From 9c196eaccbbbacef91a3336a67d5e7b8e655b854 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 30 Apr 2026 18:13:10 +0200 Subject: [PATCH] feat(taskbroker): Passthrough mode ref STREAM-882 Introduce passthrough mode, so that a broker can be used to spawn tasks from any topic with any type of message format. This will make it easier to migrate existing consumers to be tasks instead, without changing data layout in prod. For more information refer to the ticket above. --- Cargo.lock | 20 +++ Cargo.toml | 1 + src/config.rs | 21 +++ src/kafka/deserialize.rs | 42 +++++ src/kafka/deserialize_passthrough.rs | 227 +++++++++++++++++++++++++++ src/kafka/mod.rs | 2 + src/main.rs | 5 +- 7 files changed, 315 insertions(+), 3 deletions(-) create mode 100644 src/kafka/deserialize.rs create mode 100644 src/kafka/deserialize_passthrough.rs diff --git a/Cargo.lock b/Cargo.lock index 900761b1..50f4efbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2255,6 +2255,25 @@ dependencies = [ "web-sys", ] +[[package]] +name = "rmp" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ba8be72d372b2c9b35542551678538b562e7cf86c3315773cae48dfbfe7790c" +dependencies = [ + "num-traits", +] + +[[package]] +name = "rmp-serde" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72f81bee8c8ef9b577d1681a70ebbc962c232461e397b22c208c43c04b67a155" +dependencies = [ + "rmp", + "serde", +] + [[package]] name = "rsa" version = "0.9.10" @@ -2991,6 +3010,7 @@ dependencies = [ "prost-types", "rand 0.8.5", "rdkafka", + "rmp-serde", "rstest", "sentry", "sentry_protos", diff --git a/Cargo.toml b/Cargo.toml index 1ad2b992..c2e0bd92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ prost = "0.14" prost-types = "0.14" rand = "0.8.5" rdkafka = { version = "0.37.0", features = ["cmake-build", "ssl"] } +rmp-serde = "1.3" sentry = { version = "0.41.0", default-features = false, features = [ # default features, except `release-health` is disabled "backtrace", diff --git a/src/config.rs b/src/config.rs index 523c0286..fcee5fb9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -301,6 +301,22 @@ pub struct Config { /// Maps every application to its worker endpoint, both represented as strings. pub worker_map: BTreeMap, + + /// Enable passthrough mode for consuming raw bytes from legacy topics. + /// In passthrough mode, raw Kafka message bytes are wrapped into TaskActivation. + pub passthrough_mode: bool, + + /// The namespace to assign to passthrough activations. + pub passthrough_namespace: Option, + + /// The application to assign to passthrough activations. + pub passthrough_application: Option, + + /// The taskname to assign to passthrough activations. + pub passthrough_taskname: Option, + + /// Processing deadline duration in seconds for passthrough activations. + pub passthrough_processing_deadline_duration: u64, } impl Default for Config { @@ -386,6 +402,11 @@ impl Default for Config { callback_addr: "0.0.0.0".into(), callback_port: 50051, worker_map: [("sentry".into(), "http://127.0.0.1:50052".into())].into(), + passthrough_mode: false, + passthrough_namespace: None, + passthrough_application: None, + passthrough_taskname: None, + passthrough_processing_deadline_duration: 30, } } } diff --git a/src/kafka/deserialize.rs b/src/kafka/deserialize.rs new file mode 100644 index 00000000..7e650b27 --- /dev/null +++ b/src/kafka/deserialize.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use anyhow::Error; +use rdkafka::message::OwnedMessage; + +use crate::config::Config; +use crate::store::activation::InflightActivation; + +use super::deserialize_activation::{self, DeserializeActivationConfig}; +use super::deserialize_passthrough::{self, PassthroughConfig}; + +pub struct DeserializeConfig { + activation_config: DeserializeActivationConfig, + passthrough_config: Option, +} + +impl DeserializeConfig { + pub fn from_config(config: &Config) -> Self { + Self { + activation_config: DeserializeActivationConfig::from_config(config), + passthrough_config: PassthroughConfig::from_config(config), + } + } +} + +/// Create a unified deserializer that handles both normal and passthrough modes. +/// In passthrough mode, raw Kafka bytes are wrapped into a TaskActivation. +/// In normal mode, Kafka messages are expected to contain encoded TaskActivation protos. +pub fn new( + config: DeserializeConfig, +) -> impl Fn(Arc) -> Result { + let passthrough_deserializer = config.passthrough_config.map(deserialize_passthrough::new); + let activation_deserializer = deserialize_activation::new(config.activation_config); + + move |msg: Arc| { + if let Some(ref pt_deserializer) = passthrough_deserializer { + pt_deserializer(msg) + } else { + activation_deserializer(msg) + } + } +} diff --git a/src/kafka/deserialize_passthrough.rs b/src/kafka/deserialize_passthrough.rs new file mode 100644 index 00000000..3d270994 --- /dev/null +++ b/src/kafka/deserialize_passthrough.rs @@ -0,0 +1,227 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::{Error, anyhow}; +use chrono::Utc; +use prost::Message as _; +use rdkafka::Message; +use rdkafka::message::OwnedMessage; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; +use uuid::Uuid; + +use crate::config::Config; +use crate::store::activation::{InflightActivation, InflightActivationStatus}; + +use super::deserialize_activation::bucket_from_id; + +pub struct PassthroughConfig { + pub namespace: String, + pub application: String, + pub taskname: String, + pub processing_deadline_duration: u64, +} + +impl PassthroughConfig { + pub fn from_config(config: &Config) -> Option { + if !config.passthrough_mode { + return None; + } + Some(Self { + namespace: config + .passthrough_namespace + .clone() + .expect("passthrough_namespace required when passthrough_mode is enabled"), + application: config + .passthrough_application + .clone() + .expect("passthrough_application required when passthrough_mode is enabled"), + taskname: config + .passthrough_taskname + .clone() + .expect("passthrough_taskname required when passthrough_mode is enabled"), + processing_deadline_duration: config.passthrough_processing_deadline_duration, + }) + } +} + +/// Encode raw bytes into msgpack format: {"args": [raw_bytes], "kwargs": {}} +fn encode_passthrough_params(raw_bytes: &[u8]) -> Result, Error> { + use serde::Serialize; + + #[derive(Serialize)] + struct Params<'a> { + args: (&'a [u8],), + kwargs: HashMap<(), ()>, + } + + let params = Params { + args: (raw_bytes,), + kwargs: HashMap::new(), + }; + + rmp_serde::to_vec_named(¶ms).map_err(|e| anyhow!("Failed to encode msgpack: {}", e)) +} + +/// Create a deserializer closure for passthrough mode. +/// Wraps raw Kafka message bytes into a TaskActivation with msgpack-encoded parameters_bytes. +pub fn new( + config: PassthroughConfig, +) -> impl Fn(Arc) -> Result { + move |msg: Arc| { + let Some(payload) = msg.payload() else { + return Err(anyhow!("Message has no payload")); + }; + + let id = Uuid::new_v4().to_string(); + let parameters_bytes = encode_passthrough_params(payload)?; + let now = Utc::now(); + let received_at = prost_types::Timestamp { + seconds: now.timestamp(), + nanos: 0, + }; + + let activation = TaskActivation { + id: id.clone(), + application: Some(config.application.clone()), + namespace: config.namespace.clone(), + taskname: config.taskname.clone(), + #[allow(deprecated)] + parameters: String::new(), + parameters_bytes, + headers: HashMap::new(), + received_at: Some(received_at), + retry_state: None, + processing_deadline_duration: config.processing_deadline_duration, + expires: None, + delay: None, + }; + + let activation_bytes = activation.encode_to_vec(); + let bucket = bucket_from_id(&id); + + metrics::histogram!( + "consumer.passthrough.payload_size_bytes", + "namespace" => config.namespace.clone(), + "taskname" => config.taskname.clone() + ) + .record(payload.len() as f64); + + Ok(InflightActivation { + id, + activation: activation_bytes, + status: InflightActivationStatus::Pending, + partition: msg.partition(), + offset: msg.offset(), + added_at: now, + received_at: now, + processing_deadline: None, + claim_expires_at: None, + processing_deadline_duration: config.processing_deadline_duration as i32, + processing_attempts: 0, + expires_at: None, + delay_until: None, + at_most_once: false, + application: config.application.clone(), + namespace: config.namespace.clone(), + taskname: config.taskname.clone(), + on_attempts_exceeded: OnAttemptsExceeded::Discard, + bucket, + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use rdkafka::Timestamp; + use rdkafka::message::OwnedMessage; + + use super::*; + + #[test] + fn test_encode_passthrough_params() { + use serde::Deserialize; + + #[derive(Deserialize, Debug)] + struct Params { + args: (Vec,), + kwargs: HashMap<(), ()>, + } + + let raw_bytes = b"hello world"; + let encoded = encode_passthrough_params(raw_bytes).unwrap(); + + // Decode and verify + let decoded: Params = rmp_serde::from_slice(&encoded).unwrap(); + assert_eq!(decoded.args.0, raw_bytes); + assert!(decoded.kwargs.is_empty()); + } + + #[test] + fn test_passthrough_deserializer() { + let config = PassthroughConfig { + namespace: "test-namespace".to_string(), + application: "test-app".to_string(), + taskname: "test-task".to_string(), + processing_deadline_duration: 60, + }; + + let deserializer = new(config); + + let raw_payload = b"raw kafka message bytes"; + let message = OwnedMessage::new( + Some(raw_payload.to_vec()), + None, + "legacy-topic".into(), + Timestamp::now(), + 0, + 42, + None, + ); + + let result = deserializer(Arc::new(message)); + assert!(result.is_ok()); + + let inflight = result.unwrap(); + assert_eq!(inflight.namespace, "test-namespace"); + assert_eq!(inflight.application, "test-app"); + assert_eq!(inflight.taskname, "test-task"); + assert_eq!(inflight.processing_deadline_duration, 60); + assert_eq!(inflight.offset, 42); + assert_eq!(inflight.status, InflightActivationStatus::Pending); + + // Verify the activation can be decoded + let activation = TaskActivation::decode(inflight.activation.as_slice()).unwrap(); + assert_eq!(activation.namespace, "test-namespace"); + assert_eq!(activation.application, Some("test-app".to_string())); + assert_eq!(activation.taskname, "test-task"); + assert!(!activation.parameters_bytes.is_empty()); + } + + #[test] + fn test_passthrough_deserializer_empty_payload() { + let config = PassthroughConfig { + namespace: "test-namespace".to_string(), + application: "test-app".to_string(), + taskname: "test-task".to_string(), + processing_deadline_duration: 60, + }; + + let deserializer = new(config); + + let message = OwnedMessage::new( + None, // No payload + None, + "legacy-topic".into(), + Timestamp::now(), + 0, + 0, + None, + ); + + let result = deserializer(Arc::new(message)); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("no payload")); + } +} diff --git a/src/kafka/mod.rs b/src/kafka/mod.rs index ae4122ac..87b8c156 100644 --- a/src/kafka/mod.rs +++ b/src/kafka/mod.rs @@ -1,6 +1,8 @@ pub mod admin; pub mod consumer; +pub mod deserialize; pub mod deserialize_activation; +pub mod deserialize_passthrough; pub mod inflight_activation_batcher; pub mod inflight_activation_writer; pub mod os_stream_writer; diff --git a/src/main.rs b/src/main.rs index 4c174420..c91a766f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,8 +20,7 @@ use taskbroker::grpc::metrics_middleware::MetricsLayer; use taskbroker::grpc::server::TaskbrokerServer; use taskbroker::kafka::admin::create_missing_topics; use taskbroker::kafka::consumer::start_consumer; -use taskbroker::kafka::deserialize_activation; -use taskbroker::kafka::deserialize_activation::DeserializeActivationConfig; +use taskbroker::kafka::deserialize::{self, DeserializeConfig}; use taskbroker::kafka::inflight_activation_batcher::{ ActivationBatcherConfig, InflightActivationBatcher, }; @@ -174,7 +173,7 @@ async fn main() -> Result<(), Error> { ), map: - deserialize_activation::new(DeserializeActivationConfig::from_config(&consumer_config)), + deserialize::new(DeserializeConfig::from_config(&consumer_config)), reduce: InflightActivationBatcher::new(