Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ timeout = "30s"
# TCP connect timeout (default "5s"). Independent of timeout; raise it for
# cross-region or cold-start FEs that are slow to accept the connection.
# connect_timeout = "5s"
# In-request retry for transient Stream Load failures (5xx/408/429, transport
# error, Doris "Publish Timeout"). Retries re-PUT under the same label, which
# Doris dedupes. max_retries is the total attempt count (1 disables retries);
# keep max_retries * max_retry_delay inside Doris's label_keep_max_second.
# max_retries = 3
# retry_delay = "200ms"
# max_retry_delay = "5s"
# Stream Load redirect security. Doris's FE redirects (307) to a BE on another
# host; credentials are re-attached across that hop. By default a redirect that
# downgrades https -> http is refused (it would leak credentials in cleartext).
Expand Down
18 changes: 11 additions & 7 deletions core/connectors/sinks/doris_sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ The Doris sink connector consumes JSON messages from Iggy streams and writes the
2. It computes a deterministic Stream Load `label` of the form `{label_prefix}-{stream_san}-{topic_san}-{hash16}-{partition}-{first_offset}-{last_offset}`.
- `hash16` is a single 64-bit blake3 hash computed over the *raw* (un-sanitized), length-prefixed `(label_prefix, stream, topic)` triple. So identities that sanitize to the same string get distinct labels — whether the collision is in the names (`events.v1` vs `events_v1`) or in two tenants' prefixes that truncate alike (`prod_events_us_east_1` vs `..._2`) — and no boundary-shift aliasing is possible (`("ab","c")` ≠ `("a","bc")`).
- The total label is bounded under Doris's 128-char cap regardless of input length (worst case 120 chars).
- Doris dedupes loads by label inside its `label_keep_max_second` window. The deterministic label is **forward-compatible scaffolding**: if the runtime ever gains retry/redrive, a duplicate load would be absorbed, not doubled. **Today it protects no production scenario** — there is no retry loop, `consume()` runs once per poll, and the runtime discards its return value. Delivery is at-most-once: the offset is committed before `consume()` runs, so a failed load is never replayed.
- Doris dedupes loads by label inside its `label_keep_max_second` window. The in-request retry (step 6) re-PUTs a transiently-failed batch under the same label, so a prior attempt that actually landed (e.g. a `2xx` whose body we couldn't read) is absorbed, not doubled. This protects **in-request retry only**: the runtime commits the offset before `consume()` runs and discards its return, so a failure outliving the retry budget or a crash mid-load is **at-most-once**.
3. It `PUT`s the batch to `{fe_url}/api/{database}/{table}/_stream_load` with HTTP Basic auth and the headers `Expect: 100-continue`, `format: json`, `strip_outer_array: true`, `label: <label>`. (`Expect: 100-continue` is required by Doris's Stream Load endpoint, which rejects PUTs that omit it. Where the HTTP stack negotiates the handshake it also lets Doris reject auth/4xx before the body uploads — a secondary benefit, not relied on for correctness.)
4. The Doris frontend (FE) responds with a `307 Temporary Redirect` to a backend (BE). The connector follows the redirect manually so that the `Authorization` header is preserved across the hop (`reqwest`'s default policy strips it on cross-host redirects).
`308 Permanent Redirect` is also followed as a defensive measure; redirects beyond a hard cap of 5 (or a redirect with no usable `Location`) are rejected as a permanent `PermanentHttpError`, since retrying a malformed/looping redirect cannot help.
5. The HTTP body is parsed as JSON and the `Status` field decides the outcome:
- `Success` → batch accepted.
- `Label Already Exists` → idempotent replay, treated as success.
- `Publish Timeout` or HTTP `5xx`/`408`/`429` → classified as a transient error (`Error::CannotStoreData`) — retryable in principle, but per the at-most-once note above the runtime does not currently act on it.
- `Fail`, any other `4xx`, or an unparsable response body → permanent error (`Error::PermanentHttpError`); retrying would not help even if the runtime did redrive.
- `Publish Timeout` or HTTP `5xx`/`408`/`429` → transient error (`Error::CannotStoreData`): retried in-request up to `max_retries` attempts (exponential backoff + jitter) under the same label before being surfaced.
- `Fail`, any other `4xx`, or an unparsable response body → permanent error (`Error::PermanentHttpError`); never retried — re-PUTing bad data would just hammer the FE.
6. A *transient* failure (the classifications above, plus a transport-level error) is retried in-request: the same batch is re-`PUT` under the same label, up to `max_retries` attempts with backoff and ±20% jitter (`iggy_connector_sdk::retry`). Since the runtime commits the offset at poll time, this is the connector's only redelivery path; once the budget is exhausted the worst error is surfaced and the batch is not retried again — **at-most-once** across polls.

## Configuration

Expand All @@ -38,6 +39,9 @@ The Doris sink connector consumes JSON messages from Iggy streams and writes the
| `batch_size` | no | `1000` | Maximum number of messages per Stream Load request. |
| `timeout` | no | `30s` | Per-request HTTP timeout (total request budget), as a human-readable duration (e.g. `30s`, `1m`). |
| `connect_timeout` | no | `5s` | TCP connect timeout, independent of `timeout`, as a human-readable duration. Raise it for cross-region or cold-start FEs. |
| `max_retries` | no | `3` | Total Stream Load attempts per batch on a *transient* failure (`1` disables retries). Each retry re-PUTs under the same label, which Doris dedupes. |
| `retry_delay` | no | `200ms` | Base backoff before the first retry; doubles each attempt up to `max_retry_delay`, with ±20% jitter. |
| `max_retry_delay` | no | `5s` | Upper bound on a single retry backoff. Keep `max_retries × max_retry_delay` inside `label_keep_max_second`. |
| `max_filter_ratio` | no | unset | Forwarded as the `max_filter_ratio` Stream Load header. Must be a finite value in `[0.0, 1.0]`; an out-of-range value fails `open()`. |
| `columns` | no | unset | Forwarded as the `columns` Stream Load header. Validated at startup; an invalid value fails `open()`. |
| `where` | no | unset | Forwarded as the `where` Stream Load header. Validated at startup; an invalid value fails `open()`. |
Expand Down Expand Up @@ -84,16 +88,16 @@ timeout = "30s"

## Operational guidance

- **`label_keep_max_second`.** Idempotent replay relies on Doris retaining each label for at least as long as it could take the Iggy runtime to redrive a failed batch. The Doris default is 3 days, which is conservative. If you set this lower on the Doris side, make sure your runtime retry budget fits inside the window — once a label expires, a replay re-loads instead of deduping, producing duplicate rows.
- **`label_keep_max_second`.** The connector's in-request retry re-PUTs a transiently-failed batch under the same label, so Doris must retain that label for at least the connector's full retry budget for the replay to dedupe. The Doris default is 3 days, which is conservative. If you set this lower on the Doris side, make sure `max_retries × max_retry_delay` fits inside the window — once a label expires, a retry re-loads instead of deduping, producing duplicate rows.
- **Keep `batch_size` stable across a redrive.** The label includes the chunk's `first_offset` and `last_offset`, which are a function of `batch_size`. If you change `batch_size` between a failed load and its redrive, the chunk boundaries shift, the offsets differ, and the new label no longer matches the old one — so Doris re-loads instead of deduping, producing duplicate rows.
- **Filtered-row alerts.** When Doris reports `number_filtered_rows > 0`, the connector emits a `warn!`. This is your signal that upstream message shapes have drifted from the table schema; alert on it.
- **Multi-chunk batches are best-effort for operational failures.** A poll larger than `batch_size` is split into chunks, each loaded as its own labelled Stream Load. If a chunk fails *operationally* (serialize, HTTP, or status-classification error), the connector still attempts the remaining chunks and then returns the worst error — it does **not** stop at the first such failure.
The runtime commits the consumer offset for the whole poll before `consume()` runs, so a failed chunk is not replayed regardless; pushing the other chunks through maximizes delivered data, and the worst error is surfaced at the end (logged at `error!` for observability — the runtime currently discards `consume()`'s return value, so there is no retry or DLQ).
- **Multi-chunk batches are best-effort for operational failures.** A poll larger than `batch_size` is split into chunks, each loaded as its own labelled Stream Load (with its own in-request retry budget for transient failures). If a chunk still fails after its retries (serialize, HTTP, or status-classification error), the connector keeps the worst error, attempts the remaining chunks, and returns that error at the end — it does **not** stop at the first such failure.
The runtime commits the consumer offset for the whole poll before `consume()` runs, so a chunk that exhausts its in-request retries is not replayed across polls; pushing the other chunks through maximizes delivered data, and the worst error is surfaced at the end (logged at `error!` for observability — the runtime currently discards `consume()`'s return value, so there is no cross-poll redrive or DLQ).
The one deliberate exception is a **non-JSON payload**, which is treated as a schema-contract violation and aborts the whole poll immediately (see the Requirements note above). Under `schema = "json"` this is unreachable, so it is a defensive guard rather than a normal path.

## Limitations

- JSON payload only. CSV and raw-text payloads are not supported yet.
- HTTP Basic auth only.
- No automatic table creation.
- No built-in retry middleware or circuit breaker — the runtime decides whether to redrive a failing batch. A hardening pass with `iggy_connector_sdk::retry::*` is planned as a follow-up.
- In-request retry only. Transient backend failures are retried within a single `consume()` call (step 6), but the runtime commits the consumer offset at poll time and discards `consume()`'s return value, so there is no cross-poll redrive or DLQ — delivery is at-most-once under a crash or a failure that outlives the retry budget.
7 changes: 7 additions & 0 deletions core/connectors/sinks/doris_sink/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ timeout = "30s"
# TCP connect timeout (default "5s"). Independent of timeout; raise it for
# cross-region or cold-start FEs that are slow to accept the connection.
# connect_timeout = "5s"
# In-request retry for transient Stream Load failures (5xx/408/429, transport
# error, Doris "Publish Timeout"). Retries re-PUT under the same label, which
# Doris dedupes. max_retries is the total attempt count (1 disables retries);
# keep max_retries * max_retry_delay inside Doris's label_keep_max_second.
# max_retries = 3
# retry_delay = "200ms"
# max_retry_delay = "5s"
# Stream Load redirect security. Doris's FE redirects (307) to a BE on another
# host; credentials are re-attached across that hop. By default a redirect that
# downgrades https -> http is refused (it would leak credentials in cleartext).
Expand Down
Loading
Loading