Source code for twinkle.sed

"""
 sed.py by Rahul I. Patel (ri.patel272@gmail.com)

 Various tools to help with spectral energy distribution
 fitting, data managements, model management, physical tools
 spectral response functions

"""

import os, re, sys, operator
import pandas as pd
import json
import glob, string
import numpy as np

from .utils import directories
from .utils import mosaic_tools as mt
import scipy.interpolate as intp
import scipy.integrate as sintp

from astropy.io import fits
from astropy import constants as con

__author__ = 'Rahul I. Patel <ri.patel272@gmail.com>, Joe Trollo'

DIR = directories
opj = os.path.join
intpdir = DIR.SupportFiles()
fRSR = DIR.RSR()
AT = mt.ArrayTools()
FT = mt.FittingTools()

# SET UP CONSTANTS
_CS = con.c.to('cm/s').value
_WIEN = con.b_wien.to('K cm').value
_H = con.h.to('erg s').value
_KB = con.k_B.to('erg/K').value

# SET UP UNIT CONVERSION.
_CM2ANG = 100000000.0
_ANG2CM = 1e-8


[docs] class SEDTools: r""" SEDTools has functions to aid in modeling an SED of an astronomical source creates bandpass objects, converts stellar magnitudes to flux in cgs units, convolves flux with said bandpasses and calculates (for now) simple blackbody and stellar photosphere from Grids. Amongst a bunch of other things. """ def __init__(self): self.create_passbands()
[docs] def create_passbands(self, RSRFile=None, flat=False, waverange=(None, None), cntr=None): r""" Returns passband objects (no input) for all bands in this function. You can serparately enter the RSRFile name along with wavelength units to create your own passband object. Above rules apply. Wavelength is automatically converted to Angstroms. Args: RSRFile (str): File name that contains the RSR information flat (bool) : When set True, will let you create a flat spectrum waverange (tuple): 2 float elements. These need to be initiated to the min and max wave of the flat spectrum cntr (float): center or isophotal wavelength of the flat bandpass. """ units = {'WISE': 'microns', '2MASS': 'microns', 'Johnson': 'angstroms', 'MIPS': 'microns', 'Tycho': 'angstroms', 'IRAS': 'angstroms', 'HPACS': 'angstroms', 'Akari': 'angstroms', 'LNIRC2': 'microns', 'MSX': 'angstroms', 'DENIS': 'angstroms', 'GAIA': 'angstroms'} aspband = Bandpass if RSRFile is None and not flat: # MICRONS u = units['WISE'] self.W1pband = aspband(opj(fRSR, 'W1_WISE.dat'), inputUnits=u) self.W2pband = aspband(opj(fRSR, 'W2_WISE.dat'), inputUnits=u) self.W3pband = aspband(opj(fRSR, 'W3_WISE.dat'), inputUnits=u) self.W4pband = aspband(opj(fRSR, 'W4_WISE.dat'), inputUnits=u) self.W4truncpband = aspband(opj(fRSR, 'W4_WISE_truncated.dat'), inputUnits=u) self.W4stretchpband = aspband(opj(fRSR, 'W4_WISE_stretched.dat'), inputUnits=u) # MICRONS u = units['MIPS'] self.MIPS24pband = aspband(opj(fRSR, 'MIPS24.dat'), inputUnits=u) self.MIPS70pband = aspband(opj(fRSR, 'MIPS70.dat'), inputUnits=u) self.MIPS160pband = aspband(opj(fRSR, 'MIPS160.dat'), inputUnits=u) # MICRONS u = units['2MASS'] self.J2Mpband = aspband(opj(fRSR, 'J_2MASS.dat'), inputUnits=u) self.H2Mpband = aspband(opj(fRSR, 'H_2MASS.dat'), inputUnits=u) self.Ks2Mpband = aspband(opj(fRSR, 'Ks_2MASS.dat'), inputUnits=u) # ANGSTROMS u = units['Johnson'] self.UBpband = aspband(opj(fRSR, 'U_Bessel.dat'), inputUnits=u) self.BBpband = aspband(opj(fRSR, 'B_Bessel.dat'), inputUnits=u) self.VBpband = aspband(opj(fRSR, 'V_Bessel.dat'), inputUnits=u) self.RBpband = aspband(opj(fRSR, 'R_Bessel.dat'), inputUnits=u) self.IBpband = aspband(opj(fRSR, 'I_Bessel.dat'), inputUnits=u) self.UJpband = aspband(opj(fRSR, 'U_Johnson.dat'), inputUnits=u) self.BJpband = aspband(opj(fRSR, 'B_Johnson.dat'), inputUnits=u) self.VJpband = aspband(opj(fRSR, 'V_Johnson.dat'), inputUnits=u) self.RJpband = aspband(opj(fRSR, 'R_Johnson.dat'), inputUnits=u) self.IJpband = aspband(opj(fRSR, 'I_Johnson.dat'), inputUnits=u) u = units['Tycho'] self.VTpband = aspband(opj(fRSR, 'V_Tycho.dat'), inputUnits=u) self.BTpband = aspband(opj(fRSR, 'B_Tycho.dat'), inputUnits=u) self.Hppband = aspband(opj(fRSR, 'Hp.dat'), inputUnits=u) u = units['IRAS'] self.IRAS60pband = aspband(opj(fRSR, 'IRAS60.dat'), inputUnits=u) self.IRAS100pband = aspband(opj(fRSR, 'IRAS100.dat'), inputUnits=u) self.IRAS25pband = aspband(opj(fRSR, 'IRAS25.dat'), inputUnits=u) self.IRAS12pband = aspband(opj(fRSR, 'IRAS12.dat'), inputUnits=u) u = units['HPACS'] self.HPACS70pband = aspband(opj(fRSR, 'HPACS70.dat'), inputUnits=u) self.HPACS100pband = aspband(opj(fRSR, 'HPACS100.dat'), inputUnits=u) self.HPACS160pband = aspband(opj(fRSR, 'HPACS160.dat'), inputUnits=u) u = units['Akari'] self.Akari90pband = aspband(opj(fRSR, 'Akari90.dat'), inputUnits=u) self.Akari9pband = aspband(opj(fRSR, 'Akari_IRCS9W.dat'), inputUnits=u) self.Akari18pband = aspband(opj(fRSR, 'Akari_IRCL18W.dat'), inputUnits=u) u = units['LNIRC2'] self.LpNIRC2pband = aspband(opj(fRSR, 'Lp_NIRC2.dat'), inputUnits=u) self.HNIRC2pband = aspband(opj(fRSR, 'H_NIRC2.dat'), inputUnits=u) self.KpNIRC2pband = aspband(opj(fRSR, 'Kp_NIRC2.dat'), inputUnits=u) u = units['MSX'] self.MSXApband = aspband(opj(fRSR, 'MSXA.dat'), inputUnits=u) self.MSXCpband = aspband(opj(fRSR, 'MSXC.dat'), inputUnits=u) self.MSXDpband = aspband(opj(fRSR, 'MSXD.dat'), inputUnits=u) self.MSXEpband = aspband(opj(fRSR, 'MSXE.dat'), inputUnits=u) u = units['DENIS'] self.IDENISpband = aspband(opj(fRSR, 'DENISI.dat'), inputUnits=u) self.JDENISpband = aspband(opj(fRSR, 'DENISJ.dat'), inputUnits=u) self.KSDENISpband = aspband(opj(fRSR, 'DENISKs.dat'), inputUnits=u) u = units['GAIA'] self.GGAIApband = aspband(opj(fRSR, 'GAIAG.dat'), inputUnits=u) elif RSRFile is not None and not flat: self.pband = aspband(opj(fRSR, RSRFile), inputUnits=u) elif flat: self.pband = Flatbandpass(waverange, cntr)
[docs] def cgs2Jy(self, flux, dflux, band=None, wave=None, nu=None): r""" To convert specific flux from :math:`\mbox{erg}/\mbox{cm}^2/s/A` to Jy. It assumes input wavelength is in Angstroms Args: nu (arr): frequency in Hz to convert to Jansky. wave (arr): wavelength in Angstroms flux (arr): flux in :math:`\mbox{erg}/\mbox{cm}^2/s/A` band (passband obj): any of the :attr:`SEDTools.[band_id]pband` wave (float): wavelength in angstroms(???) nu (float): frequency in ...(??) Returns: (tuple): flux density in Janskies. """ _CS = con.c.to('angstrom/s').value if wave is None: wave = band.isoWavelength() Flux = flux * wave if nu is None: try: nu = band.isoFrequency() except UnboundLocalError: # ERROR IS INCURRED BY CHOICE OF SPEED OF LIGHT -- USE CAREFULLY # IF POSSIBLE TRY TO USE GIVEN FREQUENCIES nu = _CS / wave else: nu = _CS / wave y = (Flux / nu) * 1e23 # DOES NOT INCLUDE UNCERTAINITY IN FREQUENCY OR WAVELENGTH dy = (1e23 / nu) * dflux * wave # + (flux * dwave)**2) return y, dy
[docs] def mag2Jy(self, band, mag, magerr): r""" Convert magnitudes to Janskies Args: band (str): any of the band designations in :meth:`SEDTools.create_passbands` mag (float): photometric measurement in vega mags magerr (float): associated mag uncertainty Returns: tuple - (arr, arr): flux and uncertainty in Jansky. """ fcgs, dfcgs = self.mag2fluxZPLam(eval('self.{}pband'.format(band)), mag, magerr) fjy, efjy = self.cgs2Jy(fcgs, dfcgs, eval('self.{}pband'.format(band))) return fjy, efjy
[docs] def Jy2cgs(self, fluxNu, freq, lam=None): r""" To convert flux in Jansky to :math:`\mbox{erg}/\mbox{cm}^2/s/A` It assumes the wavelength is in Angstroms. Args: fluxNu (tuple(float/array,float/array)): flux and uncertainty in Jansky freq (tuple(float,float)): frequency and uncertainty in freq in Hz lam (tuple(float,float), optional): Wavelength in angstroms and corresponding uncertainty. If freq is None, then this will be used to calculate a frequency. Returns: (tuple): A tuple with the flux and conversion error (Flux, dFlux) in :math:`\mbox{erg}/\mbox{cm}^2/s/A`. """ flux_nu, eflux_nu = fluxNu flux_nu, eflux_nu = np.asarray(flux_nu), np.asarray(eflux_nu) if freq[0] is not None: nu, enu = freq elif freq[0] is None and lam is not None: lam, elam = np.array(lam) * _ANG2CM nu = _CS / lam enu = _CS * elam / lam ** 2 else: print('Did you provide a wavelength?') flux_cgs = (flux_nu * nu) / 1e23 eflux_cgs = np.sqrt((flux_cgs * enu) ** 2 + (nu * eflux_nu) ** 2) / 1e23 return flux_cgs, eflux_cgs
[docs] def get_eff_wavelengths(self, mag_list): r""" Returns dictionary of central wavelength's listed in mag_list Contingent on passband objects being created in create_passbands. Returns isophotal wavelength in RSR headers or calculates effective/pivot wavelength if isowavelength is unavailable (Tokunanga & Vacca 2005). Args: mag_list (list): list of strings with keys corresponding to band from which to extract wavelength. If mag_list = ['mv1','mv2',...,'mvN'], then wave[i] = :attr:`Star.[mv1]pband.isoWavelength()`. Check to see which RSR bands are available. Returns: (dict): dictionary of keys:val such that keys=mvi & val are the iso/pivot wavelengths (Angstroms). """ waveDict = {} mag2use = mag_list for mv in mag2use: temp = mv waveDict[temp] = eval('self.' + mv + 'pband.isoWavelength()') return waveDict
[docs] def vega2AB(self, band, m_vega): r""" Module to convert photometric magnitudes into AB mag system. AB magnitude is defined such that the monochromatic flux is measured in :math:`\mbox{erg}/\mbox{cm}^2/s/Hz`. i.e., :math:`\mbox{AB} = -2.5 \log(f_{cgs}) - 48.6` , such that for any filter, the zero point mag corresponds to a flux density of 3631 Jy. .. note:: 2MASS: TAKEN FROM ODENWALD ET AL. 2003 Analysis of the Diffuse Near-Infrared Emission from 2MASS Deep Integration Data: Foregrounds versus the Cosmic Infrared Background WISE: http://wise2.ipac.caltech.edu/docs/release/allsky/expsup/ Args: band (str): Which pass band. m_vega (float/array): Vega magnitudes Returns: AB converted magntidues as same format as input ``m_vega`` """ if band == 'J2M': mab = m_vega + 0.894 elif band == 'H2M': mab = m_vega + 1.37 elif band == 'Ks2M': mab = m_vega + 1.84 elif band == 'W1': mab = m_vega + 2.699 elif band == 'W2': mab = m_vega + 3.339 elif band == 'W3': mab = m_vega + 5.174 elif band == 'W4': mab = m_vega + 6.620 # TAKEN FROM FREI & GUNN 1995 # http://www.astro.utoronto.ca/~patton/astro/mags.html# conversions # AT THE MOMENT IT DOES NOT INCLUDE STROMGREN FILTERS-- TOO LAZY elif band == 'VJ': mab = m_vega - 0.044 elif band == 'BJ': mab = m_vega - 0.163 elif band == 'RJ': mab = m_vega + 0.055 elif band == 'IJ': mab = m_vega + 0.309 elif band == 'RJ': mab = m_vega + 0.117 elif band == 'Ic': mab = m_vega + 0.342 else: print('Band ', band, ' doesn" exist. Please check bandpass.') mab = None if mab == None: print('No conversion occured. Check variables') return mab
[docs] def batch_vega2AB(self, mag_list, vegaMag, vegaMagErr): r""" Converts all the vega magnitudes in vegaMag and associated errors in vegaMagErr to AB magnitudes. Args: mag_list (list): List of strings with passband name vegaMag,+Err (dict): Vega magnitudes in mag_list Returns: (dict, dict): Dictionaries of converted AB mag and associated errors with keys used from input mag_list suffixed by '_AB' or '_ABerr' """ mag2use = mag_list vegaMagDict, vegaMagErrDict = vegaMag, vegaMagErr ABmag_Dict = {} ABmagerr_Dict = vegaMagErrDict for mv in mag2use: # CONVERT MAG TO AB mag temp = mv + '_AB' # if mv = 'W1', temp = 'W1_AB' try: ABmag_Dict[temp] = self.vega2AB(mv, vegaMagDict[mv]) except: ABmag_Dict[temp] = 0 # CONVERT MAG_ERR TO AB mag err --> no change in error temp = mv + '_ABerr' ABmagerr_Dict[temp] = vegaMagErrDict[mv] return ABmag_Dict, ABmagerr_Dict
[docs] def mag2fluxAB(self, band, abmags, abmagserr): r""" Does the same thing as :meth:`SEDTools.batch_mag2flux` except only takes one band and can be applied to multiple measurements in the same band to produce a flux Args: band (obj): strings with magnitude names (ex: ['mv1','mv2',...,'mvn']) abmags/abmagserr (dict): dictionary of magnitude/error in AB mag format with keys 'mvi_AB' or 'mvi_ABerr' for errors Returns: (dict, dict): dictionaries of converted AB mag to fluxes and separate dict for flux errors :math:`\mbox{erg}/\mbox{cm}^2/s/A`.; keys: ``mvi_flux`` """ fluxJy = (10 ** 23.0) * 10 ** ( -(abmags + 48.6) / 2.5) # converts to AB Mag alam = 3e-13 # converts to erg s-1 cm-2 angstrom-1 with lam in microns (for some stupid reason) errLmic = band.isoWavelength() * (1e-10 / 1e-6) # band.pivotWavelength() fluxWLUnits = alam * fluxJy / errLmic ** 2 fluxJyErr = (10 ** 23.0) * 10 ** (-(abmags - abmagserr + 48.6) / 2.5) fluxWLUnitsErr = alam * fluxJyErr / errLmic ** 2 fluxWLUnitsErr = fluxWLUnitsErr - fluxWLUnits return [fluxWLUnits, fluxWLUnitsErr]
[docs] def batch_mag2fluxAB(self, band_list, abmags, abmagserr): r""" Tools to convert from AB mags to fluxes. returns a Dictionary of fluxes and flux errors with keys from input list. Args: band_list (list): strings with magnitude names (ex: ['mv1','mv2',...,'mvn']) abmags/abmagserr (dict): dictionary of magnitude/error in AB mag format with keys 'mvi_AB' or 'mvi_ABerr' for errors Returns: (dict): dictionaries of converted AB mag to fluxes and separate dict for flux errors :math:`\mbox{erg}/\mbox{cm}^2/s/Hz`.; keys: 'mvi_flux' """ band2use = band_list fluxDict, fluxDict_err = {}, {} # CALUCLATE FLUX AND ERR IN FLUX AND CREATE DICTIONARIES OF EACH for mv in band2use: temp = mv + '_flux' magAB, magABerr = abmags[mv + '_AB'], abmagserr[mv + '_ABerr'] fluxList = self.mag2fluxAB(eval('self.' + mv + 'pband'), magAB, magABerr) fluxDict[temp], fluxDict_err[temp] = fluxList[0], fluxList[1] # THIS CONVERTS AB MAG TO FLUX IN erg/s/cm^2/Angstrom return fluxDict, fluxDict_err
[docs] def fluxZPLam2mag(self, band, flux, fluxErr): r""" Convert input flux which should be in units of :math:`\mbox{erg}/\mbox{cm}^2/s/A` to vega magnitudes using the zero point fluxes for each bandpass listed in the RSR file and hence the passband object Args: band (obj): passband object flux (float): Flux density in :math:`\mbox{erg}/\mbox{cm}^2/s/A` fluxErr (float): error assosciated wiht the input flux Returns: mag, magerr in vega magnitudes """ Fiso, dFiso = band.fluxVegaZeroPointLam() mag_lam = -2.5 * np.log10(flux / Fiso) mag_lam_err = (1. / Fiso) * (2.5 / np.log(10)) * fluxErr / 10 ** ( mag_lam / -2.5) return mag_lam, mag_lam_err
[docs] def mag2fluxZPLam(self, band, Vmag, VmagErr, sysErr=False): r""" Converts the input vega magnitude to specific flux in :math:`\mbox{erg}/\mbox{cm}^2/s/A` using the zero point fluxes obtained from the passband file from literature. Args: band (obj): passband object Vmag (float): Vega magnitude VmagErr (float): error associated with Vega Magnitude) sysErr: (bool) Includes the given bandpasses photospheric calibration systematic uncertainty found in the RSR text file. Set to True if this error needs to be added in quadrature to the total integrated flux uncertainty. Default is `False` Returns: flux, fluxerr in :math:`\mbox{erg}/\mbox{cm}^2/s/A` """ Fiso, dFiso = band.fluxVegaZeroPointLam() Flux_lam = Fiso * (10 ** (Vmag / -2.5)) df_l1 = ((np.log(10) / -2.5) * Fiso * VmagErr) # LOG IS NATURAL LOG if sysErr: df_l2 = dFiso else: df_l2 = 0 dFlux_lam = 10 ** (Vmag / -2.5) * np.sqrt(df_l1 ** 2 + df_l2 ** 2) return Flux_lam, dFlux_lam
[docs] def mag2fluxZPNu(self, band, Vmag, VmagErr, sysErr=False): r""" Converts the input vega magnitude to specific flux in :math:`\mbox{erg}/\mbox{cm}^2/s/Hz` using the zero point fluxes obtained from the passband file from literature. For this module, the zero point fluxes in frequency need to be there otherwise this won't work. Args: band (obj): passband object Vmag (float): Vega magnitude VmagErr (float): error associated with Vega Magnitude) sysErr (bool): Includes the given bandpasses photospheric calibration systematic uncertainty found in the RSR text file. Set to True if this error needs to be added in quadrature to the total integrated flux uncertainty. Default is False Returns: list: flux, and flux uncertainties in :math:`\mbox{erg}/\mbox{cm}^2/s/Hz` """ Fiso, dFiso = band.fluxVegaZeroPointFreq() Flux_nu = Fiso * (10 ** (Vmag / -2.5)) df_l1 = ((np.log(10) / -2.5) * Fiso * VmagErr) if sysErr: df_l2 = dFiso else: df_l2 = 0 dFlux_nu = 10 ** (Vmag / -2.5) * np.sqrt(df_l1 ** 2 + df_l2 ** 2) return Flux_nu, dFlux_nu
[docs] def batch_mag2fluxZPLam(self, band_list, Vmags, Vmagserr): r""" Converts mag to flux using mag2fluxZP using zero point fluxs. for multiple bands all at once. Uses mag2fluxZPLam. Args: band_list (list): list of strings with magnitude names (ex:['mv1','mv2',...,'mvn']) Vmags/Vmagserr (dict): magnitude/error in Vega Mag format with keys 'mvi' for both mag and errors. Returns: (dict, dict): dictionaries of converted Vega magnitude to fluxes and associated errors in units of :math:`\mbox{erg}/\mbox{cm}^2/s/A`. """ band2use = band_list fluxDict, fluxDict_err = {}, {} for mv in band2use: temp = mv + '_flux' magV, magVerr = Vmags[mv], Vmagserr[mv] fluxList = self.mag2fluxZPLam(eval('self.' + mv + 'pband'), magV, magVerr) fluxDict[temp], fluxDict_err[temp] = fluxList[0], fluxList[1] return fluxDict, fluxDict_err
[docs] def rsr_eflux(self, pband, lambda_, flux, flux_up, flux_down, report_type='avg'): r""" Calculates the integrated flux filtered through a particluar passband, as well as the upper and lower limits of the flux to give a 1 sigma uncertainty based on the fit Args uncertainties Args: pband (obj): passband object for a given photometric band created from Bandpass class \lambda_ (arr): Wavelengths in Angstroms range of flux to be calculated. Range should be atleast the extent of pband flux (arr): Fluxes of main fit flux_up (arr): Fluxes of upper limit of fit. flux_down (arr): Fluxes of lower limit of fit. report_type (str): either ``avg`` to report average of uncertainties or ``both`` Returns: flux at the particular band and uncertainties. If ``avg``, returns 2 element array (flux, eflux). If ``both``, returns 3 element array (flux, eflux_low, eflux_high) """ flux = self.rsr_flux(pband, lambda_, flux)[0] flux_low = self.rsr_flux(pband, lambda_, flux_down)[0] flux_up = self.rsr_flux(pband, lambda_, flux_up)[0] fl, fu = flux - flux_low, flux_up - flux if report_type == 'avg': return flux, np.average([fl, fu]) elif report_type == 'both': return flux, fl, fu else: raise Exception('No report_type specified. If you only want ' 'integrated flux, use rsr_flux')
[docs] def rsr_flux(self, pband, lambda_, flux): r""" Calculates integreated flux filtered through a passband object .. math:: F_{full} = \frac{\int F_\lambda\ \mbox{RSR}_\lambda\ \lambda\ \delta\lambda}{\int \mbox{RSR}_\lambda\ \lambda\ \delta\lambda} Args: pband (obj): passband object for a given photometric band created from Bandpass class \lambda_ (arr): Wavelengths in Angstroms range of flux to be calculated. Range should be atleast the extent of pband flux (arr): Fluxes mapped to \lambda_ in :math:`\mbox{erg}/\mbox{cm}^2/s`? Returns: Integrated flux over the specified bandpass as float or array. Integration is done using Simpson's rule for integration such that: """ # FIND THE BOUNDS FOR THE WAVELENGHTS IN THIS PARTICULAR BAND # Bounds can't exceed the limits of the bandpass' limits # OTHERWISE IT WONT' INTERPOLATE CORRECTLY # CHECK TO SEE IF ONLY ONE FLUX OR MULTIPLE FLUXES NEED TO BE CALCULATED # AT THE SAME BANDPASS flux2 = flux.copy() Sn_arr = np.array([]) band_min, band_max = pband.wavelength.min(), pband.wavelength.max() indzero = np.where(pband.transmission <= 0)[0] pband.transmission[indzero] = 1e-18 filter_interp = intp.interp1d(np.log10(pband.wavelength), \ np.log10(pband.transmission)) if lambda_.ndim < 2: RSRminInd, RSRmaxInd = int(np.searchsorted(lambda_, band_min)), \ int(np.searchsorted(lambda_, band_max)) if (RSRmaxInd - RSRminInd) < 10: print('Resolution less than 10.') # Create index array Ind = np.arange(RSRminInd, RSRmaxInd) # COLLECT ALL THE WAVELENGTH AND FLUX VALUES UNDER BANDPASS # FOUND IN MODEL VALUES lam0, flx0 = lambda_[Ind], flux2[Ind] # FOR EACH WAVELENGTH, CALCUALTE A NEW RSR: INTERPOLATE RSRNew = 10 ** filter_interp(np.log10(lam0)) # Use Simpson's approximation to calculate total flux # FIND UNIQUE ELEMENTS IN WAVELENGTH BECAUSE THERE ARE REPEATED VALUES # IN NEXTGEN GRIDS u, indu = np.unique(lam0, return_index=True) lam0, flx0 = lam0[indu], flx0[indu] lam02, RSRNew = lam0, RSRNew RSRNew = RSRNew[indu] Sn = sintp.simps(flx0 * lam0 * RSRNew, lam0) / sintp.simps( lam0 * RSRNew, lam0) Sn_arr = np.append(Sn_arr, Sn) else: # DETERMINES HOW MANY rows (different temperatures) THERE ARE for i in range(len(lambda_)): lambda_i, fluxi = lambda_[i], flux2[i] RSRminInd, RSRmaxInd = int(np.searchsorted(lambda_i, band_min)), \ int(np.searchsorted(lambda_i, band_max)) if (RSRmaxInd - RSRminInd) < 10: print('Resolution less than 10.') # Create index array Ind = np.arange(RSRminInd, RSRmaxInd) # COLLECT ALL THE WAVELENGTH AND FLUX VALUES UNDER BANDPASS # FOUND IN MODEL VALUES lam0i, flx0i = lambda_i[Ind], fluxi[Ind] # FOR EACH WAVELENGTH, CALCUALTE A NEW RSR: INTERPOLATE RSRNew = 10 ** filter_interp(np.log10(lam0i)) # Use Simpson approximation to calculate total flux # FIND UNIQUE ELEMENTS IN WAVELENGTH BECAUSE THERE ARE REPEATED VALUES # IN NEXTGEN GRIDS u, indu = np.unique(lam0i, return_index=True) lam0i, flx0i = lam0i[indu], flx0i[indu] lam02i, RSRNew = lam0i, RSRNew # dset2Newi RSRNew = RSRNew[indu] Sn = sintp.simps(flx0i * lam0i * RSRNew, lam0i) / sintp.simps( lam0i * RSRNew, lam0i) if np.isnan(np.sum(Sn)): print('Is nan', np.isnan(np.sum(Sn))) Sn_arr = np.append(Sn_arr, Sn) return Sn_arr
[docs] def indSandwhich(self, t0, arr): """ Gives the indices of the array (``arr``) surrounding the target temperature ``t0`` to within 3 grid spaces for effective interpolation. I mean, this can be used for any array -- not just SED stuff. Args: t0 (float): Target temperature in Kelvin arr (arr): array that contains the target temperature. Returns: +/- 3 indexes to the left and right of the target value. """ ind = np.searchsorted(arr, t0) indRight = ind + 3 indLeft = ind - 3 if indRight > len(arr): indRight = int(len(arr)) if indLeft < 0: indLeft = 0 return indLeft, indRight
[docs] def calc_grids(self, lambda_, p0, su2ea1=1, griddata=None, tempArr=None, mag2use=None, resample=True, **kwargs): r""" Returns array of flux values at input wavelength array interpolated at T0. Args: \lambda_ (arr/float): wavelength values where flux will be calculated T0 (float): Temperature in Kelvins to determine best grid flux su2ea1 (float): factor that includes the :math:`\mbox{distance}^2` to convert from surface to earth flux: :math:`1/\mbox{radius}^2 \times (\mbox{radius}/\mbox{dist})^2 griddata (tuple): wavelength and flux arr in a 2-element tuple created from :meth:`DataLogistics.get_[model]Grids`, where `model` is either ATLAS9 or NextGen. tempArr (arr): Array of increasing temperature values mapped to those found in ``griddata``. mag2use (arr): Array of strings with names of photometric magnitude bands where fluxes are calculated. Required to call passband object. Returns: Array of fluxes calculated at argument wavelengths in units of :math:`\mbox{erg}/\mbox{cm}^2/s/A` """ temp0 = p0[0] try: su2ea = (p0[1] ** 2) * su2ea1 except: su2ea = su2ea1 try: resample = kwargs['resample'] except: resample = resample grids, TempsArr = griddata, tempArr indLeft, indRight = self.indSandwhich(temp0, TempsArr) lam_arr_all = grids[0][indLeft:indRight + 1] flux_arr_all = grids[1][indLeft:indRight + 1] xin = TempsArr[indLeft:indRight + 1] flux_arr_all = flux_arr_all * su2ea GFluxArr = np.array([]) # TO BE USED IF NO RSR IS NEEDED... INDIVIDUAL FLUX # CALUCLATION AT SINGLE WAVELENGTH if mag2use is None: fnew_all_temp = np.array( []) # WILL BE USED TO STORE CALCULATED FLUXES (lam_arr_all, flux_arr_all) = (np.log10(lam_arr_all), np.log10(flux_arr_all)) lambda_ = np.log10(lambda_) # GATHER NEW FLUXES FOR EACH TEMPERATURE AT GIVEN g and met for i in range(len(lam_arr_all)): ipolate = intp.interp1d(lam_arr_all[i], flux_arr_all[i]) fnew = 10 ** ipolate(lambda_) if len(fnew_all_temp) == 0: fnew_all_temp = np.array([fnew]) else: fnew_all_temp = np.append(fnew_all_temp, [fnew], axis=0) # COLUMN: INPUT WAVELENGTH, ROW: AVAILABLE TEMPERATURE # LOOP OVER THE INPUT WAVELENGTHS AND CALCULATE A NEW FLUX # IF LEN(LAMBDA_) ==1, SPECIAL CARE IS TAKEN for j in range(len(lambda_)): if fnew_all_temp.ndim < 2: yin = fnew_all_temp else: yin = fnew_all_temp[:, j] intpObj = intp.interp1d(xin, np.log10(yin)) m = 10 ** intpObj(temp0) GFluxArr = np.append(GFluxArr, m) else: for band in mag2use: pband = eval(f'self.{band}pband') if pband.isoWavelength() > 110000 and resample: (lam_arr_all, flux_arr_all) = FT.resample_model(lam_arr_all, flux_arr_all, min(pband.wavelength), max(pband.wavelength), pband=pband) Mflux = self.rsr_flux(pband, lam_arr_all, flux_arr_all) yin = Mflux # Interpolation object intpObj = intp.interp1d(xin, np.log10(yin)) # Interpolated flux points m = 10 ** intpObj(temp0) GFluxArr = np.append(GFluxArr, m) return GFluxArr
[docs] def wienTEMP(self, lambda_, units='micron'): r""" Calculates the maximum temperature of a blackbody at a given wavelength using Wien's law. To calculate the blackbody irradiance in units of cgs Flux :math:`\mbox{erg}/\mbox{cm}^2/s/A`. This module assumes projected emission from a source, so solid angle = pi. All physical constants are in units to facilitate this. Wavelength must be in units of ``angstrom``, ``microns``, ``cm``, or ``meters``. They will be converted to cm to carry out the calculation. Args: \lambda_ (float/arr): reference wavelength(s) to calculate blackbody temperature. units (str): describes reference wavelength unit. All values in \lambda_ have to be the same. Allowed units are ``angstrom``, ``microns``, ``cm``, ``meters``. Units will be converted to cm for ease of calculation. Returns: (float): Temperature in Kelvin. """ # CHECK UNITS x = self.scale_wavelength(lambda_, units) return _WIEN / x
[docs] def scale_wavelength(self, lambda_, units='micron'): """ Rescales the wavelength \lambda_ to cm Args: \lambda_ (float/arr): wavelength value(s) units (str): units of input \lambda_ Returns: wavelength in centimeters. """ units_scale_dict = {'angstrom': 1e-8, 'microns': 1e-4, 'meters': 1e-2, 'cm': 1} scale = units_scale_dict.get(units, None) try: lambda_ *= scale return lambda_ except TypeError: print(f'Unit {units} was not recognized')
[docs] def blackbody(self, lambda_, p0, su2ea1=1, bands=None, units='angstrom', bulk=False, **kwargs): r""" To calculate the blackbody irradiance in units of cgs Flux :math:`\mbox{erg}/\mbox{cm}^2/s/A`. This module assumes projected emission from a source, so solid angle = pi. All physical constants are in units to facilitate this. Wavelength must be in units of angstrom, microns, cm, or meters. They will be converted to cm to carry out the calculation. Args: \lambda_ (arr): wavelengths p0 (arr/list): Td, Rd * Td - dust temperature in Kelvin * Rd - dust radius in AU su2ea1: Partial flux normalization. Incorporates distance to the star, and assumes 1 AU radius: :math:`R_d[AU]/dist[pc]` in natural units - :math:`\mbox{AU2cm}^2/(\mbox{distance}\times\mbox{pc2cm})^2` units (str): input units ``angstrom``, ``micron``, ``meters``, ``cm`` bands : bulk (bool) Returns: (arr): returns flux of blackbody at fitted temperature. """ # CHECK UNITS x = lambda_.copy().astype('float32') x = self.scale_wavelength(x,units) temp0 = p0[0] # Check Args and normalization try: su2ea = (p0[1] ** 2) * su2ea1 except IndexError: su2ea = su2ea1 try: bulk = kwargs['bulk'] except KeyError: bulk = False # IF CONVOLVING TO FILTER TRANSMISSION, X SHOULD CONFORM TO LENGTH # TRANSMISSION CURVE if bulk: const = su2ea * ((2 * _H * _CS ** 2)) const2 = ((x ** 4 * ((np.exp(_H * _CS / (_KB * np.outer(x,temp0))) - 1).transpose())).transpose()) fluxbb = const / const2 else: const = su2ea * ((2 * _H * _CS ** 2) / x ** 4) fluxbb = const / (np.exp(_H * _CS / (x * _KB * temp0)) - 1) # IF FILTER IS GIVEN, CALCULATES FLUX FROM THROUGH GIVEN # BANDPASS. OTHERWISE RETURNS FLUX ARRAY FROM ALL WAVELENGHTS # GIVEN IN lambda_ flux_arr = np.array([]) if bands is not None: for band in bands: # print(band,'bbcalc') pband = eval(f'self.{band}pband') xmin, xmax = x.min() * _CM2ANG, x.max() * _CM2ANG # lambda_.min(), lambda_.max() pb_xmin, pb_xmax = pband.wavelength.min(), pband.wavelength.max() if xmin > pb_xmin: raise ValueError(f'Sample size too small. Need more on ' f'blue end for {band} band.') elif xmax < pb_xmax: raise ValueError(f'Sample size too small. Need more on ' f'red end {band} band.') else: pass flux = np.array(self.rsr_flux(pband, x * _CM2ANG, fluxbb)) flux_arr = np.append(flux_arr, flux / pband.pivotWavelength()) else: flux_arr = fluxbb / (x * _CM2ANG) return flux_arr
[docs] def calcRJ_spectrum(self, xarr, yarr): r""" Calculate Rayleigh-Jeans extension of the input spectrum array Args: xarr (arr): wavelength array yarr (arr): flux array Returns: slope and y-intercept in the log-space of the RJ tail. """ slope = (yarr[1] - yarr[0]) / (xarr[1] - xarr[0]) yint = yarr[0] - slope * xarr[0] return slope, yint
[docs] def modifiedBB(self, lambda_, p0, su2ea1=1, bands=None, units='angstrom', **kwargs): r""" Calculates a modified blackbody function given a powerlaw in conjunction with :meth:`SEDTools.blackbody` Args: lam0 = wavelength at which emission peaks. Returns: modified blackbody flux """ try: pwr = kwargs['beta'] except: # pwr = p0[2]# PASSED AS ARGUMENT TO FIT INSTEAD pwr = p0[2] try: lam0 = kwargs['lam0'] except: lam0 = self.W3pband.isoWavelength() x = self.scale_wavelength(lambda_.copy().astype('float32'), units) x0 = self.scale_wavelength(float(lam0), units) # CHECK Args AND NORMALIZATION try: su2ea = (p0[1] ** 2) * su2ea1 except: su2ea = su2ea1 p0 = np.array([p0[0]]) bbflux = self.blackbody(x, p0, su2ea1=su2ea, bands=bands, units='cm') mod_arr = np.array([]) if bands is not None: for band in bands: pband = eval(f'self.{band}pband') wav = pband.isoWavelength() * 1e-8 mod_arr = np.append(mod_arr, (x0 / wav)) else: mod_arr = (x0 / x) # mod_arr = (mod_arr<1).choose(mod_arr,1) # FOR REALLY INEFFICIENT SCATTERING DUST mod_arr = mod_arr ** pwr flux = bbflux * mod_arr return flux
[docs] def NBlackBody(self, lambda_, p0, su2ea1=1, bands=None, units='angstrom', bulk=False, **kwargs): r""" Computes the total emission from multiple blackbody spectra. This function calculates the combined blackbody radiation spectrum for multiple blackbody components, each defined by its temperature and scaling factor. The input parameters ``p0`` should contain pairs of values representing each blackbody's temperature and normalization factor. Raises SystemExit if the length of `p0` is not even, indicating incomplete blackbody parameter pairs. Args: \lambda_ (arr): Wavelength array over which the blackbody radiation is computed. p0 (list/arr): A flat list containing pairs of values, where each pair represents [temperature, normalization factor] for one blackbody component. The length of ``p0`` must be even, as each blackbody requires two parameters. su2ea1 (float, optional): Scaling factor for the unit conversion. Default is 1. bands (arr, optional): Spectral bands over which to integrate the blackbody spectrum. Default is None. units (str, optional): Wavelength units ('angstrom', 'nm', etc.). Default is 'angstrom'. bulk (bool, optional): If True, performs bulk processing for efficiency. Default is False. kwargs (dict): Additional keyword arguments passed to the :meth:`SEDTools.blackbody` function. Returns: Combined blackbody spectrum calculated as the sum of individual blackbody components. """ if (len(p0) % 2) == 0: B_total = [] for i in range(int(len(p0) / 2)): p0i = p0[i:i + 2] B_total.append( self.blackbody(lambda_, p0i, su2ea1=su2ea1, bands=bands, units=units, bulk=bulk, **kwargs)) B_total = np.sum(np.array(B_total), axis=0) return B_total else: sys.exit("You should have two free Args for each blackbody" " you'd like to fit")
[docs] def doubleBB(self, lambda_, p0, su2ea1=1, bands=None, units='angstrom', bulk=False, **kwargs): r""" Computes the combined spectrum of two blackbody components. This function calculates the total emission spectrum resulting from the sum of two blackbody radiation spectra. Each blackbody component is defined by its temperature and scaling factor, provided in the input parameter `p0`. .. note:: This function assumes exactly two blackbody components and requires four parameters in `p0`. Args: \lambda_ (arr): Wavelength array over which the blackbody radiation is computed. p0 (arr/list): A list containing four values; [temperature1, normalization1, temperature2, normalization2]. The first pair is for the first blackbody, and the second pair is for the second blackbody. su2ea1 (float, optional):Scaling factor for the unit conversion. Default is 1. bands (arr, optional): Spectral bands over which to integrate the blackbody spectrum. Default is None. units (str, optional): Wavelength units ('angstrom', 'nm', etc.). Default is 'angstrom'. bulk (bool, optional): If True, performs bulk processing for efficiency. Default is False. kwargs (dict): Additional keyword arguments passed to the :meth:`SEDTools.blackbody` function. Returns: The combined spectrum of the two blackbody components. """ p01, p02 = p0[0:2], p0[2:] B1 = self.blackbody(lambda_, p01, su2ea1=su2ea1, bands=bands, units=units, bulk=bulk, **kwargs) B2 = self.blackbody(lambda_, p02, su2ea1=su2ea1, bands=bands, units=units, bulk=bulk, **kwargs) return B1 + B2
[docs] def calcBBTemp(self, T0, lamArr, flxArr): """ Calculates the temperature of the blackbody flux Assumes Flux in units of :math:`\mbox{erg}/\mbox{cm}^2/s/A` and returns temperature in kelvin. To be used in conjunction with Newton-Raphson algorithm Args: T0: lamArr: flxArr: Returns: """ lA, fA = lamArr.copy(), flxArr.copy() l1, l2 = lA[0], lA[1] # lam1,lam2 Flux1, Flux2 = fA[0], fA[1] Ratio = (Flux1 / Flux2) * (l1 / l2) ** 5 gc = _H * _CS / _KB func = np.exp(gc / (l2 * T0)) - Ratio * np.exp(gc / (l1 * T0)) - 1 + Ratio return func
[docs] def calcModTemp(self, T0, lam0, lamArr, flxArr): r""" Calculates the temperature of a modified blackbody with a power law assuming the power index has been parametrized and the ratio of two fluxes are taken. The Flux is in :math:`\mbox{erg}/\mbox{cm}^2/s/A` and wavelength in cm. This is to be used in conjunction with a Newton-Raphson/Bisecting algorithm to find zeros for the temperature equation. Args: T0 (float): temperature in kelvin lam0 (float): target wavelength lamArr (arr): wavelength array flxArr (arr): flux array Returns: function """ lam0 = lam0 # 0.00115608 # W3 [cm] lA, fA = lamArr.copy(), flxArr.copy() l1, l2 = lA[0], lA[1] Flux1, Flux2 = fA[0], fA[1] Ratio = (Flux1 / Flux2) * (l1 / l2) ** 5 gc = _H * _CS / _KB func1 = np.log10(((np.exp(gc / (l2 * T0)) - 1) / ( np.exp(gc / (l1 * T0)) - 1)) * Ratio ** (-1)) func2 = (gc / (T0 * lam0)) * np.exp(gc / (lam0 * T0)) / ( np.exp(gc / (lam0 * T0)) - 1) func = func1 / (np.log10(l2 / lam0)) + func2 - 5. return func
[docs] def photosphere(self, p0, su2ea2, modelinfo, MegaGridModels, wave=(2000, 1e7), gridpts=10000,): r""" Calculates the photospheric emission line from grid models given relevant Args and griddata and temperature array, and range of wavelength to calculate emission for. Usually this should not exceed the limits for the grid models. If limits are exceeded flux is extrapolated linearly in log-log space, following Rayleigh-Jeans law. Args: p0 (arr): Args to be passed to :meth:`SEDTools.calc_grids`. su2ea2 (float): Normalization to be passed to :meth:`SEDTools.calc_grids`. modelinfo (arr): Multi-dimensional array to be passed to :meth:`SEDTools.calc_grids` wave (tuple/arr): tuple or array of 2 elements; min and max of wavelength for flux to be calculated. Be consistent with units. (angstroms) gridPts (int): Integer value for how many grid points you want returned (aka resolution) MegaGridModels (dict): all atmospheric grid models with metallicity, grav, and temperature keys. Returns: (tuple): wavelength and photospheric flux arrays """ MegaGrid = MegaGridModels tempStar = p0[0] * 1000. try: su2eaRJ = (p0[1] ** 2) * su2ea2 except: su2eaRJ = su2ea2 gdat = MegaGrid[modelinfo] tempArr = gdat[-1] lam_arr_all, flux_arr_all = gdat[0], gdat[1] griddata = (lam_arr_all, flux_arr_all) # DETERMINE PHOTOSPHERIC CALCULATION CUT OFF WAVELENGTH BEFORE # LINEAR INTERPOLATIN BEGINS xlim = np.max(lam_arr_all, axis=1).min() lamcut = xlim # CREATE SAMPLING POINTS FOR GRID xphot = np.logspace(np.log10(wave[0]), np.log10(wave[1]), gridpts) # angstroms # This is only for Kurucz: xphot[angstrom], yphot[erg s-1 cm-2 A-1] # CHECK TO SEE IF EXTRAPOLATION IS NECESSARY GIVEN INPUT LIMITS if wave[1] > xlim: # CUT UP X ARRAY -- grid part and extroplation part ind_cut1, ind_cut2 = np.where(xphot < lamcut)[0], \ np.where(xphot >= lamcut)[0] xphot1, xphot2 = xphot[ind_cut1], xphot[ind_cut2] # yphot1 USES GRIDS, WHILE yphot2 USES RAYLEIGH-JEANS yphot1 = self.calc_grids(xphot1, p0, su2ea2, griddata, tempArr) # USE RAYLEIGH-JEANS FOR EXTRAPOLATION yphot2 = (np.pi * 1.4) * su2eaRJ * _CS * _KB * tempStar / ( xphot2 * _ANG2CM) ** 3 yphot2 = yphot2 / xphot2 # PUT IT ALL TOGETHER xphot = np.concatenate([xphot1, xphot2]) yphot = np.concatenate([yphot1, yphot2]) else: # slope,yint=0,0 yphot = self.calc_grids(xphot, p0, su2ea2, griddata, tempArr) # # for Kurucz, yphot is in erg s-1 cm-2, xphot is in micron # return [xphot, yphot, slope, yint] return xphot, yphot
[docs] def scaleSED2bands(self, scbdlist, usebdlist, yphot,fluxm, fluxme, synflux): r""" Use this to scale the fitted stellar SED to a set of particular bands for whatever reason. Args: scbdlist (arr) : strings of band identifiers to scale the fitted photosphere. usebdlist (arr) : all the bands that are used. yphot (float) : fitted photospheric flux fluxm (arr): broad band photometric data for all bands in usebdlist fluxme (arr): uncertainties in fluxm synflux (arr): synthetic photospehric flux at each band in usebdlist. Returns: phot_norm_fac: factor calculated to scale the SED yphot : scaled flux array yphot_unsc: pre-scaled flux RJ_On: boolean to tell you whether this was done -- not sure why I still have this. """ nirflux, nirbands = [], [] yphot_unsc = yphot norm_wise_nir = 1. for band in scbdlist: if np.any(band == np.array(usebdlist)): nirbands.append(band) nirbands = np.array(nirbands) if len(nirbands) != 0: RJ_On = True flux_nirbands = AT.dict2list(fluxm, nirbands, '_flux') eflux_nirbands = AT.dict2list(fluxme, nirbands, '_flux') for band in nirbands: nirflux.append(synflux[band]) nirflux = np.array(nirflux) wts = (flux_nirbands / eflux_nirbands) ** 2 norm_wise_nir = np.average(flux_nirbands / nirflux, weights=wts) # CHANGE NORMALIZATION FOR SYNTHETIC FLUXES for key in synflux: synflux[key] = synflux[key] * norm_wise_nir yphot *= norm_wise_nir else: RJ_On = False print('Unable to scale SED to W1 and W2') return norm_wise_nir, yphot, yphot_unsc, RJ_On
[docs] def fit_photosphere(self, xlam, yfluxdat, p0, su2ea2, modinfo, magfit, func, MegaGridModels): r""" Find the best fit photospheric model to the photometric data. Fits are done using mpfit (Levenberg-Marquardt algorithm). Args: xlam (arr): Strings of wavelength identifiers of the bands that are going to be used to fit. yfluxdat (arr): Flux of the data at wavelengths in xlam to be fit. p0 (arr): Initial Args for temperature and stellar radius. Temperature is required, Radius is not. su2ea2 (float): Scaling factor :math:`R_{\mbox{sun}}^2/d_{\mbox{star}}^2` modinfo (str): Which model type to use based on (model name, log(g), metallicity) to be used as keys in MegaGrid. magfit (dict): String arrays of band identifiers keyed by ``photmags`` (bands used to fit the model) and 'scalemags' (which bands to use to scale the raw stellar flux to observed flux to first order). func (obj): Function to use to fit the data. MegaGridModels (dict): all atmospheric grid models with metallicity, grav, and temperature keys. Returns: Fitted stellar radius, temperature and mpfit object. """ MegaGrid = MegaGridModels gdat = MegaGrid[modinfo] mg4phot = magfit['photmags'] mg4scale = magfit['scalemags'] yflux, yfluxerr = yfluxdat tempArr = gdat[-1] lam_arr_all, flux_arr_all = gdat[0], gdat[1] griddata = (lam_arr_all, flux_arr_all) Flx2Fit = AT.dict2list(yflux, mg4phot, '_flux') Flx2Fiterr = AT.dict2list(yfluxerr, mg4phot, '_flux') lam2Fit = AT.dict2list(xlam, mg4phot) Flx2scale = AT.dict2list(yflux, mg4scale, '_flux') Flx2scaleErr = AT.dict2list(yfluxerr, mg4scale, '_flux') FluxSED = func(lam2Fit, p0, 1, griddata, tempArr, mg4scale) print('Bands used to fit photosphere: {}'.format(', '.join(mg4phot))) print('Bands used to scale photosphere: {}'.format(', '.join(mg4scale))) # SURFACE TO OBSERVED FLUX NORMALIZATION WEIGHTED FROM ERRORS weights = 1. / np.array(Flx2scaleErr) FluxNorm = np.average(Flx2scale / FluxSED, weights=weights) Rad = np.sqrt(FluxNorm / su2ea2) p0 = np.array([p0[0], Rad]) # radius squared # DEFINE INPUT Args IN ORDER TO FIT TEMPERATURE self.fa = {'x': lam2Fit, 'y': Flx2Fit, 'err': Flx2Fiterr, 'func': func, 'griddata': griddata, 'tempArr': tempArr, 'mag2use': mg4phot, 'su2ea1': su2ea2} parinfo = [ {'value': 0., 'relstep': 0, 'limits': [0, 0], 'limited': [0, 0]} for l in range(2)] for j in range(2): parinfo[j]['value'] = p0[j] parinfo[0]['relstep'] = 0.1 parinfo[1]['relstep'] = 0.3 parinfo[0]['limited'] = [1, 1] parinfo[0]['limits'] = [tempArr[0], tempArr[-1]] mf = mt.mpfit(FT.deviates_from_model, parinfo=parinfo, functkw=self.fa, quiet=1, maxiter=200000, xtol=1e-16, ftol=1e-16, gtol=1e-16) p0, errors = mf.params, mf.perror try: self.chi2 = mf.fnorm / mf.dof except ZeroDivisionError: self.chi2 = -1 print('Degrees of freedom = 0') print('chi2 = %.2f' % self.chi2) radius = p0[1] tempnew = p0[0] * 1000. erad = errors[1] etemp = errors[0] * 1000. # print(r'Fitted Stellar Radius: %.3f $\pm$ Rsun' %radius) # print(r'Fitted Stellar Temperature: %i K' %tempnew) print(r'Fitted Stellar Radius: {:.3f} +/- {:.3f} Rsun'.format(radius, erad)) print( r'Fitted Stellar Temperature: {:d} +/- {:d} K'.format(int(tempnew), int(etemp))) return radius, tempnew, mf
[docs] def calc_temp(self, y, yerr, x, temparr, kw_cor): r""" Calculate the residuals between the input fluxes and predicted fluxes... i think Args: y (arr): fluxes yerr (arr): uncertaintiy in fluxes x (arr): string array denoting bands temparr (arr): arrays of temperatures kw_cor (dict): stuff from load_wfcorrection Returns: residuals, flux normalized to blackbody fluxes, and array of chi2 calculations. """ fluxNormed = [] kcorList = [] # x should be list of strings with band info for bd in x: fluxNormed.append(np.array(kw_cor['F_cor_%s' % bd])) kcorList.append(np.array(kw_cor['kcor_%s' % bd])) kcorList = np.array(kcorList).transpose() Predicted = np.array(fluxNormed) a, b = Predicted[0], Predicted[1] x0, y0 = y[0], y[1] sigx, sigy = yerr[0], yerr[1] alpha = ((a * x0 / sigx ** 2) + (b * y0 / sigy ** 2)) / ( (a / sigx) ** 2 + (b / sigy) ** 2) FluxNormed = (alpha * ( Predicted.transpose() * kcorList).transpose()).transpose() res = np.subtract(FluxNormed, y) return res, FluxNormed, alpha
[docs] class DataLogistics: r""" Set of tools to help with the logistical aspect of loading and managing the user input data, empirical stellar colors, and atmospheric grid files. """ def __init__(self, jfile=None): r""" Instantiate the :class:`sed.DataLogistics` class. It will load all the neceesary data according to the JOSN file Args: jfile (str/path): JSON Parameterfile path """ # PROCESS JSON PARAMETERFILE script = open(jfile).read() self.specs = json.loads(script) # INSTANTIATE DIRECTORY PATHS AND FILE OBJECTS supportdir = DIR.SupportFiles() starfile = opj(supportdir, 'StellarInputFiles', self.specs['files']['input_user_starfile']) empfile = opj(supportdir, self.specs['files']['bv_colorfile']) sheet_name = self.specs['files']['input_stars_sheet'] print(f'Empirical Color File: {empfile}') print(f'Input User File {starfile} using sheet {sheet_name}') # DATAFRAME THAT CONTAINS ALL DATA FROM A DATA INPUT FILE self.StarsDat = self.loadAllStarsExcel(starfile, sheet_name, self.specs['changekeys']) # DICTIONARY THAT CONTAINS ALL THE EMPIRICAL STELLAR COLOR DATA self.EmpDat = self.loadEmpiricalData(empfile) # LOAD ALL RELEVANT ATMOSPHERIC GRID MODELS # DICTIONARY TO HOUSE ALL THE PHOTOSPHERIC MODELS EVENTUALLY self.MegaGrid = self.loadAllModels()
[docs] def show_parameterfile(self, indent=3): r""" Show the contents of the JSON parameterfile Args: indent (int): indentation level. Default is 3. """ specs_formatted_str = json.dumps(self.specs, indent=indent) print(specs_formatted_str)
[docs] def print_input_file(self): """ Prints the user input file. """ print(self.StarsDat)
[docs] def loadAllStarsExcel(self, starfile, sheet_name=1, changekeys=True): r""" Loads all data into a dictionary from starfile. If changekeys is active, it will try to replace the saturated corrected photometry keywords to standard WISE keywords. Data is stored in the :attr:`DataLogistics.StarsDat` class attribute. Unsaturated WISE photometry can be corrected using procedures found in `Patel,+2014 <https://iopscience.iop.org/article/10.1088/0067-0049/212/1/10>`_. Need to add in part about self-correcting the photometry instead of relying on input. Args: starfile (str/pathfile): file name and path sheet_name (str): Which sheet in the file to use. Default is the first one changekeys (bool): Whether to replace the WISE saturated corrected photometry keywords to standard WISE keywords. """ StarsData = pd.read_excel(starfile, sheet_name=sheet_name) StarsData.set_index('MainName', inplace=True) colnames = StarsData.columns if changekeys: if 'W1mC' in colnames: StarsData.rename(columns={'W1m': 'W1mC', 'W1me': 'W1meC'}) if 'W2mC' in colnames: StarsData.rename(columns={'W2m': 'W2mC', 'W2me': 'W2meC'}) return StarsData
[docs] def loadAllStars(self, starfile, changekeys): r""" .. deprecated:: 1.1 Use :meth:`loadAllStarsExcel` instead. Loads all data into a dictionary from starfile. If changekeys is active, it will try to replace the saturated corrected photometry keywords to standard WISE keywords. Unsaturated WISE photometry can be corrected using procedures found in Patel,+2014. Need to add in part about self-correcting the photometry instead of relying on input. Args: starfile (str): file name and path changekeys (): """ if len(StarsDat) == 0: # todo: change this to pandas dataframe # dat = pd.read_csv(starfile, sep=None) # dat = np.genfromtxt(starfile, names=True, dtype=None) dat = np.genfromtxt(starfile, names=True, dtype=None, encoding='utf-8') dat = dat.flatten() colnames = list(dat.dtype.names) for name in colnames: StarsDat[name] = dat[name] if changekeys: if 'W1mC' in colnames: # np.any(colnames == 'W1mC'): StarsDat['W1m'] = StarsDat.pop('W1mC') StarsDat['W1me'] = StarsDat.pop('W1meC') if 'W2mC' in colnames: # np.any(colnames == 'W2mC'): StarsDat['W2m'] = StarsDat.pop('W2mC') StarsDat['W2me'] = StarsDat.pop('W2meC') else: print('Star"s data already loaded')
[docs] def loadEmpiricalData(self, filename): r""" Loads the empirically derived stellar color data from Pecaut & Mamajek (2012). *I think this is still incomplete* Data is stored in the :attr:`DataLogistics.EmpDat` class attribute. Args: filename (str) : file location of the empirical data. """ empirical_data = pd.read_csv(filename, sep='\t') return empirical_data
# if len(EmpDat) == 0: # is not None: # dfemp = ascii.read(filename, comment='#') # EmpDat['dat'] = dfemp # # dat = EmpDat['test']
[docs] def loadAllModels(self): r""" Load all the unique stellar photospheric models based on the input star file and all the options that are needed. Everything is sorted in a dictionary with keys of (model, log(g), metallicity) in the :attr:DataLogistics.MegaGrid class variable. """ MegaGrid = {} allg = np.unique(self.StarsDat['grav']) allmet = np.unique(self.StarsDat['met']) allmod = np.unique(self.StarsDat['model']) print('-------------------------------') print('Loading All Gridmodels...\n') for mod in allmod: for z in allmet: for g in allg: if (mod, g, z) not in MegaGrid: MegaGrid[(mod, g, z)] = getattr(GMod, f'get_{mod}Grids')(g, z) print(f'Loaded {mod} of g={g}, met={z}') print('\nDone Loading Models.') print('-------------------------------') return MegaGrid
[docs] class GridModels:
[docs] @staticmethod def get_NextGenGrids(grav=None, met=0, model='NextGen', ext='.txt'): r""" Args: grav, met (str,str): gravity and metallity values associated with files for given model --> must match format for the models as indicated in their respective readme files. model (str): Model name. Should be same name as the directory all grids are located. ext (str): Grid file extensions. Returns: (tuple): (lam_arr_all, flux_arr_all, temparr) lam_arr_all: :math:`(1 \times N \times M)`-D numpy array with N-wavelength points for each M temperature in the specified grav and met range. flux_arr_all (arr): (1xNxM)-D numpy array with N flux points from grid model to each wavelength point in lam_arr_all for each M temperature for the specified met and grav. temparr (arr): (1xM) numpy array of sorted temperatures from grid models with values divided by 1000. [wavelength] (arr): angstroms [flux] (arr): :math:`\mbox{erg}/\mbox{cm}^2/s/A` [temperature] (arr): Kelvin/1000. The grids are unevenly spaced, so filler 'nan's are placed in both the wavelength and flux points to make things run smoothly. """ # CHECK FORMAT OF GRAV AND METALLICITY gdir = opj(intpdir, 'StellarGridModels', model) if grav is None: # COLLECT ALL FILES OF ANY GRAV -- MEANT TO BE USED TO KEEP GRAV AS FREE Args filesGrid = glob.glob(opj(gdir, 'lteNextGen*_%.1f%s' % (met, ext))) else: grav = grav / 10. filesGrid = glob.glob(opj(gdir, 'lteNextGen*_%.1f_%.1f%s' % (grav, met, ext))) if len(filesGrid) < 1: raise ValueError('There were no files matching your criteria. Try again.') ddat = {} tempArr = np.array([]) gravArr = np.array([]) for f in filesGrid: lam, flux = np.loadtxt(f, unpack=True) gridInfo = os.path.basename(f).split('_') # print gridInfo ti = float(gridInfo[1]) dat = np.array([lam, flux]) ddat[str(ti / 1000.)] = dat tempArr = np.append(tempArr, ti) tempArr.sort() tempArr = tempArr / 1000. # CREATE 3D MATRIX: INDEXED BY [TEMP][WAVELENGTH][FLUX] gridnp = np.array([]) i = 0 si = len(tempArr) while i < si: # START OFF BY CREATING THE FIRST STACK and JUMP TWO INDICES gridi = ddat[str(tempArr[i])] if len(gridnp) == 0: try: grid_i1 = ddat[str(tempArr[i + 1])] gridnp = np.vstack(([gridi], [grid_i1])) except ValueError: diff = len(gridi[0]) - len(ddat[str(tempArr[i + 1])]) zr = np.zeros(abs(diff)).astype(int) if diff < 0: gridi = np.insert(gridi, zr, zr, axis=1) gridnp = np.vstack(([gridi], [grid_i1])) elif diff > 0: grid_i1 = np.insert(grid_i1, zr, zr, axis=1) gridnp = np.vstack(([gridi], [grid_i1])) else: print(f'Grids are not matched between {tempArr[i]},' f'{tempArr[i+1]}') i += 2 else: try: gridnp = np.vstack((gridnp, [gridi])) except ValueError: diff = len(gridnp[0][0]) - len(gridi[0]) zr = np.zeros(abs(diff)).astype(int) if diff < 0: gridnp = np.insert(gridnp, zr, zr, axis=2) gridnp = np.vstack((gridnp, [gridi])) elif diff > 0: gridi = np.insert(gridi, zr, zr, axis=1) gridnp = np.vstack((gridnp, [gridi])) else: print('Grids are not matched between %d, %d' % ( tempArr[i], tempArr[i - 1])) i += 1 lam_arr_all = gridnp[:, 0] flux_arr_all = gridnp[:, 1] return (lam_arr_all, flux_arr_all, tempArr)
[docs] @staticmethod def get_ATLAS9Grids(grav, met, model='ATLAS9', ext='.fits'): r""" Args: grav, met (str,str): gravity and metallity values associated with files for given model --> must match format for the models as indicated in their respective readme files. model (str): Model name. Should be same name as the directory all grids are located. ext (str): Grid file extensions. Returns: (tuple); (lam_arr_all, flux_arr_all, temparr) lam_arr_all: (1X N x M)-D numpy array with N wavelength pointsfor each M temperature in the specified grav and met range. flux_arr_all: (1xNxM)-D numpy array with N flux points from grid model to each wavelength point in lam_arr_all for each M temperature for the specified met and grav. temparr: (1xM) numpy array of sorted temperatures from grid models with values divided by 1000. [wavelength]: angstroms [flux] : :math:`\mbox{erg}/\mbox{cm}^2/s/A` [temperature]: Kelvin/1000. """ # CHECK TO MAKE SURE GRAV AND MET ARE IN CORRECT FORMAT # grav: g# # , met: p# # grav, met = str(grav), str(met) if grav.find('g') == -1: grav = 'g' + grav if met.find('p') == -1: met = 'p0' + met # todo: add stellargridmodels to directories # CHANGE DIRECTORY TO NEEDED METALLICITY FILE dirmod = opj(intpdir, 'StellarGridModels', model, 'k' + met) # THIS SELECTS OUT ONLY THE FILES THAT MEET THE METALLICITY # CRITERIA filesGrid = glob.glob(opj(dirmod, 'k' + met + '*.fits')) if len(filesGrid) < 1: raise ValueError('There were no files matching your criteria.' ' Try again.') ddat = {} tempArr = np.array([]) for f in filesGrid: try: hdui = fits.open(f) ti = hdui[0].header['TEFF'] except: print(f"Something's wrong with {f}") tbdata = hdui[1].data colnames = np.array(hdui[1].columns.names) # CHECK TO SEE IF GRAVITY SELECTED IS IN FITS FILE isGrav = np.where(grav == colnames)[0] if len(isGrav) != 0: tempArr = np.append(tempArr, ti) lam, flux = tbdata['WAVELENGTH'], tbdata[grav] lam, flux = np.array(lam), np.array(flux) else: print("Gravity %s was not found in file %s" % (grav, f)) dat = np.array([lam, flux]) ddat[str(ti / 1000.)] = dat # ddat[str(ti)] = dat tempArr.sort() tempArr = tempArr / 1000. gridnp = np.array([]) # CREATE 3D MATRIX: INDEXED BY [TEMP][WAVELEN][FLUX] i = 0 # for te in tempsArr: si = len(tempArr) while i < si: # START OFF BY CREATING THE FIRST STACK and JUMP TWO INDICES if len(gridnp) == 0: gridnp = np.vstack(([ddat[str(tempArr[i])]], [ddat[str(tempArr[i + 1])]])) i += 2 else: gridnp = np.vstack((gridnp, [ddat[str(tempArr[i])]])) i += 1 lam_arr_all = gridnp[:, 0] flux_arr_all = gridnp[:, 1] return (lam_arr_all, flux_arr_all, tempArr)
[docs] def treat_NextGenSpecModels(self, met=0, grav='all', ext='.spec'): r""" Use this in order to convert the raw files of the NextGen atmospheric models (i.e. ".spec" files) from France Allard's page. The raw files are listed as such: low-res: low-resolution (100-25,000A at 2A) spectra (directly from the model iterations) Obtained from: ftp://ftp.hs.uni-hamburg.de/pub/outgoing/phoenix Spectrum files: Wavelengths are in vacuum, fluxes and BB(Teff) are in cgs (erg/s/cm^2/cm) (yes, that is correct. NOT in erg/s/cm^2/A). Also note that the fluxes and BB's are given directly, NOT as log10 as in previous versions of the model grid. line1: Teff logg [M/H] of the model Wavelength, flux and bbflux are stacked. This is not a column wise file. This module assumes all files have been unzipped. It does not discriminate with respect to temperature, although selections can be made to transform grids of a given solar metallicity and gravity. .. warning:: THERE EXIST POINTS IN THE GRIDS THAT CORRESPOND TO THE SAME DATA POINT IN BOTH LAM AND FLUX SPACE -- PROCEED WITH CAUTION IF NAN'S OCCUR Args: met (str/float): "All" will choose files for all metallicities. Input of a number from the models will select files of that metallicity. grav: Same as met ext: What the original extension of the raw files are. I suggest to keep them as .'spec' Returns: Individual ascii files of the grids, two column of wavelength and F_lam as in the raw files. File names: 'lte_temp_grav_met.txt' Flam units: :math:`\mbox{erg}/\mbox{cm}^2/s/A` Wavelength: Angstrom """ conv2ang = 1e8 # todo: change direcortory here newdir = opj(intpdir, 'NextGen2') # os.mkdir(newdir) dir = '~/Desktop/PHOENIX' if (met == 'all') and (grav == 'all'): filesGrid = glob.glob(opj(dir, 'lte*.NextGen%s' % ext)) elif (met == 'all') and (grav != 'all'): filesGrid = glob.glob( opj(dir, 'lte*-%.1f-*.NextGen%s' % (grav, ext))) elif (met != 'all') and (grav == 'all'): filesGrid = glob.glob(opj(dir, 'lte*-%.1f.NextGen%s' % (met, ext))) elif (met != 'all') and (grav != 'all'): filesGrid = glob.glob( opj(dir, 'lte*-%.1f-%.1f.NextGen%s' % (grav, met, ext))) else: pass if len(filesGrid) < 1: raise ValueError( 'There were no files matching your criteria. Try again.') else: for file in filesGrid: # f = open(file,'r').readlines() f = open(file, 'r') fread = f.read() elements = fread.strip() elements2 = elements.split(' ') fstrip = list(map(string.strip, elements2)) datarr = np.array(fstrip) ind = np.where(datarr == '')[0] dat = np.delete(datarr, ind) temp, grav, met, nwave = float(dat[0]), float(dat[1]), \ float(dat[2]), int(dat[6]) if grav <= 5.5: dat = np.delete(dat, np.arange(0, 11)).astype('float32') wave = dat[np.arange(nwave)] flux = dat[np.arange(nwave, nwave * 2)] # indkeep = np.where(wave<=30000.)[0] # wavKeep, fluxKeep = wave[indkeep],flux[indkeep] wavKeep, fluxKeep = wave, flux # units of erg/s/cm^2/Angstrom DataOut = np.column_stack( (wavKeep, ((fluxKeep) / conv2ang))) fileout = opj(newdir, 'lteNextGen_%.1f_%.1f_%.1f.txt' % ( temp, grav, met)) np.savetxt(fileout, DataOut) else: pass
GMod = GridModels()
[docs] class Bandpass: def __init__(self, fileName, normalise=True, inputUnits='angstroms'): r"""This code loads a passband file with wavelength in the first column and the RSR transmission data for that particular bandpass. The first line should be in the form of #!NNNNN.NN[0,f]/ where NNNN.NN is the isophotal wavelength in angstroms for this particular bandpass. Even if no isophotal wavelength is listed still have the !# part in there. The second line should have the column headings "wav" "trans". Converts the input file wavelength to angstroms. You can tell the code which units the file is in and it will convert to angstroms. Options are: nanometers, microns, mm, GHz inputted as a string. """ self.file = fileName df = np.genfromtxt(self.file, skip_header=1, names=True) self.wavelength = df['wav'] self.transmission = df['trans'] if inputUnits == 'angstroms': pass elif inputUnits == 'nanometers': self.wavelength *= 10.0 elif inputUnits == 'microns': self.wavelength *= 10000.0 elif inputUnits == 'mm': self.wavelength *= 1e7 elif inputUnits == 'GHz': self.wavelength = 3e8 / (self.wavelength * 1e9) self.wavelength *= 1e10 else: raise Exception("DAFUQ? Units no make sense") # Sort into ascending order of wavelength otherwise normalization will be wrong merged = np.array([self.wavelength, self.transmission]).transpose() sortedMerged = np.array(sorted(merged, key=operator.itemgetter(0))) self.wavelength = sortedMerged[:, 0] self.transmission = sortedMerged[:, 1] if normalise: self.transmission = self.transmission / np.trapz(self.transmission, self.wavelength) self.interpolator = intp.interp1d(self.wavelength, self.transmission, kind='linear') self.zmdata = self.metaPassDat(self.file)
[docs] def metaPassDat(self, dfile): r""" This will extract the first line in the RSR data files that contains the zero point wavelengths, frequencies and fluxes, denoted by the string '#!' Args: dfile : string that points to the file name of the RSR bandpass. Returns: list of zero point data. Check readme file in RSR file for more information. """ f = open(dfile) first = f.readline() self.S = re.compile(r"^#\!|\[[0-9]+\]\/|\[ *\w *, *\w+ *\]\/") header = re.split(self.S, first)[1:-1] # zmdata = map(string.strip, header) zmdata = np.array(header, dtype='float') return zmdata
[docs] def rescale(self, maxTransmission): r""" Rescales passband so that maximum value of the transmission is equal to maxTransmission maxTransmission: float - max value to rescale transmission curve to. """ self.transmission /= maxTransmission # self.trans
[docs] def pivotWavelength(self): r"""Calculates pivot wavelength for the passband. This is the same as equation (3) of Carter et al. 2009, and equation A11 in Tokunaga & Vacca 2005. """ a = np.trapz(self.transmission * self.wavelength, self.wavelength) b = np.trapz(self.transmission / self.wavelength, self.wavelength) pivWavelength = np.sqrt(a / b) return pivWavelength
[docs] def isoWavelength(self): r"""Extracts the isophotal wavelength from the bandpass file if it's there. Otherwise it uses the calculated effective wavelength of the bandpass. The isophotal wavelength is kept under # !NNNNN.NN[0,f]/ in the first line of the bandpass file. """ try: isowavelength = float(self.zmdata[0]) # h.header[0] except IndexError: isowavelength = self.pivotWavelength() print('Error in extracting isowavelength') print('Using pivotWavelength instead for at %.5f' % isowavelength) return isowavelength
[docs] def fluxVegaZeroPointLam(self): r"""Extracts the zero point fluxes and related uncertainties from the bandpass if it's there. Otherwise it throws and error and lets you know you are now attached to another object by an inclined plane, wrapped helically around an axis. The information is kept in the header file that looks like # !N1[0,f]/ N2[1,f]/ N3[2,f]/ where N1 is the isophotal wavelength and the N2, N3 are the zero point fluxes respectively. """ try: zp = self.zmdata[1] zperr = self.zmdata[2] except IndexError: print('No zero point flux available. Check RSR file {}'.format( self.file)) zpflx, zpflxErr = float(zp), float(zperr) return zpflx, zpflxErr
[docs] def isoFrequency(self): r""" Returns the iso frequency of the bandpass in the Vega photometric system. """ try: isofrequency = float(self.zmdata[3]) except IndexError: print('No other option for frequency available.' ' Place freq in header file of {}.'.format(self.file)) return isofrequency
[docs] def fluxVegaZeroPointFreq(self): r""" Returns Zero point frequency in the Vega photometric system """ try: zp = self.zmdata[4] zperr = self.zmdata[5] except IndexError: print('No zero point flux available. Check RSR file {}'.format( self.file)) zpflx, zpflxErr = float(zp), float(zperr) return zpflx, zpflxErr
[docs] class Flatbandpass: def __init__(self, waverange=(None, None), cntr=None, normalize=True, inputUnits='angstroms'): r""" Creates a passband object with a flat response curve Args: waverange (tuple): cntr (float): normalize (float): inputUnits (str): """ if waverange[0] is None or waverange[1] is None: raise ValueError( 'No wavelength range defined for the flat spectrum') else: self.isoWavelength = cntr self.pivotWavelength = cntr self.wavelength = np.linspace(waverange[0], waverange[1], 200) self.transmission = np.ones(len(self.wavelength)) if inputUnits == 'angstroms': pass elif inputUnits == 'nanometers': self.wavelength *= 10.0 self.pivotWavelength *= 10.0 elif inputUnits == 'microns': self.wavelength *= 10000.0 self.pivotWavelength *= 10000.0 elif inputUnits == 'mm': self.wavelength *= 1e7 self.pivotWavelength *= 1.7 elif inputUnits == 'GHz': self.wavelength = 3e8 / (self.wavelength * 1e9) self.wavelength *= 1e10 self.pivotWavelength = 3e8 / (self.pivotWavelength * 1e9) self.pivotWavelength *= 1e10 else: raise Exception("DAFUQ? Units no make sense") self.isoFrequency = _CS / self.isoWavelength