skills(experimental): Real-Time Mode bug fixes + two new references#157
skills(experimental): Real-Time Mode bug fixes + two new references#157DanJKeeling wants to merge 1 commit into
Conversation
Fixes incorrect RTM facts in two existing references and adds two new references covering RTM essentials and the Lakebase foreach sink — the canonical sink for a realtime app whose serving layer is Lakebase. Bug fixes (experimental/databricks-spark-structured-streaming/references/): - trigger-and-cost-optimization.md: Real-Time Mode section claimed .trigger(realTime=True) (boolean form doesn't exist; PySpark requires the duration), "Photon enabled" (actually Photon must be OFF for RTM), "Databricks 13.3+" (current minimum is DBR 16.4 LTS), and "Latency: < 800ms" (current docs cite "as low as 5 ms"). Five occurrences of these claims across the file are corrected, including the Trigger Selection Guide table and the "RTM not working" diagnostic row. - kafka-streaming.md: Patterns 4-6 used .trigger(realTime=True) (boolean) and Patterns 5-6 combined foreachBatch + RTM, directly contradicting the same file's Pattern 3 correct note that "forEachBatch is NOT supported in RTM". Switched those patterns to processingTime triggers with explanatory comments. Updated the "RTM enabled only if latency < 800ms required" Production Checklist row and the corresponding Common Issues row. New references: - real-time-mode.md (175 lines): cluster setup constraints (DBR 16.4 LTS+, Classic compute, autoscaling/Photon/spot OFF, the realTimeMode.enabled flag), slot math (worker vCPUs >= sum of partitions across stages), supported sources/sinks matrix, transformWithState per-row behavior (vs micro-batch per-key-per-batch), observability (RealTimeStreamScan physical-plan check; the three built-in latency metrics with the sink-write caveat; diagnose-by-metric decision tree), delivery semantics (exactly-once within Spark, at-least-once for Kafka and foreach), error class table. - lakebase-sink-python.md (205 lines): a duck-typed Python foreach sink that writes streaming records into Lakebase Postgres. Cross-links to skills/databricks-lakebase/references/connectivity.md for general auth methods and computes-and-scaling.md for the CU-to-connections cap; adds the streaming-specific rationale for native-password (not OAuth) on executors. Documents the LakebaseSink class with time-based commits (max_dwell_ms), INSERT ... ON CONFLICT idempotent upserts, and the connection-budget formula tying app-pool size + executor connections to the CU cap. SKILL.md: two new nav rows in the Core Patterns table. manifest.json: regenerated by scripts/skills.py generate to include the two new reference files. Signed-off-by: Dan Keeling <dan.keeling@databricks.com>
peterst28
left a comment
There was a problem hiding this comment.
Fantastic improvement for realtime mode. The existing skills for realtime are anemic and often just plain wrong. My comments are mostly nitpicks and suggestions for future improvements.
| @@ -171,12 +171,14 @@ query = (enriched_df | |||
| Enrich events with dimension data: | |||
There was a problem hiding this comment.
Shouldn't we recommend RTM for kafka->kafka streams? I think it's better than microbatch for this case.
| parsed_df.writeStream \ | ||
| .foreachBatch(route_events) \ | ||
| .trigger(realTime=True) \ | ||
| .foreachBatch(route_events) \ # foreachBatch is NOT supported in RTM — use processingTime |
There was a problem hiding this comment.
Comment is probably not necessary here, although it's useful for the reviewer. :)
| source_df.writeStream \ | ||
| .foreachBatch(validate_and_route) \ | ||
| .trigger(realTime=True) \ | ||
| .foreachBatch(validate_and_route) \ # foreachBatch is NOT supported in RTM — use processingTime |
| | maxOffsetsPerTrigger | 10,000-100,000 | Balance latency vs throughput | | ||
| | trigger interval | Business SLA / 3 | Recovery time buffer | | ||
| | RTM | Only if < 800ms required | Microbatch more cost-effective | | ||
| | RTM | When sub-second E2E latency is required | Micro-batch is more cost-effective for second-or-longer SLAs | |
| - [ ] Unique checkpoint per pipeline | ||
| - [ ] Fixed-size cluster (no autoscaling for streaming/RTM) | ||
| - [ ] RTM enabled only if latency < 800ms required | ||
| - [ ] RTM enabled only when sub-second E2E latency is required |
| | Metric | What it measures | | ||
| |---|---| | ||
| | `processingLatencyMs` | Read-to-write inside the query — how long the pipeline takes to process a record | | ||
| | `sourceQueuingLatencyMs` | Source-append-time (e.g. Kafka log append) to first read by the query. Captures inter-batch time, message-bus queuing, upstream batching. | |
There was a problem hiding this comment.
Caution: This number and e2eLatencyMs will be long if you're processing records that have been sitting in Kafka. This is an easy mistake to make, I think.
|
|
||
| **Caveat: `e2eLatencyMs` does not currently include the sink write time.** If perceived latency is higher than `e2eLatencyMs` suggests, the gap is in the sink. | ||
|
|
||
| Read these metrics via a `StreamingQueryListener` or by inspecting `query.lastProgress`. |
There was a problem hiding this comment.
Users can see it in the UI. Worth mentioning?
| | High `processingLatencyMs` | Slow operator inside the query | Per-stage metrics (set `spark.databricks.streaming.execution.enableDebugMetrics = true`); CPU profile | | ||
| | High `sourceQueuingLatencyMs` | Inter-batch time too long, or upstream source latency | Inter-batch time in driver logs; Kafka `kafka.fetch.max.wait.ms` (default 500 ms — drop to 50 for low latency); upstream batching | | ||
| | `e2eLatencyMs` looks fine but the app feels slow | Sink write time (not in `e2eLatencyMs`) | Measure sink flush duration directly | | ||
| | Latency climbing over time | Memory pressure or GC growing | Executor stdout for Full GC events (long `user` CPU times); cluster restart as immediate mitigation | |
There was a problem hiding this comment.
Oh boy. This is a complicated topic. We did a whole talk on it!
| RTM preserves Structured Streaming's standard fault-tolerance guarantees: | ||
|
|
||
| - **Exactly-once within Spark.** Operators, state stores, and supported sinks are all exactly-once. | ||
| - **At-least-once for Kafka and `foreach` sinks.** This is the boundary case to watch — anywhere data leaves Spark via `foreach` (custom writers, side effects) or is written to Kafka, the same record may be delivered more than once on restart or task retry. Design custom sinks to be idempotent. |
There was a problem hiding this comment.
Kafka is at least once? Is that documented somewhere?
|
|
||
| ## Performance & longevity | ||
|
|
||
| The sink performs well for moderate rates and short-lived or append-mostly workloads. The case that needs care is the one it's most often used for — **continuously upserting a small set of hot rows** to serve a live app — where dead-tuple churn can degrade it over a long run. |
There was a problem hiding this comment.
Do you want to discuss the upsert approach that uses Lakebase as a state store, or do you consider that bad practice?
Summary
Fixes incorrect RTM facts in two existing references and adds two new references covering RTM essentials and the Lakebase
foreachsink — the canonical sink shape for a realtime app whose serving layer is Lakebase.Scope:
experimental/databricks-spark-structured-streaming/only. No changes toskills/.Bug fixes (existing references)
references/trigger-and-cost-optimization.mdThe RTM coverage made several claims that don't match current docs and wouldn't run as written:
.trigger(realTime=True)— boolean form doesn't exist in PySpark; the duration string is requiredFive instances of these claims across the file are corrected, including the Trigger Selection Guide table, the "Real-Time Mode (RTM) Configuration" section, the cost-strategy block, and the "RTM not working" Common Issues row.
references/kafka-streaming.mdThree patterns were broken as written:
.trigger(realTime=True)(boolean) AND.join(user_dim, ...)withoutbroadcast()— RTM only supports broadcast stream-static joins, so it wouldn't have run under RTM. Fixed to wrap withbroadcast(), addoutputMode("update"), use the correctrealTime="5 minutes"form, and importbroadcastin the pattern's code block.foreachBatch(...)combined with.trigger(realTime=True), directly contradicting the same file's Pattern 3 correct note that "forEachBatch is NOT supported in RTM". Switched both to.trigger(processingTime="30 seconds")(consistent with the file's Quick Start) with an inline comment on theforeachBatchline.Also fixed: Performance Tuning table row, Common Issues "High latency" row, and Production Checklist row that repeated the
< 800msframing.New references
references/real-time-mode.md(175 lines)The must-know RTM facts in one place:
spark.databricks.streaming.realTimeMode.enabledflag, Dedicated mode for UDFs)[CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT]error)"5 minutes"is checkpoint interval, not trigger interval)transformWithStateper-row behavior (vs micro-batch per-key-per-batch) — the most commonly-missed RTM semantic differenceRealTimeStreamScan; the three built-in latency metrics with the sink-write caveat; diagnose-by-metric decision tree)foreach)references/lakebase-sink-python.md(255 lines)A duck-typed Python
foreachsink that writes streaming records into Lakebase Postgres — the canonical sink shape for an RTM realtime app whose serving layer is Lakebase. Cross-links to the stabledatabricks-lakebaseskill for general Lakebase mechanics (auth methods, CU-to-connections cap, scaling) rather than restating them:foreachBatch + df.write.jdbc()ForeachWriterPython base class — that symbol is Scala-only), and native Postgres password auth (not OAuth) on executorsLakebaseSinkclass with time-based commits (max_dwell_ms),INSERT … ON CONFLICTidempotent upserts, per-partition lifecyclereal-time-mode.md),max_dwell_mstuning, connection budget formula tying app-pool size + executor connections to the CU capSKILL.md
Two new nav rows in the Core Patterns table pointing at the new references.
Manifest
manifest.jsonregenerated bypython3 scripts/skills.py generateto include the two new reference files. Only thedatabricks-spark-structured-streamingskill's file list changes.Validation
python3 scripts/skills.py validate→ "Everything is up to date."Orthogonality
stream-static-joins.md(general broadcast pattern stays there; RTM file states the RTM-specific constraint only)stateful-operations.md(general watermarks/state stores stay there; mytransformWithStatecontent is RTM-specific per-row behavior)streaming-best-practices.md(general checklist stays there; my RTM file states stricter hard requirements)multi-sink-writes.md(that'sforeachBatch; my file isforeach— explicitly contrasts)skills/databricks-lakebase/references/connectivity.md,computes-and-scaling.md, andlakehouse-sync.mdrather than restating their content