Source code for sum_stat.lf_smf.swml

"""Step-wise maximum likelihood (SWML) estimator for the luminosity function
and stellar mass function.

Reference
---------
Efstathiou, Ellis & Peterson (1988), MNRAS 232, 431.
Johnston (2011), arXiv:1106.2039v3, §5.5, Eq. 87–91.
"""

from __future__ import annotations

import numpy as np
from astropy.cosmology import FlatLambdaCDM

from ..catalogue import GalaxyCatalogue


def _swml_iteration(
    n_k: np.ndarray,
    H: np.ndarray,
    weight: np.ndarray,
    max_iter: int = 500,
    tol: float = 1e-8,
) -> tuple[np.ndarray, int]:
    """Fixed-point SWML iteration (Johnston 2011 Eq. 88).

    Solves the equation::

        φ_k = n_k / Σ_i [w_i H_{ik} / Σ_j φ_j H_{ij}]

    iteratively until convergence.  The result is normalised to unit sum
    during iteration for numerical stability; absolute normalisation is
    applied by the caller.

    Parameters
    ----------
    n_k : ndarray, shape (n_bins,)
        Weighted galaxy count per bin.
    H : ndarray, shape (N, n_bins)
        Accessibility matrix: H_{ij} = ΔM_j if bin j is accessible to
        galaxy i (i.e. M_centre_j < M_faint(z_i)), else 0.
    weight : ndarray, shape (N,)
        Per-galaxy weights.
    max_iter : int
        Maximum number of iterations.
    tol : float
        Convergence tolerance on the maximum relative change in φ.

    Returns
    -------
    phi : ndarray, shape (n_bins,)
        Converged shape estimate (unit-sum normalised).
    n_iter : int
        Number of iterations performed.
    """
    total_n = n_k.sum()
    phi = np.where(n_k > 0, n_k / (total_n + 1e-300), 1.0 / len(n_k))
    phi = phi / phi.sum()

    for n_iter in range(1, max_iter + 1):
        phi_old = phi.copy()
        d_i = H @ phi
        safe_d = np.where(d_i > 0, d_i, 1.0)
        update = np.einsum("i,ij->j", weight / safe_d, H)
        safe_update = np.where(update > 0, update, 1.0)
        phi = np.where(update > 0, n_k / safe_update, 0.0)
        s = phi.sum()
        if s > 0:
            phi /= s
        rel_change = np.max(np.abs(phi - phi_old) / (np.abs(phi_old) + 1e-10))
        if rel_change < tol:
            break

    return phi, n_iter


[docs] def luminosity_function_swml( cat: GalaxyCatalogue, z_min: float, z_max: float, mag_limit: float, mag_bins: np.ndarray, area_sr: float, cosmo: FlatLambdaCDM, z_max_individual: np.ndarray | None = None, max_iter: int = 500, tol: float = 1e-8, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Step-wise maximum likelihood (SWML) luminosity function estimator. Non-parametric estimator. The LF is modelled as a step function; its shape is found by maximising the normalisation-free likelihood (Johnston 2011 Eq. 87):: ln L = Σ_j n_j ln φ_j − Σ_i w_i ln[Σ_j φ_j H_{ij}] where H_{ij} = ΔM_j if absolute-magnitude bin j is accessible to galaxy i (M_centre_j < M_faint(z_i)) and 0 otherwise. The fixed-point iteration (Johnston 2011 Eq. 88) is run until convergence. Absolute normalisation is set by matching the integral of φ to the total 1/V_max density. Error bars are the Poisson approximation σ_k = φ_k / sqrt(n_k) (Johnston 2011 Eq. 90). Parameters ---------- cat : GalaxyCatalogue Catalogue with abs_mag, redshift, and weight populated. z_min, z_max : float Redshift limits of the survey. mag_limit : float Survey apparent-magnitude limit [mag]. mag_bins : ndarray, shape (n_bins+1,) Absolute-magnitude bin edges [mag]. area_sr : float Survey solid angle [sr]. cosmo : FlatLambdaCDM Cosmology for distance modulus and V_max. z_max_individual : ndarray, optional Per-galaxy maximum observable redshift. Defaults to z_max. max_iter : int Maximum SWML iterations. tol : float Convergence tolerance on relative change in φ. Returns ------- mag_centres : ndarray, shape (n_bins,) Absolute-magnitude bin centres [mag]. phi : ndarray, shape (n_bins,) Luminosity function [Mpc^-3 mag^-1]. phi_err : ndarray, shape (n_bins,) Poisson uncertainty φ_k / sqrt(n_k) [Mpc^-3 mag^-1]. References ---------- Efstathiou, Ellis & Peterson (1988), MNRAS 232, 431. Johnston (2011), arXiv:1106.2039v3, §5.5, Eq. 87–91. """ if cat.abs_mag is None: raise ValueError("GalaxyCatalogue.abs_mag must be set for SWML.") abs_mag = cat.abs_mag if cat.abs_mag.ndim == 1 else cat.abs_mag[:, 0] mask = (cat.redshift >= z_min) & (cat.redshift <= z_max) abs_mag_m = abs_mag[mask] z_m = cat.redshift[mask] weight_m = cat.weight[mask] mag_bins = np.asarray(mag_bins, dtype=np.float64) n_bins = len(mag_bins) - 1 dm = mag_bins[1:] - mag_bins[:-1] mag_centres = 0.5 * (mag_bins[:-1] + mag_bins[1:]) # Faintest observable absolute magnitude per galaxy mu = cosmo.distmod(z_m).value M_faint = mag_limit - mu # Accessibility matrix H_{ij}: ΔM_j if bin centre j < M_faint[i], else 0 H = np.where(mag_centres[None, :] < M_faint[:, None], dm[None, :], 0.0) # Weighted count per bin n_k = np.array( [ np.sum(weight_m[(abs_mag_m >= mag_bins[k]) & (abs_mag_m < mag_bins[k + 1])]) for k in range(n_bins) ] ) phi_shape, _ = _swml_iteration(n_k, H, weight_m, max_iter=max_iter, tol=tol) # Absolute normalisation: match integral to 1/Vmax total density vmax_all = cat.vmax(z_min, z_max, area_sr, cosmo, z_max_individual) vmax_m = vmax_all[mask] total_density = np.sum(weight_m / np.clip(vmax_m, 1e-30, None)) integral = np.sum(phi_shape * dm) norm = total_density / integral if integral > 0 else 1.0 phi = phi_shape * norm # Poisson uncertainty: φ_k / sqrt(n_k) (Johnston 2011 Eq. 90) safe_n = np.where(n_k > 0, n_k, 1.0) phi_err = np.where(n_k > 0, phi / np.sqrt(safe_n), 0.0) return mag_centres, phi, phi_err
[docs] def stellar_mass_function_swml( cat: GalaxyCatalogue, z_min: float, z_max: float, log10_mstar_limit: np.ndarray | float, mstar_bins: np.ndarray, area_sr: float, cosmo: FlatLambdaCDM, z_max_individual: np.ndarray | None = None, max_iter: int = 500, tol: float = 1e-8, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Step-wise maximum likelihood (SWML) stellar mass function estimator. Non-parametric estimator. The SMF is modelled as a step function; its shape is found by maximising the normalisation-free likelihood (Johnston 2011 Eq. 87):: ln L = Σ_j n_j ln φ_j − Σ_i w_i ln[Σ_j φ_j H_{ij}] where H_{ij} = Δm_j if stellar-mass bin j is accessible to galaxy i (mstar_centre_j > log10_M_lim_i) and 0 otherwise. The fixed-point iteration (Johnston 2011 Eq. 88) is run until convergence. Absolute normalisation is set by matching the integral of φ to the total 1/V_max density. Error bars are the Poisson approximation σ_k = φ_k / sqrt(n_k) (Johnston 2011 Eq. 90). Parameters ---------- cat : GalaxyCatalogue Catalogue with log10_mstar, redshift, and weight populated. z_min, z_max : float Redshift limits of the survey. log10_mstar_limit : ndarray, shape (N,), or float Per-galaxy log10(M_*/M_sun) completeness limit — the minimum stellar mass observable by the survey at each galaxy's redshift. A scalar applies a uniform limit to all galaxies. mstar_bins : ndarray, shape (n_bins+1,) log10(M_*/M_sun) bin edges [dex]. area_sr : float Survey solid angle [sr]. cosmo : FlatLambdaCDM Cosmology for V_max computation (used for absolute normalisation). z_max_individual : ndarray, optional Per-galaxy maximum observable redshift. Defaults to z_max. max_iter : int Maximum SWML iterations. tol : float Convergence tolerance on relative change in φ. Returns ------- mstar_centres : ndarray, shape (n_bins,) log10(M_*/M_sun) bin centres [dex]. phi : ndarray, shape (n_bins,) Stellar mass function [Mpc^-3 dex^-1]. phi_err : ndarray, shape (n_bins,) Poisson uncertainty φ_k / sqrt(n_k) [Mpc^-3 dex^-1]. References ---------- Efstathiou, Ellis & Peterson (1988), MNRAS 232, 431. Johnston (2011), arXiv:1106.2039v3, §5.5, Eq. 87–91. """ if cat.log10_mstar is None: raise ValueError("GalaxyCatalogue.log10_mstar must be set for SWML.") mask = (cat.redshift >= z_min) & (cat.redshift <= z_max) m_arr = cat.log10_mstar[mask] weight_m = cat.weight[mask] mstar_bins = np.asarray(mstar_bins, dtype=np.float64) n_bins = len(mstar_bins) - 1 dm = mstar_bins[1:] - mstar_bins[:-1] mstar_centres = 0.5 * (mstar_bins[:-1] + mstar_bins[1:]) lim = np.broadcast_to(np.asarray(log10_mstar_limit, dtype=np.float64), m_arr.shape).copy() # Accessibility matrix H_{ij}: Δm_j if bin centre j > lim[i], else 0. # A galaxy's survey is complete for masses above its mass limit. H = np.where(mstar_centres[None, :] > lim[:, None], dm[None, :], 0.0) n_k = np.array( [ np.sum(weight_m[(m_arr >= mstar_bins[k]) & (m_arr < mstar_bins[k + 1])]) for k in range(n_bins) ] ) phi_shape, _ = _swml_iteration(n_k, H, weight_m, max_iter=max_iter, tol=tol) vmax_all = cat.vmax(z_min, z_max, area_sr, cosmo, z_max_individual) vmax_m = vmax_all[mask] total_density = np.sum(weight_m / np.clip(vmax_m, 1e-30, None)) integral = np.sum(phi_shape * dm) norm = total_density / integral if integral > 0 else 1.0 phi = phi_shape * norm safe_n = np.where(n_k > 0, n_k, 1.0) phi_err = np.where(n_k > 0, phi / np.sqrt(safe_n), 0.0) return mstar_centres, phi, phi_err