import numpy as np
from math import erf, erfc
from numba import jit
from .params_mapping import ParamsMapping
from .sparse_rep import EMLineJacobian
from .utils import (
MAX_SDEV,
norm_pdf,
norm_cdf,
max_buffer_width,
)
from .model import (
emline_model,
emline_perline_models,
)
from .jacobian import (
emline_model_jacobian,
patch_jacobian,
)
[docs]
class EMLine_Objective(object):
"""Objective function for emission-line least-squares fitting.
Parameters
----------
obs_bin_centers : :class:`numpy.ndarray`
Center wavelength of each observed wavelength bin.
obs_fluxes : :class:`numpy.ndarray`
Observed flux in each wavelength bin.
obs_weights : :class:`numpy.ndarray`
Weighting factors for each wavelength bin in the residual.
redshift : float
Redshift of the observed spectrum.
line_wavelengths : :class:`numpy.ndarray`
Nominal rest-frame wavelengths of all fitted lines in Angstroms.
resolution_matrices : tuple of :class:`fastspecfit.resolution.Resolution`
Resolution matrices for each camera.
camerapix : :class:`numpy.ndarray` of int
Start and end wavelength bin indices for each camera.
params_mapping : :class:`~fastspecfit.emline_fit.params_mapping.ParamsMapping`
Mapping from free to full line parameters.
continuum_patches : dict or None, optional
Patch pedestal parameters, or ``None`` if not used.
"""
def __init__(self,
obs_bin_centers,
obs_fluxes,
obs_weights,
redshift,
line_wavelengths,
resolution_matrices,
camerapix,
params_mapping,
continuum_patches=None):
self.dtype = obs_fluxes.dtype
self.obs_fluxes = obs_fluxes
self.obs_weights = obs_weights
self.redshift = redshift
self.line_wavelengths = line_wavelengths
self.resolution_matrices = resolution_matrices
self.camerapix = camerapix
self.params_mapping = params_mapping
self.obs_bin_centers = obs_bin_centers
if continuum_patches is not None:
self.nPatches = len(continuum_patches)
self.patch_endpts = continuum_patches['endpts'].value
self.patch_pivotwave = continuum_patches['pivotwave'].value
self.J_P = patch_jacobian(obs_bin_centers,
obs_weights,
self.patch_endpts,
self.patch_pivotwave)
else:
self.nPatches = 0
self.J_P = None
self.log_obs_bin_edges, self.ibin_widths = \
_prepare_bins(obs_bin_centers, camerapix)
# temporary storage to prevent allocation in params_mapping
# on every call to objective/jacobian
self.line_parameters = \
np.empty(self.params_mapping.nParms, dtype=self.dtype)
[docs]
def objective(self, free_parameters):
"""Compute the weighted residuals between the emission-line model and observations.
Parameters
----------
free_parameters : :class:`numpy.ndarray`
Values of all free parameters.
Returns
-------
residuals : :class:`numpy.ndarray`
Weighted residuals for each wavelength bin.
"""
nLineParameters = len(free_parameters) - 2 * self.nPatches
line_free_parameters = free_parameters[:nLineParameters]
#
# expand line free parameters into complete
# line parameter array, handling tied params
# and doublets
#
line_parameters = self.params_mapping.mapFreeToFull(line_free_parameters,
out=self.line_parameters)
model_fluxes = np.empty_like(self.obs_fluxes, dtype=self.dtype)
_build_model_core(line_parameters,
self.line_wavelengths,
self.redshift,
self.log_obs_bin_edges,
self.ibin_widths,
self.resolution_matrices,
self.camerapix,
model_fluxes)
if self.nPatches > 0:
slopes = free_parameters[nLineParameters:nLineParameters + self.nPatches]
intercepts = free_parameters[nLineParameters+self.nPatches:]
# add patch pedestals to line model
_add_patches(self.obs_bin_centers, model_fluxes,
self.patch_endpts,
self.patch_pivotwave,
slopes,
intercepts)
# turn model fluxes into residuals in-place to avoid
# unwanted memory allocation
residuals = model_fluxes
residuals -= self.obs_fluxes
residuals *= self.obs_weights
return residuals
[docs]
def jacobian(self, free_parameters):
"""Compute the sparse Jacobian of the objective function.
Parameters
----------
free_parameters : :class:`numpy.ndarray`
Values of all free parameters.
Returns
-------
J : :class:`~fastspecfit.emline_fit.sparse_rep.EMLineJacobian`
Sparse Jacobian at the given parameter values.
"""
nLineFreeParms = len(free_parameters) - 2 * self.nPatches
line_free_parameters = free_parameters[:nLineFreeParms]
#
# expand free paramters into complete
# parameter array, handling tied params
# and doublets
#
line_parameters = self.params_mapping.mapFreeToFull(line_free_parameters,
out=self.line_parameters)
J_S = self.params_mapping.getJacobian(line_free_parameters)
jacs = []
for icam, campix in enumerate(self.camerapix):
s, e = campix
# Actual inverse bin widths are in ibin_widths[s+1:e+2].
# Setup guarantees that at least one more array entry
# exists to either side of this range, so we can pass
# those in as dummies.
ibw = self.ibin_widths[s:e+3]
idealJac = \
emline_model_jacobian(line_parameters,
self.log_obs_bin_edges[s+icam:e+icam+1],
ibw,
self.redshift,
self.line_wavelengths,
self.resolution_matrices[icam].ndiag)
# ignore any columns corresponding to fixed parameters
endpts = idealJac[0]
endpts[self.params_mapping.fixedMask(), :] = 0
jacs.append( mulWMJ(self.obs_weights[s:e],
self.resolution_matrices[icam].rowdata(),
idealJac) )
nBins = np.sum(np.diff(self.camerapix))
nFreeParms = len(free_parameters)
J = EMLineJacobian((nBins, nFreeParms), nLineFreeParms,
self.camerapix, jacs, J_S,
self.J_P)
return J
[docs]
def build_model(redshift,
line_parameters,
line_wavelengths,
obs_bin_centers,
resolution_matrices,
camerapix,
continuum_patches=None):
"""Compute the resolution-convolved emission-line model fluxes.
Parameters
----------
redshift : float
Redshift of the observed spectrum.
line_parameters : :class:`numpy.ndarray`
Concatenated array of amplitudes, velocity shifts, and sigmas for
all lines.
line_wavelengths : :class:`numpy.ndarray`
Nominal rest-frame wavelengths of all fitted lines in Angstroms.
obs_bin_centers : :class:`numpy.ndarray`
Center wavelength of each observed wavelength bin.
resolution_matrices : tuple of :class:`fastspecfit.resolution.Resolution`
Resolution matrices for each camera.
camerapix : :class:`numpy.ndarray` of int
Start and end wavelength bin indices for each camera.
continuum_patches : dict or None, optional
Patch pedestal parameters, or ``None`` if not used.
Returns
-------
model_fluxes : :class:`numpy.ndarray`
Modeled flux in each observed wavelength bin.
"""
log_obs_bin_edges, ibin_widths = _prepare_bins(obs_bin_centers, camerapix)
model_fluxes = np.empty_like(obs_bin_centers, dtype=obs_bin_centers.dtype)
_build_model_core(line_parameters,
line_wavelengths,
redshift,
log_obs_bin_edges,
ibin_widths,
resolution_matrices,
camerapix,
model_fluxes)
# suppress negative pixels arising from resolution matrix
model_fluxes[model_fluxes < 0.] = 0.
if continuum_patches is not None:
# add patch pedestals to model fluxes
_add_patches(obs_bin_centers, model_fluxes,
continuum_patches['endpts'].value,
continuum_patches['pivotwave'].value,
continuum_patches['slope'].value,
continuum_patches['intercept'].value)
return model_fluxes
[docs]
class MultiLines(object):
"""Per-line emission-line flux models across one or more cameras.
Builds and stores individual resolution-convolved line profiles so
that the caller can retrieve a sparse model for any single line.
Parameters
----------
line_parameters : :class:`numpy.ndarray`
Concatenated array of amplitudes, velocity shifts, and sigmas for
all lines.
obs_bin_centers : :class:`numpy.ndarray`
Center wavelength of each observed wavelength bin.
redshift : float
Redshift of the observed spectrum.
line_wavelengths : :class:`numpy.ndarray`
Nominal rest-frame wavelengths of all fitted lines in Angstroms.
resolution_matrices : tuple of :class:`fastspecfit.resolution.Resolution`
Resolution matrices for each camera.
camerapix : :class:`numpy.ndarray` of int
Start and end wavelength bin indices for each camera.
"""
def __init__(self,
line_parameters,
obs_bin_centers,
redshift,
line_wavelengths,
resolution_matrices,
camerapix):
@jit(nopython=True, nogil=True, cache=True)
def _suppress_negative_fluxes(endpts, M):
"""Clamp negative per-line fluxes to zero."""
for i in range(M.shape[0]):
s, e = endpts[i]
for j in range(e-s):
M[i,j] = np.maximum(M[i,j], 0.)
if resolution_matrices is None:
# create trivial diagonal resolution matrices
from fastspecfit.resolution import Resolution
rm = [ Resolution(np.ones((1, e - s))) for (s, e) in camerapix ]
resolution_matrices = tuple(rm)
self.line_models = []
_build_multimodel_core(line_parameters,
obs_bin_centers,
redshift,
line_wavelengths,
resolution_matrices,
camerapix,
lambda m: self.line_models.append(m))
self.line_models = tuple(self.line_models)
for endpts, M in self.line_models:
_suppress_negative_fluxes(endpts, M)
[docs]
def getLine(self, line):
"""Return the model for one emission line as a sparse array.
Parameters
----------
line : int
Index of the line to return.
Returns
-------
range : tuple of int
``(s, e)`` giving the half-open bin range ``[s, e)`` in the
combined observed wavelength array with nonzero flux.
data : :class:`numpy.ndarray`
Flux values for bins ``obs_bin_centers[s:e]``.
"""
s = 1000000000
e =-1
# Determine which cameras' multiline models contain
# bins with nonzero flux for this line, and compute
# the lowest and highest such bins in the combined
# observed flux array.
live_models = []
for i, line_model in enumerate(self.line_models):
ls, le = line_model[0][line]
if ls < le: # line has nonzero flux bins
s = np.minimum(s, ls)
e = np.maximum(e, le)
live_models.append(i)
if len(live_models) == 0:
# line has no nonzero flux bins
return (0, 0), np.empty((0), dtype=np.float64)
elif len(live_models) == 1:
# line appears on only one camera
i = live_models[0]
return (s, e), self.line_models[i][1][line][:e-s]
else:
# build combind array across multiple cameras,
# allowing for arbitrary overlap. This array
# represents the range obs_bins[s:e]
data = np.zeros(e-s, self.line_models[0][1].dtype)
# copy the live bins for each camera to the
# combind array.
for i in live_models:
ls, le = self.line_models[i][0][line]
ldata = self.line_models[i][1][line]
data[ls-s:le-s] = ldata[:le-ls]
return (s, e), data
[docs]
def find_peak_amplitudes(line_parameters,
obs_bin_centers,
redshift,
line_wavelengths,
resolution_matrices,
camerapix):
"""Return the maximum flux contributed by each line to any observed bin.
Parameters
----------
line_parameters : :class:`numpy.ndarray`
Concatenated array of amplitudes, velocity shifts, and sigmas for
all lines.
obs_bin_centers : :class:`numpy.ndarray`
Center wavelength of each observed wavelength bin.
redshift : float
Redshift of the observed spectrum.
line_wavelengths : :class:`numpy.ndarray`
Nominal rest-frame wavelengths of all fitted lines in Angstroms.
resolution_matrices : tuple of :class:`fastspecfit.resolution.Resolution`
Resolution matrices for each camera.
camerapix : :class:`numpy.ndarray` of int
Start and end wavelength bin indices for each camera.
Returns
-------
max_amps : :class:`numpy.ndarray`
Maximum flux in any observed bin for each line.
"""
@jit(nopython=True, nogil=True, cache=True)
def _update_line_maxima(max_amps, line_models):
"""Update per-line maximum amplitudes from a camera's line models."""
endpts, vals = line_models
# find the highest flux for each peak; if it's
# bigger than any seen so far, update global max
for i in range(vals.shape[0]):
ps, pe = endpts[i]
if pe > ps:
max_amps[i] = np.maximum(max_amps[i],
np.max(vals[i,:pe-ps]))
max_amps = np.zeros_like(line_wavelengths, dtype=line_parameters.dtype)
_build_multimodel_core(line_parameters,
obs_bin_centers,
redshift,
line_wavelengths,
resolution_matrices,
camerapix,
lambda m: _update_line_maxima(max_amps, m))
return max_amps
[docs]
def find_peak_amplitudes_and_fluxes(line_parameters,
obs_bin_centers,
redshift,
line_wavelengths,
resolution_matrices,
camerapix):
"""Return the peak amplitude and integrated flux for each emission line.
Parameters
----------
line_parameters : :class:`numpy.ndarray`
Concatenated array of amplitudes, velocity shifts, and sigmas for
all lines.
obs_bin_centers : :class:`numpy.ndarray`
Center wavelength of each observed wavelength bin.
redshift : float
Redshift of the observed spectrum.
line_wavelengths : :class:`numpy.ndarray`
Nominal rest-frame wavelengths of all fitted lines in Angstroms.
resolution_matrices : tuple of :class:`fastspecfit.resolution.Resolution`
Resolution matrices for each camera.
camerapix : :class:`numpy.ndarray` of int
Start and end wavelength bin indices for each camera.
Returns
-------
max_amps : :class:`numpy.ndarray`
Maximum flux in any observed bin for each line.
line_fluxes : :class:`numpy.ndarray`
Wavelength-integrated flux of the convolved line profile for each line.
"""
@jit(nopython=True, nogil=True, cache=True)
def _update_line_maxima_and_fluxes(max_amps, line_fluxes, line_models, wave_weight):
endpts, vals = line_models
for i in range(vals.shape[0]):
ps, pe = endpts[i]
if pe > ps:
max_amps[i] = np.maximum(max_amps[i], np.max(vals[i, :pe-ps]))
for k in range(pe - ps):
line_fluxes[i] += vals[i, k] * wave_weight[ps + k]
nlines = len(line_wavelengths)
nbins = len(obs_bin_centers)
# Build per-pixel integration weights accounting for wavelength overlap
wave_weight = np.empty(nbins)
for s, e in camerapix:
wave_weight[s:e] = np.gradient(obs_bin_centers[s:e])
# For each camera pair, find wavelength overlap and halve weights there.
for i, (s1, e1) in enumerate(camerapix):
for j, (s2, e2) in enumerate(camerapix):
if j <= i:
continue
wmin = max(obs_bin_centers[s1:e1].min(), obs_bin_centers[s2:e2].min())
wmax = min(obs_bin_centers[s1:e1].max(), obs_bin_centers[s2:e2].max())
if wmax > wmin:
mask1 = np.zeros(nbins, dtype=bool)
mask2 = np.zeros(nbins, dtype=bool)
mask1[s1:e1] = (obs_bin_centers[s1:e1] >= wmin) & \
(obs_bin_centers[s1:e1] <= wmax)
mask2[s2:e2] = (obs_bin_centers[s2:e2] >= wmin) & \
(obs_bin_centers[s2:e2] <= wmax)
wave_weight[mask1] /= 2.
wave_weight[mask2] /= 2.
max_amps = np.zeros(nlines, dtype=line_parameters.dtype)
line_fluxes = np.zeros(nlines, dtype=line_parameters.dtype)
_build_multimodel_core(line_parameters,
obs_bin_centers,
redshift,
line_wavelengths,
resolution_matrices,
camerapix,
lambda m: _update_line_maxima_and_fluxes(max_amps, line_fluxes, m, wave_weight))
return max_amps, line_fluxes
[docs]
def _build_model_core(line_parameters,
line_wavelengths,
redshift,
log_obs_bin_edges,
ibin_widths,
resolution_matrices,
camerapix,
model_fluxes):
"""Compute resolution-convolved combined model fluxes, writing into ``model_fluxes``."""
for icam, campix in enumerate(camerapix):
# start and end for obs fluxes of camera icam
s, e = campix
# Actual inverse bin widths are in ibin_widths[s+1:e+2].
# Setup guarantees that at least one more array entry
# exists to either side of this range, so we can pass
# those in as dummies.
ibw = ibin_widths[s:e+3]
mf = emline_model(line_wavelengths,
line_parameters,
log_obs_bin_edges[s+icam:e+icam+1],
redshift,
ibw)
# convolve model with resolution matrix and store in
# this camera's subrange of model_fluxes
resolution_matrices[icam].dot(mf, model_fluxes[s:e])
[docs]
def _build_multimodel_core(line_parameters,
obs_bin_centers,
redshift,
line_wavelengths,
resolution_matrices,
camerapix,
consumer_fun):
"""Compute sparse per-line models for each camera and pass each to ``consumer_fun``."""
log_obs_bin_edges, ibin_widths = _prepare_bins(obs_bin_centers,
camerapix)
for icam, campix in enumerate(camerapix):
# start and end for obs fluxes of camera icam
s, e = campix
# Actual inverse bin widths are in ibin_widths[s+1:e+2].
# Setup guarantees that at least one more array entry
# exists to either side of this range, so we can pass
# those in as dummies.
ibw = ibin_widths[s:e+3]
# compute model waveform for each spectral line
line_models = emline_perline_models(line_wavelengths,
line_parameters,
log_obs_bin_edges[s+icam:e+icam+1],
redshift,
ibw,
resolution_matrices[icam].ndiag)
# convolve each line's waveform with resolution matrix
endpts, M = mulWMJ(np.ones(e - s),
resolution_matrices[icam].rowdata(),
line_models)
# adjust endpoints to reflect camera range
endpts += s
consumer_fun((endpts, M))
@jit(nopython=True, nogil=True, cache=True)
def _add_patches(obs_bin_centers,
model_fluxes,
patch_endpts,
patch_pivotwaves,
slopes,
intercepts):
"""Add affine patch pedestals to ``model_fluxes`` in-place."""
# add patch pedestals to line model
nPatches = len(slopes)
for ipatch in range(nPatches):
s, e = patch_endpts[ipatch]
pivotwave = patch_pivotwaves[ipatch]
slope = slopes[ipatch]
intercept = intercepts[ipatch]
for j in range(s,e):
model_fluxes[j] += \
slope * (obs_bin_centers[j] - pivotwave) + intercept
@jit(nopython=True, nogil=True, cache=True)
def mulWMJ(w, M, Jsp):
"""Compute the sparse matrix product ``P = W @ M @ J``.
``W`` is a diagonal weight matrix, ``M`` is a resolution matrix, and
``J`` is a column-sparse matrix with one contiguous nonzero range per
column. The result is written back into ``Jsp`` in-place.
Parameters
----------
w : :class:`numpy.ndarray`
Diagonal of the weight matrix ``W``.
M : :class:`numpy.ndarray`, shape (nbins, ndiag)
Resolution matrix in sparse row form.
Jsp : tuple
Column-sparse matrix ``(endpts, J)``, where ``endpts[j] = (s, e)``
gives the half-open nonzero range for column ``j`` and
``J[j, :e-s]`` holds the values.
Returns
-------
result : tuple
Product ``W @ M @ J`` in the same column-sparse form as ``Jsp``.
Notes
-----
The input ``Jsp`` must have sufficient padding so that each column of
``J`` can be overwritten with the corresponding column of ``P``.
"""
endpts, J = Jsp
nbins, ndiag = M.shape
ncol, maxColSize = J.shape
hdiag = ndiag//2
# temporary buffer for each column of WMJ
buf = np.empty(maxColSize, dtype=J.dtype)
for j in range(ncol):
# boundaries of nonzero entries
# in jth column of J
s, e = endpts[j]
if s == e: # no nonzero values in column j
continue
# boundaries of entries in jth column of P
# impacted by matrix multiply
imin = np.maximum(s - hdiag, 0)
imax = np.minimum(e + hdiag, nbins) # one past last impacted entry
for i in range(imin, imax):
# boundaries of interval of k where both
# M[i, k] and J[k, j] are nonzero.
kmin = np.maximum(i - hdiag, s)
kmax = np.minimum(i + hdiag, e - 1)
acc = 0.
for k in range(kmin, kmax + 1):
acc += M[i, k - i + hdiag] * J[j, k - s]
buf[i - imin] = acc * w[i]
# write result back to J and update endpts for col
newS = np.maximum(imin, 0)
newE = np.minimum(imax, nbins)
J[j, :newE - newS] = buf[:newE - newS]
endpts[j] = np.array([newS, newE])
return (endpts, J)
@jit(nopython=True, nogil=True, cache=True)
def _prepare_bins(centers, camerapix):
"""Convert observed bin centers to log bin edges and inverse bin widths.
Parameters
----------
centers : :class:`numpy.ndarray`
Center wavelength of each observed wavelength bin.
camerapix : :class:`numpy.ndarray` of int
Start and end wavelength bin indices for each camera.
Returns
-------
log_obs_bin_edges : :class:`numpy.ndarray`
Natural log of each bin edge wavelength. Edges are placed
halfway between adjacent centers, with extrapolation at the ends.
One extra edge per camera gap is included.
ibin_widths : :class:`numpy.ndarray`
Inverse bin widths, zero-padded by one entry on each end.
"""
ncameras = camerapix.shape[0]
edges = np.empty(len(centers) + ncameras, dtype=centers.dtype)
ibin_widths = np.empty(len(centers) + 2, dtype=centers.dtype)
for icam, campix in enumerate(camerapix):
s, e = campix
icenters = centers[s:e]
#- interior edges are just points half way between bin centers
int_edges = 0.5 * (icenters[:-1] + icenters[1:])
#- exterior edges are extrapolation of interior bin sizes
edge_l = icenters[ 0] - (icenters[ 1] - int_edges[ 0])
edge_r = icenters[-1] + (icenters[-1] - int_edges[-1])
edges[s + icam] = edge_l
edges[s + icam + 1:e + icam] = int_edges
edges[e + icam] = edge_r
# add 1 to indices i ibin_widths to skip dummy at 0
ibin_widths[s+1:e+1] = 1. / np.diff(edges[s+icam : e+icam+1])
# dummies before and after widths are needed
# for corner cases in edge -> bin computation
ibin_widths[0] = 0.
ibin_widths[-1] = 0.
return (np.log(edges), ibin_widths)