diff --git a/app/lib/rawdata.py b/app/lib/rawdata.py index ef085e4..1591df3 100644 --- a/app/lib/rawdata.py +++ b/app/lib/rawdata.py @@ -31,7 +31,7 @@ def rawdata_batches( break total += len(rows) log.logger.debug( - "processed batch", + "read batch", rows=len(rows), last_id=rows[-1]["hyperleda_internal_id"], total=total, diff --git a/app/structured/photometry/__init__.py b/app/structured/photometry/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/structured/photometry/upload.py b/app/structured/photometry/upload.py new file mode 100644 index 0000000..5bab216 --- /dev/null +++ b/app/structured/photometry/upload.py @@ -0,0 +1,107 @@ +from app import log +from app.display import print_table +from app.gen.client import adminapi +from app.gen.client.adminapi.api.default import save_structured_data +from app.gen.client.adminapi.models.save_structured_data_request import ( + SaveStructuredDataRequest, +) +from app.lib.rawdata import rawdata_batches +from app.storage import PgStorage +from app.upload import handle_call + +PHOTOMETRY_COLUMNS = ["band", "mag", "e_mag", "method"] + +BANDS = [ + ("U", "ut", "e_ut"), + ("B", "bt", "e_bt"), + ("V", "vt", "e_vt"), + ("I", "it", "e_it"), + ("K", "kt", "e_kt"), +] + +PHOTOMETRY_RAW_COLUMNS = [c for _, mag, err in BANDS for c in (mag, err)] + + +def upload_photometry_hyperleda( + storage: PgStorage, + table_name: str, + batch_size: int, + client: adminapi.AuthenticatedClient, + *, + write: bool = False, +) -> None: + uploaded_objects = 0 + skipped = 0 + total_source_rows = 0 + band_counts: dict[str, int] = {band: 0 for band, _, _ in BANDS} + band_mag_sums: dict[str, float] = {band: 0.0 for band, _, _ in BANDS} + + try: + for rows in rawdata_batches(storage, table_name, PHOTOMETRY_RAW_COLUMNS, batch_size): + total_source_rows += len(rows) + batch_ids: list[str] = [] + batch_data: list[list[str | float]] = [] + + for row in rows: + internal_id = row["hyperleda_internal_id"] + had_any = False + for band, mag_col, err_col in BANDS: + mag_val = row.get(mag_col) + err_val = row.get(err_col) + if mag_val is not None and err_val is not None: + batch_ids.append(internal_id) + batch_data.append([band, float(mag_val), float(err_val), "asymptotic"]) + band_counts[band] += 1 + band_mag_sums[band] += float(mag_val) + had_any = True + if had_any: + uploaded_objects += 1 + else: + skipped += 1 + + if write and batch_ids: + handle_call( + save_structured_data.sync_detailed( + client=client, + body=SaveStructuredDataRequest( + catalog="photometry", + columns=PHOTOMETRY_COLUMNS, + ids=batch_ids, + data=batch_data, + ), + ) + ) + + uploaded_rows = sum(band_counts.values()) + log.logger.info( + "processed batch", + source_rows=len(rows), + total_source_rows=total_source_rows, + objects=uploaded_objects, + photometry_rows=uploaded_rows, + ) + finally: + total = uploaded_objects + skipped + total_photometry_rows = sum(band_counts.values()) + + def pct(n: int, denom: int) -> float: + return (100.0 * n / denom) if denom else 0.0 + + table_rows: list[tuple[str | int | float, ...]] = [ + ("Source rows with ≥1 band", uploaded_objects, f"{pct(uploaded_objects, total):.1f}%", "-"), + ("Source rows with no band", skipped, f"{pct(skipped, total):.1f}%", "-"), + ("Total photometry rows", total_photometry_rows, "-", "-"), + ] + for band, _, _ in BANDS: + count = band_counts[band] + avg_mag = (band_mag_sums[band] / count) if count else 0.0 + pct_str = f"{pct(count, total_photometry_rows):.1f}%" if total_photometry_rows else "-" + avg_str = round(avg_mag, 3) if count else "-" + table_rows.append((band, count, pct_str, avg_str)) + + print_table( + ("Status", "Uploaded", "% of total", "Avg mag"), + table_rows, + title=f"Total source rows: {total}\n", + percent_last_column=False, + ) diff --git a/main.py b/main.py index 4ff0da8..de2133c 100644 --- a/main.py +++ b/main.py @@ -20,6 +20,9 @@ from app.structured.designations import upload_designations as run_upload_designations from app.structured.icrs import upload_icrs as run_upload_icrs from app.structured.nature import upload_nature as run_upload_nature +from app.structured.photometry.upload import ( + upload_photometry_hyperleda as run_upload_photometry_hyperleda, +) from app.structured.redshift import upload_redshift as run_upload_redshift env_map = { @@ -208,6 +211,34 @@ def upload_structured_redshift( ) +@upload_structured.command( + "photometry-hyperleda", + help="Upload U/B/V/I/K asymptotic magnitudes from hyperleda_m000 to the photometry catalog.", +) +@click.option("--batch-size", default=10000, type=int, help="Source rows per batch") +@click.option( + "--write", + is_flag=True, + help="Upload results to the API; default is to only print statistics (dry-run)", +) +@click.pass_context +def upload_structured_photometry_hyperleda( + ctx: click.Context, + batch_size: int, + write: bool, +) -> None: + common = ctx.obj.upload_structured_common + with connect(common["dsn"]) as conn: + storage = PgStorage(conn) + run_upload_photometry_hyperleda( + storage, + common["table_name"], + batch_size, + common["client"], + write=write, + ) + + @upload_structured.command("nature", help="Upload object nature/type to the structured level.") @click.option( "--column-name",