"""
fastspecfit.mpi
===============
MPI tools.
"""
import os, time
import numpy as np
from glob import glob
import multiprocessing
import fitsio
from astropy.table import Table, vstack
from fastspecfit.io import get_qa_filename
from fastspecfit.logger import log
from fastspecfit.util import fsftime
def _get_ntargets_one(args):
return get_ntargets_one(*args)
[docs]
def get_ntargets_one(specfile, htmldir_root, outdir_root, coadd_type='healpix',
makeqa=False, overwrite=False, fastphot=False):
"""Count the number of targets to process in a single output file."""
if makeqa:
if overwrite:
ntargets = fitsio.FITS(specfile)[1].get_nrows()
else:
outdir = os.path.dirname(specfile).replace(outdir_root, htmldir_root)
meta = fitsio.read(specfile, 'METADATA', columns=['SURVEY', 'PROGRAM', 'TARGETID', 'HEALPIX'])
ntargets = 0
for meta1 in meta:
pngfile = get_qa_filename(meta1, coadd_type, outdir=outdir, fastphot=fastphot)
#print(pngfile)
if not os.path.isfile(pngfile):
ntargets += 1
else:
from fastspecfit.util import ZWarningMask
zb = fitsio.read(specfile, 'REDSHIFTS', columns=['Z', 'ZWARN'])
fm = fitsio.read(specfile, 'FIBERMAP', columns=['TARGETID', 'OBJTYPE'])
J = ((zb['Z'] > 0.001) * (fm['OBJTYPE'] == 'TGT') * (zb['ZWARN'] & ZWarningMask.NODATA == 0))
ntargets = np.sum(J)
return ntargets
[docs]
def findfiles(filedir, prefix='redrock', coadd_type=None, survey=None,
program=None, healpix=None, tile=None, night=None,
gzip=False, sample=None):
"""Find all DESI spectral files matching the given selection criteria.
Parameters
----------
filedir : :class:`str`
Root directory to search for files.
prefix : :class:`str`, optional
Filename prefix. Default is ``'redrock'``.
coadd_type : :class:`str` or None, optional
Coadd type: ``'healpix'``, ``'cumulative'``, ``'pernight'``, or
``'perexp'``.
survey : :class:`str` or array-like of str, optional
DESI survey name(s), e.g. ``'main'``.
program : :class:`str` or array-like of str, optional
DESI program name(s), e.g. ``'dark'``.
healpix : array-like or None, optional
Specific HEALPix pixel(s) to include.
tile : array-like or None, optional
Specific tile ID(s) to include.
night : array-like or None, optional
Specific observing night(s) to include.
gzip : :class:`bool`, optional
If ``True``, look for ``.fits.gz`` files. Default is ``False``.
sample : :class:`astropy.table.Table` or None, optional
Input sample catalog; when provided, file paths are built directly
from ``SURVEY``, ``PROGRAM``, and ``HEALPIX`` columns.
Returns
-------
thesefiles : :class:`numpy.ndarray` of str
Sorted array of matching file paths.
"""
if gzip:
fitssuffix = 'fits.gz'
else:
fitssuffix = 'fits'
if sample is not None: # special case of an input catalog
thesefiles, ntargets = [], []
for onesurvey in sorted(set(sample['SURVEY'].data)):
S = np.where(onesurvey == sample['SURVEY'])[0]
for oneprogram in sorted(set(sample['PROGRAM'][S].data)):
log.info(f'Building file list for survey={onesurvey} and program={oneprogram}')
P = np.where(oneprogram == sample['PROGRAM'][S])[0]
uhealpix, _ntargets = np.unique(sample['HEALPIX'][S][P].astype(str), return_counts=True)
ntargets.append(_ntargets)
for onepix in uhealpix:
#ntargets.append(np.sum(onepix == sample['HEALPIX'][S][P].astype(str)))
thesefiles.append(os.path.join(filedir, onesurvey, oneprogram, str(int(onepix)//100), onepix,
f'{prefix}-{onesurvey}-{oneprogram}-{onepix}.{fitssuffix}'))
if len(thesefiles) > 0:
#thesefiles = np.array(sorted(np.unique(np.hstack(thesefiles))))
thesefiles = np.hstack(thesefiles)
ntargets = np.hstack(ntargets)
return thesefiles, ntargets
elif coadd_type == 'healpix':
thesefiles = []
for onesurvey in np.atleast_1d(survey):
for oneprogram in np.atleast_1d(program):
log.info(f'Building file list for survey={onesurvey} and program={oneprogram}')
if healpix is not None:
for onepix in healpix:
_thesefiles = os.path.join(filedir, onesurvey, oneprogram, str(int(onepix)//100), onepix,
f'{prefix}-{onesurvey}-{oneprogram}-{onepix}.{fitssuffix}')
thesefiles.append(glob(_thesefiles))
else:
allpix = os.path.join(filedir, onesurvey, oneprogram, '*')
for onepix in glob(allpix):
_thesefiles = os.path.join(onepix, '*', f'{prefix}-{onesurvey}-{oneprogram}-*.{fitssuffix}')
thesefiles.append(glob(_thesefiles))
if len(thesefiles) > 0:
thesefiles = np.array(sorted(np.unique(np.hstack(thesefiles))))
elif coadd_type == 'cumulative':
# Scrape the disk to get the tiles, but since we read the csv file I don't think this ever happens.
if tile is None:
tiledirs = np.array(sorted(set(glob(os.path.join(filedir, 'cumulative', '?????')))))
if len(tiledirs) > 0:
tile = [int(os.path.basename(tiledir)) for tiledir in tiledirs]
if tile is not None:
thesefiles = []
for onetile in tile:
nightdirs = np.array(sorted(set(glob(os.path.join(filedir, 'cumulative', str(onetile), '????????')))))
if len(nightdirs) > 0:
# for a given tile, take just the most recent night
thisnightdir = nightdirs[-1]
thesefiles.append(glob(os.path.join(thisnightdir, f'{prefix}-[0-9]-{onetile}-thru????????.{fitssuffix}')))
if len(thesefiles) > 0:
thesefiles = np.array(sorted(set(np.hstack(thesefiles))))
elif coadd_type == 'pernight':
if tile is not None and night is not None:
thesefiles = []
for onetile in tile:
for onenight in night:
thesefiles.append(glob(os.path.join(
filedir, 'pernight', str(onetile), str(onenight), f'{prefix}-[0-9]-{onetile}-{onenight}.{fitssuffix}')))
if len(thesefiles) > 0:
thesefiles = np.array(sorted(set(np.hstack(thesefiles))))
elif tile is not None and night is None:
thesefiles = np.array(sorted(set(np.hstack([glob(os.path.join(
filedir, 'pernight', str(onetile), '????????', f'{prefix}-[0-9]-{onetile}-????????.{fitssuffix}')) for onetile in tile]))))
elif tile is None and night is not None:
thesefiles = np.array(sorted(set(np.hstack([glob(os.path.join(
filedir, 'pernight', '?????', str(onenight), f'{prefix}-[0-9]-?????-{onenight}.{fitssuffix}')) for onenight in night]))))
else:
thesefiles = np.array(sorted(set(glob(os.path.join(
filedir, '?????', '????????', f'{prefix}-[0-9]-?????-????????.{fitssuffix}')))))
elif coadd_type == 'perexp':
if tile is not None:
thesefiles = np.array(sorted(set(np.hstack([glob(os.path.join(
filedir, 'perexp', str(onetile), '????????', f'{prefix}-[0-9]-{onetile}-exp????????.{fitssuffix}')) for onetile in tile]))))
else:
thesefiles = np.array(sorted(set(glob(os.path.join(
filedir, 'perexp', '?????', '????????', f'{prefix}-[0-9]-?????-exp????????.{fitssuffix}')))))
else:
pass
return thesefiles
[docs]
def plan_merge(outdir, outprefix, coadd_type, survey, program, healpix,
tile, night, sample=None, gzip=False):
"""Build the list of output files to be merged."""
redrockfiles = None
if sample is not None: # special case of an input catalog
outfiles, _ = findfiles(outdir, prefix=outprefix, sample=sample)
else:
outfiles = findfiles(outdir, prefix=outprefix, coadd_type=coadd_type,
survey=survey, program=program, healpix=healpix,
tile=tile, night=night, gzip=gzip)
log.info(f'Found {len(outfiles)} {outprefix} files to be merged.')
return redrockfiles, outfiles
[docs]
def plan_makeqa(outdir, htmldir, outprefix, coadd_type, survey, program,
healpix, tile, night, sample=None, gzip=False):
"""Build the list of output files and HTML directories for QA generation."""
outfiles = findfiles(outdir, prefix=outprefix, coadd_type=coadd_type,
survey=survey, program=program, healpix=healpix,
tile=tile, night=night, gzip=gzip)
log.info(f'Found {len(outfiles)} {outprefix} files for QA.')
# Hack!--build the output directories and pass them in the 'redrockfiles'
# position! for coadd_type=='cumulative', strip out the 'lastnight'
# argument.
if len(outfiles) == 0:
return _, outfiles
if coadd_type == 'cumulative':
redrockfiles = np.array([os.path.dirname(os.path.dirname(outfile)).replace(outdir, htmldir)
for outfile in outfiles])
else:
redrockfiles = np.array([os.path.dirname(outfile).replace(outdir, htmldir)
for outfile in outfiles])
return redrockfiles, outfiles
[docs]
def plan(comm=None, specprod=None, specprod_dir=None, coadd_type='healpix',
survey=None, program=None, healpix=None, tile=None, night=None,
sample=None, outdir_data='.', mp=1, merge=False, makeqa=False,
fastphot=False, overwrite=False):
"""Determine which files still need to be processed.
Parameters
----------
comm : MPI communicator or None, optional
MPI communicator for distributed execution. When ``None``, runs
single-process.
specprod : :class:`str` or None, optional
DESI spectroscopic production name (e.g. ``'iron'``).
specprod_dir : :class:`str` or None, optional
Override the standard specprod directory path.
coadd_type : :class:`str`, optional
Coadd type: ``'healpix'``, ``'cumulative'``, ``'pernight'``, or
``'perexp'``. Default is ``'healpix'``.
survey : :class:`str` or array-like of str, optional
DESI survey name(s).
program : :class:`str` or array-like of str, optional
DESI program name(s).
healpix : array-like or None, optional
Specific HEALPix pixel(s) to process.
tile : array-like or None, optional
Specific tile ID(s) to process.
night : array-like or None, optional
Specific observing night(s) to process.
sample : :class:`astropy.table.Table` or None, optional
Input sample catalog; overrides file-system discovery.
outdir_data : :class:`str`, optional
Root output directory. Default is ``'.'``.
mp : :class:`int`, optional
Number of multiprocessing workers. Default is 1.
merge : :class:`bool`, optional
If ``True``, plan for catalog merging rather than fitting. Default is
``False``.
makeqa : :class:`bool`, optional
If ``True``, plan for QA generation rather than fitting. Default is
``False``.
fastphot : :class:`bool`, optional
If ``True``, use ``fastphot`` output filenames. Default is ``False``.
overwrite : :class:`bool`, optional
If ``True``, include files that already have output. Default is
``False``.
Returns
-------
outdir : :class:`str`
Root output directory.
redrockfiles : :class:`list` or :class:`numpy.ndarray`
Input redrock files to process (or ``None``/``[]`` when merging/QA).
outfiles : :class:`list` or :class:`numpy.ndarray`
Corresponding output file paths.
ntargets : :class:`numpy.ndarray` or None
Number of targets per file; ``None`` when merging.
"""
if comm:
rank, size = comm.rank, comm.size
else:
rank, size = 0, 1
if rank > 0:
outdir = None
redrockfiles = None
outfiles = None
ntargets = None
else:
t0 = time.time()
if fastphot:
outprefix = 'fastphot'
gzip = False
else:
outprefix = 'fastspec'
gzip = True
redux_dir = os.path.expandvars(os.environ.get('DESI_SPECTRO_REDUX'))
# look for data in the standard location
if coadd_type == 'healpix':
subdir = 'healpix'
else:
subdir = 'tiles'
# figure out which tiles belong to the SV programs
if tile is None:
tilefile = os.path.join(redux_dir, specprod, f'tiles-{specprod}.csv')
alltileinfo = Table.read(tilefile)
tileinfo = alltileinfo[['sv' in survey for survey in alltileinfo['SURVEY']]]
log.info('Retrieved a list of {} {} tiles from {}'.format(
len(tileinfo), ','.join(sorted(set(tileinfo['SURVEY']))), tilefile))
tile = np.array(list(set(tileinfo['TILEID'])))
if specprod_dir is None:
specprod_dir = os.path.join(redux_dir, specprod, subdir)
outdir = os.path.join(outdir_data, specprod, subdir)
htmldir = os.path.join(outdir_data, specprod, 'html', subdir)
if merge:
redrockfiles, outfiles = plan_merge(
outdir, outprefix, coadd_type, survey, program,
healpix, tile, night, sample=sample, gzip=gzip)
if len(outfiles) == 0:
log.debug(f'No {outprefix} files in {outdir} found!')
return '', list(), list(), None
return outdir, redrockfiles, outfiles, None
elif makeqa:
redrockfiles, outfiles = plan_makeqa(
outdir, htmldir, outprefix, coadd_type, survey, program,
healpix, tile, night, sample=sample, gzip=gzip)
if len(outfiles) == 0:
log.debug(f'No {outprefix} files in {outdir} left to do!')
return '', list(), list(), None
ntargs = [(outfile, htmldir, outdir, coadd_type, True, overwrite, fastphot)
for outfile in outfiles]
else:
if sample is not None: # special case of an input catalog
redrockfiles, ntargets = findfiles(specprod_dir, prefix='redrock', sample=sample)
else:
redrockfiles = findfiles(specprod_dir, prefix='redrock', coadd_type=coadd_type,
survey=survey, program=program,
healpix=healpix, tile=tile, night=night)
# In principle, we could parallelize this piece of code...
nfile = len(redrockfiles)
outfiles = []
for redrockfile in redrockfiles:
outfile = redrockfile.replace(specprod_dir, outdir).replace('redrock-', f'{outprefix}-')
if gzip:
outfile = outfile.replace('.fits', '.fits.gz')
outfiles.append(outfile)
outfiles = np.array(outfiles)
todo = np.ones(len(redrockfiles), bool)
for ii, outfile in enumerate(outfiles):
if os.path.isfile(outfile) and not overwrite:
todo[ii] = False
if np.count_nonzero(todo) > 0:
redrockfiles = redrockfiles[todo]
outfiles = outfiles[todo]
if sample is not None:
ntargets = ntargets[todo]
else: # all done!
redrockfiles = []
outfiles = []
if sample is not None:
ntargets = np.array([])
log.info(f'Found {len(redrockfiles):,d}/{nfile:,d} redrockfiles left.')
# we already counted ntargets
if sample is None:
ntargs = [(redrockfile, None, None, None, False, False, False)
for redrockfile in redrockfiles]
# Count the number of targets in each file. Distribute work to the other
# ranks, either via MPI or multiprocessing. Note that if `sample` exists,
# we already counted targets.
if sample is None:
if comm:
if rank == 0:
ntargs_byrank = np.array_split(ntargs, size)
for onerank in range(1, size):
comm.send(ntargs_byrank[onerank], dest=onerank)
ntargs_onerank = ntargs_byrank[rank]
else:
ntargs_onerank = comm.recv(source=0)
ntargets = []
for ntarg_onerank in ntargs_onerank:
ntargets.append(get_ntargets_one(*ntarg_onerank))
if rank > 0:
comm.send(ntargets, dest=0)
else:
for onerank in range(1, size):
ntargets.extend(comm.recv(source=onerank))
if len(ntargets) > 0:
ntargets = np.hstack(ntargets)
else:
if mp > 1:
with multiprocessing.Pool(mp) as P:
ntargets = P.map(_get_ntargets_one, ntargs)
else:
ntargets = [get_ntargets_one(*_ntargs) for _ntargs in ntargs]
ntargets = np.array(ntargets)
if rank == 0:
if len(ntargets) == 0:
if redrockfiles is not None:
redrockfiles = []
if outfiles is not None:
outfiles = []
else:
iempty = np.where(ntargets == 0)[0]
if len(iempty) > 0:
log.info(f'Skipping {len(iempty):,d} redrockfiles with no targets.')
itodo = np.where(ntargets > 0)[0]
if len(itodo) > 0:
ntargets = ntargets[itodo]
log.info(f'Number of targets left: {np.sum(ntargets):,d}.')
if redrockfiles is not None:
redrockfiles = redrockfiles[itodo]
if coadd_type == 'healpix':
maxlen = str(len(max(np.atleast_1d(survey), key=len)) +
len(max(np.atleast_1d(program), key=len)))
for onesurvey in np.atleast_1d(survey):
for oneprogram in np.atleast_1d(program):
I = np.logical_and(np.char.find(redrockfiles, onesurvey) != -1,
np.char.find(redrockfiles, oneprogram) != -1)
if np.any(I):
one = f'{onesurvey}:{oneprogram}'
log.info(f' {one:>{maxlen}}: {np.sum(I):,d} ' + \
f'redrockfiles, {np.sum(ntargets[I]):,d} targets')
if outfiles is not None:
outfiles = outfiles[itodo]
else:
if redrockfiles is not None:
redrockfiles = []
if outfiles is not None:
outfiles = []
if len(redrockfiles) == 0:
log.info('All files have been processed!')
return '', list(), list(), None
return outdir, redrockfiles, outfiles, ntargets
def _read_to_merge_one(args):
return read_to_merge_one(*args)
[docs]
def read_to_merge_one(filename, fastphot):
"""Read metadata, specphot, and fastfit tables from one output file."""
info = fitsio.FITS(filename)
meta = Table(info['METADATA'].read())
specphot = Table(info['SPECPHOT'].read())
if fastphot:
fastfit = None
else:
fastfit = Table(info['FASTSPEC'].read())
return meta, specphot, fastfit
[docs]
def _domerge(outfiles, outprefix=None, specprod=None, coadd_type=None,
mergefile=None, fastphot=False, split_hdu=False,
nside_main=None, mp=1):
"""Read and concatenate a set of per-healpix/tile output files into one catalog."""
from astropy.table import vstack
from desiutil.depend import getdep, hasdep
from fastspecfit.io import write_fastspecfit
t0 = time.time()
meta, specphot, fastfit = [], [], []
mpargs = [[outfile, fastphot] for outfile in outfiles]
if mp > 1:
with multiprocessing.Pool(mp) as P:
out = P.map(_read_to_merge_one, mpargs)
else:
out = [read_to_merge_one(*mparg) for mparg in mpargs]
out = list(zip(*out))
meta = vstack(out[0])
specphot = vstack(out[1])
if not fastphot:
fastfit = vstack(out[2])
del out
_ctx = f'nobj={len(meta):,d}, nfiles={len(outfiles)}'
if outprefix:
_ctx += f', prefix={outprefix}'
log.info(fsftime('merge_catalogs', time.time()-t0, context=_ctx))
hdr = fitsio.read_header(outfiles[0])
# sensible defaults
deps = {}
deps['INPUTZ'] = False
deps['INPUTS'] = False
deps['CONSAGE'] = False
deps['USEQNET'] = True
deps['NMONTE'] = 50
deps['SEED'] = 1
deps['NOSCORR'] = False
deps['NOPHOTO'] = False
deps['BRDLFIT'] = True
deps['UFLOOR'] = 0.01
deps['SNRBBALM'] = 2.5
for key in deps.keys():
if key in hdr:
deps[key] = hdr[key]
deps2 = {}
deps2['FPHOTO_FILE'] = None
deps2['FTEMPLATES_FILE'] = None
deps2['EMLINES_FILE'] = None
for key in deps2.keys():
if hasdep(hdr, key):
deps2[key] = getdep(hdr, key)
write_fastspecfit(meta, specphot, fastfit, modelspectra=None, outfile=mergefile,
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot,
fphotofile=deps2['FPHOTO_FILE'], template_file=deps2['FTEMPLATES_FILE'],
emlinesfile=deps2['EMLINES_FILE'], inputz=deps['INPUTZ'],
ignore_photometry=deps['NOPHOTO'], broadlinefit=deps['BRDLFIT'],
constrain_age=deps['CONSAGE'], use_quasarnet=deps['USEQNET'],
no_smooth_continuum=deps['NOSCORR'], split_hdu=split_hdu,
nside=nside_main)
[docs]
def merge_fastspecfit(specprod=None, coadd_type=None, survey=None, program=None,
healpix=None, tile=None, night=None, sample=None, outsuffix=None,
fastphot=False, specprod_dir=None, outdir_data='.',
split_hdu=False, fastfiles_to_merge=None, merge_suffix=None,
mergedir=None, supermerge=False, overwrite=False, nside_main=None,
mp=1):
"""Merge per-healpix or per-tile output catalogs into a single catalog.
Runs only on MPI rank 0. When ``supermerge=True``, merges previously
merged per-survey/program catalogs into one master file instead.
Parameters
----------
specprod : :class:`str` or None, optional
DESI spectroscopic production name.
coadd_type : :class:`str` or None, optional
Coadd type: ``'healpix'``, ``'cumulative'``, ``'pernight'``, etc.
survey : :class:`str` or array-like of str, optional
DESI survey name(s).
program : :class:`str` or array-like of str, optional
DESI program name(s).
healpix : array-like or None, optional
Specific HEALPix pixel(s) to merge.
tile : array-like or None, optional
Specific tile ID(s) to merge.
night : array-like or None, optional
Specific observing night(s) to merge.
sample : :class:`astropy.table.Table` or None, optional
Input sample catalog; overrides file-system discovery.
outsuffix : :class:`str` or None, optional
Suffix for the merged output filename; defaults to ``specprod``.
fastphot : :class:`bool`, optional
If ``True``, merge ``fastphot`` output files. Default is ``False``.
specprod_dir : :class:`str` or None, optional
Override the standard specprod directory path.
outdir_data : :class:`str`, optional
Root output directory. Default is ``'.'``.
split_hdu : :class:`bool`, optional
If ``True``, write main-survey outputs as separate HDUs by
HEALPix nside. Default is ``False``.
fastfiles_to_merge : :class:`list` or None, optional
Explicit list of files to merge when ``supermerge=True``.
merge_suffix : :class:`str` or None, optional
Filename suffix for sample-based merges. Default is ``'sample'``.
mergedir : :class:`str` or None, optional
Directory for merged output files; defaults to
``<outdir_data>/<specprod>/catalogs``.
supermerge : :class:`bool`, optional
If ``True``, merge previously merged per-survey/program catalogs
into one master file. Default is ``False``.
overwrite : :class:`bool`, optional
If ``True``, overwrite existing merged output files. Default is
``False``.
nside_main : :class:`int` or None, optional
HEALPix nside for splitting main-survey output by healpixel.
mp : :class:`int`, optional
Number of multiprocessing workers. Default is 1.
"""
import fitsio
from copy import copy
from astropy.io import fits
from astropy.table import Table, vstack
from fastspecfit.mpi import plan
if split_hdu and nside_main:
errmsg = 'Using split_hdu=True and nside_main!=None is not supported.'
log.critical(errmsg)
raise ValueError(errmsg)
if fastphot:
outprefix = 'fastphot'
else:
outprefix = 'fastspec'
if mergedir is None:
mergedir = os.path.join(outdir_data, specprod, 'catalogs')
if not os.path.isdir(mergedir):
os.makedirs(mergedir, exist_ok=True)
if outsuffix is None:
outsuffix = specprod
# merge previously merged catalogs into one big catalog (and then return)
if supermerge:
if fastfiles_to_merge is None:
_outfiles = os.path.join(mergedir, f'{outprefix}-{outsuffix}-*.fits*')
outfiles = glob(_outfiles)
else:
_outfiles = fastfiles_to_merge
outfiles = _outfiles
if len(outfiles) > 0:
log.info(f'Merging {len(outfiles):,d} catalogs')
mergefile = os.path.join(mergedir, f'{outprefix}-{outsuffix}.fits')
_domerge(outfiles, mergefile=mergefile, outprefix=outprefix,
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot,
split_hdu=split_hdu, nside_main=nside_main, mp=mp)
else:
log.info(f'No catalogs found: {_outfiles}')
return
if sample is not None:
if merge_suffix is None:
merge_suffix = 'sample'
if nside_main:
print('WRITE ME!')
else:
mergefile = os.path.join(mergedir, f'{outprefix}-{specprod}-{merge_suffix}.fits')
if os.path.isfile(mergefile) and not overwrite:
log.info(f'Merged output file {mergefile} exists!')
return
_, _, outfiles, _ = plan(specprod=specprod, sample=sample, merge=True,
fastphot=fastphot, specprod_dir=specprod_dir,
outdir_data=outdir_data, overwrite=overwrite)
if len(outfiles) > 0:
_domerge(outfiles, mergefile=mergefile, outprefix=outprefix,
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot,
split_hdu=split_hdu, nside_main=nside_main, mp=mp)
elif coadd_type == 'healpix' and sample is None:
if survey is None or program is None:
log.warning(f'coadd_type={coadd_type} requires survey and program inputs.')
return
surveys = copy(survey)
programs = copy(program)
for survey in surveys:
for program in programs:
mergefile = os.path.join(mergedir, f'{outprefix}-{specprod}-{survey}-{program}.fits')
if os.path.isfile(mergefile) and not overwrite:
log.info(f'Merged output file {mergefile} exists!')
continue
_, _, outfiles, _ = plan(specprod=specprod, survey=survey, program=program, healpix=healpix,
merge=True, fastphot=fastphot, specprod_dir=specprod_dir,
outdir_data=outdir_data, overwrite=overwrite)
if len(outfiles) > 0:
# split main/dark and main/bright merge files
# either by HDU or nside_main healpixels
do_split_hdu = False
use_nside_main = None
if survey == 'main' and ((program == 'dark') or (program == 'bright')):
if nside_main:
use_nside_main = nside_main
elif split_hdu:
do_split_hdu = True
_domerge(outfiles, mergefile=mergefile, outprefix=outprefix, specprod=specprod,
coadd_type=coadd_type, fastphot=fastphot, split_hdu=do_split_hdu,
nside_main=use_nside_main, mp=mp)
else:
mergefile = os.path.join(mergedir, f'{outprefix}-{specprod}-{coadd_type}.fits')
if os.path.isfile(mergefile) and not overwrite:
log.info(f'Merged output file {mergefile} exists!')
return
_, _, outfiles, _ = plan(specprod=specprod, coadd_type=coadd_type, tile=tile, night=night,
merge=True, fastphot=fastphot, specprod_dir=specprod_dir,
outdir_data=outdir_data, overwrite=overwrite)
if len(outfiles) > 0:
_domerge(outfiles, mergefile=mergefile, outprefix=outprefix,
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot,
split_hdu=split_hdu, nside_main=nside_main, mp=mp)
# ---------------------------------------------------------------------------
# build_cmdargs: originally in bin/mpi-fastspecfit, moved here so that the
# per-file worker (_perfile_run) can call it from the fastspecfit.mpi module.
# ---------------------------------------------------------------------------
[docs]
def build_cmdargs(args, redrockfile, outfile, sample=None, fastphot=False,
input_redshifts=False):
"""Build the command-line argument string for fastspec, fastphot, or fastqa.
Parameters
----------
args : :class:`argparse.Namespace`
Parsed command-line arguments.
redrockfile : :class:`str`
Path to the input redrock file (or QA output directory for ``--makeqa``).
outfile : :class:`str`
Path to the output FITS file (or input file for ``--makeqa``).
sample : :class:`astropy.table.Table` or None, optional
Optional target sample table with columns ``{SURVEY, PROGRAM, HEALPIX, TARGETID}``.
fastphot : :class:`bool`, optional
If ``True``, build arguments for ``fastphot`` instead of ``fastspec``.
input_redshifts : :class:`bool`, optional
If ``True``, include input redshifts from ``sample['Z']``.
Returns
-------
cmd : :class:`str`
Executable name (``'fastspec'``, ``'fastphot'``, or ``'fastqa'``).
cmdargs : :class:`str`
Command-line argument string.
logfile : :class:`str`
Path to the log file.
"""
if args.makeqa:
cmd = 'fastqa'
cmdargs = f'{outfile} -o={redrockfile} --mp={args.mp}'
else:
cmd = 'fastphot' if fastphot else 'fastspec'
cmdargs = f'{redrockfile} -o={outfile} --mp={args.mp}'
if args.ignore_quasarnet:
cmdargs += ' --ignore-quasarnet'
if args.ignore_photometry:
cmdargs += ' --ignore-photometry'
if args.no_smooth_continuum:
cmdargs += ' --no-smooth-continuum'
if args.templates:
cmdargs += f' --templates={args.templates}'
if args.templateversion:
cmdargs += f' --templateversion={args.templateversion}'
if args.fphotodir:
cmdargs += f' --fphotodir={args.fphotodir}'
if args.fphotofile:
cmdargs += f' --fphotofile={args.fphotofile}'
if args.emlinesfile:
cmdargs += f' --emlinesfile={args.emlinesfile}'
if args.constraintsfile:
cmdargs += f' --constraintsfile={args.constraintsfile}'
if args.nmonte:
cmdargs += f' --nmonte={args.nmonte}'
if args.vdisp_nominal:
cmdargs += f' --vdisp-nominal={args.vdisp_nominal}'
if args.vdisp_bounds:
cmdargs += f' --vdisp-bounds {float(args.vdisp_bounds[0])} {float(args.vdisp_bounds[1])}'
if args.seed:
cmdargs += f' --seed={args.seed}'
if sample is not None:
_, survey, program, healpix = os.path.basename(redrockfile).split('-')
healpix = int(healpix.split('.')[0])
I = ((sample['SURVEY'] == survey) * (sample['PROGRAM'] == program) *
(sample['HEALPIX'] == healpix))
targetids = ','.join(sample[I]['TARGETID'].astype(str))
cmdargs += f' --targetids={targetids}'
if input_redshifts:
inputz = ','.join(sample[I]['Z'].astype(str))
cmdargs += f' --input-redshifts={inputz}'
else:
if args.targetids is not None:
cmdargs += f' --targetids={args.targetids}'
if args.verbose:
cmdargs += ' --verbose'
if args.ntargets is not None:
cmdargs += f' --ntargets={args.ntargets}'
if args.firsttarget is not None:
cmdargs += f' --firsttarget={args.firsttarget}'
if args.makeqa:
logfile = os.path.join(redrockfile, os.path.basename(outfile).replace('.gz', '').replace('.fits', '.log'))
else:
logfile = outfile.replace('.gz', '').replace('.fits', '.log')
return cmd, cmdargs, logfile
# ---------------------------------------------------------------------------
# Per-file parallel worker state and functions (used by MPPool when
# --mp-per-file is set). Placing these in the fastspecfit.mpi module (rather
# than in bin/mpi-fastspecfit) makes them picklable by fully-qualified name,
# which is required when multiprocessing uses the 'spawn' start method.
#
# _perfile_init() is the pool initializer: called once per worker at startup.
# _perfile_run() is the per-task function: called once per healpix file.
# ---------------------------------------------------------------------------
_perfile_sample = None
_perfile_rank = None
_perfile_fastphot = False
_perfile_input_redshifts = False
[docs]
def _perfile_init(init_sc_args, sample, rank, fastphot, input_redshifts):
"""Pool initializer for --mp-per-file mode.
Loads sc_data once per worker process and stores per-run state in
module-level globals so that _perfile_run() can access them without
per-task pickling overhead.
"""
global _perfile_sample, _perfile_rank, _perfile_fastphot, _perfile_input_redshifts
from fastspecfit.singlecopy import sc_data
sc_data.initialize(**init_sc_args)
_perfile_sample = sample
_perfile_rank = rank
_perfile_fastphot = fastphot
_perfile_input_redshifts = input_redshifts
[docs]
def _perfile_run(redrockfile, outfile, ntarget, args):
"""Per-file worker task for --mp-per-file mode.
Reads worker state from module-level globals set by _perfile_init(),
builds per-file command args (forcing --mp=1 so no inner pool is
created), and calls fastspec/fastphot for this file.
"""
import copy
import fastspecfit.mpi as _mpi
from desispec.parallel import stdouterr_redirected
if _mpi._perfile_fastphot:
from fastspecfit.fastspecfit import fastphot as fast
else:
from fastspecfit.fastspecfit import fastspec as fast
# Force mp=1: all parallelism is at the file level; no inner pool needed.
args_inner = copy.copy(args)
args_inner.mp = 1
cmd, cmdargs, logfile = build_cmdargs(
args_inner, redrockfile, outfile,
sample=_mpi._perfile_sample,
fastphot=_mpi._perfile_fastphot,
input_redshifts=_mpi._perfile_input_redshifts,
)
if _mpi._perfile_rank == 0:
log.info(f'Rank {_mpi._perfile_rank}: ntargets={ntarget}: {cmd} {cmdargs}')
if getattr(args, 'dry_run', False):
return
try:
t1 = time.time()
outdir = os.path.dirname(logfile)
if not os.path.isdir(outdir):
os.makedirs(outdir, exist_ok=True)
if getattr(args, 'nolog', False):
err = fast(args=cmdargs.split(), comm=None)
else:
with stdouterr_redirected(to=logfile, overwrite=args.overwrite, comm=None):
err = fast(args=cmdargs.split(), comm=None)
log.info(fsftime('process_file', time.time()-t1,
context=f'rank={_mpi._perfile_rank}'))
if err != 0:
if not os.path.exists(outfile):
log.warning(f'Rank {_mpi._perfile_rank} missing {outfile}')
raise IOError
except:
log.warning(f'Rank {_mpi._perfile_rank} raised an exception')
import traceback
traceback.print_exc()