Source code for corfu

'''corfu: projected angular correlation functions

author: Nicolas Tessore <n.tessore@ucl.ac.uk>
license: MIT

'''

__version__     = '0.2.dev'

__all__ = [
    'ptoxi',
    'theta',
    'xitow',
    'xitow_limber',
    'wtocl',
    'cltow',
]

import numpy as np
from scipy.special import loggamma, poch
from scipy.interpolate import RectBivariateSpline

# constants
PI = 3.1415926535897932384626433832795028841971693993751E+00
PI_HALF = 1.5707963267948966192313216916397514420985846996876E+00
TWO_PI = 6.2831853071795864769252867665590057683943387987502E+00
FOUR_PI = 1.2566370614359172953850573533118011536788677597500E+01
LN_2 = 6.9314718055994530941723212145817656807550013436026E-01


[docs]def ptoxi(k, p, q=0.0, d=0.0, limber=False): '''compute 3d correlation function from power spectrum Parameters ---------- k : array_like (N,) Wavenumbers. p : array_like (..., N) Power spectrum. Can be multidimensional. Last axis must agree with the wavenumber axis. q : float, optional Exponent of power law bias for fast Hankel transform. d : float, optional Logarithmic shift of output sequence. limber : bool, optional Compute Limber correlation function for equal time approximation. Default is `False`. ''' assert np.ndim(k) == 1, 'k must be 1d array' assert np.shape(p)[-1] == len(k), 'last axis of p must agree with size of k' # Limber correlation function or exact if limber: mu = 0 else: mu = 0.5 # set up log space k n = len(k) lnk1 = np.log(k[0]) lnkn = np.log(k[-1]) lnkc = (lnk1 + lnkn)/2 dlnk = (lnkn - lnk1)/(n-1) jc = (n-1)/2 j = np.arange(n) # make sure given k is linear in log space if not np.allclose(k, np.exp(lnkc + (j-jc)*dlnk)): raise ValueError('k array not linear in log space') # tweak d to fulfil low-ringing condition xp = (mu+1+q)/2 xm = (mu+1-q)/2 y = PI_HALF/dlnk zp = loggamma(xp + 1j*y) zm = loggamma(xm + 1j*y) u = (LN_2 - d)/dlnk + (zp.imag + zm.imag)/PI d = d + (u - np.round(u))*dlnk # compute Hankel transform coefficients y = np.linspace(0, np.pi*(n//2)/(n*dlnk), n//2+1) u = np.empty(n//2+1, dtype=complex) v = np.empty(n//2+1, dtype=complex) u.imag[:] = y u.real[:] = xm loggamma(u, out=v) u.real[:] = xp loggamma(u, out=u) y *= 2*(LN_2 - d) u.real -= v.real u.real += LN_2*q u.imag += v.imag u.imag += y np.exp(u, out=u) # deal with special cases if not np.isfinite(u[0]): # write u_0 = 2^q Gamma(xp)/Gamma(xm) = 2^q poch(xm, xp-xm) # poch() handles special cases for negative integers correctly u[0] = 2**q * poch(xm, xp-xm) # the transform may still be singular if np.isinf(u[0]): raise ValueError(f'singular transform for q = {q}') # ensure that kr is good for n even if not n&1: # low ringing kr should make last coefficient real if not np.isclose(u[-1].imag, 0): raise ValueError('unable to construct low-ringing transform, ' 'try odd number of points or different q') # fix last coefficient to be real u.imag[-1] = 0 # input array for transform xi = np.copy(p) # allocates memory # factor of (k/k_c)^{mu+1-q} for input array xi *= np.exp((mu+1-q)*(j-jc)*dlnk) # Hankel transform via real FFT xi = np.fft.rfft(xi, axis=-1) xi *= u xi = np.fft.irfft(xi, n, axis=-1) xi[..., :] = xi[..., ::-1] # factor of (r/r_c)^{mu+1-q} (k_c r_c)^{mu+1-q} for output array xi *= np.exp((mu+1-q)*((j-jc)*dlnk + d)) # set up r in log space r = np.exp(d)/k[::-1] # prefactor for correlation function xi /= TWO_PI**(1+mu) xi /= r**3 # done, return separations and correlations return r, xi
[docs]def theta(n): r'''compute nodes for the angular correlation function Returns :math:`n` angles :math:`\theta_0, \ldots, \theta_{n-1}` at which to compute the angular correlation function when estimating the angular power spectrum using :func:`corfu.wtocl`. Parameters ---------- n : int Number of nodes. Returns ------- theta : array_like (n,) Angles in radians. ''' x = np.arange(n, dtype=float) x += 0.5 x *= np.pi/n return x
[docs]def xitow(theta, f1, f2, xi, progress=False): '''unequal time projection''' assert np.ndim(theta) == 1, 'theta must be 1d array' assert len(f1) == 2, 'f1 must be tuple of radii and weights' assert np.ndim(f1[0]) == 1, 'f1[0] must be 1d array' assert np.ndim(f1[1]) == 1, 'f1[1] must be 1d array' assert np.shape(f1[0]) == np.shape(f1[1]), 'shapes of f1[0] and f1[1] must match' assert len(f2) == 2, 'f2 must be tuple of radii and weights' assert np.ndim(f2[0]) == 1, 'f2[0] must be 1d array' assert np.ndim(f2[1]) == 1, 'f2[1] must be 1d array' assert np.shape(f2[0]) == np.shape(f2[1]), 'shapes of f2[0] and f2[1] must match' assert len(xi) == 4, 'xi must be tuple of radii, radii, separations, and correlations' assert np.ndim(xi[0]) == 1, 'xi[0] must be 1d array' assert np.ndim(xi[1]) == 1, 'xi[1] must be 1d array' assert np.ndim(xi[2]) == 1, 'xi[2] must be 1d array' assert np.ndim(xi[3]) == 3, 'xi[3] must be 3d array' assert np.shape(xi[3]) == (len(xi[0]), len(xi[1]), len(xi[2])), 'shape of xi[3] must match xi[0], xi[1], xi[2]' # use tqdm to report on progress if progress: if progress == 'gui': from tqdm import tqdm_gui as prog elif progress == 'notebook': from tqdm.notebook import tqdm as prog else: from tqdm import tqdm as prog else: def prog(x, total): return x # expand inputs x1_f1, f1 = f1 x2_f2, f2 = f2 x1_xi, x2_xi, r_xi, xi = xi # index array for x1_xi n1_xi = np.arange(len(x1_xi)) # log(separation) values for log-linear interpolation of xi lnr_xi = np.log(r_xi) # minimum and maximum of log(separation) min_lnr_xi = np.min(lnr_xi) max_lnr_xi = np.max(lnr_xi) # select support = radii where the filters are nonzero supp_f1 = np.nonzero(f1) supp_f2 = np.nonzero(f2) # precompute trig values sin_theta = np.sin(theta) cos_theta = np.cos(theta) # differences of xi along x1 axis dxi = np.diff(xi, axis=0) # correlation function at fixed x1 xi1 = np.empty(np.shape(xi)[1:]) # number of x2 solutions for each (theta, r, x1) n_x2 = np.empty(len(theta), dtype=int) c_x2 = np.zeros(len(theta)+1, dtype=int) # partial results for integration along line of sight x1 w1 = np.zeros((len(theta), len(f1))) # treat each x1 value independently for i, x1 in prog(zip(supp_f1[0], x1_f1[supp_f1]), total=len(supp_f1[0])): # linearly interpolate xi along x1 axis n, u = divmod(np.interp(x1, x1_xi, n1_xi), 1) xi1[:, :] = dxi[int(n)] xi1 *= u xi1 += xi[int(n)] # construct interpolator for xi(x2, r | x1) xi1_f = RectBivariateSpline(x2_xi, lnr_xi, xi1, kx=1, ky=1) # precompute some numbers x1_sin_theta = x1*sin_theta x1_cos_theta = x1*cos_theta # all x2 solutions for this x1 and all theta, r x2 = [] # go through theta, collect solutions for x2 given theta, r, x1 # combine with the values x2_f2 where the filter is given for j, (x1_st, x1_ct) in enumerate(zip(x1_sin_theta, x1_cos_theta)): dx = r_xi[r_xi >= x1_st]**2 dx -= x1_st**2 np.sqrt(dx, out=dx) x2_1 = dx # same memory x2_2 = -dx[dx <= x1_ct] x2_1 += x1_ct x2_2 += x1_ct x2.extend([x2_f2[supp_f2], x2_1, x2_2]) n_x2[j] = len(x2[-3]) + len(x2[-2]) + len(x2[-1]) # stack all x2 value subarrays x2 = np.concatenate(x2) # cumulative counts of x2 values in subarrays, for indexing np.cumsum(n_x2, out=c_x2[1:]) # sort x2 subarrays for k, l in zip(c_x2, c_x2[1:]): x2[k:l].sort(kind='mergesort') # get function values at all distict x2 values f2_x2 = np.interp(x2, x2_f2, f2, left=0, right=0) # find support of f2_x2 supp_f2_x2 = np.nonzero(f2_x2) # x2 values in support x2_s = x2[supp_f2_x2] # expand arrays for subarrays x1_sin_theta = np.repeat(x1_sin_theta, n_x2)[supp_f2_x2] x1_cos_theta = np.repeat(x1_cos_theta, n_x2)[supp_f2_x2] # separation array lnr = x1_sin_theta**2 + (x1_cos_theta - x2_s)**2 np.log(lnr, out=lnr) lnr *= 0.5 # bounds check assert np.min(lnr) >= min_lnr_xi, 'r < min(r_xi)' assert np.max(lnr) <= max_lnr_xi, 'r > max(r_xi)' # interpolate correlation values over support w2_s = xi1_f.ev(x2_s, lnr) # multiply by filter over x2 where nonzero w2_s *= f2_x2[supp_f2_x2] # full correlation function along line of sight x2 w2 = np.zeros(len(x2)) w2[supp_f2_x2] = w2_s # integrate along x2 line of sight for this (theta, x1) for j, (k, l) in enumerate(zip(c_x2, c_x2[1:])): w1[j, i] = np.trapz(w2[k:l], x2[k:l]) # multiply by filter over x1 w1 *= f1 # integrate along x1 line of sight w = np.trapz(w1, x1_f1) # done, return projection return w
[docs]def xitow_limber(theta, f12, xi): '''equal time projection''' assert np.ndim(theta) == 1, 'theta must be 1d array' assert len(f12) == 2, 'f12 must be tuple of radii and weights' assert np.ndim(f12[0]) == 1, 'f12[0] must be 1d array' assert np.ndim(f12[1]) == 1, 'f12[1] must be 1d array' assert np.shape(f12[0]) == np.shape(f12[1]), 'shapes of f12[0] and f12[1] must match' assert len(xi) == 3, 'xi must be tuple of radii, separations, and correlations' assert np.ndim(xi[0]) == 1, 'xi[0] must be 1d array' assert np.ndim(xi[1]) == 1, 'xi[1] must be 1d array' assert np.ndim(xi[2]) == 2, 'xi[2] must be 2d array' assert np.shape(xi[2]) == (len(xi[0]), len(xi[1])), 'shape of xi[2] must match xi[0], xi[1]' # expand inputs xf, f12 = f12 xxi, rxi, xi = xi # index array for xxi nxi = np.arange(len(xxi)) # log(separation) values for log-linear interpolation of xi lnrxi = np.log(rxi) # select support = radii where the filter is nonzero supp = np.nonzero(f12) # partial results for integration along line of sight w12 = np.zeros((len(theta), len(xf))) # correlation functions at fixed x xi12 = np.empty(np.shape(xi)[1:]) # loop over x in support for i, x in zip(supp[0], xf[supp]): # linearly interpolate xi along x axis n, u = divmod(np.interp(x, xxi, nxi), 1) np.multiply(xi[int(n)], 1 - u, out=xi12) xi12 += xi[int(n)+1]*u # separation array lnr = x*theta np.log(lnr, out=lnr) # bounds check minr = np.exp(np.min(lnr)) maxr = np.exp(np.max(lnr)) assert minr >= np.min(rxi), f'minimum separation {minr} not in r for xi' assert maxr <= np.max(rxi), f'maximum separation {maxr} not in r for xi' # set correlations for this x w12[:, i] = np.interp(lnr, lnrxi, xi12) # multiply by filter over x and theta*x w12 *= f12 w12 *= xf w12 *= theta[:, None] # integrate along line of sight w = np.trapz(w12, xf, axis=1) # done, return projection return w
[docs]def wtocl(theta, w, lmax=None): assert np.ndim(theta) == 1, 'theta must be 1d array' assert np.ndim(w) >= 1, 'w must be at least 1d array' assert len(theta) > 0, 'theta must not be empty' assert len(theta) == np.shape(w)[-1], 'shapes of theta and w must agree' if lmax is not None: assert np.isscalar(lmax) and lmax > 0, 'lmax must be a positive number' # get default lmax, or force integer lmax if lmax is None: lmax = len(theta)-1 else: lmax = int(lmax) # force integer lmax lmax = int(lmax) # get evaluation points for Legendre fit x = np.cos(theta) # fit Legendre polynomial to angular correlation function c = np.polynomial.legendre.legfit(x, np.transpose(w), lmax, full=False).T # use only coefficients up to lmax c = c[..., :lmax+1] # scale Legendre coefficients to obtain angular power spectrum s = np.arange(lmax+1, dtype=float) s *= 2 s += 1 s /= FOUR_PI c /= s # done, return Cls return c
[docs]def cltow(cl, theta): assert np.ndim(cl) >= 1, 'cl must be at least 1d' assert np.ndim(theta) == 1, 'theta must be 1d array' # get evaluation points for Legendre series x = np.cos(theta) # ell numbers ell = np.arange(np.shape(cl)[-1]) # series coefficients from Cls c = (2*ell+1)/FOUR_PI*cl # evaluate Legendre polynomial w = np.polynomial.legendre.legval(x, c.T) # done, return correlation function return w