From 549e171a4b454bd0a0e6edee7519a728b1ab45fa Mon Sep 17 00:00:00 2001 From: ryankert01 Date: Sun, 28 Jun 2026 03:36:45 +0800 Subject: [PATCH 1/2] feat(connectors): retry transient Doris Stream Load failures in-request The Doris sink classified transient Stream Load outcomes (5xx/408/429, transport errors, Publish Timeout) as retryable but never acted on them: the runtime commits the consumer offset at poll time before consume() runs and discards its return value, so a transient backend blip silently dropped the batch under at-most-once delivery. consume() now retries a transiently-failed batch in-request, re-PUTing under the same deterministic label so Doris dedupes a prior attempt that actually landed (e.g. a 2xx whose body could not be read). Permanent failures are never retried. Backoff and jitter come from iggy_connector_sdk::retry, bounded by new max_retries/retry_delay/max_retry_delay config (defaults 3/200ms/5s). This shrinks the at-most-once window within a single poll; cross-poll and crash delivery remain a runtime concern, not something a sink can fix. Relates to #3215. --- .../example_config/connectors/doris_sink.toml | 7 + core/connectors/sinks/doris_sink/README.md | 18 +- core/connectors/sinks/doris_sink/config.toml | 7 + core/connectors/sinks/doris_sink/src/lib.rs | 292 ++++++++++++++++-- 4 files changed, 285 insertions(+), 39 deletions(-) diff --git a/core/connectors/runtime/example_config/connectors/doris_sink.toml b/core/connectors/runtime/example_config/connectors/doris_sink.toml index 656b0c2b4f..20f4e7663e 100644 --- a/core/connectors/runtime/example_config/connectors/doris_sink.toml +++ b/core/connectors/runtime/example_config/connectors/doris_sink.toml @@ -44,6 +44,13 @@ timeout = "30s" # TCP connect timeout (default "5s"). Independent of timeout; raise it for # cross-region or cold-start FEs that are slow to accept the connection. # connect_timeout = "5s" +# In-request retry for transient Stream Load failures (5xx/408/429, transport +# error, Doris "Publish Timeout"). Retries re-PUT under the same label, which +# Doris dedupes. max_retries is the total attempt count (1 disables retries); +# keep max_retries * max_retry_delay inside Doris's label_keep_max_second. +# max_retries = 3 +# retry_delay = "200ms" +# max_retry_delay = "5s" # Stream Load redirect security. Doris's FE redirects (307) to a BE on another # host; credentials are re-attached across that hop. By default a redirect that # downgrades https -> http is refused (it would leak credentials in cleartext). diff --git a/core/connectors/sinks/doris_sink/README.md b/core/connectors/sinks/doris_sink/README.md index 6da9937be0..b949a50b8e 100644 --- a/core/connectors/sinks/doris_sink/README.md +++ b/core/connectors/sinks/doris_sink/README.md @@ -15,15 +15,16 @@ The Doris sink connector consumes JSON messages from Iggy streams and writes the 2. It computes a deterministic Stream Load `label` of the form `{label_prefix}-{stream_san}-{topic_san}-{hash16}-{partition}-{first_offset}-{last_offset}`. - `hash16` is a single 64-bit blake3 hash computed over the *raw* (un-sanitized), length-prefixed `(label_prefix, stream, topic)` triple. So identities that sanitize to the same string get distinct labels — whether the collision is in the names (`events.v1` vs `events_v1`) or in two tenants' prefixes that truncate alike (`prod_events_us_east_1` vs `..._2`) — and no boundary-shift aliasing is possible (`("ab","c")` ≠ `("a","bc")`). - The total label is bounded under Doris's 128-char cap regardless of input length (worst case 120 chars). - - Doris dedupes loads by label inside its `label_keep_max_second` window. The deterministic label is **forward-compatible scaffolding**: if the runtime ever gains retry/redrive, a duplicate load would be absorbed, not doubled. **Today it protects no production scenario** — there is no retry loop, `consume()` runs once per poll, and the runtime discards its return value. Delivery is at-most-once: the offset is committed before `consume()` runs, so a failed load is never replayed. + - Doris dedupes loads by label inside its `label_keep_max_second` window. The in-request retry (step 6) re-PUTs a transiently-failed batch under the same label, so a prior attempt that actually landed (e.g. a `2xx` whose body we couldn't read) is absorbed, not doubled. This protects **in-request retry only**: the runtime commits the offset before `consume()` runs and discards its return, so a failure outliving the retry budget or a crash mid-load is **at-most-once**. 3. It `PUT`s the batch to `{fe_url}/api/{database}/{table}/_stream_load` with HTTP Basic auth and the headers `Expect: 100-continue`, `format: json`, `strip_outer_array: true`, `label: