Source code for fastspecfit.fastspecfit

"""
fastspecfit.fastspecfit
=======================

See sandbox/running-fastspecfit for examples.

"""
import os, sys, time
import numpy as np

from astropy.table import Table

from fastspecfit.logger import log
from fastspecfit.singlecopy import sc_data, _initialize_sc_data
from fastspecfit.util import BoxedScalar, MPPool, NMONTE_DEFAULT, fsftime
from fastspecfit.templates import VDISP_NOMINAL, VDISP_BOUNDS

[docs] def make_init_sc_args(args, fastphot=False, fitstack=False): """Build the sc_data.initialize() kwargs from a parsed args Namespace. Parameters ---------- args : :class:`argparse.Namespace` Parsed command-line arguments (from :func:`parse` or from ``mpi-fastspecfit``). fastphot : bool, optional Passed through to :meth:`~fastspecfit.singlecopy.Singletons.initialize`. fitstack : bool, optional Passed through to :meth:`~fastspecfit.singlecopy.Singletons.initialize`. Returns ------- dict Keyword arguments suitable for ``sc_data.initialize(**...)``. """ return { 'emlines_file': getattr(args, 'emlinesfile', None), 'constraints_file': getattr(args, 'constraintsfile', None), 'fphotofile': getattr(args, 'fphotofile', None), 'fastphot': fastphot, 'fitstack': fitstack, 'ignore_photometry': getattr(args, 'ignore_photometry', False), 'template_file': getattr(args, 'templates', None), 'template_version': getattr(args, 'templateversion', None), 'template_imf': getattr(args, 'imf', None), 'log_verbose': getattr(args, 'verbose', False), 'vdisp_nominal': getattr(args, 'vdisp_nominal', VDISP_NOMINAL), 'vdisp_bounds': getattr(args, 'vdisp_bounds', VDISP_BOUNDS), }
[docs] def parse(options=None, rank=0): """Parse input arguments to fastspec and fastphot scripts. Parameters ---------- options : list of str or None, optional Command-line argument strings. If ``None``, reads from ``sys.argv``. rank : int, optional MPI rank of the calling process, used to suppress log output on non-zero ranks. Defaults to 0. Returns ------- args : :class:`argparse.Namespace` Parsed command-line arguments. """ import argparse, sys from fastspecfit.templates import Templates parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('redrockfiles', nargs='+', help='Full path to input redrock file(s).') parser.add_argument('-o', '--outfile', type=str, required=True, help='Full path to output filename (required).') parser.add_argument('--mp', type=int, default=1, help='Number of multiprocessing threads per MPI rank.') parser.add_argument('-n', '--ntargets', type=int, help='Number of targets to process in each file.') parser.add_argument('--firsttarget', type=int, default=0, help='Index of first object to to process in each file, zero-indexed.') parser.add_argument('--targetids', type=str, default=None, help='Comma-separated list of TARGETIDs to process.') parser.add_argument('--input-redshifts', type=str, default=None, help='Comma-separated list of input redshifts corresponding to the (required) --targetids input.') parser.add_argument('--input-seeds', type=str, default=None, help='Comma-separated list of input random-number seeds corresponding to the (required) --targetids input.') parser.add_argument('--seed', type=int, default=1, help='Random seed for Monte Carlo reproducibility; ignored if --input-seeds is passed.') parser.add_argument('--nmonte', type=int, default=NMONTE_DEFAULT, help='Number of Monte Carlo realizations.') parser.add_argument('--vdisp-nominal', type=float, default=VDISP_NOMINAL, help='Nominal (default) velocity dispersion in km/s.') parser.add_argument('--vdisp-bounds', type=float, default=VDISP_BOUNDS, nargs=2, help='Nominal (default) velocity dispersion in km/s.') parser.add_argument('--zmin', type=float, default=None, help='Override the default minimum redshift required for modeling.') parser.add_argument('--no-broadlinefit', default=True, action='store_false', dest='broadlinefit', help='Do not model broad Balmer and helium line-emission.') parser.add_argument('--ignore-photometry', default=False, action='store_true', help='Ignore the broadband photometry during model fitting.') parser.add_argument('--ignore-quasarnet', dest='use_quasarnet', default=True, action='store_false', help='Do not use QuasarNet to improve QSO redshifts.') parser.add_argument('--constrain-age', action='store_true', help='Constrain the age of the SED.') parser.add_argument('--no-smooth-continuum', action='store_true', help='Do not fit the smooth continuum.') parser.add_argument('--imf', type=str, default=Templates.DEFAULT_IMF, help='Initial mass function.') parser.add_argument('--templateversion', type=str, default=Templates.DEFAULT_TEMPLATEVERSION, help='Template version number.') parser.add_argument('--templates', type=str, default=None, help='Optional full path and filename to the templates.') parser.add_argument('--redrockfile-prefix', type=str, default='redrock-', help='Prefix of the input Redrock file name(s).') parser.add_argument('--specfile-prefix', type=str, default='coadd-', help='Prefix of the spectral file(s).') parser.add_argument('--qnfile-prefix', type=str, default='qso_qn-', help='Prefix of the QuasarNet afterburner file(s).') parser.add_argument('--mapdir', type=str, default=None, help='Optional directory name for the dust maps.') parser.add_argument('--fphotodir', type=str, default=None, help='Top-level location of the source photometry.') parser.add_argument('--fphotofile', type=str, default=None, help='Photometric information file.') parser.add_argument('--emlinesfile', type=str, default=None, help='Emission line parameter file.') parser.add_argument('--constraintsfile', type=str, default=None, help='Emission-line kinematic constraint YAML file.') parser.add_argument('--redux_dir', type=str, default=None, help='Optional full path $DESI_SPECTRO_REDUX.') parser.add_argument('--specproddir', type=str, default=None, help='Optional directory name for the spectroscopic production.') parser.add_argument('--uncertainty-floor', type=float, default=0.01, help='Minimum fractional uncertainty to add in quadrature to the formal inverse variance spectrum.') parser.add_argument('--minsnr-balmer-broad', type=float, default=2.5, help='Minimum broad Balmer S/N to force broad+narrow-line model.') parser.add_argument('--debug-plots', action='store_true', help='Generate a variety of debugging plots (written to $PWD).') parser.add_argument('--verbose', action='store_true', help='Be verbose (for debugging purposes).') if options is None: options = sys.argv[1:] if rank == 0: log.info(f'fastspec {" ".join(options)}') return parser.parse_args(options)
[docs] def fastspec_one(iobj, data, meta, fastfit_dtype, specphot_dtype, broadlinefit=True, fastphot=False, fitstack=False, constrain_age=False, no_smooth_continuum=False, debug_plots=False, uncertainty_floor=0.01, minsnr_balmer_broad=2.5, nmonte=NMONTE_DEFAULT, seed=1): """Fit the continuum and emission lines for a single DESI object. Parameters ---------- iobj : int Index of the object in the input list, used for log messages. data : dict Per-object data dictionary from :class:`fastspecfit.io.DESISpectra`. meta : :class:`astropy.table.Row` Metadata row for this object. fastfit_dtype : :class:`numpy.dtype` NumPy dtype for the emission-line fitting output table. specphot_dtype : :class:`numpy.dtype` NumPy dtype for the spectrophotometric output table. broadlinefit : bool, optional Attempt to fit broad Balmer components. Defaults to ``True``. fastphot : bool, optional Fit broadband photometry only (no spectra). Defaults to ``False``. fitstack : bool, optional Treat input as stacked spectra. Defaults to ``False``. constrain_age : bool, optional Constrain the stellar population age during fitting. Defaults to ``False``. no_smooth_continuum : bool, optional Skip smooth continuum fitting. Defaults to ``False``. debug_plots : bool, optional Write diagnostic QA plots to the working directory. Defaults to ``False``. uncertainty_floor : float, optional Minimum fractional uncertainty added in quadrature to the formal inverse variance spectrum. Defaults to 0.01. minsnr_balmer_broad : float, optional Minimum S/N required to adopt the broad Balmer component. Defaults to 2.5. nmonte : int, optional Number of Monte Carlo realizations for uncertainty estimation. seed : int, optional Random seed for Monte Carlo reproducibility. Defaults to 1. Returns ------- meta : :class:`astropy.table.Row` Updated metadata row with observed photometry filled in. specphot : :class:`numpy.ndarray` Spectrophotometric output row. fastfit : :class:`numpy.ndarray` Emission-line fitting output row. emmodel : :class:`astropy.table.Table` or None Coadded model spectra table, or ``None`` if ``fastphot=True``. """ from fastspecfit.io import one_spectrum, one_stacked_spectrum from fastspecfit.emlines import emline_specfit from fastspecfit.continuum import continuum_specfit igm = sc_data.igm phot = sc_data.photometry emline_table = sc_data.emlines.table templates = sc_data.templates t0 = time.time() if fastphot: log.info(f'Continuum fitting object {iobj} [{phot.uniqueid_col.lower()} ' + \ f'{data["uniqueid"]}, seed {seed}, z={data["redshift"]:.6f}].') else: log.info(f'Continuum- and emission-line fitting object {iobj} [{phot.uniqueid_col.lower()} ' + \ f'{data["uniqueid"]}, seed {seed}, z={data["redshift"]:.6f}].') if fitstack: one_stacked_spectrum(data, meta, synthphot=True, debug_plots=debug_plots) else: one_spectrum(data, meta, uncertainty_floor=uncertainty_floor, fastphot=fastphot, synthphot=True, debug_plots=debug_plots) # Copy parsed photometry from the 'data' dictionary to the 'meta' table. if not fitstack: flux = data['photometry']['nanomaggies'] fluxivar = data['photometry']['nanomaggies_ivar'] for iband, band in enumerate(phot.bands): meta[f'FLUX_{band.upper()}'] = flux[iband] meta[f'FLUX_IVAR_{band.upper()}'] = fluxivar[iband] if hasattr(phot, 'fiber_bands'): fiberflux = data['fiberphot']['nanomaggies'] fibertotflux = data['fibertotphot']['nanomaggies'] for iband, band in enumerate(phot.fiber_bands): meta[f'FIBERFLUX_{band.upper()}'] = fiberflux[iband] meta[f'FIBERTOTFLUX_{band.upper()}'] = fibertotflux[iband] # output structures fastfit = BoxedScalar(fastfit_dtype) specphot = BoxedScalar(specphot_dtype) continuummodel, smooth_continuum, continuummodel_monte, specflux_monte = \ continuum_specfit(data, fastfit, specphot, templates, igm, phot, constrain_age=constrain_age, no_smooth_continuum=no_smooth_continuum, fastphot=fastphot, fitstack=fitstack, debug_plots=debug_plots, nmonte=nmonte, seed=seed) # Optionally fit the emission-line spectrum. if fastphot: emmodel = None else: emmodel = emline_specfit(data, fastfit, specphot, continuummodel, smooth_continuum, phot, emline_table, sc_data.constraints, broadlinefit=broadlinefit, minsnr_balmer_broad=minsnr_balmer_broad, debug_plots=debug_plots, specflux_monte=specflux_monte, continuummodel_monte=continuummodel_monte) log.info(fsftime('fastspec_one', time.time()-t0, context=f'{phot.uniqueid_col.lower()}={data["uniqueid"]}')) return meta, specphot.value, fastfit.value, emmodel
[docs] def fastspec(fastphot=False, fitstack=False, args=None, comm=None, verbose=False, mp_pool=None): """Main fastspec engine: read, fit, and write results for one or more DESI spectra. Parameters ---------- fastphot : bool, optional Fit broadband photometry only (no spectra). Defaults to ``False``. fitstack : bool, optional Treat input as stacked spectra. Defaults to ``False``. args : :class:`argparse.Namespace` or list of str or None, optional Pre-parsed arguments or raw argument list. If ``None``, reads from ``sys.argv``. comm : :class:`mpi4py.MPI.Comm` or None, optional MPI intracommunicator for parallel execution, or ``None`` for single-process mode. verbose : bool, optional Enable verbose (debug-level) logging. Defaults to ``False``. Returns ------- int Exit code (0 on success). """ from astropy.table import vstack from fastspecfit.io import (DESISpectra, get_output_dtype, create_output_meta, create_output_table, write_fastspecfit) if comm: rank, size = comm.rank, comm.size else: rank, size = 0, 1 if isinstance(args, (list, tuple, type(None))): args = parse(args, rank=rank) if fitstack: args.ignore_photometry = True # check for mandatory environment variables envlist = [] if not fitstack and args.redux_dir is None: envlist += ['DESI_SPECTRO_REDUX'] if not fitstack and args.mapdir is None: envlist += ['DUST_DIR'] if not fitstack and args.fphotodir is None: envlist += ['FPHOTO_DIR'] if args.templates is None: envlist += ['FTEMPLATES_DIR'] for env in envlist: if not env in os.environ: errmsg = f'Mandatory environment variable {env} missing.' log.critical(errmsg) raise KeyError(errmsg) if verbose: args.verbose = True # --debug-plots implies --verbose: diagnostic plots are only useful # alongside debug-level log output. if getattr(args, 'debug_plots', False): args.verbose = True targetids = None input_redshifts = None input_seeds = None if args.targetids is not None: targetids = [int(x) for x in args.targetids.split(',')] if args.input_redshifts is not None: input_redshifts = [float(x) for x in args.input_redshifts.split(',')] if len(input_redshifts) != len(targetids): errmsg = 'targetids and input_redshifts must have the same number of elements.' log.critical(errmsg) raise ValueError(errmsg) if args.input_seeds is not None: input_seeds = [np.int64(x) for x in args.input_seeds.split(',')] if len(input_seeds) != len(targetids): errmsg = 'targetids and input_seeds must have the same number of elements.' log.critical(errmsg) raise ValueError(errmsg) init_sc_args = make_init_sc_args(args, fastphot=fastphot, fitstack=fitstack) t0 = time.time() sc_data.initialize(**init_sc_args) log.info(fsftime('sc_data_init', time.time()-t0)) _own_pool = False if rank == 0: t0 = time.time() if comm is None and mp_pool is None: mp_pool = MPPool(args.mp, initializer=_initialize_sc_data, init_argdict=init_sc_args) _own_pool = True log.debug(fsftime('init_workers', time.time()-t0, context=f'nworkers={args.mp}')) log.info(f'Cached stellar templates {sc_data.templates.file}') log.info(f'Cached emission-line table {sc_data.emlines.file}') log.info(f'Cached photometric filters and parameters {sc_data.photometry.fphotofile}') log.info(f'Cached cosmology table {sc_data.cosmology.file}') log.info(f'Cached {sc_data.igm.reference} IGM attenuation parameters.') # Read the data. Spec = DESISpectra(phot=sc_data.photometry, cosmo=sc_data.cosmology, fphotodir=args.fphotodir, mapdir=args.mapdir, redux_dir=args.redux_dir) if fitstack: data, meta = Spec.read_stacked(args.redrockfiles, firsttarget=args.firsttarget, stackids=targetids, ntargets=args.ntargets, constrain_age=args.constrain_age) else: Spec.gather_metadata(args.redrockfiles, firsttarget=args.firsttarget, targetids=targetids, input_redshifts=input_redshifts, ntargets=args.ntargets, zmin=args.zmin, redrockfile_prefix=args.redrockfile_prefix, specfile_prefix=args.specfile_prefix, qnfile_prefix=args.qnfile_prefix, use_quasarnet=args.use_quasarnet, specprod_dir=args.specproddir) if len(Spec.specfiles) == 0: return 0 data, meta = Spec.read(sc_data.photometry, fastphot=fastphot, constrain_age=args.constrain_age) ntargets = len(meta) ncoeff = sc_data.templates.ntemplates if fastphot: cameras = None else: cameras = data[0]['cameras'] fastfit_dtype, fastfit_units = get_output_dtype( Spec.specprod, phot=sc_data.photometry, linetable=sc_data.emlines.table, ncoeff=ncoeff, cameras=cameras, fastphot=fastphot, fitstack=fitstack) specphot_dtype, specphot_units = get_output_dtype( Spec.specprod, phot=sc_data.photometry, linetable=sc_data.emlines.table, ncoeff=ncoeff, cameras=cameras, fastphot=fastphot, fitstack=fitstack, specphot=True) # If using Monte Carlo, generate the random seed(s). if args.nmonte > 0: if input_seeds is not None: seeds = input_seeds else: rng = np.random.default_rng(seed=args.seed) seeds = rng.integers(2**32, size=ntargets, dtype=np.int64) else: seeds = [1] * ntargets nobj = len(meta) if args.outfile: _outbase = os.path.basename(args.outfile).replace('.fits.gz', '').replace('.fits', '') for d in data: d['outfile_base'] = _outbase fitargs = [{ 'iobj': iobj, 'data': data[iobj], 'meta': meta[iobj], 'fastfit_dtype': fastfit_dtype, 'specphot_dtype': specphot_dtype, 'broadlinefit': args.broadlinefit, 'fastphot': fastphot, 'fitstack': fitstack, 'constrain_age': args.constrain_age, 'no_smooth_continuum': args.no_smooth_continuum, 'debug_plots': args.debug_plots, 'uncertainty_floor': args.uncertainty_floor, 'minsnr_balmer_broad': args.minsnr_balmer_broad, 'nmonte': args.nmonte, 'seed': seeds[iobj], } for iobj in range(nobj)] # Fit in parallel t0 = time.time() if comm is not None: # Rank=0 sends work to all the other ranks (and also does work itself). if rank == 0: log.info(f'Rank {rank}: distributing {nobj:,d} objects to {comm.size:,d} ranks.') fitargs_byrank = np.array_split(fitargs, size) for onerank in range(1, size): #log.debug(f'Rank 0 sending data on {len(fitargs_byrank[onerank])}/{len(meta)} objects to rank {onerank}') comm.send(fitargs_byrank[onerank], dest=onerank) fitargs_onerank = fitargs_byrank[rank] else: fitargs_onerank = comm.recv(source=0) #log.debug(f'Rank {rank} received data on {len(fitargs_onerank)} objects from rank 0') # Each rank, including rank 0, iterates over each object and then sends # the results to rank 0. log.info(f'Rank {rank}: fitting {len(fitargs_onerank):,d} objects.') t1 = time.time() out = [] for fitarg_onerank in fitargs_onerank: out.append(fastspec_one(**fitarg_onerank)) log.info(fsftime('fit_rank', time.time()-t1, context=f'rank={rank}, nobj={len(out):,d}')) if rank > 0: #log.debug(f'Rank {rank} sending data on {len(out)} objects to rank 0.') comm.send(out, dest=0) else: for onerank in range(1, size): out.extend(comm.recv(source=onerank)) #log.debug(f'Rank 0 received data on {len(out)} objects.') log.info(f'Rank {rank}: collected fitting results for {len(out):,d} ' + \ f'objects from {comm.size:,d} ranks.') else: out = mp_pool.starmap(fastspec_one, fitargs) out = list(zip(*out)) if rank == 0: meta = create_output_meta(vstack(out[0]), phot=sc_data.photometry, fastphot=fastphot, fitstack=fitstack) specphot = create_output_table(out[1], meta, specphot_units, fitstack=fitstack) if fastphot: fastfit = None modelspectra = None else: fastfit = create_output_table(out[2], meta, fastfit_units, fitstack=fitstack) modelspectra = vstack(out[3], join_type='exact', metadata_conflicts='error') if comm is None and _own_pool: mp_pool.close() _elapsed = time.time() - t0 _ncore = max(args.mp, 1) _per_obj = _elapsed * min(ntargets, _ncore) / ntargets log.info(fsftime('fit_all', _elapsed, context=f'nobj={ntargets},{_per_obj:.2f}s/obj/core')) write_fastspecfit( meta, specphot, fastfit, modelspectra=modelspectra, outfile=args.outfile, specprod=Spec.specprod, coadd_type=Spec.coadd_type, fphotofile=sc_data.photometry.fphotofile, template_file=sc_data.templates.file, emlinesfile=sc_data.emlines.file, constraintsfile=sc_data.constraints.file, fastphot=fastphot, inputz=input_redshifts is not None, nmonte=args.nmonte, vdisp_nominal=args.vdisp_nominal, vdisp_bounds=args.vdisp_bounds, seed=args.seed, inputseeds=input_seeds is not None, uncertainty_floor=args.uncertainty_floor, minsnr_balmer_broad=args.minsnr_balmer_broad, ignore_photometry=args.ignore_photometry, broadlinefit=args.broadlinefit, constrain_age=args.constrain_age, use_quasarnet=args.use_quasarnet, no_smooth_continuum=args.no_smooth_continuum) return 0
[docs] def fastphot(args=None, comm=None, verbose=False, mp_pool=None): """Main fastphot entry point: fit broadband photometry for DESI objects. Parameters ---------- args : :class:`argparse.Namespace` or list of str or None, optional Pre-parsed arguments or raw argument list. If ``None``, reads from ``sys.argv``. comm : :class:`mpi4py.MPI.Comm` or None, optional MPI intracommunicator for parallel execution, or ``None`` for single-process mode. verbose : bool, optional Enable verbose (debug-level) logging. Defaults to ``False``. mp_pool : :class:`fastspecfit.util.MPPool` or None, optional Pre-created worker pool to reuse; a new pool is created and closed when ``None``. Returns ------- int Exit code (0 on success). """ return fastspec(fastphot=True, args=args, comm=comm, verbose=verbose, mp_pool=mp_pool)
[docs] def stackfit(args=None, comm=None, verbose=False): """Entry point to fit generic stacked DESI spectra. Parameters ---------- args : :class:`argparse.Namespace` or list of str or None, optional Pre-parsed arguments or raw argument list. If ``None``, reads from ``sys.argv``. comm : :class:`mpi4py.MPI.Comm` or None, optional MPI intracommunicator for parallel execution, or ``None`` for single-process mode. verbose : bool, optional Enable verbose (debug-level) logging. Defaults to ``False``. Returns ------- int Exit code (0 on success). """ return fastspec(fitstack=True, args=args, comm=comm, verbose=verbose, mp_pool=None)