-
Notifications
You must be signed in to change notification settings - Fork 4
feat(admin-api): add sync progress endpoint #1528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
7723067 to
f21b0eb
Compare
LNSD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Slack, I suggested using a PUSH model (instead of a POLL model) to expose the table sync progress updates to all the clients (e.g., via a Kafka broker).
This is a secondary priority. But there is value for the engine management use case.
Please review my comments.
crates/services/admin-api/src/handlers/datasets/sync_progress.rs
Outdated
Show resolved
Hide resolved
| })?; | ||
|
|
||
| // Query active tables info from metadata database (job_id, status) | ||
| let writer_infos = metadata_db::sync_progress::get_active_tables_with_writer_info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not query this information directly from the metadata_db. Actually, no direct metadata_db interactions should happen in the admin API handlers. This should be part of the amp-data-store component.
| for resolved_table in dataset.resolved_tables(partial_ref) { | ||
| let table_name = resolved_table.name().clone(); | ||
| let writer_info = writer_info_map.get(table_name.as_str()); | ||
|
|
||
| // Get the active physical table if it exists | ||
| let physical_table = | ||
| PhysicalTable::get_active(ctx.data_store.clone(), resolved_table.clone()) | ||
| .await | ||
| .map_err(|err| { | ||
| tracing::error!( | ||
| table = %table_name, | ||
| error = %err, | ||
| error_source = logging::error_source(&*err), | ||
| "failed to get active physical table" | ||
| ); | ||
| Error::PhysicalTable(err) | ||
| })?; | ||
|
|
||
| let (current_block, start_block, files_count, total_size_bytes) = | ||
| if let Some(pt) = physical_table { | ||
| // Take a snapshot to get accurate synced range | ||
| let snapshot = pt | ||
| .snapshot(false, ctx.data_store.clone()) | ||
| .await | ||
| .map_err(|err| { | ||
| tracing::error!( | ||
| table = %table_name, | ||
| error = %err, | ||
| error_source = logging::error_source(&*err), | ||
| "failed to snapshot physical table" | ||
| ); | ||
| Error::PhysicalTable(err) | ||
| })?; | ||
|
|
||
| let synced_range = snapshot.synced_range(); | ||
| let canonical_segments = snapshot.canonical_segments(); | ||
|
|
||
| let files_count = canonical_segments.len() as i64; | ||
| let total_size_bytes = canonical_segments | ||
| .iter() | ||
| .map(|s| s.object.size as i64) | ||
| .sum(); | ||
|
|
||
| let (start, end) = match synced_range { | ||
| Some(range) => ( | ||
| Some(range.start().try_into().unwrap_or(0)), | ||
| Some(range.end().try_into().unwrap_or(0)), | ||
| ), | ||
| None => (None, None), | ||
| }; | ||
|
|
||
| (end, start, files_count, total_size_bytes) | ||
| } else { | ||
| // Table hasn't been created/synced yet | ||
| (None, None, 0, 0) | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this logic seems too ad hoc to be included in the admin API handler. It belongs somewhere else in the data plane, not in the admin API.
|
In addition to a |
|
I've added an RFC documenting the proposed changes based on all the feedback. @LNSD @leoyvens @Chriswhited
|
27cb3ef to
359a8ac
Compare
This has been added. |
16f4c38 to
408b46c
Compare
LNSD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, check my comments 🙂
I am still checking the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should add this feature doc when you implement the Kafka-based worker events emitter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will remove it from this PR for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This spec is unrelated to the PR changes.
4d1d468 to
ed2b026
Compare
LNSD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why under the datasets/ resource and not under jobs/?
Please, check my comments 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This spec is unrelated to the PR changes.
c72d1d8 to
3d433ab
Compare
…p + reorg handling via canonical_chain logic
… for a specific table within a dataset
3d433ab to
e9bb77d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, check my comments 🙂
I am reviewing the metadata DB API changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These APIs belong to the domain of physical table revisions. You should integrate these APIs into that module; please do not add arbitrary new modules to the metadata-db crate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, but do you mean crates/core/metadata-db/src/physical_table.rs or crates/core/common/src/catalog/physical.rs?
Because physical.rs contains the PhysicalTable struct, but physical_table.rs contains the database operations that also query the physical_tables table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean under the metadata_db::physical_tables::* module. You can put it inside the physical_tables.rs module. In both cases, respect the sql.rs separation (SQL statement wrappers go into the sql.rs file).
It is okay-ish that we add SQL operations that JOIN other tables (to filter or enrich the returned data).
| /// | ||
| /// Returns a list of active tables where the specified job is the writer, | ||
| /// along with metadata about each table including dataset information. | ||
| pub async fn get_tables_written_by_job( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: No references to "jobs" in the data store. We should refer to "writers". Check the other method names (the ones passing a writer ID).
| #[derive(sqlx::FromRow)] | ||
| struct Row { | ||
| table_name: TableNameOwned, | ||
| manifest_hash: ManifestHashOwned, | ||
| dataset_namespace: DatasetNamespaceOwned, | ||
| dataset_name: DatasetNameOwned, | ||
| job_status: JobStatus, | ||
| } | ||
|
|
||
| let rows: Vec<Row> = sqlx::query_as(query).bind(job_id).fetch_all(exe).await?; | ||
|
|
||
| Ok(rows | ||
| .into_iter() | ||
| .map(|row| JobTableInfo { | ||
| table_name: row.table_name, | ||
| manifest_hash: row.manifest_hash, | ||
| dataset_namespace: row.dataset_namespace, | ||
| dataset_name: row.dataset_name, | ||
| job_status: row.job_status, | ||
| }) | ||
| .collect()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this conversion between types and not just derive sqlx::FromRow for the JobTableInfo type?
Summary
GET /datasets/{namespace}/{name}/versions/{revision}/sync-progressendpoint.current_block,start_block,job_status, and file stats.TableSnapshot::synced_range()withcanonical_chainlogic to accurately report sync progress, handling gaps and reorgs.Tests
Response format:
{ "dataset_namespace": "ethereum", "dataset_name": "mainnet", "revision": "0.0.0", "manifest_hash": "2dbf16e8a4d1c526e3893341d1945040d51ea1b68d1c420e402be59b0646fcfa", "tables": [ { "table_name": "blocks", "current_block": 950000, "start_block": 0, "job_id": 1, "job_status": "RUNNING", "files_count": 47, "total_size_bytes": 2147483648 } ] }