Source code for fastspecfit.mpi

"""
fastspecfit.mpi
===============

MPI tools.

"""
import os, time
import numpy as np
from glob import glob
import multiprocessing
import fitsio
from astropy.table import Table, vstack

from fastspecfit.io import get_qa_filename
from fastspecfit.logger import log


def _get_ntargets_one(args):
    return get_ntargets_one(*args)


def get_ntargets_one(specfile, htmldir_root, outdir_root, coadd_type='healpix',
                     makeqa=False, overwrite=False, fastphot=False):
    if makeqa:
        if overwrite:
            ntargets = fitsio.FITS(specfile)[1].get_nrows()
        else:
            outdir = os.path.dirname(specfile).replace(outdir_root, htmldir_root)
            meta = fitsio.read(specfile, 'METADATA', columns=['SURVEY', 'PROGRAM', 'TARGETID', 'HEALPIX'])
            ntargets = 0
            for meta1 in meta:
                pngfile = get_qa_filename(meta1, coadd_type, outdir=outdir, fastphot=fastphot)
                #print(pngfile)
                if not os.path.isfile(pngfile):
                    ntargets += 1
    else:
        from fastspecfit.util import ZWarningMask
        zb = fitsio.read(specfile, 'REDSHIFTS', columns=['Z', 'ZWARN'])
        fm = fitsio.read(specfile, 'FIBERMAP', columns=['TARGETID', 'OBJTYPE'])
        J = ((zb['Z'] > 0.001) * (fm['OBJTYPE'] == 'TGT') * (zb['ZWARN'] & ZWarningMask.NODATA == 0))
        ntargets = np.sum(J)
    return ntargets


def findfiles(filedir, prefix='redrock', coadd_type=None, survey=None,
              program=None, healpix=None, tile=None, night=None,
              gzip=False, sample=None):
    if gzip:
        fitssuffix = 'fits.gz'
    else:
        fitssuffix = 'fits'

    if sample is not None: # special case of an input catalog
        thesefiles, ntargets = [], []
        for onesurvey in sorted(set(sample['SURVEY'].data)):
            S = np.where(onesurvey == sample['SURVEY'])[0]
            for oneprogram in sorted(set(sample['PROGRAM'][S].data)):
                log.info(f'Building file list for survey={onesurvey} and program={oneprogram}')
                P = np.where(oneprogram == sample['PROGRAM'][S])[0]
                uhealpix, _ntargets = np.unique(sample['HEALPIX'][S][P].astype(str), return_counts=True)
                ntargets.append(_ntargets)
                for onepix in uhealpix:
                    #ntargets.append(np.sum(onepix == sample['HEALPIX'][S][P].astype(str)))
                    thesefiles.append(os.path.join(filedir, onesurvey, oneprogram, str(int(onepix)//100), onepix,
                                                   f'{prefix}-{onesurvey}-{oneprogram}-{onepix}.{fitssuffix}'))
        if len(thesefiles) > 0:
            #thesefiles = np.array(sorted(np.unique(np.hstack(thesefiles))))
            thesefiles = np.hstack(thesefiles)
            ntargets = np.hstack(ntargets)
        return thesefiles, ntargets
    elif coadd_type == 'healpix':
        thesefiles = []
        for onesurvey in np.atleast_1d(survey):
            for oneprogram in np.atleast_1d(program):
                log.info(f'Building file list for survey={onesurvey} and program={oneprogram}')
                if healpix is not None:
                    for onepix in healpix:
                        _thesefiles = os.path.join(filedir, onesurvey, oneprogram, str(int(onepix)//100), onepix,
                                                   f'{prefix}-{onesurvey}-{oneprogram}-{onepix}.{fitssuffix}')
                        thesefiles.append(glob(_thesefiles))
                else:
                    allpix = os.path.join(filedir, onesurvey, oneprogram, '*')
                    for onepix in glob(allpix):
                        _thesefiles = os.path.join(onepix, '*', f'{prefix}-{onesurvey}-{oneprogram}-*.{fitssuffix}')
                        thesefiles.append(glob(_thesefiles))
        if len(thesefiles) > 0:
            thesefiles = np.array(sorted(np.unique(np.hstack(thesefiles))))
    elif coadd_type == 'cumulative':
        # Scrape the disk to get the tiles, but since we read the csv file I don't think this ever happens.
        if tile is None:
            tiledirs = np.array(sorted(set(glob(os.path.join(filedir, 'cumulative', '?????')))))
            if len(tiledirs) > 0:
                tile = [int(os.path.basename(tiledir)) for tiledir in tiledirs]
        if tile is not None:
            thesefiles = []
            for onetile in tile:
                nightdirs = np.array(sorted(set(glob(os.path.join(filedir, 'cumulative', str(onetile), '????????')))))
                if len(nightdirs) > 0:
                    # for a given tile, take just the most recent night
                    thisnightdir = nightdirs[-1]
                    thesefiles.append(glob(os.path.join(thisnightdir, f'{prefix}-[0-9]-{onetile}-thru????????.{fitssuffix}')))
            if len(thesefiles) > 0:
                thesefiles = np.array(sorted(set(np.hstack(thesefiles))))
    elif coadd_type == 'pernight':
        if tile is not None and night is not None:
            thesefiles = []
            for onetile in tile:
                for onenight in night:
                    thesefiles.append(glob(os.path.join(
                        filedir, 'pernight', str(onetile), str(onenight), f'{prefix}-[0-9]-{onetile}-{onenight}.{fitssuffix}')))
            if len(thesefiles) > 0:
                thesefiles = np.array(sorted(set(np.hstack(thesefiles))))
        elif tile is not None and night is None:
            thesefiles = np.array(sorted(set(np.hstack([glob(os.path.join(
                filedir, 'pernight', str(onetile), '????????', f'{prefix}-[0-9]-{onetile}-????????.{fitssuffix}')) for onetile in tile]))))
        elif tile is None and night is not None:
            thesefiles = np.array(sorted(set(np.hstack([glob(os.path.join(
                filedir, 'pernight', '?????', str(onenight), f'{prefix}-[0-9]-?????-{onenight}.{fitssuffix}')) for onenight in night]))))
        else:
            thesefiles = np.array(sorted(set(glob(os.path.join(
                filedir, '?????', '????????', f'{prefix}-[0-9]-?????-????????.{fitssuffix}')))))
    elif coadd_type == 'perexp':
        if tile is not None:
            thesefiles = np.array(sorted(set(np.hstack([glob(os.path.join(
                filedir, 'perexp', str(onetile), '????????', f'{prefix}-[0-9]-{onetile}-exp????????.{fitssuffix}')) for onetile in tile]))))
        else:
            thesefiles = np.array(sorted(set(glob(os.path.join(
                filedir, 'perexp', '?????', '????????', f'{prefix}-[0-9]-?????-exp????????.{fitssuffix}')))))
    else:
        pass
    return thesefiles


[docs] def plan_merge(outdir, outprefix, coadd_type, survey, program, healpix, tile, night, sample=None, gzip=False): """Simple support routine to build the input and output filenames when merging. """ redrockfiles = None if sample is not None: # special case of an input catalog outfiles, _ = findfiles(outdir, prefix=outprefix, sample=sample) else: outfiles = findfiles(outdir, prefix=outprefix, coadd_type=coadd_type, survey=survey, program=program, healpix=healpix, tile=tile, night=night, gzip=gzip) log.info(f'Found {len(outfiles)} {outprefix} files to be merged.') return redrockfiles, outfiles
[docs] def plan_makeqa(outdir, htmldir, outprefix, coadd_type, survey, program, healpix, tile, night, sample=None, gzip=False): """Simple support routine to build the input and output filenames when building QA. """ outfiles = findfiles(outdir, prefix=outprefix, coadd_type=coadd_type, survey=survey, program=program, healpix=healpix, tile=tile, night=night, gzip=gzip) log.info(f'Found {len(outfiles)} {outprefix} files for QA.') # Hack!--build the output directories and pass them in the 'redrockfiles' # position! for coadd_type=='cumulative', strip out the 'lastnight' # argument. if len(outfiles) == 0: return _, outfiles if coadd_type == 'cumulative': redrockfiles = np.array([os.path.dirname(os.path.dirname(outfile)).replace(outdir, htmldir) for outfile in outfiles]) else: redrockfiles = np.array([os.path.dirname(outfile).replace(outdir, htmldir) for outfile in outfiles]) return redrockfiles, outfiles
[docs] def plan(comm=None, specprod=None, specprod_dir=None, coadd_type='healpix', survey=None, program=None, healpix=None, tile=None, night=None, sample=None, outdir_data='.', mp=1, merge=False, makeqa=False, fastphot=False, overwrite=False): """Function which determines what files have---and have not---been processed. """ if comm: rank, size = comm.rank, comm.size else: rank, size = 0, 1 if rank > 0: outdir = None redrockfiles = None outfiles = None ntargets = None else: t0 = time.time() if fastphot: outprefix = 'fastphot' gzip = False else: outprefix = 'fastspec' gzip = True redux_dir = os.path.expandvars(os.environ.get('DESI_SPECTRO_REDUX')) # look for data in the standard location if coadd_type == 'healpix': subdir = 'healpix' else: subdir = 'tiles' # figure out which tiles belong to the SV programs if tile is None: tilefile = os.path.join(redux_dir, specprod, f'tiles-{specprod}.csv') alltileinfo = Table.read(tilefile) tileinfo = alltileinfo[['sv' in survey for survey in alltileinfo['SURVEY']]] log.info('Retrieved a list of {} {} tiles from {}'.format( len(tileinfo), ','.join(sorted(set(tileinfo['SURVEY']))), tilefile)) tile = np.array(list(set(tileinfo['TILEID']))) if specprod_dir is None: specprod_dir = os.path.join(redux_dir, specprod, subdir) outdir = os.path.join(outdir_data, specprod, subdir) htmldir = os.path.join(outdir_data, specprod, 'html', subdir) if merge: redrockfiles, outfiles = plan_merge( outdir, outprefix, coadd_type, survey, program, healpix, tile, night, sample=sample, gzip=gzip) if len(outfiles) == 0: log.debug(f'No {outprefix} files in {outdir} found!') return '', list(), list(), None return outdir, redrockfiles, outfiles, None elif makeqa: redrockfiles, outfiles = plan_makeqa( outdir, htmldir, outprefix, coadd_type, survey, program, healpix, tile, night, sample=sample, gzip=gzip) if len(outfiles) == 0: log.debug(f'No {outprefix} files in {outdir} left to do!') return '', list(), list(), None ntargs = [(outfile, htmldir, outdir, coadd_type, True, overwrite, fastphot) for outfile in outfiles] else: if sample is not None: # special case of an input catalog redrockfiles, ntargets = findfiles(specprod_dir, prefix='redrock', sample=sample) else: redrockfiles = findfiles(specprod_dir, prefix='redrock', coadd_type=coadd_type, survey=survey, program=program, healpix=healpix, tile=tile, night=night) # In principle, we could parallelize this piece of code... nfile = len(redrockfiles) outfiles = [] for redrockfile in redrockfiles: outfile = redrockfile.replace(specprod_dir, outdir).replace('redrock-', f'{outprefix}-') if gzip: outfile = outfile.replace('.fits', '.fits.gz') outfiles.append(outfile) outfiles = np.array(outfiles) todo = np.ones(len(redrockfiles), bool) for ii, outfile in enumerate(outfiles): if os.path.isfile(outfile) and not overwrite: todo[ii] = False if np.count_nonzero(todo) > 0: redrockfiles = redrockfiles[todo] outfiles = outfiles[todo] if sample is not None: ntargets = ntargets[todo] else: # all done! redrockfiles = [] outfiles = [] if sample is not None: ntargets = np.array([]) log.info(f'Found {len(redrockfiles):,d}/{nfile:,d} redrockfiles left.') # we already counted ntargets if sample is None: ntargs = [(redrockfile, None, None, None, False, False, False) for redrockfile in redrockfiles] # Count the number of targets in each file. Distribute work to the other # ranks, either via MPI or multiprocessing. Note that if `sample` exists, # we already counted targets. if sample is None: if comm: if rank == 0: ntargs_byrank = np.array_split(ntargs, size) for onerank in range(1, size): comm.send(ntargs_byrank[onerank], dest=onerank) ntargs_onerank = ntargs_byrank[rank] else: ntargs_onerank = comm.recv(source=0) ntargets = [] for ntarg_onerank in ntargs_onerank: ntargets.append(get_ntargets_one(*ntarg_onerank)) if rank > 0: comm.send(ntargets, dest=0) else: for onerank in range(1, size): ntargets.extend(comm.recv(source=onerank)) if len(ntargets) > 0: ntargets = np.hstack(ntargets) else: if mp > 1: with multiprocessing.Pool(mp) as P: ntargets = P.map(_get_ntargets_one, ntargs) else: ntargets = [get_ntargets_one(*_ntargs) for _ntargs in ntargs] ntargets = np.array(ntargets) if rank == 0: if len(ntargets) == 0: if redrockfiles is not None: redrockfiles = [] if outfiles is not None: outfiles = [] else: iempty = np.where(ntargets == 0)[0] if len(iempty) > 0: log.info(f'Skipping {len(iempty):,d} redrockfiles with no targets.') itodo = np.where(ntargets > 0)[0] if len(itodo) > 0: ntargets = ntargets[itodo] log.info(f'Number of targets left: {np.sum(ntargets):,d}.') if redrockfiles is not None: redrockfiles = redrockfiles[itodo] if coadd_type == 'healpix': maxlen = str(len(max(np.atleast_1d(survey), key=len)) + len(max(np.atleast_1d(program), key=len))) for onesurvey in np.atleast_1d(survey): for oneprogram in np.atleast_1d(program): I = np.logical_and(np.char.find(redrockfiles, onesurvey) != -1, np.char.find(redrockfiles, oneprogram) != -1) if np.any(I): one = f'{onesurvey}:{oneprogram}' log.info(f' {one:>{maxlen}}: {np.sum(I):,d} ' + \ f'redrockfiles, {np.sum(ntargets[I]):,d} targets') if outfiles is not None: outfiles = outfiles[itodo] else: if redrockfiles is not None: redrockfiles = [] if outfiles is not None: outfiles = [] if len(redrockfiles) == 0: log.info('All files have been processed!') return '', list(), list(), None return outdir, redrockfiles, outfiles, ntargets
def _read_to_merge_one(args): return read_to_merge_one(*args) def read_to_merge_one(filename, fastphot): info = fitsio.FITS(filename) meta = Table(info['METADATA'].read()) specphot = Table(info['SPECPHOT'].read()) if fastphot: fastfit = None else: fastfit = Table(info['FASTSPEC'].read()) return meta, specphot, fastfit
[docs] def _domerge(outfiles, outprefix=None, specprod=None, coadd_type=None, mergefile=None, fastphot=False, split_hdu=False, nside_main=None, mp=1): """Support routine to merge a set of input files. """ from astropy.table import vstack from desiutil.depend import getdep, hasdep from fastspecfit.io import write_fastspecfit t0 = time.time() meta, specphot, fastfit = [], [], [] mpargs = [[outfile, fastphot] for outfile in outfiles] if mp > 1: with multiprocessing.Pool(mp) as P: out = P.map(_read_to_merge_one, mpargs) else: out = [read_to_merge_one(*mparg) for mparg in mpargs] out = list(zip(*out)) meta = vstack(out[0]) specphot = vstack(out[1]) if not fastphot: fastfit = vstack(out[2]) del out if outprefix: log.info(f'Merging {len(meta):,d} objects from {len(outfiles)} {outprefix} ' + \ f'files took {(time.time()-t0)/60.:.2f} min.') else: log.info(f'Merging {len(meta):,d} objects from {len(outfiles)} ' + \ f'files took {(time.time()-t0)/60.:.2f} min.') hdr = fitsio.read_header(outfiles[0]) # sensible defaults deps = {} deps['INPUTZ'] = False deps['INPUTS'] = False deps['CONSAGE'] = False deps['USEQNET'] = True deps['NMONTE'] = 50 deps['SEED'] = 1 deps['NOSCORR'] = False deps['NOPHOTO'] = False deps['BRDLFIT'] = True deps['UFLOOR'] = 0.01 deps['SNRBBALM'] = 2.5 for key in deps.keys(): if key in hdr: deps[key] = hdr[key] deps2 = {} deps2['FPHOTO_FILE'] = None deps2['FTEMPLATES_FILE'] = None deps2['EMLINES_FILE'] = None for key in deps2.keys(): if hasdep(hdr, key): deps2[key] = getdep(hdr, key) write_fastspecfit(meta, specphot, fastfit, modelspectra=None, outfile=mergefile, specprod=specprod, coadd_type=coadd_type, fastphot=fastphot, fphotofile=deps2['FPHOTO_FILE'], template_file=deps2['FTEMPLATES_FILE'], emlinesfile=deps2['EMLINES_FILE'], inputz=deps['INPUTZ'], ignore_photometry=deps['NOPHOTO'], broadlinefit=deps['BRDLFIT'], constrain_age=deps['CONSAGE'], use_quasarnet=deps['USEQNET'], no_smooth_continuum=deps['NOSCORR'], split_hdu=split_hdu, nside=nside_main)
[docs] def merge_fastspecfit(specprod=None, coadd_type=None, survey=None, program=None, healpix=None, tile=None, night=None, sample=None, outsuffix=None, fastphot=False, specprod_dir=None, outdir_data='.', split_hdu=False, fastfiles_to_merge=None, merge_suffix=None, mergedir=None, supermerge=False, overwrite=False, nside_main=None, mp=1): """Merge all the individual catalogs into a single large catalog. Runs only on rank 0. supermerge - merge previously merged catalogs """ import fitsio from copy import copy from astropy.io import fits from astropy.table import Table, vstack from fastspecfit.mpi import plan if split_hdu and nside_main: errmsg = 'Using split_hdu=True and nside_main!=None is not supported.' log.critical(errmsg) raise ValueError(errmsg) if fastphot: outprefix = 'fastphot' else: outprefix = 'fastspec' if mergedir is None: mergedir = os.path.join(outdir_data, specprod, 'catalogs') if not os.path.isdir(mergedir): os.makedirs(mergedir, exist_ok=True) if outsuffix is None: outsuffix = specprod # merge previously merged catalogs into one big catalog (and then return) if supermerge: if fastfiles_to_merge is None: _outfiles = os.path.join(mergedir, f'{outprefix}-{outsuffix}-*.fits*') outfiles = glob(_outfiles) else: _outfiles = fastfiles_to_merge outfiles = _outfiles if len(outfiles) > 0: log.info(f'Merging {len(outfiles):,d} catalogs') mergefile = os.path.join(mergedir, f'{outprefix}-{outsuffix}.fits') _domerge(outfiles, mergefile=mergefile, outprefix=outprefix, specprod=specprod, coadd_type=coadd_type, fastphot=fastphot, split_hdu=split_hdu, nside_main=nside_main, mp=mp) else: log.info(f'No catalogs found: {_outfiles}') return if sample is not None: if merge_suffix is None: merge_suffix = 'sample' if nside_main: print('WRITE ME!') else: mergefile = os.path.join(mergedir, f'{outprefix}-{specprod}-{merge_suffix}.fits') if os.path.isfile(mergefile) and not overwrite: log.info(f'Merged output file {mergefile} exists!') return _, _, outfiles, _ = plan(specprod=specprod, sample=sample, merge=True, fastphot=fastphot, specprod_dir=specprod_dir, outdir_data=outdir_data, overwrite=overwrite) if len(outfiles) > 0: _domerge(outfiles, mergefile=mergefile, outprefix=outprefix, specprod=specprod, coadd_type=coadd_type, fastphot=fastphot, split_hdu=split_hdu, nside_main=nside_main, mp=mp) elif coadd_type == 'healpix' and sample is None: if survey is None or program is None: log.warning(f'coadd_type={coadd_type} requires survey and program inputs.') return surveys = copy(survey) programs = copy(program) for survey in surveys: for program in programs: mergefile = os.path.join(mergedir, f'{outprefix}-{specprod}-{survey}-{program}.fits') if os.path.isfile(mergefile) and not overwrite: log.info(f'Merged output file {mergefile} exists!') continue _, _, outfiles, _ = plan(specprod=specprod, survey=survey, program=program, healpix=healpix, merge=True, fastphot=fastphot, specprod_dir=specprod_dir, outdir_data=outdir_data, overwrite=overwrite) if len(outfiles) > 0: # split main/dark and main/bright merge files # either by HDU or nside_main healpixels do_split_hdu = False use_nside_main = None if survey == 'main' and ((program == 'dark') or (program == 'bright')): if nside_main: use_nside_main = nside_main elif split_hdu: do_split_hdu = True _domerge(outfiles, mergefile=mergefile, outprefix=outprefix, specprod=specprod, coadd_type=coadd_type, fastphot=fastphot, split_hdu=do_split_hdu, nside_main=use_nside_main, mp=mp) else: mergefile = os.path.join(mergedir, f'{outprefix}-{specprod}-{coadd_type}.fits') if os.path.isfile(mergefile) and not overwrite: log.info(f'Merged output file {mergefile} exists!') return _, _, outfiles, _ = plan(specprod=specprod, coadd_type=coadd_type, tile=tile, night=night, merge=True, fastphot=fastphot, specprod_dir=specprod_dir, outdir_data=outdir_data, overwrite=overwrite) if len(outfiles) > 0: _domerge(outfiles, mergefile=mergefile, outprefix=outprefix, specprod=specprod, coadd_type=coadd_type, fastphot=fastphot, split_hdu=split_hdu, nside_main=nside_main, mp=mp)