Source code for bossa.sampling

# TODO: Add module documentation
# TODO: Complete documentation

"""Sampling of arbitrary distributions, galaxy parameters and binary populations."""
import gc
import logging
import pathlib
import warnings
from time import time
from datetime import datetime
from pathlib import Path
from functools import cached_property
from typing import Union

from astropy.cosmology import WMAP9 as cosmo

import numpy as np
import pandas as pd
from numpy._typing import NDArray, ArrayLike
from numpy.lib.recfunctions import unstructured_to_structured
from scipy.integrate import quad
from scipy.optimize import fsolve, fmin
from scipy.interpolate import UnivariateSpline
from concurrent.futures import ProcessPoolExecutor

#import sys
#sys.path.append('..')
import bossa.imf as imf
import bossa.sfh as sfh
from bossa.imf import Star, IGIMF
from bossa.sfh import MZR, SFMR, Corrections, GSMF
from bossa.zams import ZAMSSystemGenerator, MultipleFraction
from bossa.utils import interpolate, ZOH_to_FeH, create_logger, format_time, Length, get_bin_centers, \
    enumerate_bin_edges
from bossa.constants import (
    Z_SUN, LOG_PATH, BINARIES_CORRELATED_TABLE_PATH, BINARIES_UNCORRELATED_TABLE_PATH,
    IGIMF_ZAMS_DIR_PATH, GALAXYGRID_DIR_PATH, PHYSICAL_CORE_COUNT, TOTAL_PHYSICAL_MEMORY
)


IMFLike = Union[imf.Star, imf.EmbeddedCluster, imf.IGIMF]
"""Classes from :mod:`imf` with an ``imf(m)`` method."""


[docs] def powerlaw(x, k, a): """Return power-law with norm ``k`` and index ``a`` at ``x``.""" return k * x ** a
[docs] class RandomSampling: """Randomly sample an arbitrary IMF. This class is meant to speed up the sampling of an IMF defined as a numerical integral, as with :class:`imf.IGIMF`, by setting up an interpolator to compute probabilities. Parameters ---------- imf : :const:`IMFLike` Instance of an IMF class with an ``imf(m)`` method. discretization_points : int Number of masses on which to compute the IMF. Attributes ---------- imf : IMFLike Instance of an IMF class with an ``imf(m)`` method. m_trunc_min : float Lower truncation mass. m_trunc_max : float Upper truncation mass. sample : NDArray Last drawn sample. """ def __init__(self, imf: IMFLike, discretization_points:int = 100) -> None: self.imf = imf self.m_trunc_min = imf.m_trunc_min self.m_trunc_max = imf.m_trunc_max self._discretization_points = discretization_points self._discretization_masses = None self.discrete_imf = None self.sample = None # TODO: set discretization_masses with np.logspace @property def discretization_masses(self) -> NDArray[float]: """NDArray: Masses on which to compute the IMF.""" if self._discretization_masses is None: size = self._discretization_points // 5 self._discretization_masses = np.concatenate(( np.linspace(self.m_trunc_min + 0.01, 0.1, 1 + int(size * (-1 - np.log10(self.m_trunc_min)))), np.linspace(0.1, 1, 1 + size)[1:], np.linspace(1, 10, 1 + size)[1:], np.linspace(10, 100, 1 + size)[1:], np.linspace(100, self.m_trunc_max, 1 + int(size * (np.log10(self.m_trunc_max) - 2)))[1:] )) return self._discretization_masses
[docs] def compute_imf(self) -> None: """Compute the IMF for interpolation. Computes the IMF at :attr:`discretization_points` mass values for the interpolator. """ self.discrete_imf = np.empty((0,), np.float64) for m in self.discretization_masses: imf = self.imf.imf(m) self.discrete_imf = np.append(self.discrete_imf, imf)
def _get_probabilities(self, sampling_masses: ArrayLike) -> ArrayLike: """Return probabilities at ``sampling_masses``. Parameters ---------- sampling_masses : NDArray Array of masses for which to compute the probability. Returns ------- sampling_probs : NDArray Array of probabilities corresponding to sampling_masses. Sums to 1. """ ipY = self.discrete_imf.reshape((1, self.discretization_masses.shape[0])) ipX = self.discretization_masses.reshape((1, self.discretization_masses.shape[0])) sampling_probs = interpolate(ipX, ipY, sampling_masses)[0] # Near the truncation masses, where the IMF sharply drops to # zero, the interpolator may yield negative values, which we # account for here. for i, prob in enumerate(sampling_probs): if prob < 0: sampling_probs[i] = 0 sampling_probs /= sampling_probs.sum() return sampling_probs
[docs] def get_sample(self, m_min: float, m_max: float, n: int) -> NDArray[float]: """Return a sample of size ``n`` from ``m_min`` to ``m_max``. Returns a sample of size ``n`` between ``mmin`` and ``m_max`` and stores it as :attr:`sample`. Parameters ---------- m_min : float Sampling interval lower limit. m_max : float Sampling interval upper limit. n : int Sample size. Returns ------- sample : NDArray ``(n,)``-shaped array containing the sample. """ sampling_masses = np.linspace(m_min, m_max, 10 * n) probabilities = self._get_probabilities(sampling_masses) self.sample = np.sort(np.random.choice(sampling_masses, p=probabilities, size=n)) return self.sample
[docs] class GalaxyStellarMassSampling: """Sample galaxy stellar masses from a GSMF. This class performs a number- or mass-weighted sampling of galaxy stellar mass from the galaxy stellar mass function (GSMF) in :class:`sfh.GSMF`. Parameters ---------- gsmf : :class:`sfh.GSMF` GSMF to sample. logm_min : float Log of sampling interval lower limit. logm_max : float Log of sampling interval upper limit. size : int Sample size. sampling : {'number', 'mass', 'uniform'}, default : 'number' Whether to sample by galaxy number, stellar mass, or with uniform mass bins. Attributes ---------- gsmf : :class:`sfh.GSMF` GSMF to sample. logm_min : float Log of sampling interval lower limit. logm_max : float Log of sampling interval upper limit. sample_size : int Sample size. bin_limits : NDArray Limits of sampled mass bins. grid_ndensity_array : NDArray Number density within each mass bin. grid_density_array : NDArray Mass density within each mass bin. grid_logmasses : NDArray Sampled log galaxy stellar masses. See Also -------- GalaxyGrid : Implements this class to generate a grid of galaxy metallicities and star-formation rates over redshift. Notes ----- The sampling method implemented in this class is equivalent to computing :attr:`sample_size` quantiles of the GSMF and assigning each one its average stellar mass. Option ``sampling='number'`` implements this for the ``GSMF(m)`` directly, while option ``sampling='number'`` does it for ``m*GSMF(m)``. In the future this class might be streamlined with Numpy's quantile function. Option ``sampling='uniform'`` sets :attr:`sample_size` uniform-width log mass bins. Sampling is performed for a fixed redshift (defined within :attr:`gsmf`). Besides the log stellar masses (:attr:`grid_logmasses`), this class also stores the total mass and number densities contained by each mass bin (:attr:`grid_density_array` and :attr:`grid_ndensity_array` respectively). Examples -------- >>> from bossa.sfh import GSMF >>> gsmf = GSMF(redshift=0) >>> galaxy_mass_sampler = GalaxyStellarMassSampling(gsmf,size=10) >>> galaxy_mass_sampler.sample() >>> galaxy_mass_sampler.grid_logmasses array([9.89241753, 8.99773241, 8.50334364, 8.14752827, 7.86839714, 7.64216579, 7.45822559, 7.30385785, 7.17084443, 7.05398244]) """ def __init__(self, gsmf: sfh.GSMF, logm_min: float = 7., logm_max: float = 12., size : int = 3, sampling: str = 'number') -> None: self.gsmf = gsmf self.logm_min = logm_min self.logm_max = logm_max self.sample_size = size self.sampling = sampling self.bin_limits = np.empty(size + 1, np.float64) self.grid_ndensity_array = np.empty(size, np.float64) self.grid_density_array = np.empty(size, np.float64) self.grid_logmasses = np.empty(size, np.float64) @property def sampling(self) -> str: """str: Whether to sample by galaxy number or stellar mass.""" return self._sampling @sampling.setter def sampling(self, sampling: str) -> None: if sampling == 'number': self._sampling = 'number' elif sampling == 'mass': self._sampling = 'mass' elif sampling == 'uniform': self._sampling = 'uniform' else: raise ValueError('Parameter "sampling" must be one of ' '"number", "mass".') def _ratio(self, logm_im1: int, logm_i :int, logm_ip1: int) -> float: """Compute the ratio of the GSMF integral in a bin. Integrate either ``GSMF(m)`` or ``m*GSMF(m)`` according to :attr:`sampling`. This function is used to check whether two consecutive mass bins hold the same mass/number density. Not called for uniform sampling. Parameters ---------- logm_im1 : float Log m_(i minus 1). Lower limit of the first bin. logm_i : float Log m_i. Upper limit of the first bin, lower of the second. logm_ip1 : float Log m_(i plus 1). Upper limit of the second bin. Returns ------- float Ratio of the integral in the first over the second bin. """ if self.sampling == 'number': int1 = quad(lambda x: 10 ** self.gsmf.log_gsmf(x), logm_ip1, logm_i)[0] int2 = quad(lambda x: 10 ** self.gsmf.log_gsmf(x), logm_i, logm_im1)[0] elif self.sampling == 'mass': int1 = quad(lambda x: 10 ** x * 10 ** self.gsmf.log_gsmf(x), logm_ip1, logm_i)[0] int2 = quad(lambda x: 10 ** x * 10 ** self.gsmf.log_gsmf(x), logm_i, logm_im1)[0] return int1 / int2 def _constraint(self, vec: NDArray[float]) -> NDArray[float]: """Returns a vector to be minimized during sampling. Vector containing the (ratios - 1) between the integrals within successive mass bins. The integrals are computed by :meth:`_ratio`. The number of bins is fixed to :attr:`sample_size`, but their limits can be shifted around until they become the :attr:`sample_size`-quantiles of the GSMF, i.e., until `bin_density_ratios` becomes null. This is done by :meth:`sample`. Not called for uniform sampling. Parameters ---------- vec : NDArray Boundaries of the mass bins, except for the extremes, i.e., first lower boundary and last upper boundary. Returns ------- bin_density_ratios : NDArray (Ratios - 1) of the number or mass integral between successive bins. """ # Add extremes to bin limits. bin_limits = np.concatenate(([self.logm_max], vec, [self.logm_min])) bin_density_ratios = np.empty(self.sample_size - 1, np.float64) for i, logm_i in enumerate(bin_limits[1:-1]): logm_im1 = bin_limits[i] # m_(i minus 1) logm_ip1 = bin_limits[i + 2] # m_(i plus 1) # The first two conditions stops bins from "crossing over". if logm_i > self.logm_max or logm_ip1 > self.logm_max: bin_density_ratios[i] = 1000 elif logm_i < self.logm_min or logm_ip1 < self.logm_min: bin_density_ratios[i] = 1000 # Only if there is no crossover is the actual ratio taken. else: r = self._ratio(logm_im1, logm_i, logm_ip1) bin_density_ratios[i] = r - 1 return bin_density_ratios def _set_grid_density(self) -> None: """Integrates within each bin for mass and number densities.""" for i, (m2, m1) in enumerate(zip(self.bin_limits[:-1], self.bin_limits[1:])): ndens = quad(lambda x: 10 ** self.gsmf.log_gsmf(x), m1, m2)[0] dens = quad(lambda x: 10 ** x * 10 ** self.gsmf.log_gsmf(x), m1, m2)[0] self.grid_ndensity_array[i] = ndens self.grid_density_array[i] = dens
[docs] def sample(self) -> None: """Sample galaxy stellar masses from the GSMF. Generates the galaxy stellar mass samples according to :attr:`sampling` and stores it in :attr:`grid_logmasses`. Number and mass densities of galaxies in each bin are also computed and stored in :attr:`grid_density_array` and :attr:`grid_ndensity_array`, respectively. """ # No minimization problem if bins are uniform. if self.sampling == 'uniform': self.bin_limits = np.linspace(self.logm_max, self.logm_min, self.sample_size + 1) else: # Use uniform bins as initial guesses. # A number-weighted sample is always weighted towards lower # masses relative to a mass-weighted one because less # massive galaxies are much more common. if self.sampling == 'number': initial_guess = np.linspace(9, self.logm_min, self.sample_size + 1)[1:-1] elif self.sampling == 'mass': initial_guess = np.linspace(11, 9, self.sample_size + 1)[1:-1] # Minimize (ratio-1) bin integral vector. # See the _constraint docstring. solution = fsolve(self._constraint, initial_guess, maxfev=(initial_guess.shape[0] + 1) * 1000) self.bin_limits = np.concatenate(([self.logm_max], solution, [self.logm_min])) self._set_grid_density() for i, (m2, m1) in enumerate(zip(self.bin_limits[:-1], self.bin_limits[1:])): number_density_in_bin = quad(lambda x: 10 ** self.gsmf.log_gsmf(x), m1, m2)[0] logmass_density_in_bin = quad(lambda x: x * 10 ** self.gsmf.log_gsmf(x), m1, m2)[0] self.grid_logmasses[i] = logmass_density_in_bin / number_density_in_bin
[docs] class GalaxyGrid: """Generate a grid of galaxy properties over redshift. This class uses the galaxy stellar mass function (GSMF), star formation-mass relation (SFMR) and mass-metallicity relation (MZR) models from the :mod:`sfh` module to sample the space of galaxy parameters (stellar mass, redshift, star formation rate and metallicity). A set of :attr:`n_redshift` redshifts is sampled first, and only then are the other three parameters sampled, :attr:`logm_per_redshift` sets per redshift. Unless :attr:`scatter_model` is set to `normal`, the redshift plus any one parameter fully determines the others. Mass is given in solar masses, star formation rate in solar masses per year, and the metallicity is [Fe/H]. Parameters ---------- n_redshift : int Number of redshift to sample. redshift_min : float, default : 0. Minimum redshift to sample. redshift_max : float, default : 10. Maximum redshift to sample. force_boundary_redshift : bool, default : True, Whether to manually add ``redshift_min`` and ``redshift_max`` to the sample. logm_per_redshift : int, default : 3 Number of masses to sample per redshift. logm_min : float, default : 7 Minimum log10(mass) to sample. logm_max : float, default : 12 Maximum log10(mass) to sample. mzr_model : {'KK04', 'T04', 'M09', 'PP04'}, default: 'KK04' Option of MZR model. sfmr_flattening : {'none', 'moderate', 'sharp'}, default: 'none' SFMR model flattening option. gsmf_slope_fixed : bool, default: True Whether to use the fixed (True) or the varying (False) GSMF low-mass slope model. sampling_mode : {'mass', 'number', 'uniform'}, default : 'mass' Method for sampling masse from the GSMF. scatter_model : str, default : 'none' Scatter model to use in the SFMR and MZR. apply_igimf_corrections : bool, default : True, Whether to correct the SFR for :class:`imf.IGIMF`. random_state : int Random number generator seed. Attributes ---------- n_redshift : int Number of redshift values in the grid. redshift_min : float Minimum redshift to sample. redshift_max : float Maximum redshift to sample. force_boundary_redshift : bool Whether to forcefully add redshifts :attr:`redshift_min` and :attr:`redshift_min` to the sample, thus making its size ``(n_redshift+2)*``:attr:`logm_per_redshift`. logm_per_redshift : int Number of galactic stellar masses to sample per redshift. logm_min : float Minimum log10(mass) to sample. logm_max : float Maximum log10(mass) to sample. sample_redshift_array : NDArray Redshift sample defining the grid. sample_redshift_bins : NDArray Limits of the bins represented by :attr:`sample_redshift_array`. sample_logm_array : NDArray Galaxy stellar mass samples for each redshift in :attr:`sample_redshift_array`. sample_logm_bins : NDArray Limits of the bins represented by :attr:`sample_logm_array`, per redshift. gsmf_slope_fixed : bool Whether the GSMF low-mass slope should be fixed or not. random_state : int Random number generator seed. apply_igimf_corrections : bool Whether to correct the SFR for :class:`imf.IGIMF`. zoh_bin_array : NDArray Edges of Z_OH bins represented by :attr:`zoh_array`. zoh_array : NDArray Z_OH values sampled at each redshift. ndensity_array : NDArray Number density of galaxies represented by each grid point. density_array : NDArray Stellar mass density of galaxies represented by each grid point. mass_list : list List of :attr:`n_redshift` arrays, containing the galaxy stellar masses sampled at each redshift. log_gsmf_list : list List of :attr:`n_redshift` arrays, containing the log10(gsmf) values (galaxy number density) sampled at each redshift. zoh_list : list List of :attr:`n_redshift` arrays, containing the Z_OH values sampled at each redshift. feh_list : list List of :attr:`n_redshift`, containing the [Fe/H] values sampled at each redshift. sfr_list : list List of :attr:`n_redshift` arrays, containing the SFR values sampled at each redshift. grid_array : numpy_array Shape ``(n_redshift, logm_per_redshift, 6)`` array containing the full grid of galaxy properties. Notes ----- This class first samples the redshift, and then for each redshift a fixed number of "galaxies", i.e., (mass, metallicity, SFR) sets. The final grid of galaxies is stored as :attr:`grid_array`, and can also be written to disk as a .pkl file with Pandas by calling :meth:`save_grid`. The ``_array`` attributes are used to build :attr:`grid_array`. The ``_list`` attributes are not used internally, but were instead necessary for older versions of data analysis/processing and test notebooks. :attr:`sample_redshift_array` is initialized as a sample of evenly-space redshifts between the set minimum and maximum. :meth:`sample_redshift` must be run to get a sample from the GSMF. It is recommended not to rely on the ``_list`` attributes, as they should be removed in the future. References ---------- .. [1] Chruslinska, M., Jerabkova, T., Nelemans, G., Yan, Z. (2020). The effect of the environment-dependent IMF on the formation and metallicities of stars over cosmic history. A&A, 636, A10. doi:10.1051/0004-6361/202037688 .. [2] Jerabkova, T., Zonoozi, A. H., Kroupa, P., Beccari, G., Yan, Z., Vazdekis, A., Zhang, Z.-Y. (2018). Impact of metallicity and star formation rate on the time-dependent, galaxy-wide stellar initial mass function. A&A, 620, A39. doi:10.1051/0004-6361/20183 """ def __init__(self, n_redshift: int, redshift_min: int = 0., redshift_max: float = 10., force_boundary_redshift: bool = True, logm_per_redshift: int = 3, logm_min: float = 6., logm_max: float = 12., mzr_model: str = 'KK04', sfmr_flattening: str = 'none', gsmf_slope_fixed: bool = True, sampling_mode: str = 'sfr', scatter_model: str = 'none', apply_igimf_corrections: bool = True, random_state: bool = None) -> None: # Redshift settings self.n_redshift = n_redshift self.redshift_min = redshift_min self.redshift_max = redshift_max self.force_boundary_redshift = force_boundary_redshift # Logm settings self.logm_per_redshift = logm_per_redshift self.logm_min = logm_min self.logm_max = logm_max # Galaxy sampling storage self.sample_redshift_array = self._get_sample_redshift_array() self.sample_redshift_bins = np.zeros(self.sample_redshift_array.shape[0] + 1) self.sample_logm_array = np.zeros((self.sample_redshift_array.shape[0], self.logm_per_redshift)) self.sample_logm_bins = np.zeros((self.sample_redshift_array.shape[0], self.logm_per_redshift + 1)) # Physical models self.mzr_model = mzr_model self.sfmr_flattening = sfmr_flattening self.gsmf_slope_fixed = gsmf_slope_fixed # Sampling settings self.sampling_mode = sampling_mode self.scatter_model = scatter_model self.random_state = random_state self.apply_igimf_corrections = apply_igimf_corrections # Logm sampling storage self.zoh_bin_array = np.empty((0, self.logm_per_redshift + 1), np.float64) self.zoh_array = np.empty((0, self.logm_per_redshift), np.float64) self.ndensity_array = np.empty((0, self.logm_per_redshift), np.float64) self.density_array = np.empty((0, self.logm_per_redshift), np.float64) self.mass_list = list() self.log_gsmf_list = list() self.zoh_list = list() self.feh_list = list() self.sfr_list = list() # Grid storage self.grid_array = np.empty((0, 5), np.float64) self._save_path = None # TODO: document sampling grid attributes # Sampling grid attributes self.sampling_grid_side = self.n_redshift*5 self.sampling_grid_samplesize_per_bin = self.logm_per_redshift*10 self.sampling_grid_redshift_bins = None self.sampling_grid_logm_bins = None self.sampling_grid = None self.sampling_grid_zoh_axis = None self.sampling_grid_logsfr_axis = None self.sampling_grid_logsfrd_overlay = None if self.random_state is not None: np.random.seed(self.random_state) def _get_sample_redshift_array(self) -> NDArray[float]: """Return initial uniform redshift array.""" if self.force_boundary_redshift: return np.linspace(self.redshift_min, self.redshift_max, self.n_redshift+2) else: return np.linspace(self.redshift_min, self.redshift_max, self.n_redshift) @property def mzr_model(self) -> str: """Mass-(gas) metallicity relation model choice.""" return self._mzr_model @mzr_model.setter def mzr_model(self, model: str) -> None: models = ['KK04', 'T04', 'M09', 'PP04'] if model not in models: raise ValueError(f'mzr_model must be one of {models}.') self._mzr_model = model @property def sfmr_flattening(self) -> str: """Star formation-mass relation model choice.""" return self._sfmr_flattening @sfmr_flattening.setter def sfmr_flattening(self, flattening: str) -> None: models = ['none', 'moderate', 'sharp'] if flattening not in models: raise ValueError(f'sfmr_flattening must be one of {models}.') self._sfmr_flattening = flattening @property def sampling_mode(self) -> str: """Sampling mode choice.""" return self._sampling_mode @sampling_mode.setter def sampling_mode(self, mode: str) -> None: modes = ['sfr', 'mass', 'number', 'uniform'] if mode not in modes: raise ValueError(f'sampling mode must be one of {modes}.') self._sampling_mode = mode @property def scatter_model(self) -> str: """Scattering model choice for the SFMR and the MZR.""" return self._scatter_model @scatter_model.setter def scatter_model(self, model: str) -> None: models = ['none', 'normal', 'min', 'max'] if model not in models: raise ValueError(f'sampling mode must be one of {models}.') self._scatter_model = model @property def save_path(self) -> pathlib.Path: """pathlib.Path: Path which to save the grid to.""" if self._save_path is None: fname = (f'galgrid_{self.mzr_model}_{self.sfmr_flattening}_{self.gsmf_slope_fixed}_' f'{self.sampling_mode}_{len(self.sample_redshift_array)}z_' f'{self.logm_per_redshift}Z.pkl') self._save_path = Path(GALAXYGRID_DIR_PATH, fname) return self._save_path def _discrete_redshift_probs(self, min_z: float, max_z: float, size: int, ) -> tuple[NDArray[float], NDArray[float]]: """Return probabilities for a uniform redshift pool. Generates and returns a ``pool`` of evenly-space ``size``` redshifts between ``min_z`` and ``max_z``. Computes and returns their probabilities (``probs``) from the number of galaxies at that redshift, found by integrating either ``m*GSMF(m)`` or ``SFR(m)*GSMF(m)`` over the entire mass span at each redshift. """ bin_edges = np.linspace(min_z, max_z, size + 1) pool = np.zeros(size) probs = np.zeros(size) for i, (z_llim, z_ulim) in enumerate(zip(bin_edges[:-1], bin_edges[1:])): pool[i] = (z_llim + z_ulim) / 2 gsmf = GSMF(redshift=pool[i], fixed_slope=self.gsmf_slope_fixed) if self.sampling_mode == 'sfr': sfmr = SFMR(redshift=pool[i], flattening=self.sfmr_flattening, scatter_model=self.scatter_model) def sfrd_dlogm(logm): # Return the star formation rate density per # logarithmic galaxy stellar mass bin. return 10.**gsmf.log_gsmf(logm) * 10.**sfmr.logsfr(logm) # Get the total star formation rate density at redshift. sfrd = quad(sfrd_dlogm, self.logm_min, self.logm_max)[0] probs[i] = sfrd elif self.sampling_mode == 'mass': def density_dlogm(logm): # Return the (stellar) mass density of galaxies # of (stellar) mass logm per logarithmic galaxy # stellar mass bin. return logm * 10.**gsmf.log_gsmf(logm) # Get the total galaxy (stellar) mass density at # redshift. # We assume the density is uniform within the redshift bin. c_vol = cosmo.comoving_volume(z_ulim).value - cosmo.comoving_volume(z_llim).value density = quad(density_dlogm, self.logm_min, self.logm_max)[0] probs[i] = density * c_vol elif self.sampling_mode == 'number': def ndensity_dlogm(logm): # Return the number density of galaxies of (stellar) # mass logm per logarithmic galaxy stellar mass bin. return 10.**gsmf.log_gsmf(logm) # Get the total number density at redshift. # We assume the density is uniform within the redshift bin. c_vol = cosmo.comoving_volume(z_ulim).value - cosmo.comoving_volume(z_llim).value density = quad(ndensity_dlogm, self.logm_min, self.logm_max)[0] probs[i] = density * c_vol elif self.sampling_mode == 'uniform': probs[i] = 1. probs /= probs.sum() return pool, probs def _gsmf_sample_masses(self, redshift: float) -> GalaxyStellarMassSampling: """Sample masses from the GSMF at ``redshift``. Returns a :class:`sfh.GSMF` object which holds the sampled masses and respective densities as attributes. Warnings -------- This method is deprecated and will be removed in the next version. """ gsmf = GSMF(redshift, self.gsmf_slope_fixed) sample = GalaxyStellarMassSampling(gsmf, self.logm_min, self.logm_max, self.logm_per_redshift, self.sampling_mode) sample.sample() return sample def _discrete_mass_probs(self, min_logm: float, max_logm: float, redshift: float, size: int, ) -> tuple[NDArray[float], NDArray[float]]: """Return probabilities for a uniform mass pool at a redshift. Generates and returns a ``pool`` of evenly-space ``size`` galaxy stellar masses between ``min_logm`` and ``max_logm``. Computes and returns their probabilities (``probs``) weighted by either density (mass or number), or SFRD. """ bin_edges = np.linspace(min_logm, max_logm, size + 1) pool = np.zeros(size) probs = np.zeros(size) gsmf = GSMF(redshift=redshift, fixed_slope=self.gsmf_slope_fixed) sfmr = SFMR(redshift=redshift, flattening=self.sfmr_flattening, scatter_model=self.scatter_model) for i, (logm0, logm1) in enumerate(zip(bin_edges[:-1], bin_edges[1:])): pool[i] = (logm0 + logm1) / 2 if self.sampling_mode == 'sfr': def sfrd_dlogm(logm): # Return the star formation rate density per # logarithmic galaxy stellar mass bin. return 10. ** gsmf.log_gsmf(logm) * 10. ** sfmr.logsfr(logm) probs[i] = sfrd_dlogm(pool[i]) elif self.sampling_mode == 'mass': def density_dlogm(logm): # Return the (stellar) mass density of galaxies # of (stellar) mass logm per logarithmic galaxy # stellar mass bin. return logm * 10. ** gsmf.log_gsmf(logm) # Get the total galaxy (stellar) mass at redshift. c_vol = cosmo.differential_comoving_volume(redshift) density = density_dlogm(pool[i]) probs[i] = density * c_vol elif self.sampling_mode == 'number': def ndensity_dlogm(logm): # Return the number density of galaxies of (stellar) # mass logm per logarithmic galaxy stellar mass bin. return 10. ** gsmf.log_gsmf(logm) # Get the total number at redshift. c_vol = cosmo.differential_comoving_volume(redshift) density = ndensity_dlogm(pool[i]) probs[i] = density * c_vol elif self.sampling_mode == 'uniform': probs[i] = 1. probs /= probs.sum() return pool, probs # TODO: implement density-weighted sampling within this method def _sample_masses(self) -> None: """Sample galaxy stellar mass at ``redshift``. Depending on ``sampling_mode``, sample galaxy stellar mass with probability weighted by galaxy number density, galaxy stellar mass density or star formation rate density (SFRD). """ for redshift_i, redshift in enumerate(self.sample_redshift_array): logm_pool, logm_probs = self._discrete_mass_probs(self.logm_min, self.logm_max, redshift=redshift, size=100*self.logm_per_redshift) # With probabilities calculated, we can generate a # representative sample from which we find logm_per_redshift) # quantiles. Repetition is not an issue because only the # quantiles are of interest (so we can ask for a sample larger # than logm_pool). logm_choices = np.random.choice(logm_pool, p=logm_probs, size=int(1e4*self.logm_per_redshift)) self.sample_logm_bins[redshift_i] = np.quantile( logm_choices, np.linspace(0, 1, self.logm_per_redshift+1) ) # Correct for the granularity of the sampling. self.sample_logm_bins[redshift_i, 0] = self.logm_min self.sample_logm_bins[redshift_i, -1] = self.logm_max # Finding uniform quantiles defines which regions of the # redshift range should be equally represented in order # to reproduce the GSMF as well as possible. The quantiles # themselves are represented in the sample by the averaged # redshift of their respective galaxies. Weighting depends on # :attr:`sampling_mode`. logm_i = 0 for quantile0, quantile1 in zip(self.sample_logm_bins[redshift_i, :-1], self.sample_logm_bins[redshift_i, 1:]): logm_pool, logm_probs = self._discrete_mass_probs(quantile0, quantile1, redshift=redshift, size=100) average_logm = np.average(logm_pool, weights=logm_probs) self.sample_logm_array[redshift_i, logm_i] = average_logm logm_i += 1 def _sample_galaxies( self, redshift: float ) -> tuple[NDArray[float], NDArray[float], NDArray[float], NDArray[float], NDArray[float], NDArray[float], NDArray[float], NDArray[bool], NDArray[float], NDArray[bool]]: """Return a sample of galaxies properties at ``redshift``. Samples a number :attr:`logm_per_redshift` of galaxies at ``redshift`` from the GSMF, SFMR and MZR. Returns a tuple of ``(len(logm_per_redshift),)``-shaped arrays. Parameters ---------- redshift : float Redshift. Defines the GSMF, MZR and SFMR. Returns ------- ndensity_array : NDArray ``(len(logm_per_redshift),)``-shaped array containing the galaxy number density within the mass bin represented by each galaxy. density_array : NDArray ``(len(logm_per_redshift),)``-shaped array containing the galaxy stellar mass density within the mass bin represented by each galaxy. logm_array : NDArray ``(len(logm_per_redshift),)``-shaped array containing the log stellar mass of each galaxy. log_gsmf_array : NDArray ``(len(logm_per_redshift),)``-shaped array containing the log GSMF evaluated along ``logm_array``. zoh_array : NDArray ``(len(logm_per_redshift),)``-shaped array containing the Z_OH of each galaxy. zoh_bins : NDArray ``(len(logm_per_redshift)+1,)``-shaped array containing the limits of the Z_OH bins represented by each galaxy. feh_array : NDArray ``(len(logm_per_redshift),)``-shaped array containing the [Fe/H] of each galaxy. feh_mask : NDArray ``(len(logm_per_redshift),)``-shaped array acting as a boolean mask for values of ``feh_array`` within the bounds of :class:`sfh.Corrections`. log_sfr_array ; NDArray ``(len(logm_per_redshift),)``-shaped array containing the log SFR of each galaxy. sfr_mask : NDArray ``(len(logm_per_redshift),)``-shaped array acting as a boolean mask for values of ``sfr_array`` within the bounds of :class:`sfh.Corrections`. """ i_redshift = np.argmin(np.abs(redshift-self.sample_redshift_array)) logm_array = self.sample_logm_array[i_redshift] logm_bins = self.sample_logm_bins[i_redshift] gsmf = GSMF(redshift) sfmr = SFMR(redshift, flattening=self.sfmr_flattening, scatter_model=self.scatter_model) mzr = MZR(redshift, self.mzr_model, scatter_model=self.scatter_model) mzr.set_params() density_array = np.zeros(self.logm_per_redshift) ndensity_array = np.zeros(self.logm_per_redshift) for i in range(self.logm_per_redshift): logm0, logm1 = logm_bins[[i, i+1]] density_array[i] = quad(lambda logm: logm * 10.**gsmf.log_gsmf(logm), logm0, logm1)[0] ndensity_array[i] = quad(lambda logm: 10. ** gsmf.log_gsmf(logm), logm0, logm1)[0] density_array = density_array.reshape(1, self.logm_per_redshift) ndensity_array = ndensity_array.reshape(1, self.logm_per_redshift) log_gsmf_array = np.array([[np.float64(gsmf.log_gsmf(logm)) for logm in logm_array]]) zoh_bins = np.array([[mzr.zoh(logm) for logm in logm_bins]]) zoh_array = np.array([[mzr.zoh(logm) for logm in logm_array]]) log_sfr_array = np.array([[sfmr.logsfr(logm) for logm in logm_array]]) feh_array = np.array([[ZOH_to_FeH(zoh) for zoh in zoh_array.flatten()]]) feh_mask = np.ones(feh_array.shape) if self.apply_igimf_corrections: for i, feh in enumerate(feh_array.flatten()): if feh > 1.3 or feh < -5: feh_mask[0, i] = 0 feh_mask = feh_mask.astype(bool) sfr_mask = np.ones(log_sfr_array.shape) if self.apply_igimf_corrections: for i, sfr in enumerate(log_sfr_array.flatten()): if np.abs(sfr) > 3.3: sfr_mask[0, i] = 0 sfr_mask = sfr_mask.astype(bool) return (ndensity_array, density_array, logm_array, log_gsmf_array, zoh_array, zoh_bins, feh_array, feh_mask, log_sfr_array, sfr_mask) def _correct_sample( self, mass_array: NDArray[float], log_gsmf_array: NDArray[float], zoh_array: NDArray[float], feh_array: NDArray[float], sfr_array:NDArray[float], mask_array: NDArray[bool] ) -> tuple[list[NDArray[float]], list[NDArray[float]], list[NDArray[float]], list[NDArray[float]], list[NDArray[float]]]: """Applies SFR corrections for a variant IMF. Applies the corrections from Chruslinska et al. (2020) [1]_, through :class:`sfh.Corrections`, to the SFR, for the variant IGIMF from Jerabkova et al. (2018) [2]_. Requires on a boolean mask ``mask_array`` that filters out SFR-[Fe/H] pairs outside the bounds of the corrections grid from the original paper, :data:`constants.C20_CORRECTIONS_PATH`. Input arrays will have shape (:attr:`n_redshift`, :attr:`logm_per_redshift`) or (:attr:`n_redshift`+2, :attr:`logm_per_redshift`) depending on whether :attr:`force_boundary_redshift` is ``True`` or ``False``, respectively. Output includes lists of arrays with potentially varying lengths. Each list corresponds to a parameter (mass) and each array to a redshift, but containing only parameters for galaxies within the correction boundaries, which can lead to different lengths. Parameters ---------- mass_array : NDArray Galaxy stellar masses. log_gsmf_array : NDArray Log GSMF evaluated over ``mass_array``. zoh_array : NDArray Z_OH of each galaxy. feh_array : NDArray [Fe/H] of each galaxy. sfr_array : NDArray SFR of each galaxy. mask_array : NDAray Boolean mask filtering out [Fe/H],SFR pairs outside of the correction boundaries. Returns ------- mass_list : list List of arrays. Galaxy stellar masses within correction boundaries. log_gsmf_list : list List of arrays. Log GSMF evaluated over ``mass_list``. zoh_list : list List of arrays. Z_OH of each galaxy within correction boundaries. feh_list : list List of arrays. [Fe/H] of each galaxy within correction boundaries. sfr_list : list List of arrays. SFR of each galaxy within correction boundaries. """ mass_list = list() log_gsmf_list = list() zoh_list = list() feh_list = list() sfr_list = list() for masses, log_gsmfs, zohs, fehs, sfrs, mask in zip(mass_array, log_gsmf_array, zoh_array, feh_array, sfr_array, mask_array): f_masses = masses[mask] f_log_gsmfs = log_gsmfs[mask] f_zohs = zohs[mask] f_fehs = fehs[mask] f_sfrs = sfrs[mask] n_samples = f_fehs.shape[0] f_sfrs = np.tile(f_sfrs, (n_samples, 1)) corrections = Corrections(f_fehs, f_sfrs) corrections.load_data() corrs = np.diag(corrections.get_corrections()) try: corr_sfrs = f_sfrs[0] + corrs except IndexError: corr_sfrs = np.array([]) mass_list.append(f_masses) log_gsmf_list.append(f_log_gsmfs) zoh_list.append(f_zohs) feh_list.append(f_fehs) sfr_list.append(corr_sfrs) return mass_list, log_gsmf_list, zoh_list, feh_list, sfr_list
[docs] def sample_redshift(self) -> None: """Sample redshifts from the GSMF integrated over mass. Integrating the GSMF over mass yields a star forming mass-over- redshift distribution. A redshift sample is building by dividing the redshift range :attr:`redshift_min`-:attr:`redshift_max` into :attr:`n_redshift` quantiles and assigning each one its mass-weighted average redshift. If :attr:`force_boundary_redshift` is ``True``, the redshift upper and lower limits are also added to the sample. """ redshift_pool, redshift_probs = self._discrete_redshift_probs(self.redshift_min, self.redshift_max, 100*self.n_redshift) # With probabilities calculated, we can generate a # representative sample from which we find n_redshift quantiles. # Repetition is not an issue because only the quantiles are of # interest. redshift_choices = np.random.choice(redshift_pool, p=redshift_probs, size=int(1e4*self.n_redshift)) self.sample_redshift_bins = np.quantile(redshift_choices, np.linspace(0, 1, self.n_redshift + 1)) # Correct for the granularity of the sampling. self.sample_redshift_bins[0] = self.redshift_min self.sample_redshift_bins[-1] = self.redshift_max # Finding uniform quantiles defines which regions of the # redshift range should be equally represented in order # to reproduce the GSMF as well as possible. The quantiles # themselves are represented in the sample by the mass-averaged # redshift of their respective galaxies. redshift_i = 0 if self.force_boundary_redshift: self.sample_redshift_array[0] = self.redshift_min self.sample_redshift_array[-1] = self.redshift_max redshift_i += 1 for quantile0, quantile1 in zip(self.sample_redshift_bins[:-1], self.sample_redshift_bins[1:]): redshift_pool, redshift_probs = self._discrete_redshift_probs(quantile0, quantile1, 100) massaverage_redshift = np.average(redshift_pool, weights=redshift_probs) self.sample_redshift_array[redshift_i] = massaverage_redshift redshift_i += 1 min_redshift_bin_upper_edge = (self.sample_redshift_array[0] + self.sample_redshift_array[1]) / 2 max_redshift_bin_lower_edge = (self.sample_redshift_array[-1] + self.sample_redshift_array[-2]) / 2 self.sample_redshift_bins = np.sort(np.concatenate(([min_redshift_bin_upper_edge], self.sample_redshift_bins, [max_redshift_bin_lower_edge])))
def _get_sfr_zoh(self, logm, redshift): sfmr = SFMR(redshift=redshift, flattening=self.sfmr_flattening, scatter_model=self.scatter_model) mzr = MZR(redshift=redshift, model=self.mzr_model, scatter_model=self.scatter_model) mzr.set_params() logsfr = sfmr.logsfr(logm) sfr = 10.**logsfr zoh = np.array(mzr.zoh(logm)).flatten() feh = np.array([ZOH_to_FeH(zoh) for zoh in zoh]) if self.apply_igimf_corrections: corrections = Corrections(metallicity=feh, sfr=np.tile(logsfr.reshape((logsfr.shape[0], 1)), (1, feh.shape[0]))) corrections.load_data() try: corr = np.diag(corrections.get_corrections()) except ValueError: corr = np.tile(np.nan, sfr.shape[0]) finally: sfr *= 10.**corr return sfr, zoh def _set_redshift_logm_sampling_grid(self): # First 2d bins on the redshift-logm plane are set. self.sampling_grid_redshift_bins = np.linspace(self.redshift_min, self.redshift_max, self.sampling_grid_side + 1) self.sampling_grid_logm_bins = np.linspace(self.logm_min, self.logm_max, self.sampling_grid_side + 1) sampling_grid_redshift_centers = get_bin_centers(self.sampling_grid_redshift_bins) sampling_grid_logm_centers = get_bin_centers(self.sampling_grid_logm_bins) # Initialize the arrays for number density, SFR and Z_O/H # of galaxies at the centers of the redshift-logm bins. sampling_grid_ndensity_centers = np.zeros((self.sampling_grid_side, self.sampling_grid_side, self.sampling_grid_samplesize_per_bin)) sampling_grid_sfr_centers = np.zeros((self.sampling_grid_side, self.sampling_grid_side, self.sampling_grid_samplesize_per_bin)) sampling_grid_zoh_centers = np.zeros((self.sampling_grid_side, self.sampling_grid_side, self.sampling_grid_samplesize_per_bin)) # Fill the three arrays initialized above. # Iterate over redshift bins. for row, (z0, z1) in enumerate_bin_edges(self.sampling_grid_redshift_bins): redshift = (z0 + z1) / 2 gsmf = GSMF(redshift=redshift, fixed_slope=self.gsmf_slope_fixed) # Iterate over logm bins. for col, (logm0, logm1) in enumerate_bin_edges(self.sampling_grid_logm_bins): # Compute the total number density of galaxies with the # 2d bin. ndensity = np.abs(quad(lambda logm: 10.**gsmf.log_gsmf(logm), logm0, logm1)[0]) # Compute the number density of galaxies represented by # each galaxy (logm) to be drawn within the bin. ndensity_sample = np.tile(ndensity/self.sampling_grid_samplesize_per_bin, self.sampling_grid_samplesize_per_bin) # Draw a logm sample, and then for each logm a SFR, # Z_O/H pair. This draw accounts for a normal spread # around the mean values set by the SFMR and MZR. logm_sample = np.random.uniform(logm0, logm1, self.sampling_grid_samplesize_per_bin) sfr_sample, zoh_sample = self._get_sfr_zoh(logm_sample, redshift) # Fill the grid arrays. sampling_grid_ndensity_centers[row, col] = ndensity_sample sampling_grid_sfr_centers[row, col] = sfr_sample sampling_grid_zoh_centers[row, col] = zoh_sample # Reorganize the sampling grid arrays into a single array. # First make the redshift and logm arrays the same shape. sampling_grid_redshift_centers = np.tile( sampling_grid_logm_centers.reshape((self.sampling_grid_side, 1)), (1, self.sampling_grid_side) ).reshape((self.sampling_grid_side, self.sampling_grid_side, 1)) sampling_grid_redshift_centers = np.tile( sampling_grid_redshift_centers, (1, 1, self.sampling_grid_samplesize_per_bin) ) sampling_grid_logm_centers = np.tile( sampling_grid_logm_centers, (self.sampling_grid_side, 1) ).reshape(self.sampling_grid_side, self.sampling_grid_side, 1) sampling_grid_logm_centers = np.tile( sampling_grid_logm_centers, (1, 1, self.sampling_grid_samplesize_per_bin) ) # Now reorganize. self.sampling_grid = np.array([sampling_grid_redshift_centers, sampling_grid_logm_centers, sampling_grid_ndensity_centers, np.log10(sampling_grid_sfr_centers), sampling_grid_zoh_centers]) self.sampling_grid = self.sampling_grid.T.reshape( (self.sampling_grid_side*self.sampling_grid_side*self.sampling_grid_samplesize_per_bin, 5) ) # Now each line of sampling_grid is a "galaxy" with 5 properties. self.sampling_grid = unstructured_to_structured( self.sampling_grid, np.dtype([('redshift', float), ('logm', float), ('ndensity', float), ('logsfr', float), ('zoh', float)]) ) print('SAMPLING GRID CREATED WITH SHAPE {self.sampling_grid.shape}') # TODO: Initialize arrays in get_grid with the appropriate shape def _scatterless_get_sample(self) -> None: """Generate the (redshift, mass, metallicity, SFR) grid. For each redshift in :attr:`sample_redshift_array`, samples galaxy stellar masses from :class:`sfh.GSMF`. Star-formation rate and metallicity are assigned through :class:`sfh.SFMR` and :class:`sfh.MZR`, respectively. If :attr:`apply_igimf_corrections` is ``True``, then the corrections to the IMF by Chruslinska et al. (2020) [1]-for the IMF from Jerabkova et al. (2018) [2]_ are applied. Note that the grid of corrections goes from -5 to 1.3 in [Fe/H], and from -3.3 to 3.3 in log(SFR). Points outside of this region are removed from the grid if corrections are on. """ self._sample_masses() mass_array = np.empty((0, self.logm_per_redshift), np.float64) log_gsmf_array = np.empty((0, self.logm_per_redshift), np.float64) feh_array = np.empty((0, self.logm_per_redshift), np.float64) sfr_array = np.empty((0, self.logm_per_redshift), np.float64) feh_mask_array = np.empty((0, self.logm_per_redshift), np.float64) sfr_mask_array = np.empty((0, self.logm_per_redshift), np.float64) for redshift in self.sample_redshift_array: (ndensity_array, density_array, masses, log_gsmfs, zohs, bin_zohs, fehs, feh_mask, sfrs, sfr_mask) = self._sample_galaxies(redshift) mass_array = np.append(mass_array, [masses], axis=0) log_gsmf_array = np.append(log_gsmf_array, log_gsmfs, axis=0) self.zoh_array = np.append(self.zoh_array, zohs, axis=0) feh_array = np.append(feh_array, fehs, axis=0) sfr_array = np.append(sfr_array, sfrs, axis=0) sfr_mask_array = np.append(sfr_mask_array, sfr_mask, axis=0) feh_mask_array = np.append(feh_mask_array, feh_mask, axis=0) self.ndensity_array = np.append(self.ndensity_array, ndensity_array, axis=0) self.density_array = np.append(self.density_array, density_array, axis=0) self.zoh_bin_array = np.append(self.zoh_bin_array, bin_zohs, axis=0) mask_array = np.logical_and(feh_mask_array, sfr_mask_array) if self.apply_igimf_corrections: self.grid_array = self._correct_sample(mass_array, log_gsmf_array, self.zoh_array, feh_array, sfr_array, mask_array) else: self.grid_array = mass_array, log_gsmf_array, self.zoh_array, feh_array, sfr_array for i, sublist in enumerate(self.grid_array): for j, ssublist in enumerate(sublist): try: self.grid_array[i][j] = np.pad(ssublist, (0, self.logm_per_redshift-len(ssublist)), mode='edge') except ValueError: self.grid_array[i][j] = np.pad(ssublist, (0, self.logm_per_redshift - len(ssublist)), mode='empty') self.grid_array = np.array(self.grid_array, np.float64) (self.mass_list, self.log_gsmf_list, self.zoh_list, self.feh_list, self.sfr_list) = self.grid_array redshift_grid = self.sample_redshift_array.reshape(*self.sample_redshift_array.shape, 1) redshift_grid = np.tile(redshift_grid, (1, self.logm_per_redshift)) redshift_grid = redshift_grid.reshape(1, *redshift_grid.shape) self.grid_array = np.append(redshift_grid, np.array(self.grid_array), axis=0) # TODO: complete scatter_get_sample() # TODO: document scatter_get_sample() def _set_zoh_logsfr_grid(self) -> None: # First set up the bins to be used when sampling "galaxies". self._set_redshift_logm_sampling_grid() # Then build the axes over which to sample. self.sampling_grid_logsfr_axis = np.linspace(np.nanmin(self.sampling_grid[:]['logsfr']), np.nanmax(self.sampling_grid[:]['logsfr']), self.sampling_grid_side + 1) self.sampling_grid_zoh_axis = np.linspace(np.nanmin(self.sampling_grid[:]['zoh']), np.nanmax(self.sampling_grid[:]['zoh']), self.sampling_grid_side + 1) # And the array which will hold the SFRD computed over those # axes. self.sampling_grid_logsfrd_overlay = np.zeros((self.sampling_grid_side, self.sampling_grid_side)) # Iterate over the axes and fill the SFRD overlay. for row, (zoh0, zoh1) in enumerate_bin_edges(self.sampling_grid_zoh_axis): # Get all galaxies from the grid that fall within this # metallicity bin. zoh_sample = self.sampling_grid[(self.sampling_grid[:]['zoh'] >= zoh0) & (self.sampling_grid[:]['zoh'] < zoh1)] for col, (logsfr0, logsfr1) in enumerate_bin_edges(self.sampling_grid_logsfr_axis): # Now all the galaxies that fall within this logSFR bin. sfr_sample = zoh_sample[(zoh_sample[:]['logsfr'] >= logsfr0) & (zoh_sample[:]['logsfr'] < logsfr1)] # The limits of the SFR corrections grid cause some SFRs # to become NaN. Eliminate those values. sfr_sample = sfr_sample[~np.isnan(sfr_sample[:]['logsfr'])] # Now get the SFRD represented by each galaxy, from the # galaxy number density it represents, and sum to get # the total SFRD within the metallicity-SFR bin, and # fill the overlay. sfrd = np.sum(sfr_sample[:]['ndensity'] * 10.**sfr_sample[:]['logsfr']) self.sampling_grid_logsfrd_overlay[row, col] = np.log10(sfrd) return # TODO: have _scatter_get_sample() set the same arrays as _scatterless_get_sample() def _scatter_get_sample(self) -> None: # Compute the SFRD on the metallicity-log SFR plane. This will # set the sampling weights. self._set_zoh_logsfr_grid() # Set the sampling pool for metallicity and log SFR. zoh_pool = get_bin_centers(self.sampling_grid_zoh_axis) logsfr_pool = get_bin_centers(self.sampling_grid_logsfr_axis) # Set the sampling pool as a 2D array the elements of which # correspond to their indices, then ravel it to a list of index # pairs. pool = np.moveaxis(np.indices(self.sampling_grid_logsfrd_overlay.shape), 0, -1) pool = pool.reshape(pool.shape[0]*pool.shape[1], pool.shape[2]) # Set the weights from the SFRD grid. probs = np.copy(10.**self.sampling_grid_logsfrd_overlay.ravel()) min_prob = np.nanmin(probs[probs != -np.inf]) probs -= min_prob probs[probs == -np.inf] = 0. probs /= probs.sum() # Randomly sample zoh-SFR pairs through their indices, stored in # pool, weighted by probs. Start by drawing an index from pool. sample_indices = np.random.choice(pool.shape[0], p=probs, size=self.n_redshift*self.logm_per_redshift) sample_zoh_i, sample_logsfr_i = pool[sample_indices].T sample_zoh = zoh_pool[sample_zoh_i] sample_logsfr = logsfr_pool[sample_logsfr_i] sample = [[zoh, logsfr] for zoh, logsfr in zip(sample_zoh, sample_logsfr)] sample = np.array(sample, dtype=[('zoh', float), ('logsfr', float)]) # Now iterate over the sample to complete it with redshifts. sample_redshift = np.zeros(sample_zoh.shape) for i, galaxy in enumerate(sample): zoh, logsfr = galaxy # Search for the zoh-logsfr bin in which this galaxy falls. zoh_i = np.searchsorted(self.sampling_grid_zoh_axis, zoh, side='right') zoh_bin0, zoh_bin1 = self.sampling_grid_zoh_axis[zoh_i-1:zoh_i+1] logsfr_i = np.searchsorted(self.sampling_grid_logsfr_axis, logsfr, side='right') logsfr_bin0, logsfr_bin1 = self.sampling_grid_logsfr_axis[logsfr_i-1:logsfr_i+1] # Get the redshifts of the galaxies within this bin. logsfr_bin_sample = sample[(self.sampling_grid[:]['logsfr'] >= logsfr_bin0) & (self.sampling_grid[:]['logsfr'] < logsfr_bin1)] zoh_bin_sample = logsfr_bin_sample[(logsfr_bin_sample[:]['zoh'] >= zoh_bin0) & (logsfr_bin_sample[:]['zoh'] < zoh_bin1)] redshift_bin_sample = zoh_bin_sample[:]['redshift'] # Use the redshift distribution in the bin to draw a # redshift for this galaxy. Here only probs, redshift_edges = np.histogram(redshift_bin_sample, bins=10) redshift_pool = get_bin_centers(redshift_edges) sample_redshift[i] = np.random.choice(redshift_pool, p=probs, size=1)[0] # Complete the sample. sample = [[zoh, logsfr, redshift] for zoh, logsfr, redshift in zip(sample_zoh, sample_logsfr, sample_redshift)] sample = np.array(sample, dtype=[('zoh', float), ('logsfr', float), ('redshift', float)])
[docs] def get_sample(self) -> None: if self.scatter_model == 'normal': self._scatter_get_sample() else: self._scatterless_get_sample()
[docs] def save_grid(self) -> None: """Save :attr:`grid_array` to disk.""" columns = ['Redshift', 'Log(Mgal/Msun)', 'Log(Number density [Mpc-3 Msun-1])', 'Log(SFR [Msun yr-1])', '12+log(O/H)', '[Fe/H]'] grid_df = pd.DataFrame(self.grid_array, columns=columns) grid_df.to_pickle(self.save_path)
[docs] class SimpleBinaryPopulation: """Generate a sample of zero-age main sequence binaries. For a given redshift, star formation rate (SFR) and [Fe/H], generate a sample of multiple systems with component masses between m_min and m_max, and up to max_comp_number companions. Each system is represented by the parameters of its innermost binaries and its total corresponding mass, including all companions. Masses are drawn from the integrated galaxy-wide initial mass function (IGIMF) and orbital parameters from correlated distributions. Attributes ---------- save_path redshift : float Redshift at which to generate the sample. sfr : float SFR for which to generate the samples, in Msun yr-1. feh : float [Fe/H] metallicity for which to generate the sample. z_abs : float Metallicity feh in Z. m_min : float Minimum sampled mass. m_max : float Maximum sampled mass. max_comp_number : int Maximum number of companions. poolsize : int Size of the pool from which masses are drawn, without repetition. col_n : int Number of columns (parameters) defining a binary. sample : numpy array (n, col_n) array of n binaries, each defined by a set of col_n parameters. sample_mass : float Total sample mass, including all system components. qe_max_tries : int Maximum number of attempts to draw a valid system for a given primary mass, orbital period pair. galaxy_descriptor : str String describing the sampled population, to be appended to the sample filename. m1_min : float Minimum primary mass allowed. lowmass_powerlaw_index : float Index of the power law from which < 0.8 Msun mass options are drawn. lowmass_powerlaw_norm : float Norm of the power law from which < 0.8 Msun mass options are drawn. igimf_arr : numpy array Array of >= 0.8 Msun mass options drawn from the IGIMF. sampling_pool : numpy array Complete pool of mass options for sampling. prioritize_high_masses : bool Whether to bias the sampler towards drawing higher masses first or not. print_progress : bool Whether to print progress updates (percentage, elapsed time, remaining time) to stdout or not. Methods ------- set_sampling_pool() Compute the array of mass options for sampling. get_sample() Generate full sample, save parameters to sample and the total mass to sample_mass. save_sample() Save sample to a parquet file at _save_path. Warns ----- UserWarning If get_sample() is run before set_sampling_pool(). UserWarning If sample is empty when save_sample() is called. Warnings -------- For a pool of poolsize possible masses, a sample of <~ 0.7*poolsize/2 binaries is generated. This is because two mass arrays are generated, one above and one below 0.8 Msun, containing poolsize/2 elements each; and because as the sampling pool is exhausted, remaining possible multiples tend to be invalid (fail the tolerance test in class ZAMSSystemGenerator), and the sampling stops after a certain number of consecutive failed iterations. See Also ------- zams.ZAMSSystemGenerator Sampling of an individual multiple system. zams.MultipleFraction Sampling of the number of companions. Notes ----- This sampler generates a population that simultaneously follows to correlated orbital parameter distributions by Moe & Di Stefano (2017) [3]_ and the IGIMF by Jerabkova et al. (2018) [2]_. The sample is only representative of the IGIMF between 0.8 and 150 Msun, because the sampling of the primary mass m1 is restricted to this range in order as per the minimum mass sampled by the orbital parameter distributions. Components with masses between 0.08 and 0.8 Msun appear as companions, but they will not reproduce the IGIMF below 0.8 Msun as all < 0.8 Msun primaries and their companions will be missing. On the other hand, because for the mass ratio 0.1 <= q <= 1.0, the range between 0.8 and 150 Msun should be complete to good approximation, as discussed in OUR WORK. References ---------- .. [3] Moe, M., Di Stefano, R. (2017). Mind Your Ps and Qs: The Interrelation between Period (P) and Mass-ratio (Q) Distributions of Binary Stars. ApJS, 230(2), 55. doi:10.3847/1538-4365/aa6fb6 """ inner_binary_sample_columns = ['Mass_ZAMS1_Found', #0 'Mass_ZAMS1_Choice', #1 'RelDev_Mass_ZAMS1', #2 'Mass_ZAMS2_Found', #3 'Mass_ZAMS2_Choice', #4 'RelDev_Mass_ZAMS2', #5 'MassRatioFound_ZAMS', #6 'MassRatioChoice_ZAMS', #7 'LogOrbitalPeriod_ZAMS', #8 'Eccentricity_ZAMS', #9 'CompanionNumber', #10 'SystemMass'] #11 """Column titles for the 12 parameters identifying each inner binary.""" outer_pair_columns = ['Mass_ZAMS3_Found', 'Mass_ZAMS3_Choice', 'LogOrbitalPeriod_ZAMS3', 'Eccentricity_ZAMS3'] """Columns saved for each further outer companion.""" def __init__(self, redshift, sfr, feh, m_min, m_max, max_comp_number, poolsize, qe_max_tries=1, only_binaries=False, invariant_imf=False, correlated_orbital_parameters=True, galaxy_descriptor='', parent_logger=None, prioritize_high_masses=False, print_progress=True, save_dir=None): """ Parameters ---------- redshift : float Redshift at which to generate the sample. sfr : float SFR for which to generate the samples, in Msun yr-1. feh : float [Fe/H] metallicity for which to generate the sample. m_min : float Minimum sampled mass. m_max : float Maximum sampled mass. max_comp_number : int Maximum number of companions. poolsize : int Size of the pool from which masses are drawn, without repetition. qe_max_tries : int, default : 1 Maximum number of attempts to draw a valid system for a given primary mass, orbital period pair. galaxy_descriptor : str, default : '' String describing the sampled population, to be appended to the sample filename. parent_logger : logging Logger, default : None Logger of the class or module from which this class was instantiated. prioritize_high_masses : bool, default : False Whether to bias the sampler towards drawing higher masses first or not. print_progress : bool, default : True Whether to print progress updates (percentage, elapsed time, remaining time) to stdout or not. """ self.redshift = redshift self.sfr = sfr self.feh = feh self.z_abs = 10 ** feh * Z_SUN self.m_min = m_min self.m_max = m_max self.max_comp_number = max_comp_number self.only_binaries = only_binaries self.invariant_imf = invariant_imf self.correlated_orbital_parameters = correlated_orbital_parameters self.poolsize = poolsize self.sample = None self.col_n = len(self.sample_columns) #self.sample_mass = 0 self.qe_max_tries = qe_max_tries self.galaxy_descriptor = galaxy_descriptor self.m1_min = np.float32(0.8) self.lowmass_powerlaw_index = np.float32(0) self.lowmass_powerlaw_norm = np.float32(0) self.igimf_arr = np.zeros((0, 2), np.float64) self.sampling_pool = np.zeros(poolsize, np.float64) self.prioritize_high_masses = prioritize_high_masses self.print_progress = print_progress self.max_m1_draws = 1000 self.logger = self._get_logger(parent_logger) self.save_dir = save_dir @cached_property def pairs_table_path(self): """Path to the orbital parameter sampling table.""" if self.correlated_orbital_parameters: pairs_table_path = BINARIES_CORRELATED_TABLE_PATH else: pairs_table_path = BINARIES_UNCORRELATED_TABLE_PATH return pairs_table_path @cached_property def save_path(self): """Path to which to save the sample as a parquet file.""" fname = f'z={self.redshift:.3f}_feh={self.feh:.3f}_logsfr={np.log10(self.sfr):.3f}_' \ f'{self.galaxy_descriptor}_logpool={np.log10(self.poolsize):.2f}_igimf_zams_sample.parquet' if self.save_dir is not None: save_path = Path(IGIMF_ZAMS_DIR_PATH, self.save_dir, fname) save_path.parent.mkdir(parents=True, exist_ok=True) else: save_path = Path(IGIMF_ZAMS_DIR_PATH, fname) return save_path @cached_property def sample_columns(self): """Sample column titles.""" outer_pair_columns = list() for cp_order in range(3, self.max_comp_number+2): columns = [0]*len(self.outer_pair_columns) for i, col in enumerate(self.outer_pair_columns): columns[i] = col.replace('3', str(cp_order)) outer_pair_columns += columns sample_columns = self.inner_binary_sample_columns + outer_pair_columns return sample_columns def _get_logger(self, parent_logger): """Create and return a class logger, as a child of a parent logger if provided.""" if parent_logger is None: loggername = '.'.join([__name__, self.__class__.__name__]) log_path = Path(LOG_PATH, loggername, datetime.now().strftime('%d-%m-%Y_%H:%M:%S.log')) log_path.parent.mkdir(parents=True, exist_ok=True) logger = create_logger(name=loggername, fpath=log_path) else: loggername = '.'.join([parent_logger.name, self.__class__.__name__]) logger = logging.getLogger(loggername) logger.setLevel(logging.DEBUG) return logger def _lowmass_powerlaw(self, m): """Evaluate the low-mass power law at m.""" return powerlaw(m, self.lowmass_powerlaw_norm, self.lowmass_powerlaw_index) def _get_imf_random_sample(self, m_min, m_max, samplesize): """Generate a random sampling of the IMF of size samplesize, between m_min and m_max. Compute the values imf_arr of the IMF at masses imf_mass_arr, then take a random sample randomsample from imf_mass_arr with imf_arr as weights. Parameters ---------- m_min : float Minimum mass for sampling m_max : float Maximum mass for sampling. samplesize : float Size of sample. Returns ------- imf_mass_arr : numpy array Masses on which the IGIMF was computed. imf_arr : numpy array IGIMF values computed on imf_mass_arr. randomsample : numpy array Mass sample from the IGIMF. """ self.logger.debug(f'Starting IGIMF random sampling: n={samplesize}, mmin={m_min} Msun, mmax = {m_max} Msun.') time0 = time() if self.invariant_imf: total_star_mass = self.sfr * 1e7 imf = Star(total_star_mass, feh=self.feh, invariant=True) imf.get_mmax_k() else: imf = IGIMF(self.sfr, self.feh) imf.set_clusters() self.logger.debug(f'IMF created with logSFR = {np.log10(self.sfr)} Msun yr-1 and [Fe/H] = {self.feh}') randomsampler = RandomSampling(imf) self.logger.debug(f'Created random sampler.') randomsampler.compute_imf() randomsample = randomsampler.get_sample(m_min, m_max, samplesize).astype(np.float32) imf_mass_arr = randomsampler.discretization_masses imf_arr = randomsampler.discrete_imf time1 = time() - time0 self.logger.debug(f'IMF random sampling completed in {time1:.6f} s.') return imf_mass_arr, imf_arr, randomsample def _low_high_mass_area_diff(self, lowmass_index, highmass_spline, highmass_area): """Compute the difference in area between the power law IMF at low masses and the IGIMF at high masses.""" if lowmass_index < -1 or lowmass_index > 0: area_diff = 1e7 else: lowmass_norm = highmass_spline(self.m1_min) / self.m1_min ** lowmass_index lowmass_area = lowmass_norm * (self.m1_min ** (lowmass_index + 1) - self.m_min ** (lowmass_index + 1)) / (lowmass_index + 1) area_diff = np.abs(highmass_area - lowmass_area) return area_diff def _set_lowmass_powerlaw(self, highmass_mass_arr, highmass_igimf_arr): """Sets the power law IMF at low masses such that its area is the same as the IGIMF's at high masses.""" self.logger.debug('Fitting IMF m < 0.8 power law.') time0 = time() hmass_spline = UnivariateSpline(highmass_mass_arr, highmass_igimf_arr, k=3) hmass_area = hmass_spline.integral(self.m1_min, self.m_max) self.lowmass_powerlaw_index = fmin(self._low_high_mass_area_diff, x0=-0.3, args=(hmass_spline, hmass_area), disp=False)[0] self.lowmass_powerlaw_norm = hmass_spline(self.m1_min) / self.m1_min ** self.lowmass_powerlaw_index time1 = time() - time0 self.logger.debug(f'Power law fitted with a = {self.lowmass_powerlaw_index},' \ f'k = {self.lowmass_powerlaw_norm} in {time1:.6f} s.')
[docs] def set_sampling_pool(self): """Set the mass pool from which to draw the sample. Set the mass pool from which to draw the final sample as a random sampling of poolsize/2 from the IGIMF, above m1_min; and of a power law with the same area between m_min and m1_min as the IGIMF between m1_min and m_max, below m1_min. Notes ----- Because the primary mass sampling is limited to m1 >= m1_min, in any case the IMF cannot be reproduced in the m < m1_min region; at the same time, an IMF at < m1_min is still necessary for the sampling of light companions. Thus the IMF for m < m1_min is defined to be a power law continuous with the IGIMF at m >= m1_min, with a slope such that its area below m1_min is the same as that of the IGIMF above. This choice is made in order to conform with the drawing of the same amount poolsize/2 of masses from both sides of m1_min. """ self.logger.debug('Setting sampling pool...') time0 = time() hmass_pool_size = int(self.poolsize / 2) lmass_pool_size = int(self.poolsize - hmass_pool_size) imf_mass_arr, imf_arr, hmass_pool = self._get_imf_random_sample(self.m1_min, self.m_max, hmass_pool_size) self.logger.debug('Got m > 0.8 sampling pool.') self.igimf_arr = np.array([imf_mass_arr, imf_arr], np.float64) self._set_lowmass_powerlaw(imf_mass_arr, imf_arr) # the IMF sample sets the equal area constraint lmass_mass_options = np.linspace(self.m_min, self.m1_min, 10 * lmass_pool_size, dtype=np.float32) lmass_option_probs = np.array([self._lowmass_powerlaw(m) for m in lmass_mass_options]) lmass_option_probs /= np.sum(lmass_option_probs) lmass_pool = np.random.choice(lmass_mass_options, p=lmass_option_probs, size=lmass_pool_size) lmass_pool = np.sort(lmass_pool) self.logger.debug('Got m < 0.8 sampling pool.') self.sampling_pool = np.concatenate((lmass_pool, hmass_pool)) time1 = time() - time0 self.logger.info(f'Sampling pool set in {time1:.6f} s.')
[docs] def get_sample(self): """Generate the binary sample.""" if self.sampling_pool[-1] == 0.0: warnings.warn('Sampling pool not set up. Please run set_sampling_pool() first.') return self.logger.info('Getting sample...') time0 = time() # The ZAMSSystemGenerator class samples the parameters of individual binaries, with the masses being taken from # imf_array. The innermost pair is returned in the case of higher order multiples, but all companion masses are # removed from imf_array and taken into account in the total system mass. systemgenerator = ZAMSSystemGenerator(imf_array=self.sampling_pool, pairs_table_path=self.pairs_table_path, qe_max_tries=self.qe_max_tries, dmcomp_tol=0.05, parent_logger=self.logger) self.logger.info(f'Started ZAMSSystemGenerator with binaries_table_path={self.pairs_table_path},' \ f'eq_max_tries = {self.qe_max_tries} and dm2tol = {0.05}.') systemgenerator.setup_sampler() # The MultipleFraction class provides the probability distribution of the number of companions as a function of # primary mass. multiple_fractions = MultipleFraction(mmin=self.m_min, mmax=self.m_max, nmax=self.max_comp_number, only_binaries=self.only_binaries) multiple_fractions.solve() ncomp_options = np.arange(0, self.max_comp_number+1, 1) self.logger.info('Starting sampling loop.') self.sample = np.empty((0, self.col_n), np.float32) sample_list = [] if self.print_progress: prev_progress = 0 progress_update_step = 0.01 iteration_counter = 0 progress = 0 prog_norm = systemgenerator.m1array_n fail_counter = 0 start_time = time() iteration_timer = start_time # systemgenerator keeps track of the remaining number of m1 options as m1array_n. while systemgenerator.m1array_n > 0: # The code below draws m1 as an index of imf_array, randomly taken from the available range. if not self.prioritize_high_masses: m1choice_i = np.random.randint(0, systemgenerator.m1array_n, 1)[0] # Alternatively, the code below prioritizes drawing the highest available masses first. else: m1ops = np.arange(0, systemgenerator.m1array_n, 1) if systemgenerator.m1array_n > 1: m1ps = m1ops/m1ops.sum() else: m1ps = [1] m1choice_i = np.random.choice(m1ops, size=1, p=m1ps)[0] # All binary parameters are taken from a table structured in PyTable Groups, for each primary mass; and # Tables within Groups, for each orbital period. Table lines contain equiprobable mass ratio, eccentricity # pairs. systemgenerator.open_m1group(m1choice_i) # m1choice is drawn from imf_array, while m1_table is its closest counterpart in m1group. For m1group the # mean number of companions is calculated and defines the probability distribution from which a companion # number is drawn for this primary. ncomp_mean = multiple_fractions.ncomp_mean(systemgenerator.m1_table) nmean_probs = multiple_fractions.prob(ncomp_mean, ncomp_options) ncomp = np.random.choice(ncomp_options, p=nmean_probs, size=1)[0] # All binary parameters must be taken from the referred to table, but masses are drawn from imf_array. A # system is only accepted if the array and table masses pass a tolerance test. If they do not, inner_binary # is an empty list, and system mass is 0. #inner_binary, system_mass = systemgenerator.sample_system(ncomp=ncomp) sampled_pairs = systemgenerator.sample_system(ncomp=ncomp, ncomp_max=self.max_comp_number) #inner_binary = inner_binary.flatten() #if len(inner_binary) != 0: if len(sampled_pairs) != 0: # While only the inner binary joins the sample, the actual corresponding total system mass is kept track # of. This quantity is important for normalizing the frequency of particular events within a population. #self.sample = np.append(self.sample, inner_binary, axis=0) #sample_list.append(inner_binary) sample_list.append(sampled_pairs) #self.sample_mass += system_mass progress = 1 - systemgenerator.m1array_n / prog_norm fail_counter = 0 else: fail_counter += 1 # Eventually imf_array becomes exhausted between 0.8 and 15 Msun, and the remaining primary mass options # (which are all > 15 Msun) have no valid companion masses above 0.8 Msun (because mass ratio > 0.1). # This can lead the sampler to stall if the mass ratio distribution does not pair very massive stars. if fail_counter == self.max_m1_draws: minm1 = systemgenerator.highmass_imf_array.min() maxm2 = systemgenerator.lowmass_imf_array.max() self.logger.info(f'No valid m_comp after {fail_counter} iterations, with minimum sampling pool m1 ' f'as {minm1} and maximum m_comp as {maxm2}. Ending sampling with ' f'{systemgenerator.m1array_n} remaining m1 options.') break # This sections updates the user on progress based on exhaustion of m1 options and estimates roughly the # remaining time. progress_update_step controls the frequency of updates. if self.print_progress: iteration_counter += 1 if progress > prev_progress + progress_update_step: elapsed_time = time() - start_time iteration_time = time() - iteration_timer overall_m1_mean_exh_rate = (prog_norm - systemgenerator.m1array_n) / elapsed_time m1_mean_exh_rate = prog_norm*progress_update_step / iteration_time expected_time = systemgenerator.m1array_n / overall_m1_mean_exh_rate self.logger.info(f'Progress: {(100*progress):.4f}% {format_time(elapsed_time)}<' \ f'{format_time(expected_time)} at {m1_mean_exh_rate:.2f} M1 options / s' \ f'({iteration_counter/iteration_time:.2f} iterations/s).') prev_progress = progress iteration_counter = 0 iteration_timer = time() self.sample = np.array(sample_list, np.float32) del(sample_list) gc.collect() # make sure memory is freed total_time = time() - start_time self.logger.info(f'Sampling loop completed in {total_time:.6f} s.') systemgenerator.close_pairs_table() time1 = time() - time0 self.logger.info(f'Sampling completed in {time1:.6f} s with {len(self.sample)} binaries.') return self.sample #, self.sample_mass
[docs] def save_sample(self): if self.sample is None: warnings.warn('No sample to save.') return self.logger.info('Saving sample...') df = pd.DataFrame(self.sample, columns=self.sample_columns) df.to_parquet(path=self.save_path, engine='pyarrow', compression='snappy') self.logger.info(f'Sample saved to {self.save_path}.')
[docs] class CompositeBinaryPopulation: """Sample binary populations for a grid of galaxies.""" def __init__(self, galaxy_grid_path, mmin, mmax, max_comp_number, mass_poolsize, qe_max_tries, only_binaries=False, invariant_imf=False, correlated_orbital_parameters=True, parent_logger=None, n_parallel_processes=int(0.9 * PHYSICAL_CORE_COUNT), memory_limit=0.8*TOTAL_PHYSICAL_MEMORY, save_dir=None): self.galaxy_grid_path = galaxy_grid_path self.mmin = mmin self.mmax = mmax self.max_comp_number = max_comp_number self.only_binaries = only_binaries self.invariant_imf = invariant_imf self.correlated_orbital_parameters = correlated_orbital_parameters self.mass_poolsize = mass_poolsize self.qe_max_tries = qe_max_tries self.n_parallel_processes = n_parallel_processes self.memory_limit = memory_limit self.logger = self._get_logger(parent_logger) self.grid_arr = None self.save_dir = save_dir def _get_logger(self, parent_logger): if parent_logger is None: loggername = '.'.join([__name__, self.__class__.__name__]) log_path = Path(LOG_PATH, loggername, datetime.now().strftime('%d-%m-%Y_%H:%M:%S.log')) log_path.parent.mkdir(parents=True, exist_ok=True) logger = create_logger(name=loggername, fpath=log_path, propagate=False) else: loggername = '.'.join([__name__, self.__class__.__name__]) logger = logging.getLogger(name=loggername) logger.setLevel(logging.DEBUG) return logger def _get_zams_sampler(self, redshift, logsfr, feh, zoh=None, mgal=None, logn_gal=None): galaxy_descr_list = [] if zoh is not None: zoh_descr = f'1e4ZOH={int(1e4 * zoh)}' galaxy_descr_list.append(zoh_descr) if mgal is not None: mgal_descr = f'1e2Mgal={int(1e2 * mgal)}' galaxy_descr_list.append(mgal_descr) if logn_gal is not None: logn_gal_descr = f'1e2logn={int(1e2 * logn_gal)}' galaxy_descr_list.append(logn_gal_descr) galaxy_descr = '_'.join(galaxy_descr_list) zams_sampler = SimpleBinaryPopulation(redshift=redshift, sfr=10 ** logsfr, feh=feh, m_min=self.mmin, m_max=self.mmax, max_comp_number=self.max_comp_number, only_binaries=self.only_binaries, invariant_imf=self.invariant_imf, correlated_orbital_parameters=self.correlated_orbital_parameters, poolsize=self.mass_poolsize, qe_max_tries=self.qe_max_tries, galaxy_descriptor=galaxy_descr, parent_logger=self.logger, save_dir=self.save_dir) return zams_sampler
[docs] def load_grid(self): grid_df = pd.read_pickle(self.galaxy_grid_path) grid_arr = np.empty((len(grid_df), 6), np.float64) grid_arr[:, 0] = grid_df['Redshift'].to_numpy() grid_arr[:, 1] = grid_df['Log(SFR [Msun yr-1])'].to_numpy() grid_arr[:, 2] = grid_df['[Fe/H]'].to_numpy() grid_arr[:, 3] = grid_df['12+log(O/H)'].to_numpy() grid_arr[:, 4] = grid_df['Log(Mgal/Msun)'].to_numpy() grid_arr[:, 5] = grid_df['Log(Number density [Mpc-3 Msun-1])'].to_numpy() self.grid = list(enumerate(grid_arr)) self.logger.info(f'Loaded grid {self.galaxy_grid_path}.')
def _get_sample(self, sampler_tuple): sampler_id, sampler_param_arr = sampler_tuple # Sampler_id is an int denoting the line number of the sample in the original galaxygrid file. While at present # it is not used for anything, it could be employed as way to keep track of which galaxy is being sampled. zams_sampler = self._get_zams_sampler(*sampler_param_arr) self.logger.debug(f'Got sampler {sampler_id}.') if zams_sampler.save_path.exists(): self.logger.warning(f'File {zams_sampler.save_path} exists. Skipping...') else: zams_sampler.set_sampling_pool() zams_sampler.get_sample() zams_sampler.save_sample()
[docs] def sample_grid(self): self.logger.info(f'Calling ProcessPoolExecutor with {self.n_parallel_processes} workers.') with ProcessPoolExecutor(self.n_parallel_processes) as executor: futures = executor.map(self._get_sample, self.grid) for _ in futures: pass