Skip to content

Conversation

@mitchhs12
Copy link
Contributor

@mitchhs12 mitchhs12 commented Jan 7, 2026

Summary

  • Added GET /datasets/{namespace}/{name}/versions/{revision}/sync-progress endpoint.
  • Returns per-table sync progress including current_block, start_block, job_status, and file stats.
  • Uses TableSnapshot::synced_range() with canonical_chain logic to accurately report sync progress, handling gaps and reorgs.

Tests

  • Endpoint returns correct structure for valid dataset
  • Returns 404 for non-existent dataset
  • Verifies RUNNING status while job is actively syncing
  • Verifies COMPLETED status when end block is reached

Response format:

{
  "dataset_namespace": "ethereum",
  "dataset_name": "mainnet",
  "revision": "0.0.0",
  "manifest_hash": "2dbf16e8a4d1c526e3893341d1945040d51ea1b68d1c420e402be59b0646fcfa",
  "tables": [
    {
      "table_name": "blocks",
      "current_block": 950000,
      "start_block": 0,
      "job_id": 1,
      "job_status": "RUNNING",
      "files_count": 47,
      "total_size_bytes": 2147483648
    }
  ]
}

@mitchhs12 mitchhs12 requested review from LNSD and leoyvens January 7, 2026 23:00
@mitchhs12 mitchhs12 self-assigned this Jan 7, 2026
@mitchhs12 mitchhs12 linked an issue Jan 7, 2026 that may be closed by this pull request
Copy link
Contributor

@LNSD LNSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Slack, I suggested using a PUSH model (instead of a POLL model) to expose the table sync progress updates to all the clients (e.g., via a Kafka broker).

This is a secondary priority. But there is value for the engine management use case.

Please review my comments.

})?;

// Query active tables info from metadata database (job_id, status)
let writer_infos = metadata_db::sync_progress::get_active_tables_with_writer_info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not query this information directly from the metadata_db. Actually, no direct metadata_db interactions should happen in the admin API handlers. This should be part of the amp-data-store component.

Comment on lines 142 to 208
for resolved_table in dataset.resolved_tables(partial_ref) {
let table_name = resolved_table.name().clone();
let writer_info = writer_info_map.get(table_name.as_str());

// Get the active physical table if it exists
let physical_table =
PhysicalTable::get_active(ctx.data_store.clone(), resolved_table.clone())
.await
.map_err(|err| {
tracing::error!(
table = %table_name,
error = %err,
error_source = logging::error_source(&*err),
"failed to get active physical table"
);
Error::PhysicalTable(err)
})?;

let (current_block, start_block, files_count, total_size_bytes) =
if let Some(pt) = physical_table {
// Take a snapshot to get accurate synced range
let snapshot = pt
.snapshot(false, ctx.data_store.clone())
.await
.map_err(|err| {
tracing::error!(
table = %table_name,
error = %err,
error_source = logging::error_source(&*err),
"failed to snapshot physical table"
);
Error::PhysicalTable(err)
})?;

let synced_range = snapshot.synced_range();
let canonical_segments = snapshot.canonical_segments();

let files_count = canonical_segments.len() as i64;
let total_size_bytes = canonical_segments
.iter()
.map(|s| s.object.size as i64)
.sum();

let (start, end) = match synced_range {
Some(range) => (
Some(range.start().try_into().unwrap_or(0)),
Some(range.end().try_into().unwrap_or(0)),
),
None => (None, None),
};

(end, start, files_count, total_size_bytes)
} else {
// Table hasn't been created/synced yet
(None, None, 0, 0)
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this logic seems too ad hoc to be included in the admin API handler. It belongs somewhere else in the data plane, not in the admin API.

@LNSD
Copy link
Contributor

LNSD commented Jan 14, 2026

In addition to a /datasets progress endpoint, we should add a per-table /datasets/{namespace}/{name}/version/{revision}/tables/{table_name}/progress endpoint.

@mitchhs12
Copy link
Contributor Author

I've added an RFC documenting the proposed changes based on all the feedback.

@LNSD @leoyvens @Chriswhited
I'd appreciate your thoughts before I proceed with the implementation. In particular Section 6:

  • DataStore API Design - Does the proposed get_table_progress() and get_tables_writer_info() method signatures look correct? Are there any concerns with adding these to the existing DataStore struct or should these be added to the amp-data-store crate or somewhere else entirely?

  • Bulk Endpoint - I've noted in Future Considerations that a bulk GET /datasets/progress endpoint may be needed for Platform to monitor many datasets efficiently. Should this be part of the initial implementation?

  • Future Considerations - Are there other scenarios that should be documented (multi-chain, non-block progress, etc.)?

@mitchhs12 mitchhs12 force-pushed the feature/admin-api-sync-progress branch from 27cb3ef to 359a8ac Compare January 15, 2026 14:55
@mitchhs12
Copy link
Contributor Author

In addition to a /datasets progress endpoint, we should add a per-table /datasets/{namespace}/{name}/version/{revision}/tables/{table_name}/progress endpoint.

This has been added.

@mitchhs12 mitchhs12 force-pushed the feature/admin-api-sync-progress branch from 16f4c38 to 408b46c Compare January 15, 2026 20:56
Copy link
Contributor

@LNSD LNSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check my comments 🙂

I am still checking the implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should add this feature doc when you implement the Kafka-based worker events emitter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will remove it from this PR for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This spec is unrelated to the PR changes.

@mitchhs12 mitchhs12 force-pushed the feature/admin-api-sync-progress branch from 4d1d468 to ed2b026 Compare January 16, 2026 21:24
@mitchhs12 mitchhs12 requested a review from LNSD January 19, 2026 14:46
Copy link
Contributor

@LNSD LNSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why under the datasets/ resource and not under jobs/?

Please, check my comments 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This spec is unrelated to the PR changes.

@mitchhs12 mitchhs12 force-pushed the feature/admin-api-sync-progress branch 2 times, most recently from c72d1d8 to 3d433ab Compare January 20, 2026 20:26
@mitchhs12 mitchhs12 force-pushed the feature/admin-api-sync-progress branch from 3d433ab to e9bb77d Compare January 20, 2026 22:07
Copy link
Contributor

@LNSD LNSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check my comments 🙂

I am reviewing the metadata DB API changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These APIs belong to the domain of physical table revisions. You should integrate these APIs into that module; please do not add arbitrary new modules to the metadata-db crate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but do you mean crates/core/metadata-db/src/physical_table.rs or crates/core/common/src/catalog/physical.rs?
Because physical.rs contains the PhysicalTable struct, but physical_table.rs contains the database operations that also query the physical_tables table?

Copy link
Contributor

@LNSD LNSD Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean under the metadata_db::physical_tables::* module. You can put it inside the physical_tables.rs module. In both cases, respect the sql.rs separation (SQL statement wrappers go into the sql.rs file).

It is okay-ish that we add SQL operations that JOIN other tables (to filter or enrich the returned data).

///
/// Returns a list of active tables where the specified job is the writer,
/// along with metadata about each table including dataset information.
pub async fn get_tables_written_by_job(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: No references to "jobs" in the data store. We should refer to "writers". Check the other method names (the ones passing a writer ID).

Comment on lines +142 to +162
#[derive(sqlx::FromRow)]
struct Row {
table_name: TableNameOwned,
manifest_hash: ManifestHashOwned,
dataset_namespace: DatasetNamespaceOwned,
dataset_name: DatasetNameOwned,
job_status: JobStatus,
}

let rows: Vec<Row> = sqlx::query_as(query).bind(job_id).fetch_all(exe).await?;

Ok(rows
.into_iter()
.map(|row| JobTableInfo {
table_name: row.table_name,
manifest_hash: row.manifest_hash,
dataset_namespace: row.dataset_namespace,
dataset_name: row.dataset_name,
job_status: row.job_status,
})
.collect())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this conversion between types and not just derive sqlx::FromRow for the JobTableInfo type?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Job progress reporting

3 participants