-
Notifications
You must be signed in to change notification settings - Fork 51
skills(experimental): Real-Time Mode bug fixes + two new references #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -171,12 +171,14 @@ query = (enriched_df | |
| Enrich events with dimension data: | ||
|
|
||
| ```python | ||
| from pyspark.sql.functions import broadcast, col, to_json, struct | ||
|
|
||
| # Read reference data (Delta table - auto-refreshed each microbatch) | ||
| user_dim = spark.table("users.dimension") | ||
|
|
||
| # Stream-static join for enrichment | ||
| enriched = (parsed_df | ||
| .join(user_dim, "user_id", "left") | ||
| .join(broadcast(user_dim), "user_id", "left") # broadcast() required: RTM only supports broadcast stream-static joins | ||
| .withColumn("enriched_value", to_json(struct( | ||
| col("event_id"), | ||
| col("user_id"), | ||
|
|
@@ -192,7 +194,8 @@ enriched.select(col("key"), col("enriched_value").alias("value")).writeStream \ | |
| .format("kafka") \ | ||
| .option("kafka.bootstrap.servers", brokers) \ | ||
| .option("topic", "enriched-events") \ | ||
| .trigger(realTime=True) \ | ||
| .outputMode("update") \ | ||
| .trigger(realTime="5 minutes") \ | ||
| .option("checkpointLocation", "/checkpoints/enrichment") \ | ||
| .start() | ||
| ``` | ||
|
|
@@ -231,8 +234,8 @@ def route_events(batch_df, batch_id): | |
| .save() | ||
|
|
||
| parsed_df.writeStream \ | ||
| .foreachBatch(route_events) \ | ||
| .trigger(realTime=True) \ | ||
| .foreachBatch(route_events) \ # foreachBatch is NOT supported in RTM — use processingTime | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment is probably not necessary here, although it's useful for the reviewer. :) |
||
| .trigger(processingTime="30 seconds") \ | ||
| .option("checkpointLocation", "/checkpoints/routing") \ | ||
| .start() | ||
| ``` | ||
|
|
@@ -281,8 +284,8 @@ def validate_and_route(batch_df, batch_id): | |
| .save() | ||
|
|
||
| source_df.writeStream \ | ||
| .foreachBatch(validate_and_route) \ | ||
| .trigger(realTime=True) \ | ||
| .foreachBatch(validate_and_route) \ # foreachBatch is NOT supported in RTM — use processingTime | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. Comment not necessary. |
||
| .trigger(processingTime="30 seconds") \ | ||
| .option("checkpointLocation", "/checkpoints/validation") \ | ||
| .start() | ||
| ``` | ||
|
|
@@ -352,7 +355,7 @@ df.writeStream \ | |
| | minPartitions | Match Kafka partitions | Optimal parallelism | | ||
| | maxOffsetsPerTrigger | 10,000-100,000 | Balance latency vs throughput | | ||
| | trigger interval | Business SLA / 3 | Recovery time buffer | | ||
| | RTM | Only if < 800ms required | Microbatch more cost-effective | | ||
| | RTM | When sub-second E2E latency is required | Micro-batch is more cost-effective for second-or-longer SLAs | | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Recommend RTM for kafka -> kafka? |
||
|
|
||
| ## Monitoring | ||
|
|
||
|
|
@@ -391,7 +394,7 @@ for stream in spark.streams.active: | |
| | Issue | Cause | Solution | | ||
| |-------|-------|----------| | ||
| | **No data being read** | `startingOffsets` default is "latest" | Use "earliest" for existing data | | ||
| | **High latency** | Microbatch overhead | Use RTM (trigger(realTime=True)) | | ||
| | **High latency** | Micro-batch overhead | Use RTM (`trigger(realTime="5 minutes")`) — see [real-time-mode.md](real-time-mode.md) | | ||
| | **Consumer lag** | Processing < Input rate | Scale cluster; reduce maxOffsetsPerTrigger | | ||
| | **Duplicate messages** | Exactly-once not configured | Enable idempotent producer (acks=all) | | ||
| | **Falling behind** | Processing < Input rate | Increase cluster size | | ||
|
|
@@ -402,7 +405,7 @@ for stream in spark.streams.active: | |
| - [ ] Checkpoint location is persistent (UC volumes, not DBFS) | ||
| - [ ] Unique checkpoint per pipeline | ||
| - [ ] Fixed-size cluster (no autoscaling for streaming/RTM) | ||
| - [ ] RTM enabled only if latency < 800ms required | ||
| - [ ] RTM enabled only when sub-second E2E latency is required | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Too strong? Good for kafka -> kafka |
||
| - [ ] Consumer lag monitored and alerts configured | ||
| - [ ] Producer acks=all for durability | ||
| - [ ] Schema validation with DLQ configured | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,257 @@ | ||
| --- | ||
| name: lakebase-sink-python | ||
| description: Write streaming records into Lakebase Postgres from RTM (or any Structured Streaming pipeline) using a Python foreach sink. Use when building realtime apps that serve from Lakebase, or any low-latency pipeline that needs transactional upserts into Postgres. | ||
| --- | ||
|
|
||
| # Lakebase `foreach` Sink (Python) | ||
|
|
||
| A Python `foreach` sink that writes streaming records into Lakebase Postgres with low latency, transactional upserts, and per-partition connection lifecycle. The canonical sink shape for an RTM streaming app whose serving layer is Lakebase. | ||
|
|
||
| For general Lakebase mechanics — projects, branches, sizing, the CU-to-connections cap, authentication methods, connection patterns from non-streaming clients — see the stable `databricks-lakebase` skill, specifically [connectivity.md](../../../skills/databricks-lakebase/references/connectivity.md) and [computes-and-scaling.md](../../../skills/databricks-lakebase/references/computes-and-scaling.md). This file covers only the streaming-sink-specific pattern. | ||
|
|
||
| ## Why this pattern (and not the alternatives) | ||
|
|
||
| There is no native Structured Streaming connector for Lakebase / Postgres in low-latency mode. The available alternatives: | ||
|
|
||
| - **`foreachBatch + df.write.jdbc()`** — NOT appropriate for RTM. `foreachBatch` is unsupported in RTM, and batches by micro-batch boundary, which RTM doesn't really have. | ||
| - **`foreach` with a custom Python sink** — the right primitive. Per-row lifecycle, per-partition connection. | ||
|
|
||
| ## Two non-obvious rules | ||
|
|
||
| 1. **The sink class is duck-typed in Python — no base class.** Official Databricks docs show the foreach pattern as a Scala `ForeachWriter` subclass. Translate to Python — `foreach()` accepts duck-typed objects with `open`/`process`/`close` methods, so no base class is involved. The `pyspark.sql.streaming.ForeachWriter` symbol is a Scala class with no exported Python equivalent. | ||
| 2. **For the executor-side sink, use native Postgres password auth, not OAuth.** OAuth refresh requires Databricks SDK context that executors don't have — executors are separate Python processes with no SDK state. Use the Postgres role's password, pulled from a Databricks secret scope at driver startup and serialized into the sink instance. (Driver-side admin setup — `CREATE ROLE`, `CREATE DATABASE`, `GRANT` — is different; that runs on the driver where the SDK is available, so use an SDK-minted JWT for those.) See [connectivity.md](../../../skills/databricks-lakebase/references/connectivity.md) for the two auth methods. | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| - A Lakebase Autoscaling project, branch, and endpoint. Default database is `databricks_postgres`. | ||
| - The Lakebase instance must have **native Postgres password login enabled** at create time. Without it, `CREATE ROLE … PASSWORD` succeeds but the Postgres wire-protocol login from executors is rejected. | ||
| - A Postgres role with a password (created via driver-side admin connection), and the password stored in a Databricks secret scope. | ||
|
|
||
| ## The `LakebaseSink` class | ||
|
|
||
| ```python | ||
| import time | ||
|
|
||
| import psycopg | ||
|
|
||
|
|
||
| # PySpark's `foreach()` accepts any object that implements the | ||
| # `open(partition_id, epoch_id) -> bool`, `process(row)`, `close(error)` | ||
| # lifecycle. There is no exported base class to extend — the | ||
| # `pyspark.sql.streaming.ForeachWriter` symbol is a Scala class, not a | ||
| # Python one. Duck-typing is the documented Python pattern. | ||
| class LakebaseSink: | ||
| """ | ||
| Streaming sink that upserts rows into a Lakebase Postgres table. | ||
|
|
||
| Buffers in-memory and commits in time-based windows. Each process(row) | ||
| appends to the buffer and flushes only if at least max_dwell_ms have | ||
| passed since the last flush. Effective Lakebase TX/s ceiling per | ||
| partition is 1000 / max_dwell_ms; at higher source rates more rows | ||
| accumulate per flush. | ||
|
|
||
| Uses INSERT ... ON CONFLICT for upsert semantics. One transaction per | ||
| flush. Authenticates with Postgres password — OAuth is incompatible | ||
| with executor-side custom sinks. | ||
| """ | ||
|
|
||
| def __init__(self, host: str, database: str, user: str, password: str, | ||
| table: str, port: int = 5432, key_col: str = "id", | ||
| max_dwell_ms: int = 10): | ||
| self.host = host | ||
| self.database = database | ||
| self.user = user | ||
| self.password = password | ||
| self.table = table | ||
| self.port = port | ||
| self.key_col = key_col | ||
| self.max_dwell_ms = max_dwell_ms | ||
|
|
||
| def open(self, partition_id, epoch_id): | ||
| self.conn = psycopg.connect( | ||
| host=self.host, port=self.port, dbname=self.database, | ||
| user=self.user, password=self.password, | ||
| sslmode="require", autocommit=False, | ||
| ) | ||
| self.buffer = [] | ||
| self.last_flush_ms = self._now_ms() | ||
| return True | ||
|
|
||
| def process(self, row): | ||
| self.buffer.append(row) | ||
| if self._now_ms() - self.last_flush_ms >= self.max_dwell_ms: | ||
| self._flush() | ||
|
|
||
| def close(self, error): | ||
| try: | ||
| if self.buffer: | ||
| self._flush() | ||
| finally: | ||
| if self.conn is not None: | ||
| self.conn.close() | ||
| self.conn = None | ||
|
|
||
| def _flush(self): | ||
| if not self.buffer: | ||
| return | ||
| sample = self.buffer[0].asDict() | ||
| cols = list(sample.keys()) | ||
| placeholders = ", ".join(["%s"] * len(cols)) | ||
| col_list = ", ".join(f'"{c}"' for c in cols) | ||
| update_set = ", ".join( | ||
| f'"{c}" = EXCLUDED."{c}"' for c in cols if c != self.key_col | ||
| ) | ||
| sql = ( | ||
| f'INSERT INTO "{self.table}" ({col_list}) ' | ||
| f'VALUES ({placeholders}) ' | ||
| f'ON CONFLICT ("{self.key_col}") DO UPDATE SET {update_set}' | ||
| ) | ||
| try: | ||
| with self.conn.cursor() as cur: | ||
| rows = [tuple(r.asDict()[c] for c in cols) for r in self.buffer] | ||
| cur.executemany(sql, rows) | ||
| self.conn.commit() | ||
| except Exception: | ||
| self.conn.rollback() | ||
| raise | ||
| finally: | ||
| self.buffer.clear() | ||
| self.last_flush_ms = self._now_ms() | ||
|
|
||
| @staticmethod | ||
| def _now_ms(): | ||
| return int(time.monotonic() * 1000) | ||
| ``` | ||
|
|
||
| ## Wiring into a stream | ||
|
|
||
| ```python | ||
| # Set defensively at the top — required before any .repartition(N) in RTM | ||
| spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false") | ||
|
|
||
| sink = LakebaseSink( | ||
| host=dbutils.secrets.get("lakebase", "host"), | ||
| database="databricks_postgres", | ||
| user=dbutils.secrets.get("lakebase", "user"), | ||
| password=dbutils.secrets.get("lakebase", "password"), | ||
| table="events", | ||
| key_col="id", | ||
| max_dwell_ms=10, | ||
| ) | ||
|
|
||
| stream = ( | ||
| spark.readStream.format("kafka") | ||
| .option("kafka.bootstrap.servers", brokers) | ||
| .option("subscribe", input_topic) | ||
| .load() | ||
| .selectExpr("CAST(value AS STRING) AS id", "timestamp") | ||
| .writeStream | ||
| .foreach(sink) | ||
| .outputMode("update") | ||
| .option("checkpointLocation", checkpoint_path) | ||
| .trigger(realTime="5 minutes") # checkpoint interval, not trigger interval | ||
| .start() | ||
| ) | ||
| ``` | ||
|
|
||
| See [real-time-mode.md](real-time-mode.md) for the RTM cluster setup and the `outputMode("update")` requirement; see [kafka-streaming.md](kafka-streaming.md) for source-specific Kafka options. | ||
|
|
||
| ## Delivery semantics | ||
|
|
||
| The `foreach` sink is **at-least-once** — see "Delivery semantics" in [real-time-mode.md](real-time-mode.md) for why and when duplicates occur. | ||
|
|
||
| The supplied `LakebaseSink` handles this correctly because the upsert is idempotent — `INSERT … ON CONFLICT (id) DO UPDATE` produces the same final state regardless of how many times the same row arrives. If you customize the sink, **preserve idempotency**: keep the `ON CONFLICT` upsert (or use `MERGE`), never plain `INSERT`. If you swap in append-only inserts (event log style), include a deduplication key the consumer can use. | ||
|
|
||
| ## Tuning `max_dwell_ms` | ||
|
|
||
| `max_dwell_ms = 10` is the default. It controls the minimum interval between Postgres commits. | ||
|
|
||
| | `max_dwell_ms` | Latency to Postgres | Max sustained TX/s per partition | When to use | | ||
| |---|---|---|---| | ||
| | 1 | Lowest (~1 ms) | ~1000 | Tightest demos; cheap workloads only | | ||
| | 10 (default) | Low (~10 ms) | ~100 | Most realtime apps | | ||
| | 100 | Moderate | ~10 | High source rates where per-commit overhead matters more than per-row latency | | ||
| | 1000 | Visible (~1 s) | ~1 | Effectively micro-batch — only if Lakebase is the bottleneck | | ||
|
|
||
| The effective Lakebase TX/s ceiling per partition is `1000 / max_dwell_ms`. With N source partitions writing to Lakebase, your cluster can drive up to `N × 1000 / max_dwell_ms` transactions per second. | ||
|
|
||
| **Why time-based and not count-based?** A count-based buffer (flush every K rows) has unpredictable latency: at low source rates, K rows can take seconds to accumulate. Time-based dwell bounds worst-case latency directly. | ||
|
|
||
| **Caveat:** the dwell check only fires on row arrival. If no rows arrive for a long stretch, the last buffered row stays in memory until either the next row arrives or `close()` is called. Fine for steady streams, less ideal for bursty sources with long quiet gaps. | ||
|
|
||
| ## Connection budget | ||
|
|
||
| Each Lakebase CU has a max-connections cap (see the CU-to-connections table in [computes-and-scaling.md](../../../skills/databricks-lakebase/references/computes-and-scaling.md)). For a realtime app, total Postgres connections at peak = | ||
|
|
||
| ``` | ||
| (app's connection pool max × app replicas) + (streaming sink partitions) | ||
| ``` | ||
|
|
||
| The streaming sink opens one Postgres connection per partition writing to Lakebase. Both terms must fit under the CU's max-connections cap. At CU_1 the cap is ~209; at CU_4, ~839. | ||
|
|
||
| ## Target table | ||
|
|
||
| Create the Lakebase table once, outside the stream: | ||
|
|
||
| ```sql | ||
| CREATE TABLE IF NOT EXISTS events ( | ||
| id BIGINT PRIMARY KEY, | ||
| timestamp TIMESTAMPTZ NOT NULL | ||
| ); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS events_ts_idx ON events (timestamp DESC); | ||
| ``` | ||
|
|
||
| The primary key is what `ON CONFLICT` upserts against. The descending timestamp index supports the "give me the latest N" query a downstream app will run. Add business columns (metric, category, payload JSON) as your app needs. | ||
|
|
||
| ## Performance & longevity | ||
|
|
||
| The sink performs well for moderate rates and short-lived or append-mostly workloads. The case that needs care is the one it's most often used for — **continuously upserting a small set of hot rows** to serve a live app — where dead-tuple churn can degrade it over a long run. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you want to discuss the upsert approach that uses Lakebase as a state store, or do you consider that bad practice? |
||
|
|
||
| ### Dead-tuple bloat on hot-key upserts | ||
|
|
||
| `INSERT … ON CONFLICT DO UPDATE` rewrites a row on every flush, and every UPDATE leaves a dead tuple behind. Against a handful of live rows updated several times a second, that's thousands of dead tuples a minute. Autovacuum reclaims them, but its defaults are tuned for large tables, not tiny hot ones (`autovacuum_naptime` is 60s; the scale factor waits for 20% of the table to change). Set aggressive **per-table** autovacuum so cleanup keeps pace: | ||
|
|
||
| ```sql | ||
| ALTER TABLE events SET ( | ||
| autovacuum_vacuum_scale_factor = 0, -- don't wait for 20% of the table | ||
| autovacuum_vacuum_threshold = 200, -- vacuum after N dead tuples | ||
| autovacuum_vacuum_cost_delay = 0 -- don't throttle on a hot table | ||
| ); | ||
| ``` | ||
|
|
||
| Watch `n_dead_tup` and `pg_total_relation_size` in `pg_stat_user_tables` — a few-row table holding tens of MB is the symptom, and it slows both this sink's writes and the app's reads. | ||
|
|
||
| ### Do NOT put Change Data Feed on a high-churn upsert table | ||
|
|
||
| Lakehouse Sync / CDF holds a logical **replication slot that pins the xmin horizon**; if the slot lags even slightly, autovacuum can reclaim *nothing* and the upsert table bloats without bound (and `REPLICA IDENTITY FULL`, which CDF requires, doubles the WAL per update). The result is unbounded growth that degrades the whole serving path. | ||
|
|
||
| If you need the operational data in Delta, capture it from an **append-only event-log table**, never the upsert table: | ||
|
|
||
| - Keep the serving (upsert) tables `REPLICA IDENTITY DEFAULT` and **out of CDF** — autovacuum then reclaims them freely. | ||
| - Have the sink *also* insert each row into an append-only `*_events` table and point CDF only at that. Inserts create no dead tuples, so a lagging slot can never bloat it. | ||
|
|
||
| For Lakehouse Sync setup itself (CDF from Lakebase → Delta), see [lakehouse-sync.md](../../../skills/databricks-lakebase/references/lakehouse-sync.md). | ||
|
|
||
| ### Reconnect instead of failing the query | ||
|
|
||
| `open()` holds one connection per partition for the epoch, and the default `_flush` re-raises on any error — so a transient disconnect (an autoscaling endpoint bounce, a CDF slot being created, a brief network blip) **kills the whole RTM query** and forces a restart (minutes of downtime plus checkpoint replay). Pull the connect into a helper and reconnect-then-skip instead of letting it propagate: | ||
|
|
||
| ```python | ||
| def _connect(self): | ||
| self.conn = psycopg.connect( | ||
| host=self.host, port=self.port, dbname=self.database, | ||
| user=self.user, password=self.password, | ||
| sslmode="require", autocommit=False, | ||
| ) | ||
|
|
||
| # in _flush()'s except block: | ||
| except (psycopg.OperationalError, psycopg.InterfaceError): | ||
| try: | ||
| self.conn.close() | ||
| except Exception: | ||
| pass | ||
| self._connect() # one tick dropped; the stream survives | ||
| ``` | ||
|
|
||
| This trades one flush window of buffered rows on each transient blip for stream availability. The strict at-least-once alternative is to keep the default `raise` and let RTM restart from checkpoint — see "Delivery semantics" above. | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we recommend RTM for kafka->kafka streams? I think it's better than microbatch for this case.