Refactor streaming sync architecture, preview flow, and packaging#3
Merged
Conversation
Phase 1: Rewrite package API (Brickbyte -> client()), clean logging, pin dependencies, update CI matrix, replace Makefile with uv targets. Phase 2: Canonical schema (record_id/extracted_at/data/run_id), stream name sanitization with collision detection, SQL identifier validation, backtick-quoted table names, UTC-aware timestamps, fixed buffer size estimation. Phase 3: Safe overwrite via staging tables with atomic INSERT OVERWRITE, writer close in finally block, fatal error handling fix, SQL writer hardening (Volume guard, flatten DDL, deterministic filenames, proper cleanup), cooperative timeout, cleanup=False default. Phase 4: Fix credential scope resolution bug for explicit scope/key, add dotted-key nested mapping, warning logs for unresolved secrets. Phase 5: Incremental sync state manager, deduplication with positional _dk_N columns, concurrent stream processing with isolated per-thread writers, progress reporting with callback and tqdm support.
Preview: add Python-to-Spark type mapping for real type change detection in compare_schemas(). Log skipped records instead of silent pass. Enrichment: add SQLSemanticEnricher for SQL-based enrichment without Spark. Flatten mode now uses COMMENT ON COLUMN for column-level descriptions. Spark enricher guards for active session with actionable error message. JSON parse errors are logged per-row.
Rewrite all 9 existing test files to use brickbyte.client() API, canonical schema (record_id/extracted_at/data/run_id), run_id param, safe overwrite assertions, and UTC-aware timestamps. Add 7 new test files: - test_sanitize.py: stream name sanitization, identifier validation - test_safe_overwrite.py: staging table lifecycle, atomic overwrite, schema alignment, incompatible type rejection - test_incremental.py: state table CRUD - test_concurrent.py: parallel writer isolation, error propagation - test_dedup.py: key normalization, _dk_N extraction, _dk_missing, null keys, validation errors, MERGE execution - test_progress.py: callback invocation, event correctness - test_enrichment_sql.py: SQL enrichment path, column comments Update all 10 notebooks to use import brickbyte / brickbyte.client(), lowercase naming, pinned dependencies in _setup.py. Rewrite README.md with Lakeflow positioning, v2 API, canonical schema, and documentation for all new features. Fix ruff lint issues (unused imports, import sorting).
Dedup: - List[str] dedup_keys (__all__ sentinel) now expanded to per-stream dict after selected streams are known, fixing silent no-op. - Dedup runs per-stream with the owning writer as executor, fixing crash in parallel + SQL mode where executor was None. - _execute_sql(None) raises immediately instead of silently falling back to Spark. - Key column identifiers validated through validate_identifier() at both normalization and MERGE-build time; unsafe chars rejected early. - _run_dedup_for_stream always routes through positional _dk_N columns in both flatten and raw modes. Overwrite: - SQL _atomic_overwrite_sql now checks types: rejects incompatible changes, CASTs safe narrower→wider, ALTERs target column when staging is wider. - Spark _atomic_overwrite uses raw dataType objects (not str()) with _type_name()/_sql_type() helpers to normalize parenthesized type strings (e.g. IntegerType()) and emit correct SQL type names for CAST and ALTER COLUMN TYPE. Reverse-widening now widens the target instead of narrowing the staging data. Preview: - get_table_name() sanitizes and backtick-quotes stream names, matching writer table naming. - get_target_schema() strips trailing "()" from Spark type strings so _PYTHON_TO_SPARK comparisons work correctly. Incremental: - StateManager supports SQL connector (not just Spark) via _get_connection() with staging_volume/warehouse_id params. - State loaded before sync via _apply_incremental_state(), which probes set_stream_state / set_state_for_stream / set_state on the source. Raises NotImplementedError when saved state exists but the connector lacks a state API. - State saved per-stream on success via _save_incremental_state(), which extracts connector-emitted state when available and falls back to a run_id+records dict. Progress: - record_processed() called every 5 000 records in both sequential and parallel-oversized paths. - stream_completed() called in parallel future collection. - ProgressReporter.close() called in finally block; errors suppressed. Enrichment: - enrich_table() in sync() now passes sanitized stream name.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR consolidates the recent refactor work across the brickbyte sync pipeline, writer architecture, packaging, preview flow, tests, and CI.
It includes the latest review-driven fixes along with the preceding foundation work that introduced the newer streaming, state, deduplication, and credential-management behavior.
Included in this PR
Core architecture
Preview, tests, and docs
Bug fixes and review follow-up
COPY INTOfrom the uploaded volume pathCI and release behavior
Why
This PR moves brickbyte toward a cleaner and more reliable ingestion-focused design.
The main goals are:
Testing
uv run pytest tests/ -v -m 'not integration'uv run ruff check src testsNotes
park-peter/brickbyteforkmainSyncResultno longer includesenriched_tables