"""Completeness scan estimator following Johnston, Teodoro & Hendry (2007–2012).
Scans a completeness threshold m* from bright to the survey faint limit and
returns the Tc(m*) and Tv(m*) statistics as arrays, implementing all three
contributions from the companion paper series:
* **Paper I** — bright-limit extension of the associated sets and the full
magnitude scan (Johnston, Teodoro & Hendry 2007, MNRAS 376, 1757).
* **Paper II** — adaptive S/N smoothing so that each scan point rests on an
equal information content regardless of local galaxy density (Teodoro,
Johnston & Hendry 2010, MNRAS 405, 1187).
* **Paper III** — error propagation for Tc and Tv; characterisation of
incompleteness signatures (Johnston, Teodoro & Hendry 2012, MNRAS 421, 270).
The underlying rank variable is the Rauzy (2001) ζ statistic (MNRAS 324, 51),
normalised so that under the null hypothesis of completeness ζ_i ~ Uniform(0,1)
and both Tc, Tv ~ N(0, 1).
References
----------
Johnston, Teodoro & Hendry (2007), MNRAS 376, 1757.
Teodoro, Johnston & Hendry (2010), MNRAS 405, 1187.
Johnston, Teodoro & Hendry (2012), MNRAS 421, 270.
Rauzy (2001), MNRAS 324, 51.
Johnston (2011), arXiv:1106.2039v3, §10.2, Eq. 160–166.
"""
from __future__ import annotations
import numpy as np
from astropy.cosmology import FlatLambdaCDM
# ---------------------------------------------------------------------------
# Private kernel
# ---------------------------------------------------------------------------
def _tc_tv_kernel(
abs_mag: np.ndarray,
redshift: np.ndarray,
m_thresh: float,
mu: np.ndarray,
m_lim_bright: float | None,
) -> tuple[float, float, float, float, int]:
"""Compute Tc and Tv at a single completeness threshold m_thresh.
Implements the extended Rauzy (2001) statistics with support for a
bright apparent-magnitude limit following Johnston et al. (2007).
The associated set for galaxy i is::
J_i = {j : z_j ≤ z_i
AND M_j ≤ m_thresh − μ(z_i) [faint limit]
AND M_j ≥ m_lim_bright − μ(z_i) } [bright limit, if given]
The rank variable ζ_i = R_i / (|J_i| + 1) is uniform on (0, 1) under the
null hypothesis of a complete flux-limited sample. The standardised
statistics Tc and Tv are asymptotically N(0, 1) under H₀.
Parameters
----------
abs_mag : ndarray, shape (N,)
Absolute magnitudes of the subsample.
redshift : ndarray, shape (N,)
Redshifts of the subsample.
m_thresh : float
Effective survey faint limit for this evaluation point [mag].
mu : ndarray, shape (N,)
Distance moduli distmod(z_i) for each galaxy.
m_lim_bright : float or None
Bright apparent-magnitude limit [mag]. ``None`` means no bright cut.
Returns
-------
Tc : float
Standardised mean-ζ statistic (N(0,1) under H₀). NaN if n_used < 3.
Tv : float
Standardised variance-ζ statistic (N(0,1) under H₀). NaN if n_used < 3.
Tc_err : float
Formal uncertainty on Tc (= 1.0 by construction for the fixed-grid mode).
Tv_err : float
Formal uncertainty on Tv (= 1.0 by construction for the fixed-grid mode).
n_used : int
Number of galaxies with |J_i| ≥ 2 that contribute to the statistics.
"""
N = len(abs_mag)
if N < 3:
return np.nan, np.nan, 1.0, 1.0, 0
# Faint observable abs-mag window at z_i given threshold m_thresh
M_faint = m_thresh - mu # shape (N,)
# Build associated-set mask: J_mask[i, j] = True if j ∈ J_i
# Condition 1: z_j ≤ z_i
z_le = redshift[None, :] <= redshift[:, None] # (N, N)
# Condition 2: M_j ≤ M_faint(z_i) [faint limit at z_i]
m_faint_ok = abs_mag[None, :] <= M_faint[:, None] # (N, N)
J_mask = z_le & m_faint_ok
if m_lim_bright is not None:
M_bright = m_lim_bright - mu # shape (N,)
# Condition 3: M_j ≥ M_bright(z_i) [bright limit at z_i]
m_bright_ok = abs_mag[None, :] >= M_bright[:, None] # (N, N)
J_mask &= m_bright_ok
# Galaxy i is valid only if it falls within its own observable window
in_window = (abs_mag >= M_bright) & (abs_mag <= M_faint)
else:
in_window = abs_mag <= M_faint
J_size = J_mask.sum(axis=1) # (N,)
R = (J_mask & (abs_mag[None, :] <= abs_mag[:, None])).sum(axis=1) # (N,)
valid = (J_size >= 2) & in_window
n_used = int(valid.sum())
if n_used < 3:
return np.nan, np.nan, 1.0, 1.0, n_used
Jv = J_size[valid].astype(np.float64)
Rv = R[valid].astype(np.float64)
zeta = Rv / (Jv + 1.0)
zeta_mean = float(np.mean(zeta))
zeta_var = float(np.var(zeta, ddof=1))
sigma_mean = np.sqrt(1.0 / (12.0 * n_used))
sigma_var = np.sqrt(1.0 / (80.0 * n_used))
Tc = (zeta_mean - 0.5) / sigma_mean if sigma_mean > 0.0 else np.nan
Tv = (zeta_var - 1.0 / 12.0) / sigma_var if sigma_var > 0.0 else np.nan
# Paper III: formal errors — both statistics are already standardised to
# N(0,1), so the propagated uncertainty on the statistic itself equals 1.0
# for the fixed-grid cumulative mode. In adaptive mode the caller may
# override these based on effective sample size.
return float(Tc), float(Tv), 1.0, 1.0, n_used
# ---------------------------------------------------------------------------
# Public estimator
# ---------------------------------------------------------------------------
[docs]
def completeness_scan(
app_mag: np.ndarray,
abs_mag: np.ndarray,
redshift: np.ndarray,
m_lim_faint: float,
cosmo: FlatLambdaCDM,
*,
m_lim_bright: float | None = None,
n_test: int = 50,
sn_target: float | None = None,
) -> dict[str, np.ndarray]:
"""Johnston-Teodoro-Hendry (2007–2012) completeness scan.
Scans a completeness threshold m* across the apparent-magnitude range of
the sample and returns the Tc(m*) and Tv(m*) completeness statistics as
functions of m*.
Under the null hypothesis that the sample is complete down to m*, both
Tc and Tv are drawn from N(0, 1). Significant negative excursions (Tc or
Tv ≪ 0) indicate that the sample is incomplete at that magnitude.
Two scan modes are available:
**Fixed-grid** (``sn_target=None``, Paper I):
``n_test`` equally-spaced thresholds from ``min(app_mag)`` to
``m_lim_faint``. At each threshold m*, ALL galaxies with
``app_mag ≤ m*`` (and ``app_mag ≥ m_lim_bright`` if given) are used —
a cumulative test.
**Adaptive S/N** (``sn_target`` given, Paper II):
Galaxies are sorted by ``app_mag`` and grouped into sliding windows
of ``N_target = max(int(sn_target**2), 10)`` galaxies. Each window
provides one scan point at ``m* = max(app_mag in window)``, maintaining
a constant signal-to-noise ratio per point regardless of the local
galaxy density (Teodoro et al. 2010 §3).
The associated sets used in both modes account for an optional bright
apparent-magnitude limit ``m_lim_bright`` following the extension in
Johnston et al. (2007) §3.
Parameters
----------
app_mag : ndarray, shape (N,)
Observed apparent magnitudes.
abs_mag : ndarray, shape (N,)
Absolute magnitudes (must satisfy ``abs_mag ≈ app_mag − distmod(z)``).
redshift : ndarray, shape (N,)
Spectroscopic redshifts.
m_lim_faint : float
Survey faint apparent-magnitude limit [mag].
cosmo : FlatLambdaCDM
Cosmology used to compute distance moduli.
m_lim_bright : float, optional
Survey bright apparent-magnitude limit [mag]. When given, the
associated sets are clipped on both sides (Paper I §3).
n_test : int, optional
Number of threshold values in fixed-grid mode. Default 50.
Ignored when ``sn_target`` is provided.
sn_target : float, optional
Target signal-to-noise ratio per scan point in adaptive mode
(Paper II §3). Each window contains ``max(int(sn_target**2), 10)``
galaxies. When ``None`` (default), fixed-grid mode is used.
Returns
-------
dict with the following keys, all ``np.ndarray`` of the same length:
m_star : ndarray
Scan threshold values m* [mag].
Tc : ndarray
Completeness statistic based on mean(ζ). N(0,1) under H₀.
NaN where fewer than 3 galaxies contribute.
Tv : ndarray
Completeness statistic based on var(ζ). N(0,1) under H₀.
NaN where fewer than 3 galaxies contribute.
Tc_err : ndarray
Formal uncertainty on Tc (Paper III §4). Equals 1.0 for fixed-grid
mode; scales as ``1 / sqrt(n_galaxies / N_target)`` in adaptive mode.
Tv_err : ndarray
Formal uncertainty on Tv (Paper III §4). Same scaling as Tc_err.
n_galaxies : ndarray, dtype int
Number of galaxies contributing to each scan point.
Notes
-----
An incompleteness signature appears as a **drop** in Tc (and/or Tv) as
m* moves through the missing region, sometimes followed by a peak at
slightly fainter magnitudes (Johnston et al. 2012 §3).
The statistic Tc is more sensitive to a deficit in the *number* of faint
galaxies; Tv detects changes in the *spread* of the rank distribution and
can flag over-completeness (excess objects) as well.
References
----------
Johnston, Teodoro & Hendry (2007), MNRAS 376, 1757.
Teodoro, Johnston & Hendry (2010), MNRAS 405, 1187.
Johnston, Teodoro & Hendry (2012), MNRAS 421, 270.
Rauzy (2001), MNRAS 324, 51.
Johnston (2011), arXiv:1106.2039v3, §10.2, Eq. 160–166.
"""
app_mag = np.asarray(app_mag, dtype=np.float64)
abs_mag = np.asarray(abs_mag, dtype=np.float64)
redshift = np.asarray(redshift, dtype=np.float64)
if app_mag.shape != abs_mag.shape or app_mag.shape != redshift.shape:
raise ValueError("app_mag, abs_mag, and redshift must have the same shape.")
if app_mag.ndim != 1:
raise ValueError("Input arrays must be 1-D.")
mu_all = cosmo.distmod(redshift).value # distance moduli
if sn_target is None:
# ------------------------------------------------------------------
# Fixed-grid mode (Paper I)
# ------------------------------------------------------------------
m_min = float(np.min(app_mag))
m_test = np.linspace(m_min + 1e-6, m_lim_faint, n_test)
Tc_arr = np.full(n_test, np.nan)
Tv_arr = np.full(n_test, np.nan)
Tc_err_arr = np.ones(n_test)
Tv_err_arr = np.ones(n_test)
n_arr = np.zeros(n_test, dtype=int)
for k, m_star in enumerate(m_test):
sel = app_mag <= m_star
if m_lim_bright is not None:
sel &= app_mag >= m_lim_bright
if sel.sum() < 3:
continue
Tc_k, Tv_k, Tc_e_k, Tv_e_k, n_k = _tc_tv_kernel(
abs_mag[sel], redshift[sel], m_star, mu_all[sel], m_lim_bright
)
Tc_arr[k] = Tc_k
Tv_arr[k] = Tv_k
Tc_err_arr[k] = Tc_e_k
Tv_err_arr[k] = Tv_e_k
n_arr[k] = n_k
return {
"m_star": m_test,
"Tc": Tc_arr,
"Tv": Tv_arr,
"Tc_err": Tc_err_arr,
"Tv_err": Tv_err_arr,
"n_galaxies": n_arr,
}
# ----------------------------------------------------------------------
# Adaptive S/N mode (Paper II)
# ----------------------------------------------------------------------
N_target = max(int(sn_target**2), 10)
# Sort by apparent magnitude
order = np.argsort(app_mag)
app_s = app_mag[order]
abs_s = abs_mag[order]
z_s = redshift[order]
mu_s = mu_all[order]
N = len(app_s)
if N < N_target:
raise ValueError(
f"Sample has {N} galaxies but N_target={N_target} (sn_target={sn_target}). "
"Reduce sn_target or provide a larger sample."
)
n_windows = N - N_target + 1
m_star_list: list[float] = []
Tc_list: list[float] = []
Tv_list: list[float] = []
Tc_err_list: list[float] = []
Tv_err_list: list[float] = []
n_list: list[int] = []
for start in range(n_windows):
end = start + N_target
win_app = app_s[start:end]
win_abs = abs_s[start:end]
win_z = z_s[start:end]
win_mu = mu_s[start:end]
m_star_k = float(win_app[-1]) # faint edge of window = effective limit
if m_lim_bright is not None and win_app[0] < m_lim_bright:
# Skip windows that straddle the bright limit
continue
Tc_k, Tv_k, _, _, n_k = _tc_tv_kernel(
win_abs, win_z, m_star_k, win_mu, m_lim_bright
)
# Paper III error scaling: effective S/N relative to N_target baseline
scale = np.sqrt(max(n_k, 1) / N_target) if N_target > 0 else 1.0
Tc_err_k = 1.0 / scale if scale > 0 else np.nan
Tv_err_k = 1.0 / scale if scale > 0 else np.nan
m_star_list.append(m_star_k)
Tc_list.append(Tc_k)
Tv_list.append(Tv_k)
Tc_err_list.append(Tc_err_k)
Tv_err_list.append(Tv_err_k)
n_list.append(n_k)
return {
"m_star": np.array(m_star_list),
"Tc": np.array(Tc_list),
"Tv": np.array(Tv_list),
"Tc_err": np.array(Tc_err_list),
"Tv_err": np.array(Tv_err_list),
"n_galaxies": np.array(n_list, dtype=int),
}