Source code for fastspecfit.util

"""
fastspecfit.util
================

General utilities.

"""
import re
import datetime
import numpy as np
from numba import jit

from fastspecfit.logger import log

try:  # this fails when building the documentation
    from scipy import constants
    C_LIGHT = constants.c / 1000.0  # [km/s]
except:
    C_LIGHT = 299792.458  # [km/s]

FLUXNORM = 1e17 # flux normalization factor for all DESI spectra [erg/s/cm2/A]
MASSNORM = 1e10 # mass normalization factor
NMONTE_DEFAULT = 50

TINY = np.nextafter(0, 1, dtype=np.float32)
SQTINY = np.sqrt(TINY)
F32MAX = np.finfo(np.float32).max


[docs] def fsftime(operation, duration, context=None): """Return a standardized, parseable timing message string. Durations under 60 seconds are reported in seconds; longer durations are reported in minutes. Use :func:`parse_fsftime` to parse these messages back out of log files. Parameters ---------- operation : :class:`str` Short name of the pipeline stage being timed, e.g. ``'continuum_specfit'`` or ``'write_fastspecfit'``. duration : :class:`float` Elapsed wall time in seconds. context : :class:`str` or None, optional Optional comma-separated key=value context string appended in square brackets, e.g. ``'targetid=39627739988231708'``. Returns ------- :class:`str` Formatted timing string ready to pass to a logger. Examples -------- >>> log.info(fsftime('continuum_specfit', 3.45, context='targetid=12345')) fsftime 3.45 sec for continuum_specfit [targetid=12345] at 2026-... >>> log.info(fsftime('fit_all', 125.3, context='nobj=500')) fsftime 2.09 min for fit_all [nobj=500] at 2026-... """ time_str = f'{duration:.2f} sec' if duration < 60. else f'{duration/60.:.2f} min' timestamp = datetime.datetime.now().isoformat(timespec='milliseconds') if context: return f'fsftime {time_str} for {operation} [{context}] at {timestamp}' return f'fsftime {time_str} for {operation} at {timestamp}'
_fsftime_re = re.compile( r'.*fsftime ([\d.]+) (sec|min) for (\S+)(?:\s+\[([^\]]*)\])? at ([\d\-T:.]+)')
[docs] def _uid(data): """Return a log-friendly uniqueid that includes the output basename when set. Use in warning/error messages so SLURM logs can be linked back to a specific healpix output file without changing plot filenames. """ uid = data['uniqueid'] base = data.get('outfile_base') return f'{uid},{base}' if base else str(uid)
[docs] def parse_fsftime(line): """Parse a line for an fsftime timing message produced by :func:`fsftime`. Parameters ---------- line : :class:`str` Log line to parse. Returns ------- :class:`dict` or None Dict with keys ``operation`` (:class:`str`), ``duration_sec`` (:class:`float`), ``context`` (:class:`str` or None), and ``timestamp`` (:class:`str`); or ``None`` if the line does not contain an fsftime message. """ m = _fsftime_re.match(line) if m is None: return None value, unit, operation, context, timestamp = m.groups() duration_sec = float(value) * (60. if unit == 'min' else 1.) return dict(operation=operation, duration_sec=duration_sec, context=context, timestamp=timestamp)
[docs] class BoxedScalar(object): """A zero-initialized NumPy structured scalar that can be passed by reference. Parameters ---------- dtype : dtype-like NumPy dtype of the scalar value. Attributes ---------- value : numpy scalar The boxed value; access via ``.value`` to unbox. """ def __init__(self, dtype): self.value = np.zeros(1, dtype=dtype)[0] def __getitem__(self, key): return self.value[key] def __setitem__(self, key, v): self.value[key] = v
[docs] class MPPool(object): """Parallel execution pool that falls back to sequential for a single worker. Unlike :class:`multiprocessing.Pool`, :meth:`starmap` accepts a list of keyword-argument dictionaries rather than positional arguments. Parameters ---------- nworkers : :class:`int` Number of worker processes. When 1, work runs in the current process. initializer : callable or None, optional Function to call in each worker subprocess at startup. init_argdict : :class:`dict` or None, optional Keyword arguments passed to ``initializer`` at startup. """ def __init__(self, nworkers, initializer=None, init_argdict=None): initfunc = None if initializer is None else self.apply_to_dict # If multiprocessing, create a pool of worker processes and initialize # single-copy objects in each worker. if nworkers > 1: from multiprocessing import Pool self.pool = Pool(nworkers, initializer=initfunc, initargs=(initializer, init_argdict,)) else: self.pool = None
[docs] def starmap(self, func, argdicts): """Apply ``func`` to each keyword-argument dict in ``argdicts``.""" # we cannot pickle a local function, so we must pass # both func and the argument dictionary to the subprocess # worker and have it apply one to the other. wrapped_args = [ ( func, a, ) for a in argdicts ] if self.pool is not None: out = self.pool.starmap(self.apply_to_dict, wrapped_args) else: from itertools import starmap out = starmap(self.apply_to_dict, wrapped_args) return out
[docs] def close(self): """Close the multiprocessing pool if one was created.""" if self.pool is not None: self.pool.close() self.pool.join()
@staticmethod def apply_to_dict(f, argdict): return f(**argdict)
[docs] class ZWarningMask(object): """Redrock ``ZWARN`` bitmask definitions (from Redrock 0.15.4). Not all flags are currently used by fastspecfit. """ SKY = 2**0 #- sky fiber LITTLE_COVERAGE = 2**1 #- too little wavelength coverage SMALL_DELTA_CHI2 = 2**2 #- chi-squared of best fit is too close to that of second best NEGATIVE_MODEL = 2**3 #- synthetic spectrum is negative MANY_OUTLIERS = 2**4 #- fraction of points more than 5 sigma away from best model is too large (>0.05) Z_FITLIMIT = 2**5 #- chi-squared minimum at edge of the redshift fitting range NEGATIVE_EMISSION = 2**6 #- a QSO line exhibits negative emission, triggered only in QSO spectra, if C_IV, C_III, Mg_II, H_beta, or H_alpha has LINEAREA + 3 * LINEAREA_ERR < 0 UNPLUGGED = 2**7 #- the fiber was unplugged/broken, so no spectrum obtained BAD_TARGET = 2**8 #- catastrophically bad targeting data NODATA = 2**9 #- No data for this fiber, e.g. because spectrograph was broken during this exposure (ivar=0 for all pixels) BAD_MINFIT = 2**10 #- Bad parabola fit to the chi2 minimum POORDATA = 2**11 #- Poor input data quality but try fitting anyway @classmethod def flags(cls): flagmask = list() for key, value in cls.__dict__.items(): if not key.startswith('_') and key.isupper(): flagmask.append((key, value)) import numpy as np isort = np.argsort([x[1] for x in flagmask]) flagmask = [flagmask[i] for i in isort] return flagmask
[docs] def mwdust_transmission(ebv, filtername): """Convert SFD E(B-V) to Milky Way dust transmission in a given bandpass. Parameters ---------- ebv : :class:`float` or array-like SFD E(B-V) reddening value(s). filtername : :class:`str` Filter name, e.g. ``'decam2014-r'``. Returns ------- transmission : :class:`float` or :class:`numpy.ndarray` Milky Way dust transmission (0–1), same shape as ``ebv``. Notes ----- Uses tabulated k_X = A(X)/E(B-V) values where transmission = 10^{-0.4 * k_X * ebv}. Based on ``desiutil.dust.mwdust_transmission``. """ k_X = { # From https://desi.lbl.gov/trac/wiki/ImagingStandardBandpass # DECam u 3881.6 3.994 # DECam g 4830.8 3.212 # DECam r 6409.0 2.164 # DECam i 7787.5 1.591 # DECam z 9142.7 1.211 # DECam Y 9854.5 1.063 # BASS g 4772.1 3.258 # BASS r 6383.6 2.176 # MzLS z 9185.1 1.199 # Consistent with the synthetic magnitudes and function dust_transmission 'BASS-g': 3.258, 'BASS-r': 2.176, 'MzLS-z': 1.199, 'decam2014-u': 3.994, 'decam2014-g': 3.212, 'decam2014-r': 2.164, 'decam2014-i': 1.591, 'decam2014-z': 1.211, 'decam2014-Y': 1.063, # Anand Raichoor, private communication 'cfht_megacam-u': 4.01, 'cfht_megacam-ustar': 4.12, 'odin-N419': 4.324, 'odin-N501': 3.540, 'odin-N673': 2.438, 'hsc2017-g': 3.24, 'hsc2017-r': 2.276, 'hsc2017-r2': 2.276, 'hsc2017-i': 1.633, 'hsc2017-i2': 1.633, 'hsc2017-z': 1.263, 'hsc2017-y': 1.075, 'suprime-IB427': 4.202, 'suprime-IB464': 3.894, 'suprime-IB484': 3.694, 'suprime-IB505': 3.490, 'suprime-IB527': 3.304, # Add WISE from # https://github.com/dstndstn/tractor/blob/main/tractor/sfd.py#L23-L35 'wise2010-W1': 0.184, 'wise2010-W2': 0.113, 'wise2010-W3': 0.0241, 'wise2010-W4': 0.00910, } if filtername not in k_X: errmsg = f'Filtername {filtername} is missing from dictionary of known bandpasses!' log.critical(errmsg) raise ValueError(errmsg) A_X = k_X[filtername] * ebv transmission = 10.**(-0.4 * A_X) return transmission
[docs] def var2ivar(var, sigma=False): """Safely convert a scalar variance (or standard deviation) to an inverse variance. Parameters ---------- var : :class:`float` Variance, or standard deviation when ``sigma=True``. sigma : :class:`bool`, optional If ``True``, treat ``var`` as a standard deviation. Default is ``False``. Returns ------- ivar : :class:`float` Inverse variance; 0 when ``var`` is too small to invert safely. """ if sigma: if var > SQTINY: ivar = 1. / var**2 else: ivar = 0. else: if var > TINY: ivar = 1. / var else: ivar = 0. return ivar
[docs] def ivar2var(ivar, clip=1e-8, sigma=False, allmasked_ok=False): """Safely convert an inverse variance array to variance (or sigma). Parameters ---------- ivar : :class:`numpy.ndarray` Inverse variance array. clip : :class:`float`, optional Minimum ``ivar`` value treated as valid. Default is 1e-8. sigma : :class:`bool`, optional If ``True``, return the square root (i.e. standard deviation). Default is ``False``. allmasked_ok : :class:`bool`, optional If ``True``, return zeros rather than raising when all pixels are masked. Default is ``False``. Returns ------- var : :class:`numpy.ndarray` Variance (or sigma) array; zero where ``ivar <= clip``. goodmask : :class:`numpy.ndarray` of bool ``True`` where ``ivar > clip``. """ var = np.zeros_like(ivar) goodmask = ivar > clip # True is good if np.count_nonzero(goodmask) == 0: # Try clipping at zero. goodmask = ivar > 0. # True is good if np.count_nonzero(goodmask) == 0: if allmasked_ok: return var, goodmask errmsg = 'All values are masked!' log.critical(errmsg) raise ValueError(errmsg) var[goodmask] = 1. / ivar[goodmask] if sigma: var = np.sqrt(var) # return a sigma return var, goodmask
# currently unused - JDB
[docs] def air2vac(airwave): """Convert an air wavelength in Angstroms to vacuum wavelength.""" if airwave <= 0: raise ValueError('Input wavelength is not defined.') ss = 1e4 / airwave nn = 1 + 0.00008336624212083 + 0.02408926869968 / (130.1065924522 - ss**2) + 0.0001599740894897 / (38.92568793293 - ss**2) return airwave * nn
[docs] def find_minima(x): """Return indices of local minima of ``x``, including edges. Indices are sorted in ascending order of ``x`` value. Conservative with repeated values: ``find_minima([1,1,1,2,2,2])`` returns ``[0,1,2,4,5]``. Parameters ---------- x : array-like Input data array. Returns ------- ii : :class:`numpy.ndarray` of int Indices of local minima, sorted by ``x`` value (smallest first). """ x = np.asarray(x) ii = np.where(np.r_[True, x[1:]<=x[:-1]] & np.r_[x[:-1]<=x[1:], True])[0] jj = np.argsort(x[ii]) return ii[jj]
[docs] def minfit(x, y, return_coeff=False): """Fit a parabola y = y0 + ((x - x0) / xerr)^2 to find the minimum. Parameters ---------- x : array-like x values. y : array-like y values. return_coeff : :class:`bool`, optional If ``True``, also return the raw polynomial coefficients ``(a, b, c)``. Default is ``False``. Returns ------- x0 : :class:`float` x position of the parabola minimum (-1 on failure). xerr : :class:`float` Half-width of the parabola at unit height (-1 on failure). y0 : :class:`float` Minimum y value (-1 on failure). zwarn : :class:`int` Zero on success; :attr:`ZWarningMask.BAD_MINFIT` on failure. coeff : :class:`tuple`, optional Raw ``(a, b, c)`` polynomial coefficients; only present when ``return_coeff=True``. """ a, b, c = 0., 0., 0. if len(x) < 3: if return_coeff: return (-1,-1,-1,ZWarningMask.BAD_MINFIT,(a,b,c)) else: return (-1,-1,-1,ZWarningMask.BAD_MINFIT) try: #- y = a x^2 + b x + c a,b,c = np.polyfit(x,y,2) except np.linalg.LinAlgError: if return_coeff: return (-1,-1,-1,ZWarningMask.BAD_MINFIT,(a,b,c)) else: return (-1,-1,-1,ZWarningMask.BAD_MINFIT) if a == 0.0: if return_coeff: return (-1,-1,-1,ZWarningMask.BAD_MINFIT,(a,b,c)) else: return (-1,-1,-1,ZWarningMask.BAD_MINFIT) #- recast as y = y0 + ((x-x0)/xerr)^2 x0 = -b / (2*a) y0 = -(b**2) / (4*a) + c zwarn = 0 if (x0 <= np.min(x)) or (np.max(x) <= x0): zwarn |= ZWarningMask.BAD_MINFIT if (y0<=0.): zwarn |= ZWarningMask.BAD_MINFIT if a > 0.0: xerr = 1 / np.sqrt(a) else: xerr = 1 / np.sqrt(-a) zwarn |= ZWarningMask.BAD_MINFIT if return_coeff: return (x0, xerr, y0, zwarn,(a,b,c)) else: return (x0, xerr, y0, zwarn)
# # sigma clipping in Numba # Rewrite basic sigma clipping to avoid # array copies and redundant summation # on each iteration # @jit(nopython=True, nogil=True, cache=True) def sigmaclip(c, low=3., high=3.): """Iterative sigma-clipping; returns the clipped array and a boolean mask.""" n = len(c) mask = np.full(n, True, dtype=np.bool_) s = np.sum(c) s2 = np.sum(c*c) delta = 1 while delta > 0: mean = s/n # differences very close to zero can end up negative # due to limited floating-point precision std = np.sqrt(np.maximum(s2/n - mean*mean, 0.)) clo = mean - std * low chi = mean + std * high if std == 0.: # don't mask everything break n0 = n for j, cval in enumerate(c): if mask[j] and (cval < clo or cval > chi): mask[j] = False n -= 1 s -= cval s2 -= cval * cval delta = n0-n return c[mask], mask # Numba's quantile impl is much faster # than Numpy's standard version @jit(nopython=True, nogil=True, cache=True) def quantile(A, q): """Compute quantile(s) ``q`` of array ``A`` (Numba-accelerated).""" return np.quantile(A, q) # Numba's median impl is also faster @jit(nopython=True, nogil=True, cache=True) def median(A): """Compute the median of array ``A`` (Numba-accelerated).""" return np.median(A) # Open-coded Numba trapz is much faster than np.traz @jit(nopython=True, nogil=True, fastmath=True, cache=True) def trapz(y, x): """Trapezoidal integration of ``y`` over ``x`` (Numba-accelerated).""" res = 0. for i in range(len(x) - 1): res += (x[i+1] - x[i]) * (y[i+1] + y[i]) return 0.5 * res
[docs] def trapz_rebin(src_x, src_y, bin_centers, out=None, pre=None): """Resample ``src_y`` into bins centered at ``bin_centers`` (area-conserving). Parameters ---------- src_x : :class:`numpy.ndarray` x coordinates of the input signal. src_y : :class:`numpy.ndarray` Input signal sampled at ``src_x``. bin_centers : :class:`numpy.ndarray` Center x coordinates of the output bins. out : :class:`numpy.ndarray` or None, optional Pre-allocated output array; allocated if ``None``. pre : :class:`tuple` or None, optional Preprocessing data from :func:`trapz_rebin_pre`; computed from ``bin_centers`` when ``None``. Returns ------- out : :class:`numpy.ndarray` Resampled signal at the center of each output bin. """ if pre == None: pre = trapz_rebin_pre(bin_centers) edges, ibw = pre return _trapz_rebin(src_x, src_y, edges, ibw, out)
[docs] def trapz_rebin_pre(bin_centers): """Precompute bin edges and inverse widths for :func:`trapz_rebin`. Parameters ---------- bin_centers : :class:`numpy.ndarray` Center x coordinates of the output bins. Returns ------- pre : :class:`tuple` ``(edges, ibw)`` where ``edges`` are the bin edges and ``ibw`` is the array of inverse bin widths. Pass as the ``pre`` argument to :func:`trapz_rebin` to avoid recomputation. """ edges = centers2edges(bin_centers) ibw = 1. / np.diff(edges) return (edges, ibw)
@jit(nopython=True, nogil=True, fastmath=True, cache=True) def _trapz_rebin(x, y, edges, ibw, out): """Numba-accelerated trapezoidal rebinning core.""" # interpolated value of y at edge_x, which lies between x[j] and x[j+1] def y_at(edge_x, j): # j: largest j s.t. x[j] < edge_x return y[j] + (edge_x - x[j]) * (y[j+1] - y[j]) / (x[j+1] - x[j]) if edges[0] < x[0] or x[-1] < edges[-1]: raise ValueError('edges must lie strictly within x range') nbins = len(edges) - 1 results = np.empty(nbins, dtype=y.dtype) if out is None else out # greatest j s.t. x[j] < edges[0] j = np.searchsorted(x, edges[0], 'right') xlo = edges[0] ylo = y_at(xlo, j) # loop invariant: on entry to iteration i, # x[j] is greatest x < edges[i] # xlo = edges[i] # ylo = y_at(edges[i], j) for i in range(nbins): a = 0. while x[j+1] < edges[i+1]: xhi = x[j+1] yhi = y[j+1] # add area from prev boundary to x[j+1] a += (xhi - xlo) * (yhi + ylo) xlo = xhi ylo = yhi j += 1 # partial area up to edge i+1 xhi = edges[i+1] yhi = y_at(xhi, j) a += (xhi - xlo) * (yhi + ylo) xlo = xhi ylo = yhi results[i] = 0.5 * ibw[i] * a return results @jit(nopython=True, nogil=True, fastmath=True, cache=True) def _trapz_rebin_batch(x, Y, edges, ibw, out): """Apply :func:`_trapz_rebin` to each row of ``Y`` (ntemplates × npix). Parameters ---------- x : :class:`numpy.ndarray`, shape (npix,) Y : :class:`numpy.ndarray`, shape (ntemplates, npix) edges, ibw : precomputed bin edges and inverse widths from :func:`trapz_rebin_pre` out : :class:`numpy.ndarray`, shape (ntemplates, nbins) """ for t in range(Y.shape[0]): _trapz_rebin(x, Y[t], edges, ibw, out[t]) @jit(nopython=True, nogil=True, cache=True) def centers2edges(centers): """Convert bin centers to bin edges by linear extrapolation at the boundaries. Parameters ---------- centers : :class:`numpy.ndarray` Bin center coordinates. Returns ------- edges : :class:`numpy.ndarray` Bin edge coordinates, length ``len(centers) + 1``. """ edges = np.empty(len(centers) + 1, dtype=centers.dtype) #- Interior edges are just points half way between bin centers edges[1:-1] = 0.5 * (centers[:-1] + centers[1:]) #- edge edges are extrapolation of interior bin sizes edges[0] = centers[0] - (centers[1] - edges[1]) edges[-1] = centers[-1] + (centers[-1] - edges[-2]) return edges
[docs] def radec2pix(nside, ra, dec): """Convert RA/Dec to HEALPix nested pixel numbers. Parameters ---------- nside : :class:`int` HEALPix ``nside`` parameter, must be ``2**k`` for 0 < k < 30. ra : :class:`float` or array-like Right ascension in degrees. dec : :class:`float` or array-like Declination in degrees. Returns ------- pix : :class:`numpy.ndarray` of int HEALPix pixel numbers in the nested scheme. """ import healpy as hp theta, phi = np.radians(90-dec), np.radians(ra) if np.isnan(np.sum(theta)) : raise ValueError("some NaN theta values") if np.sum((theta < 0)|(theta > np.pi))>0 : raise ValueError("some theta values are outside [0,pi]: {}".format(theta[(theta < 0)|(theta > np.pi)])) return hp.ang2pix(nside, theta, phi, nest=True)