Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,6 @@ pip-wheel-metadata/

# Mac OSX
.DS_Store

.cursor
uv.lock
110 changes: 0 additions & 110 deletions astrodbkit/spectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@

import os

import astropy.units as u
import numpy as np
from astropy.io import fits
from astropy.nddata import StdDevUncertainty
from astropy.units import Unit
from astropy.wcs import WCS
from specutils import Spectrum
from specutils.io.parsing_utils import read_fileobj_or_hdulist
from specutils.io.registers import data_loader

# pylint: disable=no-member, unused-argument
Expand Down Expand Up @@ -75,112 +71,6 @@ def spex_prism_loader(filename, **kwargs):
return Spectrum(flux=data, spectral_axis=wave, uncertainty=uncertainty, meta=meta)


def identify_wcs1d_multispec(origin, *args, **kwargs):
"""
Identifier for WCS1D multispec
"""
hdu = kwargs.get("hdu", 0)

# Check if number of axes is one and dimension of WCS is greater than one
with read_fileobj_or_hdulist(*args, **kwargs) as hdulist:
return (
hdulist[hdu].header.get("WCSDIM", 1) > 1
and hdulist[hdu].header["NAXIS"] > 1
and "WAT0_001" in hdulist[hdu].header
and hdulist[hdu].header.get("WCSDIM", 1) == hdulist[hdu].header["NAXIS"]
and "LINEAR" in hdulist[hdu].header.get("CTYPE1", "")
)


@data_loader("wcs1d-multispec", identifier=identify_wcs1d_multispec, extensions=["fits"], dtype=Spectrum, priority=10)
def wcs1d_multispec_loader(file_obj, flux_unit=None, hdu=0, verbose=False, **kwargs):
"""
Loader for multiextension spectra as wcs1d. Adapted from wcs1d_fits_loader

Parameters
----------
file_obj : str, file-like or HDUList
FITS file name, object (provided from name by Astropy I/O Registry),
or HDUList (as resulting from astropy.io.fits.open()).
flux_unit : :class:`~astropy.units.Unit` or str, optional
Units of the flux for this spectrum. If not given (or None), the unit
will be inferred from the BUNIT keyword in the header. Note that this
unit will attempt to convert from BUNIT if BUNIT is present.
hdu : int
The index of the HDU to load into this spectrum.
verbose : bool
Print extra info.
**kwargs
Extra keywords for :func:`~specutils.io.parsing_utils.read_fileobj_or_hdulist`.

Returns
-------
:class:`~specutils.Spectrum`
"""

with read_fileobj_or_hdulist(file_obj, **kwargs) as hdulist:
header = hdulist[hdu].header
wcs = WCS(header)

# Load data, convert units if BUNIT and flux_unit is provided and not the same
if "BUNIT" in header:
data = u.Quantity(hdulist[hdu].data, unit=header["BUNIT"])
if u.A in data.unit.bases:
data = data * u.A / u.AA # convert ampere to Angroms
if flux_unit is not None:
data = data.to(flux_unit)
else:
data = u.Quantity(hdulist[hdu].data, unit=flux_unit)

if wcs.wcs.cunit[0] == "" and "WAT1_001" in header:
# Try to extract from IRAF-style card or use Angstrom as default.
wat_dict = dict((rec.split("=") for rec in header["WAT1_001"].split()))
unit = wat_dict.get("units", "Angstrom")
if hasattr(u, unit):
wcs.wcs.cunit[0] = unit
else: # try with unit name stripped of excess plural 's'...
wcs.wcs.cunit[0] = unit.rstrip("s")
if verbose:
print(f"Extracted spectral axis unit '{unit}' from 'WAT1_001'")
elif wcs.wcs.cunit[0] == "":
wcs.wcs.cunit[0] = "Angstrom"

# Compatibility attribute for lookup_table (gwcs) WCS
wcs.unit = tuple(wcs.wcs.cunit)

# Identify the correct parts of the data to store
if len(data.shape) > 1:
flux_data = data[0]
else:
flux_data = data
uncertainty = None
if "NAXIS3" in header:
for i in range(header["NAXIS3"]):
if "spectrum" in header.get(f"BANDID{i+1}", ""):
flux_data = data[i]
if "sigma" in header.get(f"BANDID{i+1}", ""):
uncertainty = data[i]

# Reshape arrays if needed
if len(flux_data) == 1 and len(flux_data.shape) > 1:
flux_data = np.reshape(flux_data, -1)
if uncertainty is not None:
uncertainty = np.reshape(uncertainty, -1)

# Convert uncertainty to StdDevUncertainty array
if uncertainty is not None:
uncertainty = StdDevUncertainty(uncertainty)

# Manually generate spectral axis
pixels = [[i] + [0] * (wcs.naxis - 1) for i in range(wcs.pixel_shape[0])]
spectral_axis = [i[0] for i in wcs.all_pix2world(pixels, 0)] * wcs.wcs.cunit[0]

# Store header as metadata information
meta = {"header": header}

return Spectrum(flux=flux_data, spectral_axis=spectral_axis, uncertainty=uncertainty, meta=meta)


def load_spectrum(filename: str, spectra_format: str = None, raise_error: bool = False):
"""Attempt to load the filename as a spectrum object

Expand Down
74 changes: 2 additions & 72 deletions astrodbkit/tests/test_spectra.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Tests for spectra functions

from unittest import mock

import numpy as np
import pytest
from astropy.io import fits
Expand All @@ -8,17 +10,10 @@
from astrodbkit.spectra import (
_identify_spex,
identify_spex_prism,
identify_wcs1d_multispec,
load_spectrum,
spex_prism_loader,
wcs1d_multispec_loader,
)

try:
import mock
except ImportError:
from unittest import mock


@pytest.fixture(scope="module")
def good_spex_file():
Expand Down Expand Up @@ -47,44 +42,6 @@ def bad_spex_file():
return fits.HDUList([hdu1])


@pytest.fixture(scope="module")
def good_wcs1dmultispec():
n = np.empty((2141, 1, 4))
hdr = fits.Header()
hdr["WCSDIM"] = 3
hdr["NAXIS"] = 3
hdr["WAT0_001"] = "system=equispec"
hdr["WAT1_001"] = "wtype=linear label=Wavelength units=angstroms"
hdr["WAT2_001"] = "wtype=linear"
hdr["BANDID1"] = "spectrum - background fit, weights variance, clean no"
hdr["BANDID2"] = "raw - background fit, weights none, clean no"
hdr["BANDID3"] = "background - background fit"
hdr["BANDID4"] = "sigma - background fit, weights variance, clean no"
hdr["CTYPE1"] = "LINEAR "
hdr["BUNIT"] = "erg/cm2/s/A"
hdu1 = fits.PrimaryHDU(n, header=hdr)
return fits.HDUList([hdu1])


@pytest.fixture(scope="module")
def alt_wcs1dmultispec():
n = np.empty((2141,))
hdr = fits.Header()
hdr["WCSDIM"] = 1
hdr["NAXIS"] = 1
hdr["WAT0_001"] = "system=equispec"
hdr["WAT1_001"] = "wtype=linear label=Wavelength units=angstroms"
hdr["WAT2_001"] = "wtype=linear"
hdr["BANDID1"] = "spectrum - background fit, weights variance, clean no"
hdr["BANDID2"] = "raw - background fit, weights none, clean no"
hdr["BANDID3"] = "background - background fit"
hdr["BANDID4"] = "sigma - background fit, weights variance, clean no"
hdr["CTYPE1"] = "LINEAR "
hdr["BUNIT"] = "erg/cm2/s/A"
hdu1 = fits.PrimaryHDU(n, header=hdr)
return fits.HDUList([hdu1])


@mock.patch("astrodbkit.spectra.fits.open")
def test_identify_spex_prism(mock_fits_open, good_spex_file):
mock_fits_open.return_value = good_spex_file
Expand Down Expand Up @@ -115,33 +72,6 @@ def test_load_spex_prism(mock_fits_open, good_spex_file, bad_spex_file):
assert spectrum.unit == Unit("erg")


@mock.patch("astrodbkit.spectra.read_fileobj_or_hdulist")
def test_identify_wcs1d_multispec(mock_fits_open, good_wcs1dmultispec):
mock_fits_open.return_value = good_wcs1dmultispec

filename = "https://s3.amazonaws.com/bdnyc/optical_spectra/U10929.fits"
assert identify_wcs1d_multispec("read", filename)


@mock.patch("astrodbkit.spectra.read_fileobj_or_hdulist")
def test_wcs1d_multispec_loader(mock_fits_open, good_wcs1dmultispec, alt_wcs1dmultispec):
mock_fits_open.return_value = good_wcs1dmultispec

spectrum = wcs1d_multispec_loader("filename")
assert spectrum.unit == Unit("erg / (Angstrom cm2 s)")
assert spectrum.wavelength.unit == Unit("Angstrom")

# Check flux_unit is converted correctly
spectrum = wcs1d_multispec_loader("filename", flux_unit=Unit("erg / (um cm2 s)"))
assert spectrum.unit == Unit("erg / (um cm2 s)")

# NAXIS=1 example
mock_fits_open.return_value = alt_wcs1dmultispec
spectrum = wcs1d_multispec_loader("filename")
assert spectrum.unit == Unit("erg / (Angstrom cm2 s)")
assert spectrum.wavelength.unit == Unit("Angstrom")


@mock.patch("astrodbkit.spectra.Spectrum.read")
def test_load_spectrum(mock_spectrum1d, monkeypatch):
_ = load_spectrum("fake_file.txt")
Expand Down