"""Step-wise maximum likelihood (SWML) estimator for the luminosity function
and stellar mass function.
Reference
---------
Efstathiou, Ellis & Peterson (1988), MNRAS 232, 431.
Johnston (2011), arXiv:1106.2039v3, §5.5, Eq. 87–91.
"""
from __future__ import annotations
import numpy as np
from astropy.cosmology import FlatLambdaCDM
from ..catalogue import GalaxyCatalogue
def _swml_iteration(
n_k: np.ndarray,
H: np.ndarray,
weight: np.ndarray,
max_iter: int = 500,
tol: float = 1e-8,
) -> tuple[np.ndarray, int]:
"""Fixed-point SWML iteration (Johnston 2011 Eq. 88).
Solves the equation::
φ_k = n_k / Σ_i [w_i H_{ik} / Σ_j φ_j H_{ij}]
iteratively until convergence. The result is normalised to unit sum
during iteration for numerical stability; absolute normalisation is
applied by the caller.
Parameters
----------
n_k : ndarray, shape (n_bins,)
Weighted galaxy count per bin.
H : ndarray, shape (N, n_bins)
Accessibility matrix: H_{ij} = ΔM_j if bin j is accessible to
galaxy i (i.e. M_centre_j < M_faint(z_i)), else 0.
weight : ndarray, shape (N,)
Per-galaxy weights.
max_iter : int
Maximum number of iterations.
tol : float
Convergence tolerance on the maximum relative change in φ.
Returns
-------
phi : ndarray, shape (n_bins,)
Converged shape estimate (unit-sum normalised).
n_iter : int
Number of iterations performed.
"""
total_n = n_k.sum()
phi = np.where(n_k > 0, n_k / (total_n + 1e-300), 1.0 / len(n_k))
phi = phi / phi.sum()
for n_iter in range(1, max_iter + 1):
phi_old = phi.copy()
d_i = H @ phi
safe_d = np.where(d_i > 0, d_i, 1.0)
update = np.einsum("i,ij->j", weight / safe_d, H)
safe_update = np.where(update > 0, update, 1.0)
phi = np.where(update > 0, n_k / safe_update, 0.0)
s = phi.sum()
if s > 0:
phi /= s
rel_change = np.max(np.abs(phi - phi_old) / (np.abs(phi_old) + 1e-10))
if rel_change < tol:
break
return phi, n_iter
[docs]
def luminosity_function_swml(
cat: GalaxyCatalogue,
z_min: float,
z_max: float,
mag_limit: float,
mag_bins: np.ndarray,
area_sr: float,
cosmo: FlatLambdaCDM,
z_max_individual: np.ndarray | None = None,
max_iter: int = 500,
tol: float = 1e-8,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Step-wise maximum likelihood (SWML) luminosity function estimator.
Non-parametric estimator. The LF is modelled as a step function; its
shape is found by maximising the normalisation-free likelihood
(Johnston 2011 Eq. 87)::
ln L = Σ_j n_j ln φ_j − Σ_i w_i ln[Σ_j φ_j H_{ij}]
where H_{ij} = ΔM_j if absolute-magnitude bin j is accessible to galaxy i
(M_centre_j < M_faint(z_i)) and 0 otherwise. The fixed-point iteration
(Johnston 2011 Eq. 88) is run until convergence. Absolute normalisation
is set by matching the integral of φ to the total 1/V_max density.
Error bars are the Poisson approximation σ_k = φ_k / sqrt(n_k)
(Johnston 2011 Eq. 90).
Parameters
----------
cat : GalaxyCatalogue
Catalogue with abs_mag, redshift, and weight populated.
z_min, z_max : float
Redshift limits of the survey.
mag_limit : float
Survey apparent-magnitude limit [mag].
mag_bins : ndarray, shape (n_bins+1,)
Absolute-magnitude bin edges [mag].
area_sr : float
Survey solid angle [sr].
cosmo : FlatLambdaCDM
Cosmology for distance modulus and V_max.
z_max_individual : ndarray, optional
Per-galaxy maximum observable redshift. Defaults to z_max.
max_iter : int
Maximum SWML iterations.
tol : float
Convergence tolerance on relative change in φ.
Returns
-------
mag_centres : ndarray, shape (n_bins,)
Absolute-magnitude bin centres [mag].
phi : ndarray, shape (n_bins,)
Luminosity function [Mpc^-3 mag^-1].
phi_err : ndarray, shape (n_bins,)
Poisson uncertainty φ_k / sqrt(n_k) [Mpc^-3 mag^-1].
References
----------
Efstathiou, Ellis & Peterson (1988), MNRAS 232, 431.
Johnston (2011), arXiv:1106.2039v3, §5.5, Eq. 87–91.
"""
if cat.abs_mag is None:
raise ValueError("GalaxyCatalogue.abs_mag must be set for SWML.")
abs_mag = cat.abs_mag if cat.abs_mag.ndim == 1 else cat.abs_mag[:, 0]
mask = (cat.redshift >= z_min) & (cat.redshift <= z_max)
abs_mag_m = abs_mag[mask]
z_m = cat.redshift[mask]
weight_m = cat.weight[mask]
mag_bins = np.asarray(mag_bins, dtype=np.float64)
n_bins = len(mag_bins) - 1
dm = mag_bins[1:] - mag_bins[:-1]
mag_centres = 0.5 * (mag_bins[:-1] + mag_bins[1:])
# Faintest observable absolute magnitude per galaxy
mu = cosmo.distmod(z_m).value
M_faint = mag_limit - mu
# Accessibility matrix H_{ij}: ΔM_j if bin centre j < M_faint[i], else 0
H = np.where(mag_centres[None, :] < M_faint[:, None], dm[None, :], 0.0)
# Weighted count per bin
n_k = np.array(
[
np.sum(weight_m[(abs_mag_m >= mag_bins[k]) & (abs_mag_m < mag_bins[k + 1])])
for k in range(n_bins)
]
)
phi_shape, _ = _swml_iteration(n_k, H, weight_m, max_iter=max_iter, tol=tol)
# Absolute normalisation: match integral to 1/Vmax total density
vmax_all = cat.vmax(z_min, z_max, area_sr, cosmo, z_max_individual)
vmax_m = vmax_all[mask]
total_density = np.sum(weight_m / np.clip(vmax_m, 1e-30, None))
integral = np.sum(phi_shape * dm)
norm = total_density / integral if integral > 0 else 1.0
phi = phi_shape * norm
# Poisson uncertainty: φ_k / sqrt(n_k) (Johnston 2011 Eq. 90)
safe_n = np.where(n_k > 0, n_k, 1.0)
phi_err = np.where(n_k > 0, phi / np.sqrt(safe_n), 0.0)
return mag_centres, phi, phi_err
[docs]
def stellar_mass_function_swml(
cat: GalaxyCatalogue,
z_min: float,
z_max: float,
log10_mstar_limit: np.ndarray | float,
mstar_bins: np.ndarray,
area_sr: float,
cosmo: FlatLambdaCDM,
z_max_individual: np.ndarray | None = None,
max_iter: int = 500,
tol: float = 1e-8,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Step-wise maximum likelihood (SWML) stellar mass function estimator.
Non-parametric estimator. The SMF is modelled as a step function; its
shape is found by maximising the normalisation-free likelihood
(Johnston 2011 Eq. 87)::
ln L = Σ_j n_j ln φ_j − Σ_i w_i ln[Σ_j φ_j H_{ij}]
where H_{ij} = Δm_j if stellar-mass bin j is accessible to galaxy i
(mstar_centre_j > log10_M_lim_i) and 0 otherwise. The fixed-point
iteration (Johnston 2011 Eq. 88) is run until convergence. Absolute
normalisation is set by matching the integral of φ to the total 1/V_max
density.
Error bars are the Poisson approximation σ_k = φ_k / sqrt(n_k)
(Johnston 2011 Eq. 90).
Parameters
----------
cat : GalaxyCatalogue
Catalogue with log10_mstar, redshift, and weight populated.
z_min, z_max : float
Redshift limits of the survey.
log10_mstar_limit : ndarray, shape (N,), or float
Per-galaxy log10(M_*/M_sun) completeness limit — the minimum stellar
mass observable by the survey at each galaxy's redshift. A scalar
applies a uniform limit to all galaxies.
mstar_bins : ndarray, shape (n_bins+1,)
log10(M_*/M_sun) bin edges [dex].
area_sr : float
Survey solid angle [sr].
cosmo : FlatLambdaCDM
Cosmology for V_max computation (used for absolute normalisation).
z_max_individual : ndarray, optional
Per-galaxy maximum observable redshift. Defaults to z_max.
max_iter : int
Maximum SWML iterations.
tol : float
Convergence tolerance on relative change in φ.
Returns
-------
mstar_centres : ndarray, shape (n_bins,)
log10(M_*/M_sun) bin centres [dex].
phi : ndarray, shape (n_bins,)
Stellar mass function [Mpc^-3 dex^-1].
phi_err : ndarray, shape (n_bins,)
Poisson uncertainty φ_k / sqrt(n_k) [Mpc^-3 dex^-1].
References
----------
Efstathiou, Ellis & Peterson (1988), MNRAS 232, 431.
Johnston (2011), arXiv:1106.2039v3, §5.5, Eq. 87–91.
"""
if cat.log10_mstar is None:
raise ValueError("GalaxyCatalogue.log10_mstar must be set for SWML.")
mask = (cat.redshift >= z_min) & (cat.redshift <= z_max)
m_arr = cat.log10_mstar[mask]
weight_m = cat.weight[mask]
mstar_bins = np.asarray(mstar_bins, dtype=np.float64)
n_bins = len(mstar_bins) - 1
dm = mstar_bins[1:] - mstar_bins[:-1]
mstar_centres = 0.5 * (mstar_bins[:-1] + mstar_bins[1:])
lim = np.broadcast_to(np.asarray(log10_mstar_limit, dtype=np.float64), m_arr.shape).copy()
# Accessibility matrix H_{ij}: Δm_j if bin centre j > lim[i], else 0.
# A galaxy's survey is complete for masses above its mass limit.
H = np.where(mstar_centres[None, :] > lim[:, None], dm[None, :], 0.0)
n_k = np.array(
[
np.sum(weight_m[(m_arr >= mstar_bins[k]) & (m_arr < mstar_bins[k + 1])])
for k in range(n_bins)
]
)
phi_shape, _ = _swml_iteration(n_k, H, weight_m, max_iter=max_iter, tol=tol)
vmax_all = cat.vmax(z_min, z_max, area_sr, cosmo, z_max_individual)
vmax_m = vmax_all[mask]
total_density = np.sum(weight_m / np.clip(vmax_m, 1e-30, None))
integral = np.sum(phi_shape * dm)
norm = total_density / integral if integral > 0 else 1.0
phi = phi_shape * norm
safe_n = np.where(n_k > 0, n_k, 1.0)
phi_err = np.where(n_k > 0, phi / np.sqrt(safe_n), 0.0)
return mstar_centres, phi, phi_err