'''corfu: projected angular correlation functions
author: Nicolas Tessore <n.tessore@ucl.ac.uk>
license: MIT
'''
__version__ = '0.1.1'
__all__ = [
'ptoxi',
'uneqt',
'eqt',
'wtocl',
'cltow',
]
import numpy as np
from scipy.special import loggamma
from scipy.interpolate import splrep, splev, RectBivariateSpline
# constants
PI = 3.1415926535897932384626433832795028841971693993751E+00
PI_HALF = 1.5707963267948966192313216916397514420985846996876E+00
TWO_PI = 6.2831853071795864769252867665590057683943387987502E+00
FOUR_PI = 1.2566370614359172953850573533118011536788677597500E+01
LN_2 = 6.9314718055994530941723212145817656807550013436026E-01
LN_10 = 2.3025850929940456840179914546843642076011014886288E+00
[docs]def ptoxi(k, p, q=0, limber=False):
'''compute 3d correlation function from power spectrum
Parameters
----------
k : array_like (N,)
Wavenumbers.
p : array_like (..., N)
Power spectrum. Can be multidimensional. Last axis must agree with the
wavenumber axis.
q : float, optional
Bias parameter for FFTLog.
limber : bool, optional
Compute Limber correlation function for equal time approximation.
Default is `False`.
'''
assert np.ndim(k) == 1, 'k must be 1d array'
assert np.shape(p)[-1] == len(k), 'last axis of p must agree with size of k'
# Limber correlation function or exact
if limber:
mu = 0
else:
mu = 0.5
# set up log space k
n = len(k)
lnk1 = np.log(k[0])
lnkn = np.log(k[-1])
lnkc = (lnk1 + lnkn)/2
dlnk = (lnkn - lnk1)/(n-1)
# make sure given k is linear in log space
assert np.allclose(k, np.logspace(lnk1/LN_10, lnkn/LN_10, n)), 'k array not linear in log space'
# find low-ringing kr near unity
a = (mu+1+q)/2
b = (mu+1-q)/2
c = PI_HALF/dlnk
u = loggamma(a + 1j*c)
v = loggamma(b + 1j*c)
u = LN_2/dlnk + (u.imag + v.imag)/PI
lnkr = (u - np.round(u))*dlnk
# compute Hankel transform coefficients
c = np.arange(0, n//2+1, dtype=float)
c *= PI/(n*dlnk)
u = np.empty(len(c), dtype=complex)
v = np.empty(len(c), dtype=complex)
u.real[:] = a
v.real[:] = b
u.imag[:] = c
v.imag[:] = c
loggamma(u, out=u)
loggamma(v, out=v)
c *= 2*(LN_2 - lnkr)
u.real -= v.real
u.real += LN_2*q
u.imag += v.imag
u.imag += c
np.exp(u, out=u)
if np.isnan(u[0]):
if a > 0:
u[0] = 0
else:
raise ValueError(f'singular transform for q = {q:g}')
del(v)
# ensure that kr is good
assert n & 1 or np.isclose(u[-1].real, u[-1].real+u[-1].imag), 'unable to construct low-ringing transform, try odd number of points'
# input array for transform
xi = np.copy(p) # allocates memory
# input factor for transform
j = np.arange(1, n+1, dtype=float)
j -= (n+1)/2
xi *= np.exp((1+mu-q)*j*dlnk)
# Hankel transform via real FFT
xi = np.fft.rfft(xi, axis=-1)
xi *= u
xi = np.fft.irfft(xi, n, axis=-1)
xi[..., :] = xi[..., ::-1]
# output factor for transform
xi *= np.exp(-((2-mu+q)*j*dlnk + (2-mu+q)*lnkr - 3*lnkc))
# prefactor for correlation function
xi /= TWO_PI**(1+mu)
# set up r in log space
r = np.exp(lnkr)/k[::-1]
# done, return separations and correlations
return r, xi
[docs]def uneqt(theta, f1, f2, xi, progress=False):
'''unequal time projection'''
assert np.ndim(theta) == 1, 'theta must be 1d array'
assert len(f1) == 2, 'f1 must be tuple of radii and weights'
assert np.ndim(f1[0]) == 1, 'f1[0] must be 1d array'
assert np.ndim(f1[1]) == 1, 'f1[1] must be 1d array'
assert np.shape(f1[0]) == np.shape(f1[1]), 'shapes of f1[0] and f1[1] must match'
assert len(f2) == 2, 'f2 must be tuple of radii and weights'
assert np.ndim(f2[0]) == 1, 'f2[0] must be 1d array'
assert np.ndim(f2[1]) == 1, 'f2[1] must be 1d array'
assert np.shape(f2[0]) == np.shape(f2[1]), 'shapes of f2[0] and f2[1] must match'
assert len(xi) == 4, 'xi must be tuple of radii, radii, separations, and correlations'
assert np.ndim(xi[0]) == 1, 'xi[0] must be 1d array'
assert np.ndim(xi[1]) == 1, 'xi[1] must be 1d array'
assert np.ndim(xi[2]) == 1, 'xi[2] must be 1d array'
assert np.ndim(xi[3]) == 3, 'xi[3] must be 3d array'
assert np.shape(xi[3]) == (len(xi[0]), len(xi[1]), len(xi[2])), 'shape of xi[3] must match xi[0], xi[1], xi[2]'
# use tqdm to report on progress
if progress:
if progress == 'gui':
from tqdm import tqdm_gui as prog
elif progress == 'notebook':
from tqdm.notebook import tqdm as prog
else:
from tqdm import tqdm as prog
else:
def prog(x, total): return x
# expand inputs
x1_f1, f1 = f1
x2_f2, f2 = f2
x1_xi, x2_xi, r_xi, xi = xi
# index array for x1_xi
n1_xi = np.arange(len(x1_xi))
# log(separation) values for log-linear interpolation of xi
lnr_xi = np.log(r_xi)
# minimum and maximum of log(separation)
min_lnr_xi = np.min(lnr_xi)
max_lnr_xi = np.max(lnr_xi)
# select support = radii where the filters are nonzero
supp_f1 = np.nonzero(f1)
supp_f2 = np.nonzero(f2)
# precompute trig values
sin_theta = np.sin(theta)
cos_theta = np.cos(theta)
# differences of xi along x1 axis
dxi = np.diff(xi, axis=0)
# correlation function at fixed x1
xi1 = np.empty(np.shape(xi)[1:])
# number of x2 solutions for each (theta, r, x1)
n_x2 = np.empty(len(theta), dtype=int)
c_x2 = np.zeros(len(theta)+1, dtype=int)
# partial results for integration along line of sight x1
w1 = np.zeros((len(theta), len(f1)))
# treat each x1 value independently
for i, x1 in prog(zip(supp_f1[0], x1_f1[supp_f1]), total=len(supp_f1[0])):
# linearly interpolate xi along x1 axis
n, u = divmod(np.interp(x1, x1_xi, n1_xi), 1)
xi1[:, :] = dxi[int(n)]
xi1 *= u
xi1 += xi[int(n)]
# construct interpolator for xi(x2, r | x1)
xi1_f = RectBivariateSpline(x2_xi, lnr_xi, xi1, kx=1, ky=1)
# precompute some numbers
x1_sin_theta = x1*sin_theta
x1_cos_theta = x1*cos_theta
# all x2 solutions for this x1 and all theta, r
x2 = []
# go through theta, collect solutions for x2 given theta, r, x1
# combine with the values x2_f2 where the filter is given
for j, (x1_st, x1_ct) in enumerate(zip(x1_sin_theta, x1_cos_theta)):
dx = r_xi[r_xi >= x1_st]**2
dx -= x1_st**2
np.sqrt(dx, out=dx)
x2_1 = dx # same memory
x2_2 = -dx[dx <= x1_ct]
x2_1 += x1_ct
x2_2 += x1_ct
x2.extend([x2_f2[supp_f2], x2_1, x2_2])
n_x2[j] = len(x2[-3]) + len(x2[-2]) + len(x2[-1])
# stack all x2 value subarrays
x2 = np.concatenate(x2)
# cumulative counts of x2 values in subarrays, for indexing
np.cumsum(n_x2, out=c_x2[1:])
# sort x2 subarrays
for k, l in zip(c_x2, c_x2[1:]):
x2[k:l].sort(kind='mergesort')
# get function values at all distict x2 values
f2_x2 = np.interp(x2, x2_f2, f2, left=0, right=0)
# find support of f2_x2
supp_f2_x2 = np.nonzero(f2_x2)
# x2 values in support
x2_s = x2[supp_f2_x2]
# expand arrays for subarrays
x1_sin_theta = np.repeat(x1_sin_theta, n_x2)[supp_f2_x2]
x1_cos_theta = np.repeat(x1_cos_theta, n_x2)[supp_f2_x2]
# separation array
lnr = x1_sin_theta**2 + (x1_cos_theta - x2_s)**2
np.log(lnr, out=lnr)
lnr *= 0.5
# bounds check
assert np.min(lnr) >= min_lnr_xi, 'r < min(r_xi)'
assert np.max(lnr) <= max_lnr_xi, 'r > max(r_xi)'
# interpolate correlation values over support
w2_s = xi1_f.ev(x2_s, lnr)
# multiply by filter over x2 where nonzero
w2_s *= f2_x2[supp_f2_x2]
# full correlation function along line of sight x2
w2 = np.zeros(len(x2))
w2[supp_f2_x2] = w2_s
# integrate along x2 line of sight for this (theta, x1)
for j, (k, l) in enumerate(zip(c_x2, c_x2[1:])):
w1[j, i] = np.trapz(w2[k:l], x2[k:l])
# multiply by filter over x1
w1 *= f1
# integrate along x1 line of sight
w = np.trapz(w1, x1_f1)
# done, return projection
return w
[docs]def eqt(theta, f12, xi):
'''equal time projection'''
assert np.ndim(theta) == 1, 'theta must be 1d array'
assert len(f12) == 2, 'f12 must be tuple of radii and weights'
assert np.ndim(f12[0]) == 1, 'f12[0] must be 1d array'
assert np.ndim(f12[1]) == 1, 'f12[1] must be 1d array'
assert np.shape(f12[0]) == np.shape(f12[1]), 'shapes of f12[0] and f12[1] must match'
assert len(xi) == 3, 'xi must be tuple of radii, separations, and correlations'
assert np.ndim(xi[0]) == 1, 'xi[0] must be 1d array'
assert np.ndim(xi[1]) == 1, 'xi[1] must be 1d array'
assert np.ndim(xi[2]) == 2, 'xi[2] must be 2d array'
assert np.shape(xi[2]) == (len(xi[0]), len(xi[1])), 'shape of xi[2] must match xi[0], xi[1]'
# expand inputs
xf, f12 = f12
xxi, rxi, xi = xi
# index array for xxi
nxi = np.arange(len(xxi))
# log(separation) values for log-linear interpolation of xi
lnrxi = np.log(rxi)
# select support = radii where the filter is nonzero
supp = np.nonzero(f12)
# partial results for integration along line of sight
w12 = np.zeros((len(theta), len(xf)))
# correlation functions at fixed x
xi12 = np.empty(np.shape(xi)[1:])
# loop over x in support
for i, x in zip(supp[0], xf[supp]):
# linearly interpolate xi along x axis
n, u = divmod(np.interp(x, xxi, nxi), 1)
np.multiply(xi[int(n)], 1 - u, out=xi12)
xi12 += xi[int(n)+1]*u
# separation array
lnr = x*theta
np.log(lnr, out=lnr)
# bounds check
minr = np.exp(np.min(lnr))
maxr = np.exp(np.max(lnr))
assert minr >= np.min(rxi), f'minimum separation {minr} not in r for xi'
assert maxr <= np.max(rxi), f'maximum separation {maxr} not in r for xi'
# set correlations for this x
w12[:, i] = np.interp(lnr, lnrxi, xi12)
# multiply by filter over x and theta*x
w12 *= f12
w12 *= xf
w12 *= theta[:, None]
# integrate along line of sight
w = np.trapz(w12, xf, axis=1)
# done, return projection
return w
[docs]def wtocl(theta, w, lmax):
assert np.ndim(theta) == 1, 'theta must be 1d array'
assert np.ndim(w) == 1, 'w must be 1d array'
assert len(theta) == len(w), 'length of theta and w must agree'
assert np.isscalar(lmax) and lmax > 0, 'lmax must be a positive number'
# force integer lmax
lmax = int(lmax)
# get evaluation points for Legendre fit
t = np.linspace(np.min(theta), np.max(theta), 2*lmax)
x = np.cos(t)
np.log(t, out=t)
# interpolate function values in log space
spl = splrep(np.log(theta), w)
f = splev(t, spl, der=0, ext=0)
# fit Legendre polynomial to interpolated function
c = np.polynomial.legendre.legfit(x, f, 2*lmax-1, rcond=0, full=False)
# use only coefficients up to lmax
c = c[:lmax+1]
# ell values
ell = np.arange(lmax+1)
# scale to obtain Cls
c /= 2*ell+1
c *= FOUR_PI
# done, return ls and Cls
return ell, c
[docs]def cltow(cl, theta):
assert np.ndim(cl) == 1, 'cl must be 1d array'
assert np.ndim(theta) == 1, 'theta must be 1d array'
# get evaluation points for Legendre series
x = np.cos(theta)
# ell numbers
ell = np.arange(len(cl))
# series coefficients from Cls
c = (2*ell+1)/FOUR_PI*cl
# evaluate Legendre polynomial
w = np.polynomial.legendre.legval(x, c)
# done, return correlation function
return w