diff --git a/core/connectors/runtime/example_config/connectors/doris_sink.toml b/core/connectors/runtime/example_config/connectors/doris_sink.toml index 656b0c2b4f..4f7118068e 100644 --- a/core/connectors/runtime/example_config/connectors/doris_sink.toml +++ b/core/connectors/runtime/example_config/connectors/doris_sink.toml @@ -41,9 +41,21 @@ password = "replace_with_secret" label_prefix = "iggy" batch_size = 1000 timeout = "30s" +# Output format: "json" (default) or "csv". CSV is opt-in for throughput and is +# positional, so it REQUIRES `columns` to pin the column order (JSON is +# name-mapped). open() fails if output_format="csv" without columns. +# output_format = "json" +# columns = "id, name, count, amount, active, timestamp" # TCP connect timeout (default "5s"). Independent of timeout; raise it for # cross-region or cold-start FEs that are slow to accept the connection. # connect_timeout = "5s" +# In-request retry for transient Stream Load failures (5xx/408/429, transport +# error, Doris "Publish Timeout"). Retries re-PUT under the same label, which +# Doris dedupes. max_retries is the total attempt count (1 disables retries); +# keep max_retries * max_retry_delay inside Doris's label_keep_max_second. +# max_retries = 3 +# retry_delay = "200ms" +# max_retry_delay = "5s" # Stream Load redirect security. Doris's FE redirects (307) to a BE on another # host; credentials are re-attached across that hop. By default a redirect that # downgrades https -> http is refused (it would leak credentials in cleartext). diff --git a/core/connectors/sinks/doris_sink/README.md b/core/connectors/sinks/doris_sink/README.md index 6da9937be0..2689fa97c4 100644 --- a/core/connectors/sinks/doris_sink/README.md +++ b/core/connectors/sinks/doris_sink/README.md @@ -11,19 +11,20 @@ The Doris sink connector consumes JSON messages from Iggy streams and writes the ## How it works -1. For each batch of messages, the connector serializes the JSON payloads into a JSON array. +1. For each batch of messages, the connector serializes the JSON payloads into the configured output format: a JSON array by default, or CSV when `output_format = "csv"` (see Configuration). 2. It computes a deterministic Stream Load `label` of the form `{label_prefix}-{stream_san}-{topic_san}-{hash16}-{partition}-{first_offset}-{last_offset}`. - `hash16` is a single 64-bit blake3 hash computed over the *raw* (un-sanitized), length-prefixed `(label_prefix, stream, topic)` triple. So identities that sanitize to the same string get distinct labels — whether the collision is in the names (`events.v1` vs `events_v1`) or in two tenants' prefixes that truncate alike (`prod_events_us_east_1` vs `..._2`) — and no boundary-shift aliasing is possible (`("ab","c")` ≠ `("a","bc")`). - The total label is bounded under Doris's 128-char cap regardless of input length (worst case 120 chars). - - Doris dedupes loads by label inside its `label_keep_max_second` window. The deterministic label is **forward-compatible scaffolding**: if the runtime ever gains retry/redrive, a duplicate load would be absorbed, not doubled. **Today it protects no production scenario** — there is no retry loop, `consume()` runs once per poll, and the runtime discards its return value. Delivery is at-most-once: the offset is committed before `consume()` runs, so a failed load is never replayed. -3. It `PUT`s the batch to `{fe_url}/api/{database}/{table}/_stream_load` with HTTP Basic auth and the headers `Expect: 100-continue`, `format: json`, `strip_outer_array: true`, `label: