"""
fastspecfit.util
================
General utilities.
"""
import numpy as np
import numba
from desiutil.log import get_logger
log = get_logger()
try: # this fails when building the documentation
from scipy import constants
C_LIGHT = constants.c / 1000.0 # [km/s]
except:
C_LIGHT = 299792.458 # [km/s]
[docs]
def mwdust_transmission(ebv, filtername):
"""Convert SFD E(B-V) value to dust transmission 0-1 given the bandpass.
Args:
ebv (float or array-like): SFD E(B-V) value(s)
filtername (str): Filter name, e.g., 'decam2014-r'.
Returns:
Scalar or array (same as ebv input), Milky Way dust transmission 0-1.
Note:
This function tabulates the total-to-selective extinction ratio,
k_X=A(X)/E(B-V) for many different filter bandpasses, X, where
A(X)=-2.5*log10(transmission in X band). And so given the ebv, it
returns mwdust_transmission=10**(-0.4*k_X*ebv).
Returns:
scalar, total extinction A(band) = -2.5*log10(transmission(band))
Notes:
Based on `desiutil.dust.mwdust_transmission`.
"""
k_X = {
# From https://desi.lbl.gov/trac/wiki/ImagingStandardBandpass
# DECam u 3881.6 3.994
# DECam g 4830.8 3.212
# DECam r 6409.0 2.164
# DECam i 7787.5 1.591
# DECam z 9142.7 1.211
# DECam Y 9854.5 1.063
# BASS g 4772.1 3.258
# BASS r 6383.6 2.176
# MzLS z 9185.1 1.199
# Consistent with the synthetic magnitudes and function dust_transmission
'BASS-g': 3.258,
'BASS-r': 2.176,
'MzLS-z': 1.199,
'decam2014-u': 3.994,
'decam2014-g': 3.212,
'decam2014-r': 2.164,
'decam2014-i': 1.591,
'decam2014-z': 1.211,
'decam2014-Y': 1.063,
# Anand Raichoor, private communication
'cfht_megacam-u': 4.01,
'cfht_megacam-ustar': 4.12,
'odin-N419': 4.324,
'odin-N501': 3.540,
'odin-N673': 2.438,
'hsc2017-g': 3.24,
'hsc2017-r': 2.276,
'hsc2017-r2': 2.276,
'hsc2017-i': 1.633,
'hsc2017-i2': 1.633,
'hsc2017-z': 1.263,
'hsc2017-y': 1.075,
'suprime-IB427': 4.202,
'suprime-IB464': 3.894,
'suprime-IB484': 3.694,
'suprime-IB505': 3.490,
'suprime-IB527': 3.304,
# Add WISE from
# https://github.com/dstndstn/tractor/blob/main/tractor/sfd.py#L23-L35
'wise2010-W1': 0.184,
'wise2010-W2': 0.113,
'wise2010-W3': 0.0241,
'wise2010-W4': 0.00910,
}
if filtername not in k_X.keys():
errmsg = f'Filtername {filtername} is missing from dictionary of known bandpasses!'
log.critical(errmsg)
raise ValueError(errmsg)
A_X = k_X[filtername] * ebv
transmission = 10**(-0.4 * A_X)
return transmission
[docs]
def ivar2var(ivar, clip=1e-3, sigma=False, allmasked_ok=False):
"""Safely convert an inverse variance to a variance. Note that we clip at 1e-3
by default, not zero.
"""
var = np.zeros_like(ivar)
goodmask = ivar > clip # True is good
if np.count_nonzero(goodmask) == 0:
# Try clipping at zero.
goodmask = ivar > 0 # True is good
if np.count_nonzero(goodmask) == 0:
if allmasked_ok:
return var, goodmask
errmsg = 'All values are masked!'
log.critical(errmsg)
raise ValueError(errmsg)
var[goodmask] = 1 / ivar[goodmask]
if sigma:
var = np.sqrt(var) # return a sigma
return var, goodmask
[docs]
def air2vac(airwave):
"""http://www.astro.uu.se/valdwiki/Air-to-vacuum%20conversion"""
if airwave <= 0:
raise ValueError('Input wavelength is not defined.')
ss = 1e4 / airwave
nn = 1 + 0.00008336624212083 + 0.02408926869968 / (130.1065924522 - ss**2) + 0.0001599740894897 / (38.92568793293 - ss**2)
return airwave * nn
[docs]
def centers2edges(centers):
"""Convert bin centers to bin edges, guessing at what you probably meant
Args:
centers (array): bin centers,
Returns:
array: bin edges, lenth = len(centers) + 1
"""
centers = np.asarray(centers)
edges = np.zeros(len(centers)+1)
#- Interior edges are just points half way between bin centers
edges[1:-1] = (centers[0:-1] + centers[1:]) / 2.0
#- edge edges are extrapolation of interior bin sizes
edges[0] = centers[0] - (centers[1]-edges[1])
edges[-1] = centers[-1] + (centers[-1]-edges[-2])
return edges
# This code is purposely written in a very "C-like" way. The logic
# being that it may help numba optimization and also makes it easier
# if it ever needs to be ported to Cython. Actually Cython versions
# of this code have already been tested and shown to perform no better
# than numba on Intel haswell and KNL architectures.
@numba.jit(nopython=True)
def _trapz_rebin(x, y, edges, results):
'''
Numba-friendly version of trapezoidal rebinning
See redrock.rebin.trapz_rebin() for input descriptions.
`results` is pre-allocated array of length len(edges)-1 to keep results
'''
nbin = len(edges) - 1
i = 0 #- index counter for output
j = 0 #- index counter for inputs
yedge = 0.0
area = 0.0
while i < nbin:
#- Seek next sample beyond bin edge
while x[j] <= edges[i]:
j += 1
#- What is the y value where the interpolation crossed the edge?
yedge = y[j-1] + (edges[i]-x[j-1]) * (y[j]-y[j-1]) / (x[j]-x[j-1])
#- Is this sample inside this bin?
if x[j] < edges[i+1]:
area = 0.5 * (y[j] + yedge) * (x[j] - edges[i])
results[i] += area
#- Continue with interior bins
while x[j+1] < edges[i+1]:
j += 1
area = 0.5 * (y[j] + y[j-1]) * (x[j] - x[j-1])
results[i] += area
#- Next sample will be outside this bin; handle upper edge
yedge = y[j] + (edges[i+1]-x[j]) * (y[j+1]-y[j]) / (x[j+1]-x[j])
area = 0.5 * (yedge + y[j]) * (edges[i+1] - x[j])
results[i] += area
#- Otherwise the samples span over this bin
else:
ylo = y[j] + (edges[i]-x[j]) * (y[j] - y[j-1]) / (x[j] - x[j-1])
yhi = y[j] + (edges[i+1]-x[j]) * (y[j] - y[j-1]) / (x[j] - x[j-1])
area = 0.5 * (ylo+yhi) * (edges[i+1]-edges[i])
results[i] += area
i += 1
for i in range(nbin):
results[i] /= edges[i+1] - edges[i]
return
[docs]
def trapz_rebin(x, y, xnew=None, edges=None):
"""Rebin y(x) flux density using trapezoidal integration between bin edges
Notes:
y is interpreted as a density, as is the output, e.g.
>>> x = np.arange(10)
>>> y = np.ones(10)
>>> trapz_rebin(x, y, edges=[0,2,4,6,8]) #- density still 1, not 2
array([ 1., 1., 1., 1.])
Args:
x (array): input x values.
y (array): input y values.
edges (array): (optional) new bin edges.
Returns:
array: integrated results with len(results) = len(edges)-1
Raises:
ValueError: if edges are outside the range of x or if len(x) != len(y)
"""
if edges is None:
edges = centers2edges(xnew)
else:
edges = np.asarray(edges)
if edges[0] < x[0] or x[-1] < edges[-1]:
raise ValueError('edges must be within input x range')
result = np.zeros(len(edges)-1, dtype=np.float64)
_trapz_rebin(x, y, edges, result)
return result
[docs]
class ZWarningMask(object):
"""
Mask bit definitions for zwarning.
WARNING on the warnings: not all of these are implemented yet.
#- TODO: Consider using something like desispec.maskbits to provide a more
#- convenient wrapper class (probably copy it here; don't make a dependency)
#- That class as-is would bring in a yaml dependency.
"""
SKY = 2**0 #- sky fiber
LITTLE_COVERAGE = 2**1 #- too little wavelength coverage
SMALL_DELTA_CHI2 = 2**2 #- chi-squared of best fit is too close to that of second best
NEGATIVE_MODEL = 2**3 #- synthetic spectrum is negative
MANY_OUTLIERS = 2**4 #- fraction of points more than 5 sigma away from best model is too large (>0.05)
Z_FITLIMIT = 2**5 #- chi-squared minimum at edge of the redshift fitting range
NEGATIVE_EMISSION = 2**6 #- a QSO line exhibits negative emission, triggered only in QSO spectra, if C_IV, C_III, Mg_II, H_beta, or H_alpha has LINEAREA + 3 * LINEAREA_ERR < 0
UNPLUGGED = 2**7 #- the fiber was unplugged/broken, so no spectrum obtained
BAD_TARGET = 2**8 #- catastrophically bad targeting data
NODATA = 2**9 #- No data for this fiber, e.g. because spectrograph was broken during this exposure (ivar=0 for all pixels)
BAD_MINFIT = 2**10 #- Bad parabola fit to the chi2 minimum
POORDATA = 2**11 #- Poor input data quality but try fitting anyway
#- The following bits are reserved for experiment-specific post-redrock
#- afterburner updates to ZWARN; redrock commits to *not* setting these bits
RESERVED16 = 2**16
RESERVED17 = 2**17
RESERVED18 = 2**18
RESERVED19 = 2**19
RESERVED20 = 2**20
RESERVED21 = 2**21
RESERVED22 = 2**22
RESERVED23 = 2**23
@classmethod
def flags(cls):
flagmask = list()
for key, value in cls.__dict__.items():
if not key.startswith('_') and key.isupper():
flagmask.append((key, value))
import numpy as np
isort = np.argsort([x[1] for x in flagmask])
flagmask = [flagmask[i] for i in isort]
return flagmask
[docs]
def find_minima(x):
"""Return indices of local minima of x, including edges.
The indices are sorted small to large.
Note:
this is somewhat conservative in the case of repeated values:
find_minima([1,1,1,2,2,2]) -> [0,1,2,4,5]
Args:
x (array-like): The data array.
Returns:
(array): The indices.
"""
x = np.asarray(x)
ii = np.where(np.r_[True, x[1:]<=x[:-1]] & np.r_[x[:-1]<=x[1:], True])[0]
jj = np.argsort(x[ii])
return ii[jj]
[docs]
def minfit(x, y, return_coeff=False):
"""Fits y = y0 + ((x-x0)/xerr)**2
See redrock.zwarning.ZWarningMask.BAD_MINFIT for zwarn failure flags
Args:
x (array): x values.
y (array): y values.
Returns:
(tuple): (x0, xerr, y0, zwarn) where zwarn=0 is good fit.
"""
a, b, c = 0., 0., 0.
if len(x) < 3:
if return_coeff:
return (-1,-1,-1,ZWarningMask.BAD_MINFIT,(a,b,c))
else:
return (-1,-1,-1,ZWarningMask.BAD_MINFIT)
try:
#- y = a x^2 + b x + c
a,b,c = np.polyfit(x,y,2)
except np.linalg.LinAlgError:
if return_coeff:
return (-1,-1,-1,ZWarningMask.BAD_MINFIT,(a,b,c))
else:
return (-1,-1,-1,ZWarningMask.BAD_MINFIT)
if a == 0.0:
if return_coeff:
return (-1,-1,-1,ZWarningMask.BAD_MINFIT,(a,b,c))
else:
return (-1,-1,-1,ZWarningMask.BAD_MINFIT)
#- recast as y = y0 + ((x-x0)/xerr)^2
x0 = -b / (2*a)
y0 = -(b**2) / (4*a) + c
zwarn = 0
if (x0 <= np.min(x)) or (np.max(x) <= x0):
zwarn |= ZWarningMask.BAD_MINFIT
if (y0<=0.):
zwarn |= ZWarningMask.BAD_MINFIT
if a > 0.0:
xerr = 1 / np.sqrt(a)
else:
xerr = 1 / np.sqrt(-a)
zwarn |= ZWarningMask.BAD_MINFIT
if return_coeff:
return (x0, xerr, y0, zwarn,(a,b,c))
else:
return (x0, xerr, y0, zwarn)
[docs]
class TabulatedDESI(object):
"""
Class to load tabulated z->E(z) and z->comoving_radial_distance(z) relations within DESI fiducial cosmology
(in LSS/data/desi_fiducial_cosmology.dat) and perform the (linear) interpolations at any z.
>>> cosmo = TabulatedDESI()
>>> distance = cosmo.comoving_radial_distance([0.1, 0.2])
>>> efunc = cosmo.efunc(0.3)
The cosmology is defined in https://github.com/abacusorg/AbacusSummit/blob/master/Cosmologies/abacus_cosm000/CLASS.ini
and the tabulated file was obtained using https://github.com/adematti/cosmoprimo/blob/main/cosmoprimo/fiducial.py.
Note
----
Redshift interpolation range is [0, 100].
"""
def __init__(self):
from importlib import resources
cosmofile = resources.files('fastspecfit').joinpath('data/desi_fiducial_cosmology.dat')
self._z, self._efunc, self._comoving_radial_distance = np.loadtxt(cosmofile, comments='#', usecols=None, unpack=True)
self.H0 = 100.0
self.h = self.H0 / 100
self.hubble_time = 3.08567758e19 / 3.15576e16 / self.H0 # Hubble time [Gyr]
[docs]
def efunc(self, z):
r"""Return :math:`E(z)`, where the Hubble parameter is defined as :math:`H(z) = H_{0} E(z)`, unitless."""
z = np.asarray(z)
mask = (z < self._z[0]) | (z > self._z[-1])
if mask.any():
raise ValueError('Input z outside of tabulated range.')
return np.interp(z, self._z, self._efunc, left=None, right=None)
[docs]
def comoving_radial_distance(self, z):
r"""Return comoving radial distance, in :math:`\mathrm{Mpc}/h`."""
z = np.asarray(z)
mask = (z < self._z[0]) | (z > self._z[-1])
if mask.any():
raise ValueError('Input z outside of tabulated range.')
return np.interp(z, self._z, self._comoving_radial_distance, left=None, right=None)
[docs]
def luminosity_distance(self, z):
r"""Return luminosity distance, in :math:`\mathrm{Mpc}/h`."""
return self.comoving_radial_distance(z) * (1.0+z)
[docs]
def distance_modulus(self, z):
"""Return the distance modulus at the given redshift (Hogg Eq. 24)."""
return 5. * np.log10(self.luminosity_distance(z)) + 25
[docs]
def universe_age(self, z):
"""Return the age of the universe at the given redshift.
"""
from scipy.integrate import quad
def _agefunc(z):
return 1.0 / self.efunc(z) / (1.0 + z)
if np.isscalar(z):
integ, _ = quad(_agefunc, z, self._z[-1])
return integ * self.hubble_time
else:
age = []
for _z in z:
integ, _ = quad(_agefunc, _z, self._z[-1])
age.append(integ * self.hubble_time)
return np.array(age)