From 06519a8eaba99f929c527df52d5c64bf6a2027ab Mon Sep 17 00:00:00 2001 From: David Rodriguez Date: Tue, 11 Nov 2025 15:52:45 -0500 Subject: [PATCH 1/2] Removing logic for wcs1d-multispec --- .gitignore | 3 + astrodbkit/spectra.py | 112 +------------------------------ astrodbkit/tests/test_spectra.py | 74 +------------------- 3 files changed, 6 insertions(+), 183 deletions(-) diff --git a/.gitignore b/.gitignore index 159a581..382685d 100644 --- a/.gitignore +++ b/.gitignore @@ -64,3 +64,6 @@ pip-wheel-metadata/ # Mac OSX .DS_Store + +.cursor +uv.lock diff --git a/astrodbkit/spectra.py b/astrodbkit/spectra.py index 81f7443..e109289 100644 --- a/astrodbkit/spectra.py +++ b/astrodbkit/spectra.py @@ -2,14 +2,10 @@ import os -import astropy.units as u -import numpy as np from astropy.io import fits from astropy.nddata import StdDevUncertainty from astropy.units import Unit -from astropy.wcs import WCS -from specutils import Spectrum -from specutils.io.parsing_utils import read_fileobj_or_hdulist +from specutils import Spectrum1D as Spectrum from specutils.io.registers import data_loader # pylint: disable=no-member, unused-argument @@ -75,112 +71,6 @@ def spex_prism_loader(filename, **kwargs): return Spectrum(flux=data, spectral_axis=wave, uncertainty=uncertainty, meta=meta) -def identify_wcs1d_multispec(origin, *args, **kwargs): - """ - Identifier for WCS1D multispec - """ - hdu = kwargs.get("hdu", 0) - - # Check if number of axes is one and dimension of WCS is greater than one - with read_fileobj_or_hdulist(*args, **kwargs) as hdulist: - return ( - hdulist[hdu].header.get("WCSDIM", 1) > 1 - and hdulist[hdu].header["NAXIS"] > 1 - and "WAT0_001" in hdulist[hdu].header - and hdulist[hdu].header.get("WCSDIM", 1) == hdulist[hdu].header["NAXIS"] - and "LINEAR" in hdulist[hdu].header.get("CTYPE1", "") - ) - - -@data_loader("wcs1d-multispec", identifier=identify_wcs1d_multispec, extensions=["fits"], dtype=Spectrum, priority=10) -def wcs1d_multispec_loader(file_obj, flux_unit=None, hdu=0, verbose=False, **kwargs): - """ - Loader for multiextension spectra as wcs1d. Adapted from wcs1d_fits_loader - - Parameters - ---------- - file_obj : str, file-like or HDUList - FITS file name, object (provided from name by Astropy I/O Registry), - or HDUList (as resulting from astropy.io.fits.open()). - flux_unit : :class:`~astropy.units.Unit` or str, optional - Units of the flux for this spectrum. If not given (or None), the unit - will be inferred from the BUNIT keyword in the header. Note that this - unit will attempt to convert from BUNIT if BUNIT is present. - hdu : int - The index of the HDU to load into this spectrum. - verbose : bool - Print extra info. - **kwargs - Extra keywords for :func:`~specutils.io.parsing_utils.read_fileobj_or_hdulist`. - - Returns - ------- - :class:`~specutils.Spectrum` - """ - - with read_fileobj_or_hdulist(file_obj, **kwargs) as hdulist: - header = hdulist[hdu].header - wcs = WCS(header) - - # Load data, convert units if BUNIT and flux_unit is provided and not the same - if "BUNIT" in header: - data = u.Quantity(hdulist[hdu].data, unit=header["BUNIT"]) - if u.A in data.unit.bases: - data = data * u.A / u.AA # convert ampere to Angroms - if flux_unit is not None: - data = data.to(flux_unit) - else: - data = u.Quantity(hdulist[hdu].data, unit=flux_unit) - - if wcs.wcs.cunit[0] == "" and "WAT1_001" in header: - # Try to extract from IRAF-style card or use Angstrom as default. - wat_dict = dict((rec.split("=") for rec in header["WAT1_001"].split())) - unit = wat_dict.get("units", "Angstrom") - if hasattr(u, unit): - wcs.wcs.cunit[0] = unit - else: # try with unit name stripped of excess plural 's'... - wcs.wcs.cunit[0] = unit.rstrip("s") - if verbose: - print(f"Extracted spectral axis unit '{unit}' from 'WAT1_001'") - elif wcs.wcs.cunit[0] == "": - wcs.wcs.cunit[0] = "Angstrom" - - # Compatibility attribute for lookup_table (gwcs) WCS - wcs.unit = tuple(wcs.wcs.cunit) - - # Identify the correct parts of the data to store - if len(data.shape) > 1: - flux_data = data[0] - else: - flux_data = data - uncertainty = None - if "NAXIS3" in header: - for i in range(header["NAXIS3"]): - if "spectrum" in header.get(f"BANDID{i+1}", ""): - flux_data = data[i] - if "sigma" in header.get(f"BANDID{i+1}", ""): - uncertainty = data[i] - - # Reshape arrays if needed - if len(flux_data) == 1 and len(flux_data.shape) > 1: - flux_data = np.reshape(flux_data, -1) - if uncertainty is not None: - uncertainty = np.reshape(uncertainty, -1) - - # Convert uncertainty to StdDevUncertainty array - if uncertainty is not None: - uncertainty = StdDevUncertainty(uncertainty) - - # Manually generate spectral axis - pixels = [[i] + [0] * (wcs.naxis - 1) for i in range(wcs.pixel_shape[0])] - spectral_axis = [i[0] for i in wcs.all_pix2world(pixels, 0)] * wcs.wcs.cunit[0] - - # Store header as metadata information - meta = {"header": header} - - return Spectrum(flux=flux_data, spectral_axis=spectral_axis, uncertainty=uncertainty, meta=meta) - - def load_spectrum(filename: str, spectra_format: str = None, raise_error: bool = False): """Attempt to load the filename as a spectrum object diff --git a/astrodbkit/tests/test_spectra.py b/astrodbkit/tests/test_spectra.py index a7e70ac..12939d4 100644 --- a/astrodbkit/tests/test_spectra.py +++ b/astrodbkit/tests/test_spectra.py @@ -1,5 +1,7 @@ # Tests for spectra functions +from unittest import mock + import numpy as np import pytest from astropy.io import fits @@ -8,17 +10,10 @@ from astrodbkit.spectra import ( _identify_spex, identify_spex_prism, - identify_wcs1d_multispec, load_spectrum, spex_prism_loader, - wcs1d_multispec_loader, ) -try: - import mock -except ImportError: - from unittest import mock - @pytest.fixture(scope="module") def good_spex_file(): @@ -47,44 +42,6 @@ def bad_spex_file(): return fits.HDUList([hdu1]) -@pytest.fixture(scope="module") -def good_wcs1dmultispec(): - n = np.empty((2141, 1, 4)) - hdr = fits.Header() - hdr["WCSDIM"] = 3 - hdr["NAXIS"] = 3 - hdr["WAT0_001"] = "system=equispec" - hdr["WAT1_001"] = "wtype=linear label=Wavelength units=angstroms" - hdr["WAT2_001"] = "wtype=linear" - hdr["BANDID1"] = "spectrum - background fit, weights variance, clean no" - hdr["BANDID2"] = "raw - background fit, weights none, clean no" - hdr["BANDID3"] = "background - background fit" - hdr["BANDID4"] = "sigma - background fit, weights variance, clean no" - hdr["CTYPE1"] = "LINEAR " - hdr["BUNIT"] = "erg/cm2/s/A" - hdu1 = fits.PrimaryHDU(n, header=hdr) - return fits.HDUList([hdu1]) - - -@pytest.fixture(scope="module") -def alt_wcs1dmultispec(): - n = np.empty((2141,)) - hdr = fits.Header() - hdr["WCSDIM"] = 1 - hdr["NAXIS"] = 1 - hdr["WAT0_001"] = "system=equispec" - hdr["WAT1_001"] = "wtype=linear label=Wavelength units=angstroms" - hdr["WAT2_001"] = "wtype=linear" - hdr["BANDID1"] = "spectrum - background fit, weights variance, clean no" - hdr["BANDID2"] = "raw - background fit, weights none, clean no" - hdr["BANDID3"] = "background - background fit" - hdr["BANDID4"] = "sigma - background fit, weights variance, clean no" - hdr["CTYPE1"] = "LINEAR " - hdr["BUNIT"] = "erg/cm2/s/A" - hdu1 = fits.PrimaryHDU(n, header=hdr) - return fits.HDUList([hdu1]) - - @mock.patch("astrodbkit.spectra.fits.open") def test_identify_spex_prism(mock_fits_open, good_spex_file): mock_fits_open.return_value = good_spex_file @@ -115,33 +72,6 @@ def test_load_spex_prism(mock_fits_open, good_spex_file, bad_spex_file): assert spectrum.unit == Unit("erg") -@mock.patch("astrodbkit.spectra.read_fileobj_or_hdulist") -def test_identify_wcs1d_multispec(mock_fits_open, good_wcs1dmultispec): - mock_fits_open.return_value = good_wcs1dmultispec - - filename = "https://s3.amazonaws.com/bdnyc/optical_spectra/U10929.fits" - assert identify_wcs1d_multispec("read", filename) - - -@mock.patch("astrodbkit.spectra.read_fileobj_or_hdulist") -def test_wcs1d_multispec_loader(mock_fits_open, good_wcs1dmultispec, alt_wcs1dmultispec): - mock_fits_open.return_value = good_wcs1dmultispec - - spectrum = wcs1d_multispec_loader("filename") - assert spectrum.unit == Unit("erg / (Angstrom cm2 s)") - assert spectrum.wavelength.unit == Unit("Angstrom") - - # Check flux_unit is converted correctly - spectrum = wcs1d_multispec_loader("filename", flux_unit=Unit("erg / (um cm2 s)")) - assert spectrum.unit == Unit("erg / (um cm2 s)") - - # NAXIS=1 example - mock_fits_open.return_value = alt_wcs1dmultispec - spectrum = wcs1d_multispec_loader("filename") - assert spectrum.unit == Unit("erg / (Angstrom cm2 s)") - assert spectrum.wavelength.unit == Unit("Angstrom") - - @mock.patch("astrodbkit.spectra.Spectrum.read") def test_load_spectrum(mock_spectrum1d, monkeypatch): _ = load_spectrum("fake_file.txt") From 4d9e21adb5210a9473b376793e464202709b9076 Mon Sep 17 00:00:00 2001 From: David Rodriguez Date: Mon, 8 Dec 2025 11:36:26 -0500 Subject: [PATCH 2/2] Update astrodbkit/spectra.py Co-authored-by: Kelle Cruz --- astrodbkit/spectra.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/astrodbkit/spectra.py b/astrodbkit/spectra.py index e109289..b78075f 100644 --- a/astrodbkit/spectra.py +++ b/astrodbkit/spectra.py @@ -5,7 +5,7 @@ from astropy.io import fits from astropy.nddata import StdDevUncertainty from astropy.units import Unit -from specutils import Spectrum1D as Spectrum +from specutils import Spectrum from specutils.io.registers import data_loader # pylint: disable=no-member, unused-argument