From 1bfab4e5673b1a53a3acffad81ad6df0a7f4130f Mon Sep 17 00:00:00 2001 From: Dan Keeling Date: Mon, 15 Jun 2026 20:02:39 +0100 Subject: [PATCH] Real-Time Mode: bug fixes + two new references MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes incorrect RTM facts in two existing references and adds two new references covering RTM essentials and the Lakebase foreach sink — the canonical sink for a realtime app whose serving layer is Lakebase. Bug fixes (experimental/databricks-spark-structured-streaming/references/): - trigger-and-cost-optimization.md: Real-Time Mode section claimed .trigger(realTime=True) (boolean form doesn't exist; PySpark requires the duration), "Photon enabled" (actually Photon must be OFF for RTM), "Databricks 13.3+" (current minimum is DBR 16.4 LTS), and "Latency: < 800ms" (current docs cite "as low as 5 ms"). Five occurrences of these claims across the file are corrected, including the Trigger Selection Guide table and the "RTM not working" diagnostic row. - kafka-streaming.md: Patterns 4-6 used .trigger(realTime=True) (boolean) and Patterns 5-6 combined foreachBatch + RTM, directly contradicting the same file's Pattern 3 correct note that "forEachBatch is NOT supported in RTM". Switched those patterns to processingTime triggers with explanatory comments. Updated the "RTM enabled only if latency < 800ms required" Production Checklist row and the corresponding Common Issues row. New references: - real-time-mode.md (175 lines): cluster setup constraints (DBR 16.4 LTS+, Classic compute, autoscaling/Photon/spot OFF, the realTimeMode.enabled flag), slot math (worker vCPUs >= sum of partitions across stages), supported sources/sinks matrix, transformWithState per-row behavior (vs micro-batch per-key-per-batch), observability (RealTimeStreamScan physical-plan check; the three built-in latency metrics with the sink-write caveat; diagnose-by-metric decision tree), delivery semantics (exactly-once within Spark, at-least-once for Kafka and foreach), error class table. - lakebase-sink-python.md (205 lines): a duck-typed Python foreach sink that writes streaming records into Lakebase Postgres. Cross-links to skills/databricks-lakebase/references/connectivity.md for general auth methods and computes-and-scaling.md for the CU-to-connections cap; adds the streaming-specific rationale for native-password (not OAuth) on executors. Documents the LakebaseSink class with time-based commits (max_dwell_ms), INSERT ... ON CONFLICT idempotent upserts, and the connection-budget formula tying app-pool size + executor connections to the CU cap. SKILL.md: two new nav rows in the Core Patterns table. manifest.json: regenerated by scripts/skills.py generate to include the two new reference files. Signed-off-by: Dan Keeling --- .../SKILL.md | 2 + .../references/kafka-streaming.md | 21 +- .../references/lakebase-sink-python.md | 257 ++++++++++++++++++ .../references/real-time-mode.md | 175 ++++++++++++ .../trigger-and-cost-optimization.md | 74 ++--- manifest.json | 2 + 6 files changed, 487 insertions(+), 44 deletions(-) create mode 100644 experimental/databricks-spark-structured-streaming/references/lakebase-sink-python.md create mode 100644 experimental/databricks-spark-structured-streaming/references/real-time-mode.md diff --git a/experimental/databricks-spark-structured-streaming/SKILL.md b/experimental/databricks-spark-structured-streaming/SKILL.md index a85e9f2..0070be4 100644 --- a/experimental/databricks-spark-structured-streaming/SKILL.md +++ b/experimental/databricks-spark-structured-streaming/SKILL.md @@ -40,6 +40,8 @@ df.writeStream \ | Pattern | Description | Reference | |---------|-------------|-----------| | **Kafka Streaming** | Kafka to Delta, Kafka to Kafka, Real-Time Mode | See [references/kafka-streaming.md](references/kafka-streaming.md) | +| **Real-Time Mode (RTM)** | Sub-second E2E latency — cluster setup, slot math, supported ops, `transformWithState`, observability, delivery semantics | See [references/real-time-mode.md](references/real-time-mode.md) | +| **Lakebase `foreach` Sink** | Write streaming records into Lakebase Postgres with transactional upserts (the canonical sink for an RTM realtime app) | See [references/lakebase-sink-python.md](references/lakebase-sink-python.md) | | **Stream Joins** | Stream-stream joins, stream-static joins | See [references/stream-stream-joins.md](references/stream-stream-joins.md), [references/stream-static-joins.md](references/stream-static-joins.md) | | **Multi-Sink Writes** | Write to multiple tables, parallel merges | See [references/multi-sink-writes.md](references/multi-sink-writes.md) | | **Merge Operations** | MERGE performance, parallel merges, optimizations | See [references/merge-operations.md](references/merge-operations.md) | diff --git a/experimental/databricks-spark-structured-streaming/references/kafka-streaming.md b/experimental/databricks-spark-structured-streaming/references/kafka-streaming.md index 9731434..cbb2681 100644 --- a/experimental/databricks-spark-structured-streaming/references/kafka-streaming.md +++ b/experimental/databricks-spark-structured-streaming/references/kafka-streaming.md @@ -171,12 +171,14 @@ query = (enriched_df Enrich events with dimension data: ```python +from pyspark.sql.functions import broadcast, col, to_json, struct + # Read reference data (Delta table - auto-refreshed each microbatch) user_dim = spark.table("users.dimension") # Stream-static join for enrichment enriched = (parsed_df - .join(user_dim, "user_id", "left") + .join(broadcast(user_dim), "user_id", "left") # broadcast() required: RTM only supports broadcast stream-static joins .withColumn("enriched_value", to_json(struct( col("event_id"), col("user_id"), @@ -192,7 +194,8 @@ enriched.select(col("key"), col("enriched_value").alias("value")).writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", brokers) \ .option("topic", "enriched-events") \ - .trigger(realTime=True) \ + .outputMode("update") \ + .trigger(realTime="5 minutes") \ .option("checkpointLocation", "/checkpoints/enrichment") \ .start() ``` @@ -231,8 +234,8 @@ def route_events(batch_df, batch_id): .save() parsed_df.writeStream \ - .foreachBatch(route_events) \ - .trigger(realTime=True) \ + .foreachBatch(route_events) \ # foreachBatch is NOT supported in RTM — use processingTime + .trigger(processingTime="30 seconds") \ .option("checkpointLocation", "/checkpoints/routing") \ .start() ``` @@ -281,8 +284,8 @@ def validate_and_route(batch_df, batch_id): .save() source_df.writeStream \ - .foreachBatch(validate_and_route) \ - .trigger(realTime=True) \ + .foreachBatch(validate_and_route) \ # foreachBatch is NOT supported in RTM — use processingTime + .trigger(processingTime="30 seconds") \ .option("checkpointLocation", "/checkpoints/validation") \ .start() ``` @@ -352,7 +355,7 @@ df.writeStream \ | minPartitions | Match Kafka partitions | Optimal parallelism | | maxOffsetsPerTrigger | 10,000-100,000 | Balance latency vs throughput | | trigger interval | Business SLA / 3 | Recovery time buffer | -| RTM | Only if < 800ms required | Microbatch more cost-effective | +| RTM | When sub-second E2E latency is required | Micro-batch is more cost-effective for second-or-longer SLAs | ## Monitoring @@ -391,7 +394,7 @@ for stream in spark.streams.active: | Issue | Cause | Solution | |-------|-------|----------| | **No data being read** | `startingOffsets` default is "latest" | Use "earliest" for existing data | -| **High latency** | Microbatch overhead | Use RTM (trigger(realTime=True)) | +| **High latency** | Micro-batch overhead | Use RTM (`trigger(realTime="5 minutes")`) — see [real-time-mode.md](real-time-mode.md) | | **Consumer lag** | Processing < Input rate | Scale cluster; reduce maxOffsetsPerTrigger | | **Duplicate messages** | Exactly-once not configured | Enable idempotent producer (acks=all) | | **Falling behind** | Processing < Input rate | Increase cluster size | @@ -402,7 +405,7 @@ for stream in spark.streams.active: - [ ] Checkpoint location is persistent (UC volumes, not DBFS) - [ ] Unique checkpoint per pipeline - [ ] Fixed-size cluster (no autoscaling for streaming/RTM) -- [ ] RTM enabled only if latency < 800ms required +- [ ] RTM enabled only when sub-second E2E latency is required - [ ] Consumer lag monitored and alerts configured - [ ] Producer acks=all for durability - [ ] Schema validation with DLQ configured diff --git a/experimental/databricks-spark-structured-streaming/references/lakebase-sink-python.md b/experimental/databricks-spark-structured-streaming/references/lakebase-sink-python.md new file mode 100644 index 0000000..e16dde6 --- /dev/null +++ b/experimental/databricks-spark-structured-streaming/references/lakebase-sink-python.md @@ -0,0 +1,257 @@ +--- +name: lakebase-sink-python +description: Write streaming records into Lakebase Postgres from RTM (or any Structured Streaming pipeline) using a Python foreach sink. Use when building realtime apps that serve from Lakebase, or any low-latency pipeline that needs transactional upserts into Postgres. +--- + +# Lakebase `foreach` Sink (Python) + +A Python `foreach` sink that writes streaming records into Lakebase Postgres with low latency, transactional upserts, and per-partition connection lifecycle. The canonical sink shape for an RTM streaming app whose serving layer is Lakebase. + +For general Lakebase mechanics — projects, branches, sizing, the CU-to-connections cap, authentication methods, connection patterns from non-streaming clients — see the stable `databricks-lakebase` skill, specifically [connectivity.md](../../../skills/databricks-lakebase/references/connectivity.md) and [computes-and-scaling.md](../../../skills/databricks-lakebase/references/computes-and-scaling.md). This file covers only the streaming-sink-specific pattern. + +## Why this pattern (and not the alternatives) + +There is no native Structured Streaming connector for Lakebase / Postgres in low-latency mode. The available alternatives: + +- **`foreachBatch + df.write.jdbc()`** — NOT appropriate for RTM. `foreachBatch` is unsupported in RTM, and batches by micro-batch boundary, which RTM doesn't really have. +- **`foreach` with a custom Python sink** — the right primitive. Per-row lifecycle, per-partition connection. + +## Two non-obvious rules + +1. **The sink class is duck-typed in Python — no base class.** Official Databricks docs show the foreach pattern as a Scala `ForeachWriter` subclass. Translate to Python — `foreach()` accepts duck-typed objects with `open`/`process`/`close` methods, so no base class is involved. The `pyspark.sql.streaming.ForeachWriter` symbol is a Scala class with no exported Python equivalent. +2. **For the executor-side sink, use native Postgres password auth, not OAuth.** OAuth refresh requires Databricks SDK context that executors don't have — executors are separate Python processes with no SDK state. Use the Postgres role's password, pulled from a Databricks secret scope at driver startup and serialized into the sink instance. (Driver-side admin setup — `CREATE ROLE`, `CREATE DATABASE`, `GRANT` — is different; that runs on the driver where the SDK is available, so use an SDK-minted JWT for those.) See [connectivity.md](../../../skills/databricks-lakebase/references/connectivity.md) for the two auth methods. + +## Prerequisites + +- A Lakebase Autoscaling project, branch, and endpoint. Default database is `databricks_postgres`. +- The Lakebase instance must have **native Postgres password login enabled** at create time. Without it, `CREATE ROLE … PASSWORD` succeeds but the Postgres wire-protocol login from executors is rejected. +- A Postgres role with a password (created via driver-side admin connection), and the password stored in a Databricks secret scope. + +## The `LakebaseSink` class + +```python +import time + +import psycopg + + +# PySpark's `foreach()` accepts any object that implements the +# `open(partition_id, epoch_id) -> bool`, `process(row)`, `close(error)` +# lifecycle. There is no exported base class to extend — the +# `pyspark.sql.streaming.ForeachWriter` symbol is a Scala class, not a +# Python one. Duck-typing is the documented Python pattern. +class LakebaseSink: + """ + Streaming sink that upserts rows into a Lakebase Postgres table. + + Buffers in-memory and commits in time-based windows. Each process(row) + appends to the buffer and flushes only if at least max_dwell_ms have + passed since the last flush. Effective Lakebase TX/s ceiling per + partition is 1000 / max_dwell_ms; at higher source rates more rows + accumulate per flush. + + Uses INSERT ... ON CONFLICT for upsert semantics. One transaction per + flush. Authenticates with Postgres password — OAuth is incompatible + with executor-side custom sinks. + """ + + def __init__(self, host: str, database: str, user: str, password: str, + table: str, port: int = 5432, key_col: str = "id", + max_dwell_ms: int = 10): + self.host = host + self.database = database + self.user = user + self.password = password + self.table = table + self.port = port + self.key_col = key_col + self.max_dwell_ms = max_dwell_ms + + def open(self, partition_id, epoch_id): + self.conn = psycopg.connect( + host=self.host, port=self.port, dbname=self.database, + user=self.user, password=self.password, + sslmode="require", autocommit=False, + ) + self.buffer = [] + self.last_flush_ms = self._now_ms() + return True + + def process(self, row): + self.buffer.append(row) + if self._now_ms() - self.last_flush_ms >= self.max_dwell_ms: + self._flush() + + def close(self, error): + try: + if self.buffer: + self._flush() + finally: + if self.conn is not None: + self.conn.close() + self.conn = None + + def _flush(self): + if not self.buffer: + return + sample = self.buffer[0].asDict() + cols = list(sample.keys()) + placeholders = ", ".join(["%s"] * len(cols)) + col_list = ", ".join(f'"{c}"' for c in cols) + update_set = ", ".join( + f'"{c}" = EXCLUDED."{c}"' for c in cols if c != self.key_col + ) + sql = ( + f'INSERT INTO "{self.table}" ({col_list}) ' + f'VALUES ({placeholders}) ' + f'ON CONFLICT ("{self.key_col}") DO UPDATE SET {update_set}' + ) + try: + with self.conn.cursor() as cur: + rows = [tuple(r.asDict()[c] for c in cols) for r in self.buffer] + cur.executemany(sql, rows) + self.conn.commit() + except Exception: + self.conn.rollback() + raise + finally: + self.buffer.clear() + self.last_flush_ms = self._now_ms() + + @staticmethod + def _now_ms(): + return int(time.monotonic() * 1000) +``` + +## Wiring into a stream + +```python +# Set defensively at the top — required before any .repartition(N) in RTM +spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false") + +sink = LakebaseSink( + host=dbutils.secrets.get("lakebase", "host"), + database="databricks_postgres", + user=dbutils.secrets.get("lakebase", "user"), + password=dbutils.secrets.get("lakebase", "password"), + table="events", + key_col="id", + max_dwell_ms=10, +) + +stream = ( + spark.readStream.format("kafka") + .option("kafka.bootstrap.servers", brokers) + .option("subscribe", input_topic) + .load() + .selectExpr("CAST(value AS STRING) AS id", "timestamp") + .writeStream + .foreach(sink) + .outputMode("update") + .option("checkpointLocation", checkpoint_path) + .trigger(realTime="5 minutes") # checkpoint interval, not trigger interval + .start() +) +``` + +See [real-time-mode.md](real-time-mode.md) for the RTM cluster setup and the `outputMode("update")` requirement; see [kafka-streaming.md](kafka-streaming.md) for source-specific Kafka options. + +## Delivery semantics + +The `foreach` sink is **at-least-once** — see "Delivery semantics" in [real-time-mode.md](real-time-mode.md) for why and when duplicates occur. + +The supplied `LakebaseSink` handles this correctly because the upsert is idempotent — `INSERT … ON CONFLICT (id) DO UPDATE` produces the same final state regardless of how many times the same row arrives. If you customize the sink, **preserve idempotency**: keep the `ON CONFLICT` upsert (or use `MERGE`), never plain `INSERT`. If you swap in append-only inserts (event log style), include a deduplication key the consumer can use. + +## Tuning `max_dwell_ms` + +`max_dwell_ms = 10` is the default. It controls the minimum interval between Postgres commits. + +| `max_dwell_ms` | Latency to Postgres | Max sustained TX/s per partition | When to use | +|---|---|---|---| +| 1 | Lowest (~1 ms) | ~1000 | Tightest demos; cheap workloads only | +| 10 (default) | Low (~10 ms) | ~100 | Most realtime apps | +| 100 | Moderate | ~10 | High source rates where per-commit overhead matters more than per-row latency | +| 1000 | Visible (~1 s) | ~1 | Effectively micro-batch — only if Lakebase is the bottleneck | + +The effective Lakebase TX/s ceiling per partition is `1000 / max_dwell_ms`. With N source partitions writing to Lakebase, your cluster can drive up to `N × 1000 / max_dwell_ms` transactions per second. + +**Why time-based and not count-based?** A count-based buffer (flush every K rows) has unpredictable latency: at low source rates, K rows can take seconds to accumulate. Time-based dwell bounds worst-case latency directly. + +**Caveat:** the dwell check only fires on row arrival. If no rows arrive for a long stretch, the last buffered row stays in memory until either the next row arrives or `close()` is called. Fine for steady streams, less ideal for bursty sources with long quiet gaps. + +## Connection budget + +Each Lakebase CU has a max-connections cap (see the CU-to-connections table in [computes-and-scaling.md](../../../skills/databricks-lakebase/references/computes-and-scaling.md)). For a realtime app, total Postgres connections at peak = + +``` +(app's connection pool max × app replicas) + (streaming sink partitions) +``` + +The streaming sink opens one Postgres connection per partition writing to Lakebase. Both terms must fit under the CU's max-connections cap. At CU_1 the cap is ~209; at CU_4, ~839. + +## Target table + +Create the Lakebase table once, outside the stream: + +```sql +CREATE TABLE IF NOT EXISTS events ( + id BIGINT PRIMARY KEY, + timestamp TIMESTAMPTZ NOT NULL +); + +CREATE INDEX IF NOT EXISTS events_ts_idx ON events (timestamp DESC); +``` + +The primary key is what `ON CONFLICT` upserts against. The descending timestamp index supports the "give me the latest N" query a downstream app will run. Add business columns (metric, category, payload JSON) as your app needs. + +## Performance & longevity + +The sink performs well for moderate rates and short-lived or append-mostly workloads. The case that needs care is the one it's most often used for — **continuously upserting a small set of hot rows** to serve a live app — where dead-tuple churn can degrade it over a long run. + +### Dead-tuple bloat on hot-key upserts + +`INSERT … ON CONFLICT DO UPDATE` rewrites a row on every flush, and every UPDATE leaves a dead tuple behind. Against a handful of live rows updated several times a second, that's thousands of dead tuples a minute. Autovacuum reclaims them, but its defaults are tuned for large tables, not tiny hot ones (`autovacuum_naptime` is 60s; the scale factor waits for 20% of the table to change). Set aggressive **per-table** autovacuum so cleanup keeps pace: + +```sql +ALTER TABLE events SET ( + autovacuum_vacuum_scale_factor = 0, -- don't wait for 20% of the table + autovacuum_vacuum_threshold = 200, -- vacuum after N dead tuples + autovacuum_vacuum_cost_delay = 0 -- don't throttle on a hot table +); +``` + +Watch `n_dead_tup` and `pg_total_relation_size` in `pg_stat_user_tables` — a few-row table holding tens of MB is the symptom, and it slows both this sink's writes and the app's reads. + +### Do NOT put Change Data Feed on a high-churn upsert table + +Lakehouse Sync / CDF holds a logical **replication slot that pins the xmin horizon**; if the slot lags even slightly, autovacuum can reclaim *nothing* and the upsert table bloats without bound (and `REPLICA IDENTITY FULL`, which CDF requires, doubles the WAL per update). The result is unbounded growth that degrades the whole serving path. + +If you need the operational data in Delta, capture it from an **append-only event-log table**, never the upsert table: + +- Keep the serving (upsert) tables `REPLICA IDENTITY DEFAULT` and **out of CDF** — autovacuum then reclaims them freely. +- Have the sink *also* insert each row into an append-only `*_events` table and point CDF only at that. Inserts create no dead tuples, so a lagging slot can never bloat it. + +For Lakehouse Sync setup itself (CDF from Lakebase → Delta), see [lakehouse-sync.md](../../../skills/databricks-lakebase/references/lakehouse-sync.md). + +### Reconnect instead of failing the query + +`open()` holds one connection per partition for the epoch, and the default `_flush` re-raises on any error — so a transient disconnect (an autoscaling endpoint bounce, a CDF slot being created, a brief network blip) **kills the whole RTM query** and forces a restart (minutes of downtime plus checkpoint replay). Pull the connect into a helper and reconnect-then-skip instead of letting it propagate: + +```python +def _connect(self): + self.conn = psycopg.connect( + host=self.host, port=self.port, dbname=self.database, + user=self.user, password=self.password, + sslmode="require", autocommit=False, + ) + +# in _flush()'s except block: +except (psycopg.OperationalError, psycopg.InterfaceError): + try: + self.conn.close() + except Exception: + pass + self._connect() # one tick dropped; the stream survives +``` + +This trades one flush window of buffered rows on each transient blip for stream availability. The strict at-least-once alternative is to keep the default `raise` and let RTM restart from checkpoint — see "Delivery semantics" above. diff --git a/experimental/databricks-spark-structured-streaming/references/real-time-mode.md b/experimental/databricks-spark-structured-streaming/references/real-time-mode.md new file mode 100644 index 0000000..12febe7 --- /dev/null +++ b/experimental/databricks-spark-structured-streaming/references/real-time-mode.md @@ -0,0 +1,175 @@ +--- +name: real-time-mode +description: Real-Time Mode (RTM) for Spark Structured Streaming on Databricks — sub-second end-to-end latency. Use when building realtime apps, low-latency operational pipelines, or any streaming workload with SLAs measured in milliseconds rather than seconds. +--- + +# Real-Time Mode (RTM) + +RTM is a Structured Streaming execution mode that processes records continuously instead of in micro-batches. It is the only Spark Streaming surface that achieves sub-second end-to-end latency (as low as 5 ms). + +For broader streaming topics (checkpoints, watermarks, stream-stream joins, micro-batch tuning), see the other references in this skill. This file covers only what is RTM-specific. + +## Cluster setup + +RTM has hard cluster requirements. Get any of these wrong and the stream either won't start or won't be low-latency. + +| Setting | Required value | Notes | +|---|---|---| +| DBR | **16.4 LTS minimum, 18.x recommended** | 18.2+ specifically resolves a known latency floor with Python `transformWithState` at <5 rec/sec. | +| Compute type | **Classic compute** (Dedicated or Standard access mode) | Standard supports Python only. Serverless is NOT supported. RTM also exists as a configuration mode inside Lakeflow Spark Declarative Pipelines, with a different authoring API — out of scope here. | +| Autoscaling | **Off** | Streaming clusters must be fixed-size. | +| Photon | **Off** | Incompatible with RTM. | +| Spot instances | **Off** | Interruptions break the stream. | +| Spark conf | `spark.databricks.streaming.realTimeMode.enabled = true` | Required to enable RTM at all. Set in cluster Advanced Options → Spark config. | + +For latency-sensitive Python UDFs, use **Dedicated** access mode — Standard's security isolation adds overhead. + +## Trigger and output mode + +**Python:** +```python +.trigger(realTime="5 minutes") +.outputMode("update") +``` + +**Scala:** +```scala +import org.apache.spark.sql.execution.streaming.RealTimeTrigger +import org.apache.spark.sql.streaming.OutputMode + +.trigger(RealTimeTrigger.apply()) // or RealTimeTrigger.apply("5 minutes") +.outputMode(OutputMode.Update()) +``` + +Two things people get wrong: + +1. **The `"5 minutes"` is the checkpoint interval, not the trigger interval.** RTM is continuous; it doesn't "fire" on a schedule. The duration controls how often the engine pauses to checkpoint. Set it to **at least 5 minutes** — shorter intervals cause frequent multi-second pauses; longer intervals mean more potential reprocessing on restart. Inter-batch time should stay ≤3 seconds (≤1% of the batch duration) or P99 latency rises. +2. **Output mode must be `update`.** RTM does not support `append` or `complete`. + +## Slot math + +RTM runs **all pipeline stages simultaneously** (unlike micro-batch, which can reuse slots stage by stage). The cluster's **total worker vCPUs must be ≥ the sum of partitions across every stage** — slots and CPU cores are equivalent for RTM sizing. + +| Pipeline shape | Slots / vCPUs needed | +|---|---| +| Stateless: Kafka source (`maxPartitions=8`) → Kafka sink | 8 | +| + one shuffle (group by, dedup) with `spark.sql.shuffle.partitions=20` | 8 + 20 = 28 | +| + one explicit `.repartition(20)` | 8 + 20 + 20 = 48 | + +If `maxPartitions` is unset, the source partition count equals the Kafka topic's partition count. If under-sized, the query throws `[CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT]` and stalls or fails. + +## Supported sources and sinks + +| Source / sink | As source | As sink | +|---|---|---| +| Kafka | ✓ | ✓ | +| Event Hubs (via Kafka connector) | ✓ | ✓ | +| Kinesis (EFO mode only) | ✓ | ✗ | +| AWS MSK | ✓ | ✗ | +| Rate (demos) | ✓ | — | +| Delta | ✗ | ✗ | +| Auto Loader / `cloudFiles` | ✗ | — | +| Files / object storage directly | ✗ | — | +| Google Pub/Sub | ✗ | ✗ | +| Apache Pulsar | ✗ | ✗ | +| Custom sink via `foreach` (Python class or Scala `ForeachWriter`) | — | ✓ | +| `foreachBatch` | — | ✗ | + +**File-based sources (Auto Loader, direct file reads, Delta) are NOT supported in RTM.** They belong to micro-batch streaming. If your data lives in files and you need sub-second latency, ingest into Kafka / Event Hubs first. + +For the `foreach` sink to Lakebase Postgres, see [lakebase-sink-python.md](lakebase-sink-python.md). + +## Supported operations + +### Stateless (lower cost, lower latency) + +- Projections (`select`, `withColumn`), filters +- `union` of multiple streams +- **`repartition(N)`** — requires `spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")` set first; without it, repartition inserts a sort that destroys low-latency behavior with no warning. +- **Stream-static join** — broadcast-only. Wrap the static side in `broadcast(spark.read...)` and ensure it fits in memory. + +### Stateful (higher cost, requires more slots) + +- `dropDuplicates` for deduplication +- Simple aggregations: `groupBy(...).count()`, `sum`, `avg`, etc. +- `transformWithState` for custom state (see below) + +### Not supported in RTM + +- Watermark-based windowed aggregations +- Stream-stream joins +- `flatMapGroupsWithState` (older API) +- `foreachBatch` and `mapPartitions` +- Output modes `append` and `complete` + +## `transformWithState` behavior change + +This is the most important semantic difference vs micro-batch: + +**In RTM, `handleInputRows` is called once per row.** In micro-batch, it's called once per key per batch, with the iterator carrying all values for that key. + +If you write a `StatefulProcessor` assuming "I get a batch of rows for one key," that logic breaks in RTM. Each row arrives separately. + +Other RTM-specific rules: +- **Processing-time timers only.** Event-time timers are not supported. +- **No `transformWithStateInPandas`.** Use the row-based Python API. +- Timer firing is delayed by data arrival: a timer scheduled for 10:00:00 will not fire at 10:00:00 if no data arrives — it fires when the next row arrives. Termination paths fire pending timers before exit. +- DBR 18.1 and below show "up to a few hundred ms" latency with Python at <5 rec/sec. Use DBR 18.2 or later (or Scala) to avoid this. + +## Verifying and observing RTM + +### Confirm the query is actually in RTM + +A common mistake is wiring up the trigger correctly but landing on a code path that silently runs in micro-batch (e.g. a source that doesn't yet support RTM). Confirm by inspecting the streaming query's physical plan — the leaf nodes should be `RealTimeStreamScan` (or `RealTimeScanExec`): + +``` +== Physical Plan == +WriteToDataSourceV2 ++- * Project + +- RealTimeStreamScan ... +``` + +If you see `MicroBatchScan` instead, the query is not running in RTM — check that the source is supported and the cluster Spark conf is set. + +### Built-in latency metrics + +Every RTM batch emits three latency metrics in `StreamingQueryProgress` under the `latencies` field. Per-batch percentile distributions (P0, P50, P90, P95, P99): + +| Metric | What it measures | +|---|---| +| `processingLatencyMs` | Read-to-write inside the query — how long the pipeline takes to process a record | +| `sourceQueuingLatencyMs` | Source-append-time (e.g. Kafka log append) to first read by the query. Captures inter-batch time, message-bus queuing, upstream batching. | +| `e2eLatencyMs` | Source-append-time to processed downstream. End-to-end. | + +**Caveat: `e2eLatencyMs` does not currently include the sink write time.** If perceived latency is higher than `e2eLatencyMs` suggests, the gap is in the sink. + +Read these metrics via a `StreamingQueryListener` or by inspecting `query.lastProgress`. + +### Diagnose-by-metric + +| Symptom | Likely cause | Where to look | +|---|---|---| +| High `processingLatencyMs` | Slow operator inside the query | Per-stage metrics (set `spark.databricks.streaming.execution.enableDebugMetrics = true`); CPU profile | +| High `sourceQueuingLatencyMs` | Inter-batch time too long, or upstream source latency | Inter-batch time in driver logs; Kafka `kafka.fetch.max.wait.ms` (default 500 ms — drop to 50 for low latency); upstream batching | +| `e2eLatencyMs` looks fine but the app feels slow | Sink write time (not in `e2eLatencyMs`) | Measure sink flush duration directly | +| Latency climbing over time | Memory pressure or GC growing | Executor stdout for Full GC events (long `user` CPU times); cluster restart as immediate mitigation | + +## Delivery semantics + +RTM preserves Structured Streaming's standard fault-tolerance guarantees: + +- **Exactly-once within Spark.** Operators, state stores, and supported sinks are all exactly-once. +- **At-least-once for Kafka and `foreach` sinks.** This is the boundary case to watch — anywhere data leaves Spark via `foreach` (custom writers, side effects) or is written to Kafka, the same record may be delivered more than once on restart or task retry. Design custom sinks to be idempotent. + +## Common error classes + +| Error | Cause / fix | +|---|---| +| `[CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT]` | Cluster has fewer vCPUs than the pipeline's sum-of-partitions. Increase cluster size to match the slot-math table above. | +| `[STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED]` | RTM only supports `outputMode("update")`. Replace `append` or `complete`. | +| `[STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST]` | The query uses an operator or sink that RTM doesn't support (e.g. `foreachBatch`, watermarked windows, stream-stream join). Refactor to a supported equivalent. | +| `[STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED]` | The source isn't supported in RTM (e.g. Delta, Auto Loader, Pub/Sub). No override — ingest via Kafka/Event Hubs/Kinesis-EFO. | + +## Worker memory and GC + +RTM executors process data continuously; the driver is only active at batch boundaries. **GC pauses on executors disrupt processing and show up as latency spikes** — more so than driver-side GC. For stateful pipelines (`transformWithState`, `dropDuplicates`, in-stream aggregation), plan worker memory with headroom above the state store's working size. Watch executor GC logs in `stdout` — long Full GC events (multi-second `user` CPU times) indicate undersized memory. diff --git a/experimental/databricks-spark-structured-streaming/references/trigger-and-cost-optimization.md b/experimental/databricks-spark-structured-streaming/references/trigger-and-cost-optimization.md index 92ba4cb..c09130a 100644 --- a/experimental/databricks-spark-structured-streaming/references/trigger-and-cost-optimization.md +++ b/experimental/databricks-spark-structured-streaming/references/trigger-and-cost-optimization.md @@ -56,25 +56,32 @@ Process all available data, then stop: ### Real-Time Mode (RTM) -Sub-second latency with Photon: +Sub-second end-to-end latency (as low as 5 ms) — for deep RTM coverage see [real-time-mode.md](real-time-mode.md): ```python -# Real-Time Mode (Databricks 13.3+) -.trigger(realTime=True) +# Real-Time Mode +# PySpark requires specifying the checkpoint interval; "5 minutes" is the +# checkpoint cadence, NOT the trigger interval (RTM runs continuously between +# checkpoints). +.trigger(realTime="5 minutes") # Requirements: -# - Photon enabled -# - Fixed-size cluster (no autoscaling) -# - Latency: < 800ms - -# Cost: Continuous cluster with Photon +# - DBR 16.4 LTS or later (18.x recommended) +# - Classic compute (Dedicated or Standard access mode); NOT Serverless +# - Fixed-size cluster (autoscaling OFF) +# - Photon OFF (incompatible with RTM) +# - Spot instances OFF (interruptions break the stream) +# - spark.databricks.streaming.realTimeMode.enabled = true (cluster Spark config) +# - outputMode("update") — RTM only supports update mode + +# Cost: Continuous cluster (Photon disabled) ``` ## Trigger Selection Guide | Latency Requirement | Trigger | Cost | Use Case | |---------------------|---------|------|----------| -| < 800ms | RTM | $$$ | Real-time analytics, alerts | +| Sub-second (as low as 5ms) | RTM | $$$ | Real-time analytics, alerts, operational apps | | 1-30 seconds | processingTime | $$ | Near real-time dashboards | | 15-60 minutes | availableNow (scheduled) | $ | Batch-style SLA | | > 1 hour | availableNow (scheduled) | $ | ETL pipelines | @@ -110,7 +117,7 @@ trigger = sla / 3 # 5 minutes .trigger(processingTime="5 minutes") # Example 3: Real-time requirement -.trigger(realTime=True) # < 800ms +.trigger(realTime="5 minutes") # sub-second E2E latency; "5 minutes" = checkpoint cadence ``` ## Cost Optimization Strategies @@ -276,23 +283,18 @@ def start_all_streams(): ### Pattern 3: RTM for Sub-Second Latency -Use RTM for real-time requirements: +Use RTM for real-time requirements. See [real-time-mode.md](real-time-mode.md) for the deep treatment. ```python -# Real-Time Mode for sub-second latency +# Real-Time Mode — sub-second E2E latency (as low as 5 ms) df.writeStream \ - .format("kafka") - .option("topic", "output") - .trigger(realTime=True) \ + .format("kafka") \ + .option("topic", "output") \ + .outputMode("update") \ + .trigger(realTime="5 minutes") \ .start() -# Required configurations: -spark.conf.set("spark.databricks.photon.enabled", "true") -spark.conf.set("spark.sql.streaming.stateStore.providerClass", - "com.databricks.sql.streaming.state.RocksDBStateProvider") - -# Latency: < 800ms -# Cost: Continuous cluster with Photon +# Cost: Continuous cluster (Photon disabled) ``` ## Real-Time Mode (RTM) Configuration @@ -300,18 +302,20 @@ spark.conf.set("spark.sql.streaming.stateStore.providerClass", ### Enable RTM ```python -# Enable Real-Time Mode -.trigger(realTime=True) +# Streaming code +.outputMode("update") +.trigger(realTime="5 minutes") # checkpoint interval, not trigger interval -# Required configurations: -spark.conf.set("spark.databricks.photon.enabled", "true") -spark.conf.set("spark.sql.streaming.stateStore.providerClass", - "com.databricks.sql.streaming.state.RocksDBStateProvider") +# Cluster Spark config (cluster Advanced Options → Spark config) +spark.databricks.streaming.realTimeMode.enabled = true # Cluster requirements: -# - Fixed-size cluster (no autoscaling) -# - Photon enabled -# - Driver: Minimum 4 cores +# - DBR 16.4 LTS or later (18.x recommended) +# - Classic compute (Dedicated or Standard access mode); NOT Serverless +# - Fixed-size cluster (autoscaling OFF) +# - Photon OFF (incompatible with RTM) +# - Spot instances OFF +# - Total worker vCPUs ≥ sum of partitions across all pipeline stages ``` ### RTM Use Cases @@ -422,10 +426,10 @@ job_tags = { ```python # Highest cost, lowest latency -.trigger(realTime=True) +.trigger(realTime="5 minutes") # "5 minutes" = checkpoint interval -# Cost: Continuous cluster with Photon -# Latency: < 800ms +# Cost: Continuous cluster (Photon disabled) +# Latency: Sub-second E2E (as low as 5 ms) # Use when: Sub-second latency required ``` @@ -436,7 +440,7 @@ job_tags = { | **High latency** | Trigger interval too long | Decrease trigger interval or use RTM | | **High cost** | Continuous processing | Use scheduled (availableNow) | | **Batch duration > trigger** | Processing too slow | Optimize processing or increase trigger | -| **RTM not working** | Photon not enabled | Enable Photon and configure cluster | +| **RTM not working** | Cluster misconfigured | Verify: DBR 16.4 LTS+, Classic compute, autoscaling/Photon/spot OFF, `spark.databricks.streaming.realTimeMode.enabled = true`. See [real-time-mode.md](real-time-mode.md). | ## Quick Wins diff --git a/manifest.json b/manifest.json index 58d1c69..7af75b9 100644 --- a/manifest.json +++ b/manifest.json @@ -413,8 +413,10 @@ "assets/databricks.svg", "references/checkpoint-best-practices.md", "references/kafka-streaming.md", + "references/lakebase-sink-python.md", "references/merge-operations.md", "references/multi-sink-writes.md", + "references/real-time-mode.md", "references/stateful-operations.md", "references/stream-static-joins.md", "references/stream-stream-joins.md",