"""C⁻ (Lynden-Bell 1971) non-parametric cumulative LF and SMF estimators.
Reference
---------
Lynden-Bell (1971), MNRAS 155, 95.
Johnston (2011), arXiv:1106.2039v3, §5.3, Eq. 74–77.
"""
from __future__ import annotations
import numpy as np
from astropy.cosmology import FlatLambdaCDM
from ..catalogue import GalaxyCatalogue
def _cminus_product(
M: np.ndarray,
M_faint: np.ndarray,
weight: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""C⁻ product estimator of the cumulative LF shape.
Galaxies are sorted bright→faint. For each galaxy k, the C⁻ comparison
set is all brighter galaxies j whose survey limit reaches M_k
(Johnston 2011 Eq. 75)::
c_k = Σ_{j < k : M_faint(z_j) > M_{(k)}} w_j
The cumulative LF is built by the product recursion (Eq. 77)::
Φ(M_{(k+1)}) = Φ(M_{(k)}) × (c_k + w_k) / c_k
Parameters
----------
M : ndarray, shape (N,)
Absolute magnitudes (unsorted).
M_faint : ndarray, shape (N,)
Faintest observable absolute magnitude at each galaxy's redshift,
M_faint = m_lim − μ(z).
weight : ndarray, shape (N,)
Per-galaxy weights.
Returns
-------
M_sorted : ndarray, shape (N,)
Absolute magnitudes sorted brightest → faintest.
Phi : ndarray, shape (N,)
Unnormalised cumulative LF shape (monotonically increasing).
Phi_err : ndarray, shape (N,)
Uncertainty from the Lynden-Bell product-formula variance.
"""
order = np.argsort(M)
M_s = M[order]
M_faint_s = M_faint[order]
w_s = weight[order]
N = len(M_s)
log_phi = np.zeros(N)
log_var = np.zeros(N)
log_phi_acc = 0.0
log_var_acc = 0.0
for k in range(N):
log_phi[k] = log_phi_acc
log_var[k] = log_var_acc
if k < N - 1:
# c_k: weighted count of brighter galaxies that can observe M_s[k+1]
accessible = M_faint_s[: k + 1] > M_s[k + 1]
c_k = np.sum(w_s[: k + 1][accessible])
if c_k > 0:
log_phi_acc += np.log1p(w_s[k + 1] / c_k)
log_var_acc += w_s[k + 1] / (c_k * (c_k + w_s[k + 1]))
Phi = np.exp(log_phi)
Phi_err = Phi * np.sqrt(np.maximum(log_var, 0.0))
return M_s, Phi, Phi_err
[docs]
def cumulative_luminosity_function_cminus(
cat: GalaxyCatalogue,
z_min: float,
z_max: float,
mag_limit: float,
area_sr: float,
cosmo: FlatLambdaCDM,
z_max_individual: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""C⁻ (Lynden-Bell 1971) cumulative luminosity function estimator.
Non-parametric, density-independent estimator of the cumulative LF
Φ(< M) — the number density of galaxies brighter than M — based on the
product formula of Lynden-Bell (1971) as described in Johnston (2011)
§5.3, Eq. 74–77::
Φ(M_{(k+1)}) / Φ(M_{(k)}) = (c_k + w_k) / c_k
where c_k = Σ_{j≤k : M_faint(z_j) > M_{(k+1)}} w_j is the weighted
count of brighter galaxies at positions where M_{(k+1)} is still
observable.
Absolute normalisation is set by matching Φ(M_faintest) to the total
1/V_max number density.
Parameters
----------
cat : GalaxyCatalogue
Catalogue with abs_mag, redshift, and weight populated.
z_min, z_max : float
Redshift limits.
mag_limit : float
Survey apparent-magnitude limit [mag].
area_sr : float
Survey solid angle [sr].
cosmo : FlatLambdaCDM
Cosmology for distance modulus and V_max.
z_max_individual : ndarray, optional
Per-galaxy maximum observable redshift. Defaults to z_max.
Returns
-------
M : ndarray, shape (N,)
Sorted absolute magnitudes, brightest → faintest [mag].
Phi : ndarray, shape (N,)
Cumulative LF Φ(< M) [Mpc^-3].
Phi_err : ndarray, shape (N,)
Uncertainty on Φ [Mpc^-3].
References
----------
Lynden-Bell (1971), MNRAS 155, 95.
Johnston (2011), arXiv:1106.2039v3, §5.3, Eq. 74–77.
"""
if cat.abs_mag is None:
raise ValueError("GalaxyCatalogue.abs_mag must be set for C⁻ estimator.")
abs_mag = cat.abs_mag if cat.abs_mag.ndim == 1 else cat.abs_mag[:, 0]
mask = (cat.redshift >= z_min) & (cat.redshift <= z_max)
abs_mag_m = abs_mag[mask]
z_m = cat.redshift[mask]
weight_m = cat.weight[mask]
mu = cosmo.distmod(z_m).value
M_faint = mag_limit - mu
M_s, Phi_shape, Phi_err_shape = _cminus_product(abs_mag_m, M_faint, weight_m)
# Absolute normalisation: Φ(M_faintest) = total 1/Vmax density
vmax_all = cat.vmax(z_min, z_max, area_sr, cosmo, z_max_individual)
vmax_m = vmax_all[mask]
order = np.argsort(abs_mag_m)
vmax_s = vmax_m[order]
weight_s = weight_m[order]
total_density = np.sum(weight_s / np.clip(vmax_s, 1e-30, None))
Phi_max = Phi_shape[-1]
norm = total_density / Phi_max if Phi_max > 0 else 1.0
Phi = Phi_shape * norm
Phi_err = Phi_err_shape * norm
return M_s, Phi, Phi_err
def _cminus_smf_product(
m: np.ndarray,
log10_M_lim: np.ndarray,
weight: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""C⁻ product estimator of the cumulative SMF shape.
Galaxies are sorted massive→light. For each galaxy k at log10_M_{(k)},
the C⁻ comparison set is all more-massive galaxies j whose survey is
complete at log10_M_{(k+1)} (Johnston 2011 Eq. 75 adapted for SMF)::
c_k = Σ_{j < k : log10_M_lim(z_j) < log10_M_{(k+1)}} w_j
The cumulative SMF Φ(> m_{(k)}) is built by the product recursion
(Eq. 77)::
Φ(m_{(k+1)}) = Φ(m_{(k)}) × (c_k + w_k) / c_k
Parameters
----------
m : ndarray, shape (N,)
log10(M_*/M_sun) values (unsorted).
log10_M_lim : ndarray, shape (N,)
Per-galaxy log10(M_*/M_sun) completeness limit (minimum observable
mass at each galaxy's redshift).
weight : ndarray, shape (N,)
Per-galaxy weights.
Returns
-------
m_sorted : ndarray, shape (N,)
log10(M_*/M_sun) sorted massive → light (descending).
Phi : ndarray, shape (N,)
Unnormalised cumulative SMF shape Φ(> m), monotonically increasing
as m decreases.
Phi_err : ndarray, shape (N,)
Uncertainty from the Lynden-Bell product-formula variance.
"""
order = np.argsort(-m) # descending: most massive first
m_s = m[order]
lim_s = log10_M_lim[order]
w_s = weight[order]
N = len(m_s)
log_phi = np.zeros(N)
log_var = np.zeros(N)
log_phi_acc = 0.0
log_var_acc = 0.0
for k in range(N):
log_phi[k] = log_phi_acc
log_var[k] = log_var_acc
if k < N - 1:
# c_k: weighted count of more-massive galaxies (j<=k) whose survey
# is complete at m_s[k+1] (i.e. lim_s[j] < m_s[k+1])
accessible = lim_s[: k + 1] < m_s[k + 1]
c_k = np.sum(w_s[: k + 1][accessible])
if c_k > 0:
log_phi_acc += np.log1p(w_s[k + 1] / c_k)
log_var_acc += w_s[k + 1] / (c_k * (c_k + w_s[k + 1]))
Phi = np.exp(log_phi)
Phi_err = Phi * np.sqrt(np.maximum(log_var, 0.0))
return m_s, Phi, Phi_err
[docs]
def cumulative_stellar_mass_function_cminus(
cat: GalaxyCatalogue,
z_min: float,
z_max: float,
log10_mstar_limit: np.ndarray | float,
area_sr: float,
cosmo: FlatLambdaCDM,
z_max_individual: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""C⁻ (Lynden-Bell 1971) cumulative stellar mass function estimator.
Non-parametric, density-independent estimator of the cumulative SMF
Φ(> m) — the number density of galaxies more massive than m — based on
the product formula of Lynden-Bell (1971) as described in Johnston (2011)
§5.3, Eq. 74–77::
Φ(m_{(k+1)}) / Φ(m_{(k)}) = (c_k + w_k) / c_k
where c_k = Σ_{j≤k : log10_M_lim(z_j) < m_{(k+1)}} w_j is the weighted
count of more-massive galaxies whose survey is complete at m_{(k+1)}.
Galaxies are sorted massive → light (descending log10_M).
Absolute normalisation is set by matching Φ(m_lightest) to the total
1/V_max number density.
Parameters
----------
cat : GalaxyCatalogue
Catalogue with log10_mstar, redshift, and weight populated.
z_min, z_max : float
Redshift limits.
log10_mstar_limit : ndarray, shape (N,), or float
Per-galaxy log10(M_*/M_sun) completeness limit — the minimum stellar
mass observable by the survey at each galaxy's redshift. A scalar
applies a uniform limit to all galaxies.
area_sr : float
Survey solid angle [sr].
cosmo : FlatLambdaCDM
Cosmology for V_max computation.
z_max_individual : ndarray, optional
Per-galaxy maximum observable redshift. Defaults to z_max.
Returns
-------
m : ndarray, shape (N,)
log10(M_*/M_sun) sorted massive → light [dex].
Phi : ndarray, shape (N,)
Cumulative SMF Φ(> m) [Mpc^-3].
Phi_err : ndarray, shape (N,)
Uncertainty on Φ [Mpc^-3].
References
----------
Lynden-Bell (1971), MNRAS 155, 95.
Johnston (2011), arXiv:1106.2039v3, §5.3, Eq. 74–77.
"""
if cat.log10_mstar is None:
raise ValueError("GalaxyCatalogue.log10_mstar must be set for C⁻ estimator.")
mask = (cat.redshift >= z_min) & (cat.redshift <= z_max)
m_arr = cat.log10_mstar[mask]
weight_m = cat.weight[mask]
lim = np.broadcast_to(np.asarray(log10_mstar_limit, dtype=np.float64), m_arr.shape).copy()
m_s, Phi_shape, Phi_err_shape = _cminus_smf_product(m_arr, lim, weight_m)
# Absolute normalisation: Φ(m_lightest) = total 1/Vmax density
vmax_all = cat.vmax(z_min, z_max, area_sr, cosmo, z_max_individual)
vmax_m = vmax_all[mask]
order = np.argsort(-m_arr) # same sort as in _cminus_smf_product
vmax_s = vmax_m[order]
weight_s = weight_m[order]
total_density = np.sum(weight_s / np.clip(vmax_s, 1e-30, None))
Phi_max = Phi_shape[-1]
norm = total_density / Phi_max if Phi_max > 0 else 1.0
Phi = Phi_shape * norm
Phi_err = Phi_err_shape * norm
return m_s, Phi, Phi_err