Source code for fastspecfit.util

"""
fastspecfit.util
================

General utilities.

"""
import numpy as np
from numba import jit

from fastspecfit.logger import log

try:  # this fails when building the documentation
    from scipy import constants
    C_LIGHT = constants.c / 1000.0  # [km/s]
except:
    C_LIGHT = 299792.458  # [km/s]

FLUXNORM = 1e17  # flux normalization factor for all DESI spectra [erg/s/cm2/A]


[docs] class BoxedScalar(object): """ A BoxedScalar is an item of an Numpy structured scalar type that is initialized to all zeros and can then be passed around by reference. Access the .value field to unbox the scalar. """ def __init__(self, dtype): self.value = np.zeros(1, dtype=dtype)[0] def __getitem__(self, key): return self.value[key] def __setitem__(self, key, v): self.value[key] = v
[docs] class MPPool(object): """ A Pool encapsulates paraallel execution with a multiprocessing.Pool, falling back to sequential execution in the current process if just one worker is requested. Unlike multiprocessing.Pool, our starmap() function takes a list of keyword argument dictionaries, rather than a list of positional arguments. """ def __init__(self, nworkers, initializer=None, init_argdict=None): """ create a pool with nworkers workers, using the current process if nworkers is 1. If initiializer is not None, apply this function to the arguments in keyword dictionary init_argdict on startup in each each worker subprocess. """ initfunc = None if initializer is None else self.apply_to_dict if nworkers > 1: #try: # from mpi4py.futures import MPIPoolExecutor as Pool #except: # from multiprocessing import Pool from multiprocessing import Pool self.pool = Pool(nworkers, initializer=initfunc, initargs=(initializer, init_argdict,)) else: self.pool = None
[docs] def starmap(self, func, argdicts): """ apply function func to each of a list of inputs, represented as a list of keyword argument dictionaries. """ # we cannot pickle a local function, so we must pass # both func and the argument dictionary to the subprocess # worker and have it apply one to the other. wrapped_args = [ ( func, a, ) for a in argdicts ] if self.pool is not None: out = self.pool.starmap(self.apply_to_dict, wrapped_args) else: from itertools import starmap out = starmap(self.apply_to_dict, wrapped_args) return out
[docs] def close(self): """ close our multiprocess pool if we created one """ if self.pool is not None: self.pool.close()
@staticmethod def apply_to_dict(f, argdict): return f(**argdict)
[docs] class ZWarningMask(object): """ Mask bit definitions for zwarning. Taken from Redrock/0.15.4 WARNING on the warnings: not all of these are implemented yet. #- TODO: Consider using something like desispec.maskbits to provide a more #- convenient wrapper class (probably copy it here; don't make a dependency) #- That class as-is would bring in a yaml dependency. """ SKY = 2**0 #- sky fiber LITTLE_COVERAGE = 2**1 #- too little wavelength coverage SMALL_DELTA_CHI2 = 2**2 #- chi-squared of best fit is too close to that of second best NEGATIVE_MODEL = 2**3 #- synthetic spectrum is negative MANY_OUTLIERS = 2**4 #- fraction of points more than 5 sigma away from best model is too large (>0.05) Z_FITLIMIT = 2**5 #- chi-squared minimum at edge of the redshift fitting range NEGATIVE_EMISSION = 2**6 #- a QSO line exhibits negative emission, triggered only in QSO spectra, if C_IV, C_III, Mg_II, H_beta, or H_alpha has LINEAREA + 3 * LINEAREA_ERR < 0 UNPLUGGED = 2**7 #- the fiber was unplugged/broken, so no spectrum obtained BAD_TARGET = 2**8 #- catastrophically bad targeting data NODATA = 2**9 #- No data for this fiber, e.g. because spectrograph was broken during this exposure (ivar=0 for all pixels) BAD_MINFIT = 2**10 #- Bad parabola fit to the chi2 minimum POORDATA = 2**11 #- Poor input data quality but try fitting anyway @classmethod def flags(cls): flagmask = list() for key, value in cls.__dict__.items(): if not key.startswith('_') and key.isupper(): flagmask.append((key, value)) import numpy as np isort = np.argsort([x[1] for x in flagmask]) flagmask = [flagmask[i] for i in isort] return flagmask
[docs] def mwdust_transmission(ebv, filtername): """Convert SFD E(B-V) value to dust transmission 0-1 given the bandpass. Args: ebv (float or array-like): SFD E(B-V) value(s) filtername (str): Filter name, e.g., 'decam2014-r'. Returns: Scalar or array (same as ebv input), Milky Way dust transmission 0-1. Note: This function tabulates the total-to-selective extinction ratio, k_X=A(X)/E(B-V) for many different filter bandpasses, X, where A(X)=-2.5*log10(transmission in X band). And so given the ebv, it returns mwdust_transmission=10**(-0.4*k_X*ebv). Returns: scalar, total extinction A(band) = -2.5*log10(transmission(band)) Notes: Based on `desiutil.dust.mwdust_transmission`. """ k_X = { # From https://desi.lbl.gov/trac/wiki/ImagingStandardBandpass # DECam u 3881.6 3.994 # DECam g 4830.8 3.212 # DECam r 6409.0 2.164 # DECam i 7787.5 1.591 # DECam z 9142.7 1.211 # DECam Y 9854.5 1.063 # BASS g 4772.1 3.258 # BASS r 6383.6 2.176 # MzLS z 9185.1 1.199 # Consistent with the synthetic magnitudes and function dust_transmission 'BASS-g': 3.258, 'BASS-r': 2.176, 'MzLS-z': 1.199, 'decam2014-u': 3.994, 'decam2014-g': 3.212, 'decam2014-r': 2.164, 'decam2014-i': 1.591, 'decam2014-z': 1.211, 'decam2014-Y': 1.063, # Anand Raichoor, private communication 'cfht_megacam-u': 4.01, 'cfht_megacam-ustar': 4.12, 'odin-N419': 4.324, 'odin-N501': 3.540, 'odin-N673': 2.438, 'hsc2017-g': 3.24, 'hsc2017-r': 2.276, 'hsc2017-r2': 2.276, 'hsc2017-i': 1.633, 'hsc2017-i2': 1.633, 'hsc2017-z': 1.263, 'hsc2017-y': 1.075, 'suprime-IB427': 4.202, 'suprime-IB464': 3.894, 'suprime-IB484': 3.694, 'suprime-IB505': 3.490, 'suprime-IB527': 3.304, # Add WISE from # https://github.com/dstndstn/tractor/blob/main/tractor/sfd.py#L23-L35 'wise2010-W1': 0.184, 'wise2010-W2': 0.113, 'wise2010-W3': 0.0241, 'wise2010-W4': 0.00910, } if filtername not in k_X: errmsg = f'Filtername {filtername} is missing from dictionary of known bandpasses!' log.critical(errmsg) raise ValueError(errmsg) A_X = k_X[filtername] * ebv transmission = 10.**(-0.4 * A_X) return transmission
[docs] def ivar2var(ivar, clip=1e-3, sigma=False, allmasked_ok=False): """Safely convert an inverse variance to a variance. Note that we clip at 1e-3 by default, not zero. """ var = np.zeros_like(ivar) goodmask = ivar > clip # True is good if np.count_nonzero(goodmask) == 0: # Try clipping at zero. goodmask = ivar > 0 # True is good if np.count_nonzero(goodmask) == 0: if allmasked_ok: return var, goodmask errmsg = 'All values are masked!' log.critical(errmsg) raise ValueError(errmsg) var[goodmask] = 1 / ivar[goodmask] if sigma: var = np.sqrt(var) # return a sigma return var, goodmask
# currently unused - JDB
[docs] def air2vac(airwave): """http://www.astro.uu.se/valdwiki/Air-to-vacuum%20conversion""" if airwave <= 0: raise ValueError('Input wavelength is not defined.') ss = 1e4 / airwave nn = 1 + 0.00008336624212083 + 0.02408926869968 / (130.1065924522 - ss**2) + 0.0001599740894897 / (38.92568793293 - ss**2) return airwave * nn
[docs] def find_minima(x): """Return indices of local minima of x, including edges. The indices are sorted small to large. Note: this is somewhat conservative in the case of repeated values: find_minima([1,1,1,2,2,2]) -> [0,1,2,4,5] Args: x (array-like): The data array. Returns: (array): The indices. """ x = np.asarray(x) ii = np.where(np.r_[True, x[1:]<=x[:-1]] & np.r_[x[:-1]<=x[1:], True])[0] jj = np.argsort(x[ii]) return ii[jj]
[docs] def minfit(x, y, return_coeff=False): """Fits y = y0 + ((x-x0)/xerr)**2 See redrock.zwarning.ZWarningMask.BAD_MINFIT for zwarn failure flags Args: x (array): x values. y (array): y values. Returns: (tuple): (x0, xerr, y0, zwarn) where zwarn=0 is good fit. """ a, b, c = 0., 0., 0. if len(x) < 3: if return_coeff: return (-1,-1,-1,ZWarningMask.BAD_MINFIT,(a,b,c)) else: return (-1,-1,-1,ZWarningMask.BAD_MINFIT) try: #- y = a x^2 + b x + c a,b,c = np.polyfit(x,y,2) except np.linalg.LinAlgError: if return_coeff: return (-1,-1,-1,ZWarningMask.BAD_MINFIT,(a,b,c)) else: return (-1,-1,-1,ZWarningMask.BAD_MINFIT) if a == 0.0: if return_coeff: return (-1,-1,-1,ZWarningMask.BAD_MINFIT,(a,b,c)) else: return (-1,-1,-1,ZWarningMask.BAD_MINFIT) #- recast as y = y0 + ((x-x0)/xerr)^2 x0 = -b / (2*a) y0 = -(b**2) / (4*a) + c zwarn = 0 if (x0 <= np.min(x)) or (np.max(x) <= x0): zwarn |= ZWarningMask.BAD_MINFIT if (y0<=0.): zwarn |= ZWarningMask.BAD_MINFIT if a > 0.0: xerr = 1 / np.sqrt(a) else: xerr = 1 / np.sqrt(-a) zwarn |= ZWarningMask.BAD_MINFIT if return_coeff: return (x0, xerr, y0, zwarn,(a,b,c)) else: return (x0, xerr, y0, zwarn)
# # sigma clipping in Numba # Rewrite basic sigma clipping to avoid # array copies and redundant summation # on each iteration # @jit(nopython=True, nogil=True) def sigmaclip(c, low=3., high=3.): n = len(c) mask = np.full(n, True, dtype=np.bool_) s = np.sum(c) s2 = np.sum(c*c) delta = 1 while delta > 0: mean = s/n # differences very close to zero can end up negative # due to limited floating-point precision std = np.sqrt(np.maximum(s2/n - mean*mean, 0.)) clo = mean - std * low chi = mean + std * high if std == 0.: # don't mask everything break n0 = n for j, cval in enumerate(c): if mask[j] and (cval < clo or cval > chi): mask[j] = False n -= 1 s -= cval s2 -= cval * cval delta = n0-n return c[mask], mask # Numba's quantile impl is much faster # than Numpy's standard version @jit(nopython=True, nogil=True) def quantile(A, q): return np.quantile(A, q) # Numba's median impl is also faster @jit(nopython=True, nogil=True) def median(A): return np.median(A) # Open-coded Numba trapz is much faster than np.traz @jit(nopython=True, nogil=True, fastmath=True) def trapz(y, x): res = 0. for i in range(len(x) - 1): res += (x[i+1] - x[i]) * (y[i+1] + y[i]) return 0.5 * res
[docs] def trapz_rebin(src_x, src_y, bin_centers, out=None, pre=None): """ Resample an signal src_y sampled at points src_x into into a series of x-axis bins, in an area-conserving way Parameters ---------- src_y : :class:`numpy.ndarray` [npix] Input signal src_x : :class:`numpy.ndarray` [npix] array of x coordinates corresponing to src_y bin_centers : :class:`numpy.ndarray` [noutpix] center x coordinates of output bins out: :class:`numpy.ndarray` or None if not None, an array of correct size and type that will receive the result of the computation. If None, a new array will be allocated. pre : preprocessing data computed by trapz_rebin_pre(), if available. If not None, it must correspond to the input bin_centers array. Returns ------- :class:`numpy.ndarray` [noutpix] Resampled signal at centers of each bin """ if pre == None: pre = trapz_rebin_pre(bin_centers) edges, ibw = pre return _trapz_rebin(src_x, src_y, edges, ibw, out)
[docs] def trapz_rebin_pre(bin_centers): """ Perform preprocessing on the bin centers passed to trapz_rebin() to avoid some computations each time it is called with these same centers. Parameters ---------- bin_centers : :class:`numpy.ndarray` [npix] array of bin centers for trapz_rebin() Returns ------- A preprocessing data structure that can be passed as the 'pre' argument of trapz_rebin whenever it is called with the given bin_centers (for *any* src_x and src_y) """ edges = centers2edges(bin_centers) ibw = 1. / np.diff(edges) return (edges, ibw)
@jit(nopython=True, nogil=True, fastmath=True) def _trapz_rebin(x, y, edges, ibw, out): """ Trapezoidal rebinning This implementation is optimized for compilation with Numba. It agrees with the earlier fastspecfit implementation to within around 1e-12 and is somewhat faster. It also tolerates the first bin being arbitrarily greater than the first x without performance loss. ibw is the array of inverse bin widths. We require, as did the original version of the code, that the values of x extend strictly beyond the edges array in both directions. """ # interpolated value of y at edge_x, which lies between x[j] and x[j+1] def y_at(edge_x, j): # j: largest j s.t. x[j] < edge_x return y[j] + (edge_x - x[j]) * (y[j+1] - y[j]) / (x[j+1] - x[j]) if edges[0] < x[0] or x[-1] < edges[-1]: raise ValueError('edges must lie strictly within x range') nbins = len(edges) - 1 results = np.empty(nbins, dtype=y.dtype) if out is None else out # greatest j s.t. x[j] < edges[0] j = np.searchsorted(x, edges[0], 'right') xlo = edges[0] ylo = y_at(xlo, j) # loop invariant: on entry to iteration i, # x[j] is greatest x < edges[i] # xlo = edges[i] # ylo = y_at(edges[i], j) for i in range(nbins): a = 0. while x[j+1] < edges[i+1]: xhi = x[j+1] yhi = y[j+1] # add area from prev boundary to x[j+1] a += (xhi - xlo) * (yhi + ylo) xlo = xhi ylo = yhi j += 1 # partial area up to edge i+1 xhi = edges[i+1] yhi = y_at(xhi, j) a += (xhi - xlo) * (yhi + ylo) xlo = xhi ylo = yhi results[i] = 0.5 * ibw[i] * a return results @jit(nopython=True, nogil=True) def centers2edges(centers): """ Convert bin centers to bin edges, guessing at what you probably meant. Parameters ---------- centers: :class:`numpy.ndarray` bin centers Returns ------- array: bin edges, length = len(centers) + 1 """ edges = np.empty(len(centers) + 1, dtype=centers.dtype) #- Interior edges are just points half way between bin centers edges[1:-1] = 0.5 * (centers[:-1] + centers[1:]) #- edge edges are extrapolation of interior bin sizes edges[0] = centers[0] - (centers[1] - edges[1]) edges[-1] = centers[-1] + (centers[-1] - edges[-2]) return edges