__all__ = ("Pk2D", "parse_pk2d", "parse_pk",)
import warnings
import numpy as np
from . import (
CCLObject, DEFAULT_POWER_SPECTRUM, UnlockInstance, check, get_pk_spline_a,
get_pk_spline_lk, lib, unlock_instance)
from . import CCLWarning, CCLError
from .pyutils import _get_spline1d_arrays, _get_spline2d_arrays
[docs]class Pk2D(CCLObject):
"""A power spectrum class holding the information needed to reconstruct an
arbitrary function of wavenumber and scale factor.
.. note::
The ``Pk2D`` class is a wrapper around CCL's bicubic interpolators.
Addition, subtraction, multiplication, division returning new objects
or in-place is supported between ``Pk2D`` objects and other ``Pk2D``
objects, integers, or floats. When the second object is also of type
``Pk2D``, the a- and k-range changes to the most restrictive range.
Exponentiation is also supported for integers and floats.
.. note::
The power spectrum can be evaluated by directly calling the instance
``pk(k, a)``. This is vectorized in both ``k`` and ``a``.
Args:
a_arr (`array`):
An array holding values of the scale factor
lk_arr (`array`):
An array holding values of the natural logarithm of the wavenumber
(in :math:`\\mathrm{Mpc}^{-1}`).
pk_arr (`array`) :
A 2-D array of shape ``(na, nk)``, of the values of the power spectrum
at ``a_arr`` and ``lk_arr``. Input array could be flattened, provided
that its size is ``nk*na``. The array can hold the values of the
natural logarithm of the power spectrum, depending on the value of
``is_logp``. Users must ensure that the power spectrum is sufficiently
well sampled (i.e. the resolution of `a_arr` and `lk_arr` is high enough
to sample the main features in the power spectrum). CCL uses bicubic
interpolation to evaluate the power spectrum at any intermediate point
in k and a.
extrap_order_lok (:obj:`int`): ``{0, 1, 2}``.
Extrapolation order to be used on k-values below the minimum
the splines. Note that extrapolation is either in :math:`\\log(P(k))`
or in :math:`P(k)`, depending on the value of ``is_logp``.
extrap_order_hik (:obj:`int`) : ``{0, 1, 2}``.
Extrapolation order to be used on k-values above the maximum of the
splines. Note that extrapolation is either in :math:`\\log(P(k))` or
in :math:`P(k)`, depending on the value of ``is_logp``.
is_logp (:obj:`bool`):
If True, ``pkarr`` holds the natural logarithm of the
power spectrum. Otherwise, the true value of the power spectrum is
expected. If ``is_logp`` is ``True``, arrays are interpolated in
log-space.
.. automethod:: __call__
""" # noqa E501
from ._core.repr_ import build_string_Pk2D as __repr__
def __init__(self, *, a_arr=None, lk_arr=None, pk_arr=None,
is_logp=True, extrap_order_lok=1, extrap_order_hik=2):
# Make sure input makes sense
if (a_arr is None) or (lk_arr is None) or (pk_arr is None):
raise ValueError("If you do not provide a function, "
"you must provide arrays")
# Check that `a` is a monotonically increasing array.
if not (np.diff(a_arr) > 0).all():
raise ValueError("Input scale factor array in `a_arr` is not "
"monotonically increasing.")
pkflat = pk_arr.flatten()
# Check dimensions make sense
if len(pkflat) != len(a_arr)*len(lk_arr):
raise ValueError("Size of input arrays is inconsistent")
status = 0
self.psp, status = lib.set_pk2d_new_from_arrays(lk_arr, a_arr, pkflat,
int(extrap_order_lok),
int(extrap_order_hik),
int(is_logp), status)
check(status)
[docs] @classmethod
def from_function(cls, pkfunc, *, is_logp=True,
spline_params=None,
extrap_order_lok=1, extrap_order_hik=2):
""" Generates a `Pk2D` object from a function that calculates a power
spectrum.
Args:
pkfunc (:obj:`callable`):
Function with signature ``f(k, a)`` which takes vectorized
input in ``k`` (wavenumber in :math:`\\mathrm{Mpc}^{-1}`)
and a scale factor`a``, and returns the value of the
corresponding quantity. ``pkfunc`` will be sampled at the
values of ``k`` and ``a`` used internally by CCL to store the
linear and non-linear power spectra.
spline_params (:obj:`~pyccl._core.parameters.parameters_base.SplineParameters`):
optional spline parameters. Used to determine sampling rates
in scale factor and wavenumber. Custom parameters can be passed
via the :class:`~pyccl.cosmology.Cosmology` object with
``cosmo.cosmo.spline_params`` (C API), or with an instance of
``ccl.parameters.SplineParameters`` (Python API). If ``None``, it
defaults to the global accuracy parameters in CCL at the moment
this function is called.
Returns:
:class:`~pyccl.pk2d.Pk2D`. Power spectrum object.
""" # noqa E501
if spline_params is None:
from . import spline_params
# Set k and a sampling from CCL parameters
a_arr = get_pk_spline_a(spline_params=spline_params)
lk_arr = get_pk_spline_lk(spline_params=spline_params)
# Compute power spectrum on 2D grid
pk_arr = np.array([pkfunc(k=np.exp(lk_arr), a=a) for a in a_arr])
return cls(a_arr=a_arr, lk_arr=lk_arr, pk_arr=pk_arr,
is_logp=is_logp, extrap_order_lok=extrap_order_lok,
extrap_order_hik=extrap_order_hik)
def __eq__(self, other):
# Check object id.
if self is other:
return True
# Check the object class.
if type(self) is not type(other):
return False
# If the objects contain no data, return early.
if not (self or other):
return True
# Check extrapolation orders.
if not (self.extrap_order_lok == other.extrap_order_lok
and self.extrap_order_hik == other.extrap_order_hik):
return False
# Check the individual splines.
a1, lk1, pk1 = self.get_spline_arrays()
a2, lk2, pk2 = other.get_spline_arrays()
return ((a1 == a2).all() and (lk1 == lk2).all()
and np.array_equal(pk1, pk2))
def __hash__(self):
return hash(repr(self))
@property
def has_psp(self):
return 'psp' in vars(self)
@property
def extrap_order_lok(self):
return self.psp.extrap_order_lok if self else None
@property
def extrap_order_hik(self):
return self.psp.extrap_order_hik if self else None
[docs] @classmethod
def from_model(cls, cosmo, model):
""":class:`Pk2D` constructor returning the power spectrum
associated with a given numerical model.
Arguments:
cosmo (:class:`~pyccl.cosmology.Cosmology`): A Cosmology object.
model (:obj:`str`): Model to use. These models allowed:
- ``'bbks'`` (`Bardeen et al. 1986 <https://ui.adsabs.harvard.edu/abs/1986ApJ...304...15B/abstract>`_).
- ``'eisenstein_hu'`` (`Eisenstein & Hu 1997 <https://arxiv.org/abs/astro-ph/9709112>`_).
- ``'eisenstein_hu_nowiggles'`` (`Eisenstein & Hu 1997 <https://arxiv.org/abs/astro-ph/9709112>`_, no-wiggles version).
Returns:
:class:`~pyccl.pk2d.Pk2D`. The power spectrum of the input model.
""" # noqa E501
pk2d = Pk2D.__new__(cls)
status = 0
if model == 'bbks':
cosmo.compute_growth()
ret = lib.compute_linpower_bbks(cosmo.cosmo, status)
elif model == 'eisenstein_hu':
cosmo.compute_growth()
ret = lib.compute_linpower_eh(cosmo.cosmo, 1, status)
elif model == 'eisenstein_hu_nowiggles':
cosmo.compute_growth()
ret = lib.compute_linpower_eh(cosmo.cosmo, 0, status)
else:
raise ValueError(f"Invalid model {model}.")
if np.ndim(ret) == 0:
status = ret
else:
with UnlockInstance(pk2d):
pk2d.psp, status = ret
check(status, cosmo)
return pk2d
[docs] def apply_halofit(self, cosmo):
"""Apply the "HALOFIT" transformation of
`Takahashi et al. 2012 <https://arxiv.org/abs/1208.2701>`_ on the linear
power spectrum represented by this :class:`Pk2D` object, and return the
result as a :class:`Pk2D` object.
Args:
cosmo (:class:`~pyccl.cosmology.Cosmology`): a Cosmology object.
Return:
:class:`Pk2D` object containing the non-linear power spectrum after
applying HALOFIT.
""" # noqa 501
if cosmo["wa"] != 0:
# HALOFIT translates (w0, wa) to a w0_eff. This requires computing
# the comoving distance to the CMB, which requires the background
# splines being sampled to sufficiently high redshifts.
cosmo.compute_distances()
_, a = _get_spline1d_arrays(cosmo.cosmo.data.achi)
if min(a) > 1/(1 + 3000):
raise CCLError("Comoving distance spline does not cover "
"sufficiently high redshifts for HALOFIT. "
"HALOFIT translates (w0, wa) to a w0_eff. This "
"requires computing the comoving distance to "
"the CMB, which requires the background "
"splines being sampled to sufficiently high "
"redshifts. If using the calculator mode, "
"check the support of the background data.")
pk2d = Pk2D.__new__(Pk2D)
status = 0
ret = lib.apply_halofit(cosmo.cosmo, self.psp, status)
if np.ndim(ret) == 0:
status = ret
else:
with UnlockInstance(pk2d):
pk2d.psp, status = ret
check(status, cosmo)
return pk2d
[docs] def __call__(self, k, a, cosmo=None, *, derivative=False):
"""Evaluate the power spectrum or its logarithmic derivative at
a single value of the scale factor.
Arguments
---------
k : :obj:`float` or `array`
Wavenumber value(s) in units of :math:`{\\ rm Mpc}^{-1}`.
a : :obj:`float` or `array`
Value of the scale factor
cosmo : :class:`~pyccl.cosmology.Cosmology`
Cosmology object. Used to evaluate the power spectrum outside
of the interpolation range in ``a``, thorugh the linear growth
factor. If ``cosmo`` is ``None``, attempting to evaluate the power
spectrum outside of the interpolation range will raise an error.
derivative : :obj:`bool`
If ``False``, evaluate the power spectrum. If ``True``, evaluate
the logarithmic derivative of the power spectrum,
:math:`d\\log P(k)/d\\log k`.
Returns
-------
P(k, a) : (:obj:`float` or `array`)
Value(s) of the power spectrum. or its derivative.
"""
# determine if logarithmic derivative is needed
if not derivative:
eval_func = lib.pk2d_eval_multi
else:
eval_func = lib.pk2d_der_eval_multi
# handle scale factor extrapolation
if cosmo is None:
cosmo = self.__call__._cosmo
self.psp.extrap_linear_growth = 404 # flag no extrapolation
else:
cosmo.compute_growth() # growth factors for extrapolation
self.psp.extrap_linear_growth = 401 # flag extrapolation
a_use = np.atleast_1d(a).astype(float)
k_use = np.atleast_1d(k).astype(float)
lk_use = np.log(k_use)
status = 0
out = np.zeros([len(a_use), len(k_use)])
for ia, aa in enumerate(a_use):
f, status = eval_func(self.psp, lk_use, aa,
cosmo.cosmo, k_use.size, status)
# Catch scale factor extrapolation bounds error.
if status == lib.CCL_ERROR_SPLINE_EV:
raise ValueError(
"Pk2D evaluation scale factor is outside of the "
"interpolation range. To extrapolate, pass a Cosmology.")
check(status, cosmo)
out[ia] = f
if np.ndim(k) == 0:
out = np.squeeze(out, axis=-1)
if np.ndim(a) == 0:
out = np.squeeze(out, axis=0)
return out
# Save a dummy cosmology as an attribute of the `__call__` method
# so we don't have to initialize one every time no `cosmo` is passed.
# This is gentle with memory too, as `free` does not work for an empty
# cosmology.
__call__._cosmo = type("Dummy", (object,), {"cosmo": lib.cosmology()})()
[docs] def copy(self):
"""Return a copy of this Pk2D object."""
if not self:
return Pk2D.__new__(Pk2D)
return self + 0
[docs] def get_spline_arrays(self):
"""Get the spline data arrays internally stored by this object to
interpolate the power spectrum.
Returns:
Tuple containing
- a_arr: Array of scale factors.
- lk_arr: Array of natural logarithm of wavenumber k.
- pk_arr: Array of the power spectrum :math:`P(k, a)`. The shape
is ``(a_arr.size, lk_arr.size)``.
"""
if not self:
raise ValueError("Pk2D object does not have data.")
a_arr, lk_arr, pk_arr = _get_spline2d_arrays(self.psp.fka)
if self.psp.is_log:
pk_arr = np.exp(pk_arr)
return a_arr, lk_arr, pk_arr
def __del__(self):
"""Free memory associated with this Pk2D structure."""
if self:
lib.f2d_t_free(self.psp)
def __bool__(self):
return self.has_psp
def __contains__(self, other):
if not (self.psp.lkmin <= other.psp.lkmin
and self.psp.lkmax >= other.psp.lkmax
and self.psp.amin <= other.psp.amin
and self.psp.amax >= other.psp.amax):
return False
return True
def _get_binary_operator_arrays(self, other):
if not (self and other):
raise ValueError("Pk2D object does not have data.")
if self not in other:
raise ValueError(
"The 2nd operand has its data defined over a smaller range "
"than the 1st operand. To avoid extrapolation, this operation "
"is forbidden. If you want to operate on the smaller support, "
"try swapping the operands.")
a_arr_a, lk_arr_a, pk_arr_a = self.get_spline_arrays()
a_arr_b, lk_arr_b, pk_arr_b = other.get_spline_arrays()
if not (a_arr_a.size == a_arr_b.size
and lk_arr_a.size == lk_arr_b.size
and np.allclose(a_arr_a, a_arr_b)
and np.allclose(lk_arr_a, lk_arr_b)):
warnings.warn(
"Operands defined over different ranges. "
"The result will be interpolated and clipped to "
f"{self.psp.lkmin} <= log k <= {self.psp.lkmax} and "
f"{self.psp.amin} <= a <= {self.psp.amax}.", CCLWarning)
pk_arr_b = other(np.exp(lk_arr_a), a_arr_a)
return a_arr_a, lk_arr_a, pk_arr_a, pk_arr_b
def __add__(self, other):
"""Adds two Pk2D instances.
The a and k ranges of the 2nd operand need to be the same or smaller
than the 1st operand.
The returned Pk2D object uses the same a and k arrays as the first
operand.
"""
if isinstance(other, (float, int)):
a_arr_a, lk_arr_a, pk_arr_a = self.get_spline_arrays()
pk_arr_new = pk_arr_a + other
elif isinstance(other, Pk2D):
a_arr_a, lk_arr_a, pk_arr_a, pk_arr_b = \
self._get_binary_operator_arrays(other)
pk_arr_new = pk_arr_a + pk_arr_b
else:
raise TypeError("Addition of Pk2D is only defined for "
"floats, ints, and Pk2D objects.")
logp = np.all(pk_arr_new > 0)
if logp:
pk_arr_new = np.log(pk_arr_new)
new = Pk2D(a_arr=a_arr_a, lk_arr=lk_arr_a, pk_arr=pk_arr_new,
is_logp=logp,
extrap_order_lok=self.extrap_order_lok,
extrap_order_hik=self.extrap_order_hik)
return new
def __mul__(self, other):
"""Multiply two Pk2D instances.
The a and k ranges of the 2nd operand need to be the same or smaller
than the 1st operand.
The returned Pk2D object uses the same a and k arrays as the first
operand.
"""
if isinstance(other, (float, int)):
a_arr_a, lk_arr_a, pk_arr_a = self.get_spline_arrays()
pk_arr_new = other * pk_arr_a
elif isinstance(other, Pk2D):
a_arr_a, lk_arr_a, pk_arr_a, pk_arr_b = \
self._get_binary_operator_arrays(other)
pk_arr_new = pk_arr_a * pk_arr_b
else:
raise TypeError("Multiplication of Pk2D is only defined for "
"floats, ints, and Pk2D objects.")
logp = np.all(pk_arr_new > 0)
if logp:
pk_arr_new = np.log(pk_arr_new)
new = Pk2D(a_arr=a_arr_a, lk_arr=lk_arr_a, pk_arr=pk_arr_new,
is_logp=logp,
extrap_order_lok=self.extrap_order_lok,
extrap_order_hik=self.extrap_order_hik)
return new
def __pow__(self, exponent):
"""Take a Pk2D instance to a power.
"""
if not isinstance(exponent, (float, int)):
raise TypeError(
"Exponentiation of Pk2D is only defined for floats and ints.")
a_arr_a, lk_arr_a, pk_arr_a = self.get_spline_arrays()
if np.any(pk_arr_a < 0) and exponent % 1 != 0:
warnings.warn(
"Taking a non-positive Pk2D object to a non-integer "
"power may lead to unexpected results", CCLWarning)
pk_arr_new = pk_arr_a**exponent
logp = np.all(pk_arr_new > 0)
if logp:
pk_arr_new = np.log(pk_arr_new)
new = Pk2D(a_arr=a_arr_a, lk_arr=lk_arr_a, pk_arr=pk_arr_new,
is_logp=logp,
extrap_order_lok=self.extrap_order_lok,
extrap_order_hik=self.extrap_order_hik)
return new
def __sub__(self, other):
return self + (-1)*other
def __truediv__(self, other):
return self * other**(-1)
__radd__ = __add__
__rmul__ = __mul__
def __rsub__(self, other):
return other + (-1)*self
def __rtruediv__(self, other):
return other * self**(-1)
@unlock_instance
def __iadd__(self, other):
self = self + other
return self
@unlock_instance
def __imul__(self, other):
self = self * other
return self
@unlock_instance
def __isub__(self, other):
self = self - other
return self
@unlock_instance
def __itruediv__(self, other):
self = self / other
return self
@unlock_instance
def __ipow__(self, other):
self = self**other
return self
[docs]def parse_pk2d(cosmo, p_of_k_a=DEFAULT_POWER_SPECTRUM, *, is_linear=False):
""" Return the C-level `f2d` spline associated with a
:class:`Pk2D` object.
Args:
cosmo (:class:`~pyccl.cosmology.Cosmology`): A Cosmology object.
p_of_k_a (:class:`Pk2D` or :obj:`str`): if a
:class:`Pk2D` object, its `f2d` spline will be used. If
a string, the linear or non-linear power spectrum stored
by ``cosmo`` under this name will be used. Defaults to the
matter power spectrum stored in `cosmo`.
is_linear (:obj:`bool`): if ``True``, and if ``p_of_k_a`` is a
string or ``None``, the linear version of the corresponding
power spectrum will be used (otherwise it'll be the
non-linear version).
"""
if isinstance(p_of_k_a, Pk2D):
psp = p_of_k_a.psp
else:
if isinstance(p_of_k_a, str):
name = p_of_k_a
else:
raise ValueError("p_of_k_a must be a pyccl.Pk2D object or "
"a string")
if is_linear:
cosmo.compute_linear_power()
pk = cosmo.get_linear_power(name)
else:
cosmo.compute_nonlin_power()
pk = cosmo.get_nonlin_power(name)
psp = pk.psp
return psp
[docs]def parse_pk(cosmo, p_of_k_a=None):
"""Helper to retrieve the right :class:`Pk2D` object.
Args:
cosmo (:class:`~pyccl.cosmology.Cosmology`): a Cosmology object.
p_of_k_a (:class:`~pyccl.pk2d.Pk2D`, :obj:`str` or :obj:`None`):
3D Power spectrum to return. If a `Pk2D`, it is just returned.
If `None` or `"linear"`, the linear power spectrum stored in
``cosmo`` is returned. If `"nonlinear"`, the nonlinear matter
power spectrum stored in ``cosmo`` is returned.
Returns:
:class:`Pk2D` object corresponding to ``p_of_k_a``.
"""
if not (p_of_k_a is None or isinstance(p_of_k_a, (str, Pk2D))):
raise TypeError("p_of_k_a must be None, 'linear', 'nonlinear', Pk2D.")
if isinstance(p_of_k_a, Pk2D):
return p_of_k_a
elif p_of_k_a is None or p_of_k_a == "linear":
cosmo.compute_linear_power()
return cosmo.get_linear_power()
elif p_of_k_a == "nonlinear":
cosmo.compute_nonlin_power()
return cosmo.get_nonlin_power()