Source code for sum_stat.lf_smf.completeness_scan

"""Completeness scan estimator following Johnston, Teodoro & Hendry (2007–2012).

Scans a completeness threshold m* from bright to the survey faint limit and
returns the Tc(m*) and Tv(m*) statistics as arrays, implementing all three
contributions from the companion paper series:

* **Paper I** — bright-limit extension of the associated sets and the full
  magnitude scan (Johnston, Teodoro & Hendry 2007, MNRAS 376, 1757).
* **Paper II** — adaptive S/N smoothing so that each scan point rests on an
  equal information content regardless of local galaxy density (Teodoro,
  Johnston & Hendry 2010, MNRAS 405, 1187).
* **Paper III** — error propagation for Tc and Tv; characterisation of
  incompleteness signatures (Johnston, Teodoro & Hendry 2012, MNRAS 421, 270).

The underlying rank variable is the Rauzy (2001) ζ statistic (MNRAS 324, 51),
normalised so that under the null hypothesis of completeness ζ_i ~ Uniform(0,1)
and both Tc, Tv ~ N(0, 1).

References
----------
Johnston, Teodoro & Hendry (2007), MNRAS 376, 1757.
Teodoro, Johnston & Hendry (2010), MNRAS 405, 1187.
Johnston, Teodoro & Hendry (2012), MNRAS 421, 270.
Rauzy (2001), MNRAS 324, 51.
Johnston (2011), arXiv:1106.2039v3, §10.2, Eq. 160–166.
"""

from __future__ import annotations

import numpy as np
from astropy.cosmology import FlatLambdaCDM

# ---------------------------------------------------------------------------
# Private kernel
# ---------------------------------------------------------------------------


def _tc_tv_kernel(
    abs_mag: np.ndarray,
    redshift: np.ndarray,
    m_thresh: float,
    mu: np.ndarray,
    m_lim_bright: float | None,
) -> tuple[float, float, float, float, int]:
    """Compute Tc and Tv at a single completeness threshold m_thresh.

    Implements the extended Rauzy (2001) statistics with support for a
    bright apparent-magnitude limit following Johnston et al. (2007).

    The associated set for galaxy i is::

        J_i = {j : z_j ≤ z_i
                   AND  M_j ≤ m_thresh − μ(z_i)          [faint limit]
                   AND  M_j ≥ m_lim_bright − μ(z_i) }    [bright limit, if given]

    The rank variable ζ_i = R_i / (|J_i| + 1) is uniform on (0, 1) under the
    null hypothesis of a complete flux-limited sample.  The standardised
    statistics Tc and Tv are asymptotically N(0, 1) under H₀.

    Parameters
    ----------
    abs_mag : ndarray, shape (N,)
        Absolute magnitudes of the subsample.
    redshift : ndarray, shape (N,)
        Redshifts of the subsample.
    m_thresh : float
        Effective survey faint limit for this evaluation point [mag].
    mu : ndarray, shape (N,)
        Distance moduli distmod(z_i) for each galaxy.
    m_lim_bright : float or None
        Bright apparent-magnitude limit [mag].  ``None`` means no bright cut.

    Returns
    -------
    Tc : float
        Standardised mean-ζ statistic (N(0,1) under H₀).  NaN if n_used < 3.
    Tv : float
        Standardised variance-ζ statistic (N(0,1) under H₀).  NaN if n_used < 3.
    Tc_err : float
        Formal uncertainty on Tc (= 1.0 by construction for the fixed-grid mode).
    Tv_err : float
        Formal uncertainty on Tv (= 1.0 by construction for the fixed-grid mode).
    n_used : int
        Number of galaxies with |J_i| ≥ 2 that contribute to the statistics.
    """
    N = len(abs_mag)
    if N < 3:
        return np.nan, np.nan, 1.0, 1.0, 0

    # Faint observable abs-mag window at z_i given threshold m_thresh
    M_faint = m_thresh - mu  # shape (N,)

    # Build associated-set mask: J_mask[i, j] = True if j ∈ J_i
    # Condition 1: z_j ≤ z_i
    z_le = redshift[None, :] <= redshift[:, None]          # (N, N)
    # Condition 2: M_j ≤ M_faint(z_i)  [faint limit at z_i]
    m_faint_ok = abs_mag[None, :] <= M_faint[:, None]     # (N, N)

    J_mask = z_le & m_faint_ok

    if m_lim_bright is not None:
        M_bright = m_lim_bright - mu                          # shape (N,)
        # Condition 3: M_j ≥ M_bright(z_i)  [bright limit at z_i]
        m_bright_ok = abs_mag[None, :] >= M_bright[:, None]  # (N, N)
        J_mask &= m_bright_ok
        # Galaxy i is valid only if it falls within its own observable window
        in_window = (abs_mag >= M_bright) & (abs_mag <= M_faint)
    else:
        in_window = abs_mag <= M_faint

    J_size = J_mask.sum(axis=1)                                          # (N,)
    R = (J_mask & (abs_mag[None, :] <= abs_mag[:, None])).sum(axis=1)   # (N,)

    valid = (J_size >= 2) & in_window
    n_used = int(valid.sum())
    if n_used < 3:
        return np.nan, np.nan, 1.0, 1.0, n_used

    Jv = J_size[valid].astype(np.float64)
    Rv = R[valid].astype(np.float64)

    zeta = Rv / (Jv + 1.0)
    zeta_mean = float(np.mean(zeta))
    zeta_var = float(np.var(zeta, ddof=1))

    sigma_mean = np.sqrt(1.0 / (12.0 * n_used))
    sigma_var = np.sqrt(1.0 / (80.0 * n_used))

    Tc = (zeta_mean - 0.5) / sigma_mean if sigma_mean > 0.0 else np.nan
    Tv = (zeta_var - 1.0 / 12.0) / sigma_var if sigma_var > 0.0 else np.nan

    # Paper III: formal errors — both statistics are already standardised to
    # N(0,1), so the propagated uncertainty on the statistic itself equals 1.0
    # for the fixed-grid cumulative mode.  In adaptive mode the caller may
    # override these based on effective sample size.
    return float(Tc), float(Tv), 1.0, 1.0, n_used


# ---------------------------------------------------------------------------
# Public estimator
# ---------------------------------------------------------------------------


[docs] def completeness_scan( app_mag: np.ndarray, abs_mag: np.ndarray, redshift: np.ndarray, m_lim_faint: float, cosmo: FlatLambdaCDM, *, m_lim_bright: float | None = None, n_test: int = 50, sn_target: float | None = None, ) -> dict[str, np.ndarray]: """Johnston-Teodoro-Hendry (2007–2012) completeness scan. Scans a completeness threshold m* across the apparent-magnitude range of the sample and returns the Tc(m*) and Tv(m*) completeness statistics as functions of m*. Under the null hypothesis that the sample is complete down to m*, both Tc and Tv are drawn from N(0, 1). Significant negative excursions (Tc or Tv ≪ 0) indicate that the sample is incomplete at that magnitude. Two scan modes are available: **Fixed-grid** (``sn_target=None``, Paper I): ``n_test`` equally-spaced thresholds from ``min(app_mag)`` to ``m_lim_faint``. At each threshold m*, ALL galaxies with ``app_mag ≤ m*`` (and ``app_mag ≥ m_lim_bright`` if given) are used — a cumulative test. **Adaptive S/N** (``sn_target`` given, Paper II): Galaxies are sorted by ``app_mag`` and grouped into sliding windows of ``N_target = max(int(sn_target**2), 10)`` galaxies. Each window provides one scan point at ``m* = max(app_mag in window)``, maintaining a constant signal-to-noise ratio per point regardless of the local galaxy density (Teodoro et al. 2010 §3). The associated sets used in both modes account for an optional bright apparent-magnitude limit ``m_lim_bright`` following the extension in Johnston et al. (2007) §3. Parameters ---------- app_mag : ndarray, shape (N,) Observed apparent magnitudes. abs_mag : ndarray, shape (N,) Absolute magnitudes (must satisfy ``abs_mag ≈ app_mag − distmod(z)``). redshift : ndarray, shape (N,) Spectroscopic redshifts. m_lim_faint : float Survey faint apparent-magnitude limit [mag]. cosmo : FlatLambdaCDM Cosmology used to compute distance moduli. m_lim_bright : float, optional Survey bright apparent-magnitude limit [mag]. When given, the associated sets are clipped on both sides (Paper I §3). n_test : int, optional Number of threshold values in fixed-grid mode. Default 50. Ignored when ``sn_target`` is provided. sn_target : float, optional Target signal-to-noise ratio per scan point in adaptive mode (Paper II §3). Each window contains ``max(int(sn_target**2), 10)`` galaxies. When ``None`` (default), fixed-grid mode is used. Returns ------- dict with the following keys, all ``np.ndarray`` of the same length: m_star : ndarray Scan threshold values m* [mag]. Tc : ndarray Completeness statistic based on mean(ζ). N(0,1) under H₀. NaN where fewer than 3 galaxies contribute. Tv : ndarray Completeness statistic based on var(ζ). N(0,1) under H₀. NaN where fewer than 3 galaxies contribute. Tc_err : ndarray Formal uncertainty on Tc (Paper III §4). Equals 1.0 for fixed-grid mode; scales as ``1 / sqrt(n_galaxies / N_target)`` in adaptive mode. Tv_err : ndarray Formal uncertainty on Tv (Paper III §4). Same scaling as Tc_err. n_galaxies : ndarray, dtype int Number of galaxies contributing to each scan point. Notes ----- An incompleteness signature appears as a **drop** in Tc (and/or Tv) as m* moves through the missing region, sometimes followed by a peak at slightly fainter magnitudes (Johnston et al. 2012 §3). The statistic Tc is more sensitive to a deficit in the *number* of faint galaxies; Tv detects changes in the *spread* of the rank distribution and can flag over-completeness (excess objects) as well. References ---------- Johnston, Teodoro & Hendry (2007), MNRAS 376, 1757. Teodoro, Johnston & Hendry (2010), MNRAS 405, 1187. Johnston, Teodoro & Hendry (2012), MNRAS 421, 270. Rauzy (2001), MNRAS 324, 51. Johnston (2011), arXiv:1106.2039v3, §10.2, Eq. 160–166. """ app_mag = np.asarray(app_mag, dtype=np.float64) abs_mag = np.asarray(abs_mag, dtype=np.float64) redshift = np.asarray(redshift, dtype=np.float64) if app_mag.shape != abs_mag.shape or app_mag.shape != redshift.shape: raise ValueError("app_mag, abs_mag, and redshift must have the same shape.") if app_mag.ndim != 1: raise ValueError("Input arrays must be 1-D.") mu_all = cosmo.distmod(redshift).value # distance moduli if sn_target is None: # ------------------------------------------------------------------ # Fixed-grid mode (Paper I) # ------------------------------------------------------------------ m_min = float(np.min(app_mag)) m_test = np.linspace(m_min + 1e-6, m_lim_faint, n_test) Tc_arr = np.full(n_test, np.nan) Tv_arr = np.full(n_test, np.nan) Tc_err_arr = np.ones(n_test) Tv_err_arr = np.ones(n_test) n_arr = np.zeros(n_test, dtype=int) for k, m_star in enumerate(m_test): sel = app_mag <= m_star if m_lim_bright is not None: sel &= app_mag >= m_lim_bright if sel.sum() < 3: continue Tc_k, Tv_k, Tc_e_k, Tv_e_k, n_k = _tc_tv_kernel( abs_mag[sel], redshift[sel], m_star, mu_all[sel], m_lim_bright ) Tc_arr[k] = Tc_k Tv_arr[k] = Tv_k Tc_err_arr[k] = Tc_e_k Tv_err_arr[k] = Tv_e_k n_arr[k] = n_k return { "m_star": m_test, "Tc": Tc_arr, "Tv": Tv_arr, "Tc_err": Tc_err_arr, "Tv_err": Tv_err_arr, "n_galaxies": n_arr, } # ---------------------------------------------------------------------- # Adaptive S/N mode (Paper II) # ---------------------------------------------------------------------- N_target = max(int(sn_target**2), 10) # Sort by apparent magnitude order = np.argsort(app_mag) app_s = app_mag[order] abs_s = abs_mag[order] z_s = redshift[order] mu_s = mu_all[order] N = len(app_s) if N < N_target: raise ValueError( f"Sample has {N} galaxies but N_target={N_target} (sn_target={sn_target}). " "Reduce sn_target or provide a larger sample." ) n_windows = N - N_target + 1 m_star_list: list[float] = [] Tc_list: list[float] = [] Tv_list: list[float] = [] Tc_err_list: list[float] = [] Tv_err_list: list[float] = [] n_list: list[int] = [] for start in range(n_windows): end = start + N_target win_app = app_s[start:end] win_abs = abs_s[start:end] win_z = z_s[start:end] win_mu = mu_s[start:end] m_star_k = float(win_app[-1]) # faint edge of window = effective limit if m_lim_bright is not None and win_app[0] < m_lim_bright: # Skip windows that straddle the bright limit continue Tc_k, Tv_k, _, _, n_k = _tc_tv_kernel( win_abs, win_z, m_star_k, win_mu, m_lim_bright ) # Paper III error scaling: effective S/N relative to N_target baseline scale = np.sqrt(max(n_k, 1) / N_target) if N_target > 0 else 1.0 Tc_err_k = 1.0 / scale if scale > 0 else np.nan Tv_err_k = 1.0 / scale if scale > 0 else np.nan m_star_list.append(m_star_k) Tc_list.append(Tc_k) Tv_list.append(Tv_k) Tc_err_list.append(Tc_err_k) Tv_err_list.append(Tv_err_k) n_list.append(n_k) return { "m_star": np.array(m_star_list), "Tc": np.array(Tc_list), "Tv": np.array(Tv_list), "Tc_err": np.array(Tc_err_list), "Tv_err": np.array(Tv_err_list), "n_galaxies": np.array(n_list, dtype=int), }