r"""
.. module:: foreground
The ``Foreground`` class calculates foreground spectra , using :math:`\ell` ranges,
array of frequencies,etc.
The inherited ``BandpowerForeground`` adds integration over bandpowers, using the
bandpass transmissions.
If one wants to use this class as standalone, the ``bands`` dictionary is filled
when initializing ``BandpowerForeground``.
The default values of the systematic parameters are set in the
``TTTEEE/TEEE/TT/EE/TE/etc.yaml`` files. They have to be named as
``cal/calT/calE/alpha`` + ``_`` + experiment_channel string (e.g.
``LAT_93/dr6_pa4_f150``).
The default values of the foreground parameters are set in the ``fg_TT/TE/EE.yaml`` files.
If you want to set different parameters settings, do that in the ``params`` block of
the ``yaml`` file you will use for running (see the
`examples/mflike_example.yaml <https://github.com/simonsobs/LAT_MFLike/blob/master/examples/mflike_example.yaml>`_).
.. note::
Note that when you set different foregrounds/systematics parameters in the ``params`` block
of your running ``yaml``, Cobaya will use the new parameters settings you indicated. If you
don't change a parameter setting in the ``params`` block, Cobaya will use its default stored
in the files mentioned above. Note that, in the ``.updated.yaml`` file generated by Cobaya,
the ``mflike.BandpowerForeground`` theory block will have a ``params`` block with all the
foreground parameters default settings which may be different from what you are defining in
the general ``params`` block.
The bandpass shifts are applied within the ``_bandpass_construction`` function. There are
two possibilities:
* reading the passband :math:`\tau(\nu)` stored in a sacc file
(which is the default now)
* building the passbands :math:`\tau(\nu)`, either as Dirac delta or as top-hat
For the first option, it is necessary to leave the ``top_hat_band`` key empty under
``mflike.BandpowerForeground``:
.. code-block:: yaml
theory:
mflike.BandpowerForeground:
top_hat_band: null
For the second option, the ``top_hat_band`` dictionary under the
``mflike.BandpowerForeground`` theory block has to be filled with two keys:
* ``nsteps``: setting the number of frequencies used in the band integration
(either 1 for a Dirac delta or > 1)
* ``bandwidth``: setting the relative width :math:`\delta` of the band with respect to
the central frequency, such that the frequency extremes are
:math:`\nu_{\rm{low/high}} = \nu_{\rm{center}}(1 \mp \delta/2) + \Delta^{\nu}_{\rm band}`
(with :math:`\Delta^{\nu}_{\rm band}` being the possible bandpass shift).
``bandwidth`` has to be 0 if ``nstep`` = 1, > 0 otherwise.
``bandwidth`` can be a list if you want a different width for each band
e.g. ``bandwidth: [0.3,0.2,0.3]`` for 3 bands.
The effective frequencies, used as central frequencies to build the bandpasses, are read from the
``bands`` dictionary as before. To build a Dirac delta, use:
.. code-block:: yaml
theory:
mflike.BandpowerForeground:
top_hat_band:
nsteps: 1
bandwidth: 0
If we don't want to include the beam chromaticity effect, just leave the ``beam_profile`` key empty:
.. code-block:: yaml
theory:
mflike.BandpowerForeground:
beam_profile:
If we want to consider it, we have several options on how to compute/read the beam profiles.
Notice that we need arrays(freqs, ells+2) (computed from :math:`\ell = 0`), since
we want a beam window function for each freq in the bandpasses. We should use this
block under ``mflike.BandpowerForeground``:
.. code-block:: yaml
theory:
mflike.BandpowerForeground:
beam_profile:
beam_from_file: filename/False/null
There are two options:
* reading the beams from the sacc file (``beam_from_file: False/null``).
The beams have to be stored in the ``sacc.tracers[exp].beam`` tracer
* reading the beams from an external yaml file (``beam_from_file: filename``).
``filename`` has to be the absolute path for the file, with the ``.yaml/.yml`` extension.
The yaml file has to be a dictionary ``{"{exp}_s0": {"nu": nu, "beams": array(freqs, ells+2)},
"{exp}_s2": {"nu": nu, "beams": array(freqs, ells+2)},...}``
Once computed/read, the beam profiles are saved in
.. code-block:: python
self.beams = {"{exp}_s0": {"nu": nu, "beams": array(freqs, ells+2)}, "{exp}_s2":
{"nu": nu, "beams": array(freqs, ells+2)},...}.
The beams are appropriately normalized, then we select the :math:`\ell` range used in
the rest of the code.
In case of bandpass shifts :math:`\Delta \nu \neq 0`, you can decide whether to
propagate the bandpass shift effect to :math:`b_{\ell}(\nu)` or not. If you want to
leave :math:`b_{\ell}(\nu)` unchanged even if :math:`\Delta \nu \neq 0` (assuming this
modification is a second order effect), you just need to leave the ``beam_profile``
block as it is, i.e. ``Bandpass_shifted_beams: null``.
In case you want to propagate this effect, the chromatic beams are derived as: :
math:`b^{T/P}_{\ell}(\nu + \Delta \nu) = b^{T/P}_{\ell (\nu / \nu_0)^{-\alpha / 2}}
(\nu_0 + \Delta \nu)`, starting from a monochromatic beam :math:`b^{T/P}_{\ell}(\nu_0 +
\Delta \nu)`. This monochromatic beam is derived from measurements of the planet beam
and assuming a certain bandpass shift :math:`\Delta \nu`. So we need a dictionary of
these :math:`b^{T/P}_{\ell}(\nu_0 + \Delta \nu)` for the several values of
:math:`\Delta \nu` that could be sampled in the MCMC. To apply the scaling :math:`b^{T/
P}_{\ell (\nu / \nu_0)^{-\alpha / 2}}(\nu_0 + \Delta \nu)` we also need :math:`\nu_0`
and :math:`\alpha` for each experiment/array.
The array of frequencies :math:`\nu` for each experiment/array is derived from the
corresponding bandpass file.
This means that, to propagate the bandpass shifts to :math:`b_{\ell}(\nu)`, we need to
provide a yaml file under the key ``Bandpass_shifted_beams``:
.. code-block:: yaml
beam_profile:
Bandpass_shifted_beams: bandpass_shifted_beams
beam_from_file: filename/False/null
where the ``bandpass_shifted_beams.yaml`` file is structured as:
.. code-block:: yaml
LAT_93_s0:
beams: {..., '-2.0': b_ell(nu_0 -2),
'-1.0': b_ell(nu_0 -1),
...
'5.0': b_ell(nu_0 + 5),
...}
nu_0: ...
alpha: ...
LAT_93_s2:
beams: {'-10.0': b_ell(nu_0 - 10), ...}
nu_0: ...
alpha: ...
LAT_145_s0:
beams: ...
nu_0: ...
alpha: ...
...
``bandpass_shifted_beams`` has to be an absolute path, with the ``.yaml/.yml``
extension, which is added by the code.
It is important the keys of ``beam_profile["Bandpass_shifted_beams"]["{exp}_s0/2"]
["beams"]`` are strings of floats representing the value of :math:`\Delta \nu` (if they
are strings of int the code to read the associated beams would not work).
"""
import os
from itertools import product
import numpy as np
from cobaya.log import LoggedError
from cobaya.theory import Theory
from cobaya.typing import empty_dict
from cobaya.yaml import yaml_load, yaml_load_file
from scipy import constants
try:
from numpy import trapezoid
except ImportError:
from numpy import trapz as trapezoid
# Converts from cmb temperature to differential source intensity
# (see eq. 8 of https://arxiv.org/abs/1303.5070).
# The bandpass transmission needs to be divided by
# nu^2 if measured with respect to a RJ source.
# This factor is already included here.
def _cmb2bb(nu: np.ndarray) -> np.ndarray:
r"""
Computes the conversion factor :math:`\frac{\partial B_{\nu}}{\partial T}`
from CMB thermodynamic units to differential source intensity.
Passbands measured with respect to a RJ source have to be divided by a
:math:`\nu^2` factor.
Numerical constants are not included, which is not a problem when using this
conversion both at numerator and denominator.
:param nu: frequency array
:return: the array :math:`\frac{\partial B_{\nu}}{\partial T}`. See note above.
"""
T_CMB = 2.72548
x = nu * constants.h * 1e9 / constants.k / T_CMB
return np.exp(x) * (nu * x / np.expm1(x)) ** 2
[docs]
class Foreground(Theory):
normalisation: dict
components: dict
experiments: list[str]
lmin: int
lmax: int
requested_cls: list[str]
bandint_freqs: list[float]
ells: np.ndarray
[docs]
@classmethod
def get_modified_defaults(cls, defaults: dict, input_options: dict = empty_dict) -> dict:
"""
Adds the appropriate foreground parameters based on the requested_cls
"""
requested_cls = input_options.get("requested_cls") or defaults.get(
"requested_cls", ["tt", "te", "ee"]
)
for spec in requested_cls:
defaults["params"] |= yaml_load(cls.get_text_file_content("fg_%s.yaml" % spec.upper()))
return defaults
# Initializes the foreground model. It sets the SED and reads the templates
def initialize(self):
"""
Initializes the foreground models from ``fgspectra``. Sets the SED
of kSZ, tSZ, dust, radio, CIB Poisson and clustered,
tSZxCIB, and reads the templates for CIB and tSZxCIB.
"""
from fgspectra import cross as fgc
from fgspectra import frequency as fgf
from fgspectra import power as fgp
self.fg_component_list = {s: self.components[s] for s in self.requested_cls}
self.bandint_freqs_T = self.bandint_freqs
self.bandint_freqs_P = self.bandint_freqs
template_path = os.path.join(os.path.dirname(os.path.abspath(fgp.__file__)), "data")
# set pivot freq and multipole
self.fg_nu_0 = self.normalisation["nu_0"]
self.fg_ell_0 = self.normalisation["ell_0"]
if "tt" in self.requested_cls:
tsz_file = os.path.join(template_path, "cl_tsz_150_bat.dat")
cibc_file = os.path.join(template_path, "cl_cib_Choi2020.dat")
cibxtsz_file = os.path.join(template_path, "cl_sz_x_cib.dat")
# We don't seem to be using this
# cirrus = fgc.FactorizedCrossSpectrum(fgf.PowerLaw(), fgp.PowerLaw())
self.ksz = fgc.FactorizedCrossSpectrum(fgf.ConstantSED(), fgp.kSZ_bat())
self.cibp = fgc.FactorizedCrossSpectrum(fgf.ModifiedBlackBody(), fgp.PowerLaw())
self.tsz = fgc.FactorizedCrossSpectrum(
fgf.ThermalSZ(), fgp.PowerLawRescaledTemplate(tsz_file)
)
self.cibc = fgc.FactorizedCrossSpectrum(fgf.CIB(), fgp.PowerSpectrumFromFile(cibc_file))
tsz_cib_sed = fgf.Join(fgf.ThermalSZ(), fgf.CIB())
tsz_cib_power_spectra = [
fgp.PowerLawRescaledTemplate(tsz_file),
fgp.PowerSpectrumFromFile(cibc_file),
fgp.PowerSpectrumFromFile(cibxtsz_file),
]
tsz_cib_cl = fgp.PowerSpectraAndCovariance(*tsz_cib_power_spectra)
self.tSZ_and_CIB = fgc.CorrelatedFactorizedCrossSpectrum(tsz_cib_sed, tsz_cib_cl)
if "te" in self.requested_cls:
self.radioTE = fgc.FactorizedCrossSpectrumTE(
fgf.PowerLaw(), fgf.PowerLaw(), fgp.PowerLaw()
)
self.dustTE = fgc.FactorizedCrossSpectrumTE(
fgf.ModifiedBlackBody(), fgf.ModifiedBlackBody(), fgp.PowerLaw()
)
self.radio = fgc.FactorizedCrossSpectrum(fgf.PowerLaw(), fgp.PowerLaw())
self.dust = fgc.FactorizedCrossSpectrum(fgf.ModifiedBlackBody(), fgp.PowerLaw())
if self.ells is None:
self.ells = np.arange(self.lmin, self.lmax + 1)
# Gets the actual power spectrum of foregrounds given the passed parameters
[docs]
def _get_foreground_model_arrays(self, fg_params: dict, ell: np.ndarray | None = None) -> dict:
r"""
Gets the foreground power spectra for each component computed by ``fgspectra``.
Integration over frequency is performed using bandint_freqs.
:param fg_params: parameters of the foreground components
:param ell: ell range. If ``None`` the default range
set in ``mflike.ells`` is used
:return: the foreground dictionary of arrays
"""
# if ell = None, it uses self.ells, otherwise the ell array provided
# useful to make tests at different l_max than the data
if ell is None:
ell = self.ells
ell_0 = self.fg_ell_0
nu_0 = self.fg_nu_0
# Normalisation of radio sources
ell_clp = ell * (ell + 1.0)
ell_0clp = ell_0 * (ell_0 + 1.0)
model = {}
if "tt" in self.requested_cls:
model["tt", "kSZ"] = fg_params["a_kSZ"] * self.ksz(
{"nu": self.bandint_freqs_T}, {"ell": ell, "ell_0": ell_0}
)
model["tt", "cibp"] = fg_params["a_p"] * self.cibp(
{
"nu": self.bandint_freqs_T,
"nu_0": nu_0,
"temp": fg_params["T_d"],
"beta": fg_params["beta_p"],
},
{"ell": ell_clp, "ell_0": ell_0clp, "alpha": fg_params["alpha_p"]},
)
model["tt", "radio"] = fg_params["a_s"] * self.radio(
{"nu": self.bandint_freqs_T, "nu_0": nu_0, "beta": fg_params["beta_s"]},
{"ell": ell_clp, "ell_0": ell_0clp, "alpha": fg_params["alpha_s"]},
)
model["tt", "tSZ"] = fg_params["a_tSZ"] * self.tsz(
{"nu": self.bandint_freqs_T, "nu_0": nu_0},
{"ell": ell, "ell_0": ell_0, "alpha": fg_params["alpha_tSZ"]},
)
model["tt", "cibc"] = fg_params["a_c"] * self.cibc(
{
"nu": self.bandint_freqs_T,
"nu_0": nu_0,
"temp": fg_params["T_d"],
"beta": fg_params["beta_c"],
},
{"ell": ell, "ell_0": ell_0},
)
model["tt", "dust"] = fg_params["a_gtt"] * self.dust(
{
"nu": self.bandint_freqs_T,
"nu_0": nu_0,
"temp": fg_params["T_effd"],
"beta": fg_params["beta_d"],
},
{"ell": ell, "ell_0": 500.0, "alpha": fg_params["alpha_dT"]},
)
model["tt", "tSZ_and_CIB"] = self.tSZ_and_CIB(
{
"kwseq": (
{"nu": self.bandint_freqs_T, "nu_0": nu_0},
{
"nu": self.bandint_freqs_T,
"nu_0": nu_0,
"temp": fg_params["T_d"],
"beta": fg_params["beta_c"],
},
)
},
{
"kwseq": (
{
"ell": ell,
"ell_0": ell_0,
"amp": fg_params["a_tSZ"],
"alpha": fg_params["alpha_tSZ"],
},
{"ell": ell, "ell_0": ell_0, "amp": fg_params["a_c"]},
{
"ell": ell,
"ell_0": ell_0,
"amp": -fg_params["xi"]
* np.sqrt(fg_params["a_tSZ"] * fg_params["a_c"]),
},
)
},
)
if "ee" in self.requested_cls:
model["ee", "radio"] = fg_params["a_psee"] * self.radio(
{"nu": self.bandint_freqs_P, "nu_0": nu_0, "beta": fg_params["beta_s"]},
{"ell": ell_clp, "ell_0": ell_0clp, "alpha": fg_params["alpha_s"]},
)
model["ee", "dust"] = fg_params["a_gee"] * self.dust(
{
"nu": self.bandint_freqs_P,
"nu_0": nu_0,
"temp": fg_params["T_effd"],
"beta": fg_params["beta_d"],
},
{"ell": ell, "ell_0": 500.0, "alpha": fg_params["alpha_dE"]},
)
if "te" in self.requested_cls:
model["te", "radio"] = fg_params["a_pste"] * self.radioTE(
{"nu": self.bandint_freqs_T, "nu_0": nu_0, "beta": fg_params["beta_s"]},
{"nu": self.bandint_freqs_P, "nu_0": nu_0, "beta": fg_params["beta_s"]},
{"ell": ell_clp, "ell_0": ell_0clp, "alpha": fg_params["alpha_s"]},
)
model["te", "dust"] = fg_params["a_gte"] * self.dustTE(
{
"nu": self.bandint_freqs_T,
"nu_0": nu_0,
"temp": fg_params["T_effd"],
"beta": fg_params["beta_d"],
},
{
"nu": self.bandint_freqs_P,
"nu_0": nu_0,
"temp": fg_params["T_effd"],
"beta": fg_params["beta_d"],
},
{"ell": ell, "ell_0": 500.0, "alpha": fg_params["alpha_dE"]},
)
return model
[docs]
def get_foreground_model(
self, ell: np.ndarray | None = None, freqs_order: list[str] | None = None, **fg_params
) -> dict:
r"""
Gets the foreground power spectra for each component computed by ``fgspectra``.
Integration over frequency is performed using bandint_freqs.
This function is not used by Cobaya, but can be used to get the individual
foreground components and total as a dictionary when accessing the class
separately.
:param ell: ell range. If ``None`` the default range
set in ``ells`` is used
:param freqs_order: list of the effective frequencies for each channel
used to compute the foreground components. Useful when
this class is called outside of mflike, used in place of
``self.experiments``
:param fg_params: parameters of the foreground components
:return: the foreground dictionary
"""
if ell is None:
ell = self.ells
model = self._get_foreground_model_arrays(fg_params, ell=ell)
experiments = self.experiments if freqs_order is None else freqs_order
fg_dict = {}
for c1, exp1 in enumerate(experiments):
for c2, exp2 in enumerate(experiments):
for s in self.requested_cls:
sum_all = np.zeros(len(ell))
for comp in self.fg_component_list[s]:
term = model[s, comp][c1, c2]
if comp == "tSZ_and_CIB":
fg_dict[s, "tSZ", exp1, exp2] = model[s, "tSZ"][c1, c2]
fg_dict[s, "cibc", exp1, exp2] = model[s, "cibc"][c1, c2]
fg_dict[s, "tSZxCIB", exp1, exp2] = (
term - model[s, "tSZ"][c1, c2] - model[s, "cibc"][c1, c2]
)
else:
fg_dict[s, comp, exp1, exp2] = term
sum_all += term
fg_dict[s, "all", exp1, exp2] = sum_all
return fg_dict
[docs]
def calculate(self, state, want_derived=False, **params_values_dict):
"""
Fills the ``state`` dictionary of the ``Foreground`` Theory class
with the foreground spectra, computed using the bandpass
transmissions the sampled foreground parameters.
:param state: ``state`` dictionary to be filled with computed foreground
spectra
:param want_derived: if derived wanted (none here)
:param params_values_dict: dictionary of parameters from the sampler
"""
state["fg_totals"] = self.get_foreground_model_totals(**params_values_dict)
[docs]
def get_foreground_model_totals(self, requested_cl=(), **params_values_dict):
"""
Get total foregrounds for each cl type and frequency channel.
:param requested_cl: optional list of cl types to compute (tt, ee, te)
:param params_values_dict: foreground parameters
:return: list of arrays for each requested_cl
"""
# get total foregrounds; model is dictionary of arrays for each frequency combo
model = self._get_foreground_model_arrays(params_values_dict)
return [
np.sum([model[s, comp] for comp in self.fg_component_list[s]], axis=0)
for s in (requested_cl if requested_cl else self.requested_cls)
]
[docs]
def get_fg_totals(self) -> dict:
"""
Returns the ``state`` dictionary of foreground spectra, when used with Cobaya.
Should only be called after the model is calculated by Cobaya.
"""
return self.current_state["fg_totals"]
[docs]
def must_provide(self, **requirements):
if (req := requirements.get("fg_totals")) is not None:
if set(self.requested_cls) != set(req.get("requested_cls", ["tt", "te", "ee"])):
raise ValueError("requested_cls must be the same in Foreground and MFLike")
self.ells = req.get("ells", self.ells)
self.experiments = req.get("experiments", self.experiments)
[docs]
class BandpowerForeground(Foreground):
# foregrounds integrated over bandpass windows
top_hat_band: dict | None = None
bands: dict | None = None
beams: dict | None = None
beam_profile: dict | None = None
def initialize(self):
super().initialize()
if self.bands is None:
self.bands = {
f"{exp}_{spin}": {"nu": [self.bandint_freqs_T[iexp]], "bandpass": [1.0]}
for spin in ["s0", "s2"]
for iexp, exp in enumerate(self.experiments)
}
self._initialized = False
self.init_bandpowers()
def init_bandpowers(self):
self.use_top_hat_band = bool(self.top_hat_band)
# Parameters for band integration
if self.use_top_hat_band:
self.bandint_nsteps = self.top_hat_band["nsteps"]
self.bandint_width = self.top_hat_band["bandwidth"]
# checks on the bandpass input params, to be done only at the initialization
if not hasattr(self.bandint_width, "__len__"):
self.bandint_width = np.full_like(self.experiments, self.bandint_width, dtype=float)
if self.bandint_nsteps > 1 and np.any(np.array(self.bandint_width) == 0):
raise LoggedError(
self.log, "One band has width = 0, set a positive width and run again"
)
# initialize beam params after mflike initialization
if not self._initialized:
self.use_beam_profile = False
else:
self.use_beam_profile = bool(self.beam_profile)
if self.use_beam_profile:
# reading the beams either from an external file or
# from the sacc file
self.beam_file = self.beam_profile.get("beam_from_file")
self._init_beam_from_file()
# reading the possible dictionary with beam profiles associated to bandpass shifts
# this has to be present if we want to propagate bandpass shifts to the chromatic beams
# otherwise they are left unchanged
self.bandsh_beams_path = self.beam_profile.get("Bandpass_shifted_beams")
if self.bandsh_beams_path:
self.bandpass_shifted_beams = yaml_load_file(file_name=self.bandsh_beams_path)
self._bandint_shift_params = [f"bandint_shift_{f}" for f in self.experiments]
# default bandpass when shift is 0
shift_params = dict.fromkeys(self._bandint_shift_params, 0.0)
self._bandpass_construction(**shift_params)
[docs]
def must_provide(self, **requirements):
# fg_dict is required by mflike
# and requires some params to be computed
# Assign those as requested or us defaults
# otherwise use default values
# Foreground requires bandint_freqs from BandPass
# TODO: not clear that changing number of freqs actually works (for shift parameters)
super().must_provide(**requirements)
if (req := requirements.get("fg_totals")) is not None:
self.bands = req.get("bands", self.bands)
self.beams = req.get("beams", self.beams)
self.top_hat_band = req.get("top_hat_band", self.top_hat_band)
self.init_bandpowers()
[docs]
def get_can_support_params(self) -> list[str]:
return self._bandint_shift_params
[docs]
def _get_foreground_model_arrays(self, fg_params: dict, ell: np.ndarray | None = None) -> dict:
r"""
Gets the foreground power spectra for each component computed by ``fgspectra``.
The computation assumes the bandpass transmissions computed in ``_bandpass_construction``
and integration in frequency is performed if the passbands are not Dirac delta.
:param fg_params: parameters of the foreground components
:param ell: ell range. If ``None`` the default range
set in ``self.ells`` is used
:return: the foreground dictionary of arrays
"""
# compute bandpasses at each step only if bandint_shift params are not null
# and bandint_freqs has been computed at least once
if any(fg_params.get(k) for k in self._bandint_shift_params):
self._bandpass_construction(**fg_params)
return super()._get_foreground_model_arrays(fg_params, ell=ell)
# Takes care of the bandpass construction. It returns a list of nu-transmittance
# for each frequency or an array with the effective freqs.
# bandpasses saved in the sacc file have to be divided by nu^2
# if measured with respect to a RJ source.
# This factor is already included in the _cmb2bb function
[docs]
def _bandpass_construction(self, _initialize: bool = False, **params):
r"""
Builds the bandpass transmission with or without beam.
When chromatic beam is not considered, we compute:
:math:`\frac{\frac{\partial B_{\nu+\Delta \nu}}{\partial T}
\tau(\nu+\Delta \nu)}{\int d\nu
\frac{\partial B_{\nu+\Delta \nu}}{\partial T} \tau(\nu+\Delta \nu)}`
using passbands :math:`\tau(\nu)` (divided by :math:`\nu^2` if
measured with respect to a RJ source, not read from a txt
file) and bandpass shift :math:`\Delta \nu`. As a default,
:math:`\tau(\nu)` is read from the sacc file.
If ``use_top_hat_band``, :math:`\tau(\nu)` is built as a top-hat
with width ``bandint_width`` and number of samples ``nsteps``,
read from the ``MFLike.yaml``.
If ``nstep = 1`` and ``bandint_width = 0``, the passband is a Dirac delta
centered at :math:`\nu+\Delta \nu`.
When the chromatic beam is considered, we compute
:math:`r_{\ell}^T(\nu+\Delta \nu) = \frac{\frac{\partial B_{\nu+\Delta \nu}}{\partial T}
\tau(\nu+\Delta \nu) b^T_{\ell}(\nu)}
{\int d\nu
\frac{\partial B_{\nu+\Delta \nu}}{\partial T} \tau(\nu+\Delta \nu)
b^T_{\ell}(\nu)}`
for the temperature field, and a corresponding expression for the polarization field,
replacing the temperature beam with the polarization one
:math:`b^P_{\ell}(\nu)`. If we want to propagate the bandpass shifts to the beam,
we compute instead
:math:`r_{\ell}^T(\nu+\Delta \nu) = \frac{\frac{\partial B_{\nu+\Delta \nu}}{
\partial T} \tau(\nu+\Delta \nu) b^T_{\ell}(\nu + \Delta \nu)}
{\int d\nu \frac{\partial B_{\nu+\Delta \nu}}{\partial T} \tau(\nu+\Delta \nu)
b^T_{\ell}(\nu + \Delta \nu)}`.
:param \**params: dictionary of nuisance parameters
:return: the list of [nu, transmission] in the multifrequency case
or just an array of frequencies in the single frequency one.
We distinguish between T and pol transmission when a chromatic
beam is included
"""
data_are_monofreq = False
self.bandint_freqs_T = []
self.bandint_freqs_P = []
for iexp, (band_shift, exp) in enumerate(zip(self._bandint_shift_params, self.experiments)):
# Only temperature bandpass for the time being
bands = self.bands[f"{exp}_s0"]
shift = params.get(band_shift, 0.0)
nu_ghz, bp = np.asarray(bands["nu"]), np.asarray(bands["bandpass"])
# computing top-hat bandpass to make band integration
if self.use_top_hat_band:
# Compute central frequency given bandpass in the sacc file
fr = nu_ghz @ bp / bp.sum()
if self.bandint_nsteps > 1:
bandlow = fr * (1 - self.bandint_width[iexp] * 0.5)
bandhigh = fr * (1 + self.bandint_width[iexp] * 0.5)
# nubtrue = np.linspace(bandlow, bandhigh, self.bandint_nsteps, dtype=float)
nub = np.linspace(
bandlow + shift,
bandhigh + shift,
self.bandint_nsteps,
dtype=float,
)
if not self.use_beam_profile:
tranb = _cmb2bb(nub)
# normalization integral to be evaluated at the shifted freqs
# in order to have cmb component calibrated to 1
tranb_norm = trapezoid(_cmb2bb(nub), nub)
if "tt" in self.requested_cls or "te" in self.requested_cls:
self.bandint_freqs_T.append([nub, tranb / tranb_norm])
if "te" in self.requested_cls or "ee" in self.requested_cls:
self.bandint_freqs_P.append([nub, tranb / tranb_norm])
else:
if self.bandsh_beams_path:
blT, blP = self.return_beams(exp, nu_ghz, shift)
else:
# not propagating bandpass shifts to the chromatic beams
blT, blP = self.return_beams(exp, nu_ghz, 0.0)
if "tt" in self.requested_cls or "te" in self.requested_cls:
bpT = _cmb2bb(nub)[..., np.newaxis] * blT
self.bandint_freqs_T.append([nub, bpT / trapezoid(bpT, nub, axis=0)])
if "te" in self.requested_cls or "ee" in self.requested_cls:
bpP = _cmb2bb(nub)[..., np.newaxis] * blP
self.bandint_freqs_P.append([nub, bpP / trapezoid(bpP, nub, axis=0)])
# in case we don't want to do band integration, e.g. when
# we have multifreq bandpass in sacc file
if self.bandint_nsteps == 1:
nub = fr + shift
data_are_monofreq = True
if "tt" in self.requested_cls or "te" in self.requested_cls:
self.bandint_freqs_T.append(nub)
if "te" in self.requested_cls or "ee" in self.requested_cls:
self.bandint_freqs_P.append(nub)
# using the bandpass from sacc file
else:
nub = nu_ghz + shift
if len(bp) == 1:
# Monofrequency channel
data_are_monofreq = True
if "tt" in self.requested_cls or "te" in self.requested_cls:
self.bandint_freqs_T.append(nub[0])
if "te" in self.requested_cls or "ee" in self.requested_cls:
self.bandint_freqs_P.append(nub[0])
else:
if not self.use_beam_profile:
trans_norm = trapezoid(bp * _cmb2bb(nub), nub)
trans = bp / trans_norm * _cmb2bb(nub)
if "tt" in self.requested_cls or "te" in self.requested_cls:
self.bandint_freqs_T.append([nub, trans])
if "te" in self.requested_cls or "ee" in self.requested_cls:
self.bandint_freqs_P.append([nub, trans])
else:
if self.bandsh_beams_path:
blT, blP = self.return_beams(exp, nu_ghz, shift)
else:
# not propagating bandpass shifts to the chromatic beams
blT, blP = self.return_beams(exp, nu_ghz, 0.0)
if "tt" in self.requested_cls or "te" in self.requested_cls:
bpT = bp[..., np.newaxis] * _cmb2bb(nub)[..., np.newaxis] * blT
self.bandint_freqs_T.append([nub, bpT / trapezoid(bpT, nub, axis=0)])
if "te" in self.requested_cls or "ee" in self.requested_cls:
bpP = bp[..., np.newaxis] * _cmb2bb(nub)[..., np.newaxis] * blP
self.bandint_freqs_P.append([nub, bpP / trapezoid(bpP, nub, axis=0)])
# fgspectra can't mix monofrequency with [nu, bp]. If one channel is mono-frequency then we
# assume all of them and pass to fgspectra an array (not list!!) of frequencies
if data_are_monofreq:
self.bandint_freqs_T = np.asarray(self.bandint_freqs_T)
self.bandint_freqs_P = np.asarray(self.bandint_freqs_P)
if self._initialized:
self.log.info("bandpass is delta function, no band integration performed")
self._initialized = True
###########################################################################
## This part deals with beam functions, i.e. reading beam from file.
## We also have a function to compute
## the correction to the beams in case of bandpass shifts
###########################################################################
[docs]
def _init_beam_from_file(self):
"""
Reads the beam profile from an external file or the sacc file.
It has to be a dictionary ``{"{exp}_s0": {"nu": nu, "beams": array(freqs, ells+2)},
"{exp}_s2": {"nu": nu, "beams": array(freqs, ells+2)},...}``
including temperature and polarization beams.
"""
if not self.beam_file:
# option to read beam from sacc
if not self.beams:
raise ValueError("Beams not stored in sacc files!")
else:
self.beams = yaml_load_file(file_name=self.beam_file)
# checking that the freq array is compatible with the bandpass one
for exp, spin in product(self.experiments, ["s0", "s2"]):
key = f"{exp}_{spin}"
# checking nu is the same as the bandpass one
if not np.allclose(self.beams[key]["nu"], self.bands[key]["nu"], atol=1e-5):
raise LoggedError(
self.log,
f"Frequency array for beam {key} is not the same as the bandpass one!",
)
# checking beam shape is correct
if not self.beams[key]["beams"].shape[0] == self.bands[key]["nu"].size:
shape_b = self.beams[key]["beams"].shape[0]
shape_n = self.bands[key]["nu"].size
raise LoggedError(
self.log,
f"beam {key} has a wrong shape! It is shape ({shape_b}, ells), "
+ f"but shoule be ({shape_n}, ells)",
)
[docs]
def beam_interpolation(
self,
b_ell_template: np.ndarray,
f_ell: np.ndarray,
ells: np.ndarray,
freqs: np.ndarray,
freq_ref: float,
alpha: float,
) -> np.ndarray:
r"""
Computing :math:`b_{\ell}(\nu)` from monochromatic beam :math:`b_{\ell}` using the
frequency scaling: :math:`(b \cdot f)_{\ell \cdot (\nu / \nu_0)^{-\alpha / 2}}`
:param b_ell_template: (nell array) Template for :math:`b_{\ell}`, should be 1 at ell=0.
:param f_ell: (nell array) Multiplicate correction to the :math:`b_{\ell}` template.
Should be 1 at ell=0.
:param ells: (nell array) ell array
:param freqs: (nfreq array) Frequency for that experiment/array
:param freq_ref: (float) Reference frequency.
:param alpha: (float) Power law index.
:return: a (nfreq, nell) array: :math:`b_{\ell}(\nu)` at each input frequency.
"""
from scipy.interpolate import interp1d
# f_ell = np.ones_like(b_ell_template)
fi = interp1d(ells, b_ell_template * f_ell, kind="linear", fill_value="extrapolate")
bnu = fi(ells[:, np.newaxis] * (freqs / freq_ref) ** (-alpha / 2))
# Because we extrapolate beyond lmax, output can become negative, that is
# unphysical so we set these to zero.
bnu[bnu < 0] = 0
# transposing to have an object (nfreq, nell)
return bnu.T
[docs]
def return_beams(self, exp: str, nu: np.ndarray, dnu: float) -> tuple[np.ndarray, np.ndarray]:
r"""
Returns the temperature and polarization beams, properly normalized and from
:math:`\ell = 2` (same ell range as ``self.ells``). We compute them from :math:`\ell = 0`
to normalize them in the correct way (temperature beam = 1 for :math:`\ell = 0`).
The polarization beam is normalized by the temperature one (as in ``hp.gauss_beam``).
If we want to propagate bandpass shifts to the beams, we have to select the
monochromatic beam :math:`b_{\ell}` computed from the planet beam assuming
that bandpass shift. This has to be present in the ``self.bandpass_shifted_beams``
dictionary. From each of these :math:`b_{\ell}`, the chromatic beam is computed
with the scaling :math:`b_{\ell (\nu / \nu_0)^{-\alpha / 2}}`, where :math:`\nu_0`
and :math:`\alpha` are also found in the same dictionary.
:param nu: the frequency array in GHz (for now, the math:`\nu` array is the same
between bandpass file and beam file for the same experiment/array.
It is passed from the ``_bandpass_construction`` function
for consistency.)
:param dnu: the bandpass shift :math:`\Delta \nu`
:return: The temperature and polarization chromatic beams
"""
if dnu != 0:
bandsh_beams = self.bandpass_shifted_beams[f"{exp}_s0"]
# reading the Delta nu list in the file
dnulist = np.array([float(dn) for dn in bandsh_beams["beams"].keys()])
# finding the Delta nu closer to the sampled one
Dnu = dnulist[abs(dnulist - dnu).argmin()]
# reading the corresponding monochromatic beam
# the dnu keys have to be strings of floats, not int
b = bandsh_beams["beams"][f"{Dnu}"]
nu = np.asarray(self.bands[f"{exp}_s0"]["nu"])
nu0 = np.asarray(bandsh_beams["nu_0"])
alpha = np.asarray(bandsh_beams["alpha"])
blT = self.beam_interpolation(
b[: self.ells[-1] + 1],
np.ones(self.ells[-1] + 1),
np.arange(self.ells[-1] + 1),
nu,
nu0,
alpha,
)
bandsh_beams = self.bandpass_shifted_beams[f"{exp}_s2"]
# reading the Delta nu list in the file
dnulist = np.array([float(dn) for dn in bandsh_beams["beams"].keys()])
# finding the Delta nu closer to the sampled one
Dnu = dnulist[abs(dnulist - dnu).argmin()]
# reading the corresponding monochromatic beam
b = bandsh_beams["beams"][f"{Dnu}"]
# using the same freq array as the bandpass one
# nu pol should be the same as the T one, I'll comment it for now
# nu = np.asarray(self.bands[f"{exp}_s2"]['nu'])
nu0 = np.asarray(bandsh_beams["nu_0"])
alpha = np.asarray(bandsh_beams["alpha"])
blP = self.beam_interpolation(
b[: self.ells[-1] + 1],
np.ones(self.ells[-1] + 1),
np.arange(self.ells[-1] + 1),
nu,
nu0,
alpha,
)
else:
blT = self.beams[f"{exp}_s0"]["beams"]
blP = self.beams[f"{exp}_s2"]["beams"]
# normalizing the pol beam by the T one for each freq
# if already normalized, this operation has no effect
blP /= blT[:, 0][..., np.newaxis]
# normalizing the beam profile such that it has a max at 1 at ell = 0
blT /= blT[:, 0][..., np.newaxis]
# selecting the ell range from the data, since bl start from ell = 0
return blT[:, self.ells], blP[:, self.ells]
class TTForeground(BandpowerForeground):
requested_cls = ["tt"]
class EEForeground(BandpowerForeground):
requested_cls = ["ee"]
class TEForeground(BandpowerForeground):
requested_cls = ["te"]
class TTEEForeground(BandpowerForeground):
requested_cls = ["tt", "ee"]
class TTTEForeground(BandpowerForeground):
requested_cls = ["tt", "te"]
class TEEEForeground(BandpowerForeground):
requested_cls = ["te", "ee"]