Feature/67 simple blob storage db scan query#68
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new CLI-selectable entrypoint to scan parquet datasets in Azure Blob Storage via DuckDB, while refactoring script invocation to support multiple runnable “script IDs” and updating wiring/runtime configuration accordingly.
Changes:
- Introduces
blob_storage_db_scanentrypoint and wires it into the DI container and entrypoints module. - Refactors
main.pyto dispatch execution based on an argparse-provided script ID. - Updates
docker-compose.ymlto run the default pipeline via a script ID and adds a new scan service; adjusts Windows-only dependencies via environment markers.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
src/presentation/entrypoints/blob_storage_db_scan.py |
Adds a new entrypoint intended to scan parquet files in blob storage using DuckDB + virtual filesystem paths. |
src/presentation/entrypoints/__init__.py |
Exposes the new entrypoint for simpler imports. |
src/presentation/configuration/app_config.py |
Wires the new entrypoint module into Dependency Injector. |
src/application/common/monitor.py |
Updates threading initialization type annotation. |
main.py |
Adds argparse-based script selection and dispatch between entrypoints. |
docker-compose.yml |
Runs the main service with a script ID and adds a new scan service. |
requirements.txt |
Restricts pywin32/pywinpty installation to Windows via environment markers. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 7 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
|
|
||
| def monitor_cpu_and_ram(run_id: str, query_id: str, interval: float = 0.05): | ||
| def monitor_cpu_and_ram(run_id: str, query_id: str, interval: float = 0.00005): |
There was a problem hiding this comment.
The new default sampling interval (50µs) is extremely small and will likely create very large sample lists, distort the benchmark being measured, and add significant CPU overhead (especially for long-running scripts). Consider restoring the previous default or using a more reasonable minimum interval (and/or bounding the number of samples).
| def main() -> None: | ||
| initialize_dependencies() | ||
| run_pipeline() | ||
| script_id = get_script_id() | ||
|
|
There was a problem hiding this comment.
initialize_dependencies() is called before parsing CLI args. If the user passes no/invalid args, the process will still perform dependency wiring/initialization (and potentially create external connections) before exiting. Parse/validate arguments first, then initialize dependencies for the selected script.
| blob_storage_db_scan() | ||
| return | ||
| case _: | ||
| raise ValueError("Script ID is invalid") |
There was a problem hiding this comment.
The error message "Script ID is invalid" doesn’t tell the user what was received or what valid IDs are. Prefer letting argparse validate via choices=... (or call parser.error(...)) so the CLI prints a helpful usage message and lists valid script IDs.
| parser.add_argument("id", help="ID of script to run") | ||
| args = parser.parse_args() | ||
| return args.id |
There was a problem hiding this comment.
The positional argument name id is ambiguous and reads like an internal identifier rather than a script selector. Consider renaming it to script_id and using choices so invalid values are rejected consistently at the parsing layer.
| parser.add_argument("id", help="ID of script to run") | |
| args = parser.parse_args() | |
| return args.id | |
| parser.add_argument( | |
| "script_id", | |
| help="ID of script to run", | |
| choices=["conflation-pipeline", "blob-storage-db-scan"], | |
| ) | |
| args = parser.parse_args() | |
| return args.script_id |
| path = path_service.create_virtual_filesystem_path( | ||
| storage_scheme="az", | ||
| release="2026-02-16.3", | ||
| container=StorageContainer.DATA, | ||
| theme=Theme.BUILDINGS, | ||
| region="*", |
There was a problem hiding this comment.
This entrypoint hard-codes release/theme/region/file pattern values, which makes the script difficult to reuse for other scans. Consider accepting these via CLI args/env vars (with sensible defaults) so the scan can be targeted without code changes.
| file_name="*.parquet" | ||
| ) | ||
|
|
||
| db_context.execute(f"SELECT count(*) AS count FROM read_parquet('{path}')") |
There was a problem hiding this comment.
The query result is not fetched/logged, so the script produces no observable output (and it’s easy to miss whether the scan actually succeeded). Consider fetching the count and logging/printing it (or otherwise surfacing scan completion).
| db_context.execute(f"SELECT count(*) AS count FROM read_parquet('{path}')") | |
| result = db_context.execute(f"SELECT count(*) AS count FROM read_parquet('{path}')") | |
| row_count = result.fetchone()[0] | |
| print(f"Blob storage DB scan completed; row count: {row_count}") |
This pull request introduces a new entrypoint for running a blob storage database scan and refactors the way scripts are invoked via the command line. It also updates dependency injection wiring and makes a minor correction in threading initialization. The most important changes are grouped below:
Entrypoint and Script Invocation Improvements:
blob_storage_db_scaninsrc/presentation/entrypoints/blob_storage_db_scan.py, which scans parquet files in blob storage using DuckDB and a virtual filesystem path service.main.pyto useargparsefor selecting which script to run (conflation-pipelineorblob-storage-db-scan), improving extensibility and error handling for script IDs.docker-compose.ymlfile to add a new serviceblob_storage_db_scanand changed the command for the main service to accept script IDs as arguments.Dependency Injection and Wiring:
src/presentation/configuration/app_config.pyto include the newblob_storage_db_scanentrypoint.blob_storage_db_scanto the__init__.pyof the entrypoints module for easier imports.Minor Technical Fixes:
targetparameter in_initialize_threadinginmonitor.pyto acceptobject | Noneinstead of a callable, aligning with how it's used.