Source code for sum_stat.twopcf.spatial

"""3D two-point correlation function ξ(r).

Provides:
- treecorr-based measurement via :func:`xi_r`
"""

from __future__ import annotations

import numpy as np
import treecorr
from astropy.cosmology import FlatLambdaCDM

from ..catalogue import GalaxyCatalogue


[docs] def xi_r( gal: GalaxyCatalogue, rand: GalaxyCatalogue, r_bins: np.ndarray, cosmo: FlatLambdaCDM, n_threads: int = 4, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """3D two-point correlation function ξ(r) via Landy-Szalay estimator. Comoving distances are computed from redshifts using the provided cosmology. treecorr operates in 3D Euclidean metric. Parameters ---------- gal : GalaxyCatalogue Galaxy catalogue (ra, dec, redshift required). rand : GalaxyCatalogue Random catalogue (ra, dec, redshift required). r_bins : ndarray, shape (n_bins+1,) Comoving separation bin edges [Mpc]. cosmo : FlatLambdaCDM Cosmology for redshift-to-distance conversion. n_threads : int Number of OpenMP threads for treecorr. Returns ------- r_centres : ndarray, shape (n_bins,) Mean comoving separation per bin [Mpc]. xi : ndarray, shape (n_bins,) 3D correlation function (dimensionless). var_xi : ndarray, shape (n_bins,) Variance estimate (Poisson / shot noise). Performance ----------- ~8 s/call (N_gal=5000, N_rand=50000, n_bins=20, treecorr, 4 threads, CPU x86-64) """ rand = rand.subsample(5 * gal.n) r_bins = np.asarray(r_bins, dtype=np.float64) r_gal = gal.comoving_distance(cosmo).astype(np.float64) r_rand = rand.comoving_distance(cosmo).astype(np.float64) config = dict( min_sep=r_bins[0], max_sep=r_bins[-1], nbins=len(r_bins) - 1, metric="Euclidean", num_threads=n_threads, ) cat_g = treecorr.Catalog( ra=gal.ra, dec=gal.dec, r=r_gal, w=gal.weight, ra_units="degrees", dec_units="degrees" ) cat_r = treecorr.Catalog( ra=rand.ra, dec=rand.dec, r=r_rand, w=rand.weight, ra_units="degrees", dec_units="degrees" ) dd = treecorr.NNCorrelation(config) dr = treecorr.NNCorrelation(config) rr = treecorr.NNCorrelation(config) dd.process(cat_g) dr.process(cat_g, cat_r) rr.process(cat_r) xi, varxi = dd.calculateXi(rr=rr, dr=dr) r_centres = np.exp(dd.meanlogr) return r_centres, np.array(xi), np.array(varxi)