diff --git a/Cargo.lock b/Cargo.lock index 900761b1..50f4efbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2255,6 +2255,25 @@ dependencies = [ "web-sys", ] +[[package]] +name = "rmp" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ba8be72d372b2c9b35542551678538b562e7cf86c3315773cae48dfbfe7790c" +dependencies = [ + "num-traits", +] + +[[package]] +name = "rmp-serde" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72f81bee8c8ef9b577d1681a70ebbc962c232461e397b22c208c43c04b67a155" +dependencies = [ + "rmp", + "serde", +] + [[package]] name = "rsa" version = "0.9.10" @@ -2991,6 +3010,7 @@ dependencies = [ "prost-types", "rand 0.8.5", "rdkafka", + "rmp-serde", "rstest", "sentry", "sentry_protos", diff --git a/Cargo.toml b/Cargo.toml index 1ad2b992..c2e0bd92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ prost = "0.14" prost-types = "0.14" rand = "0.8.5" rdkafka = { version = "0.37.0", features = ["cmake-build", "ssl"] } +rmp-serde = "1.3" sentry = { version = "0.41.0", default-features = false, features = [ # default features, except `release-health` is disabled "backtrace", diff --git a/src/config.rs b/src/config.rs index 523c0286..fcee5fb9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -301,6 +301,22 @@ pub struct Config { /// Maps every application to its worker endpoint, both represented as strings. pub worker_map: BTreeMap, + + /// Enable passthrough mode for consuming raw bytes from legacy topics. + /// In passthrough mode, raw Kafka message bytes are wrapped into TaskActivation. + pub passthrough_mode: bool, + + /// The namespace to assign to passthrough activations. + pub passthrough_namespace: Option, + + /// The application to assign to passthrough activations. + pub passthrough_application: Option, + + /// The taskname to assign to passthrough activations. + pub passthrough_taskname: Option, + + /// Processing deadline duration in seconds for passthrough activations. + pub passthrough_processing_deadline_duration: u64, } impl Default for Config { @@ -386,6 +402,11 @@ impl Default for Config { callback_addr: "0.0.0.0".into(), callback_port: 50051, worker_map: [("sentry".into(), "http://127.0.0.1:50052".into())].into(), + passthrough_mode: false, + passthrough_namespace: None, + passthrough_application: None, + passthrough_taskname: None, + passthrough_processing_deadline_duration: 30, } } } diff --git a/src/kafka/deserialize.rs b/src/kafka/deserialize.rs new file mode 100644 index 00000000..7e650b27 --- /dev/null +++ b/src/kafka/deserialize.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use anyhow::Error; +use rdkafka::message::OwnedMessage; + +use crate::config::Config; +use crate::store::activation::InflightActivation; + +use super::deserialize_activation::{self, DeserializeActivationConfig}; +use super::deserialize_passthrough::{self, PassthroughConfig}; + +pub struct DeserializeConfig { + activation_config: DeserializeActivationConfig, + passthrough_config: Option, +} + +impl DeserializeConfig { + pub fn from_config(config: &Config) -> Self { + Self { + activation_config: DeserializeActivationConfig::from_config(config), + passthrough_config: PassthroughConfig::from_config(config), + } + } +} + +/// Create a unified deserializer that handles both normal and passthrough modes. +/// In passthrough mode, raw Kafka bytes are wrapped into a TaskActivation. +/// In normal mode, Kafka messages are expected to contain encoded TaskActivation protos. +pub fn new( + config: DeserializeConfig, +) -> impl Fn(Arc) -> Result { + let passthrough_deserializer = config.passthrough_config.map(deserialize_passthrough::new); + let activation_deserializer = deserialize_activation::new(config.activation_config); + + move |msg: Arc| { + if let Some(ref pt_deserializer) = passthrough_deserializer { + pt_deserializer(msg) + } else { + activation_deserializer(msg) + } + } +} diff --git a/src/kafka/deserialize_passthrough.rs b/src/kafka/deserialize_passthrough.rs new file mode 100644 index 00000000..3d270994 --- /dev/null +++ b/src/kafka/deserialize_passthrough.rs @@ -0,0 +1,227 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::{Error, anyhow}; +use chrono::Utc; +use prost::Message as _; +use rdkafka::Message; +use rdkafka::message::OwnedMessage; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; +use uuid::Uuid; + +use crate::config::Config; +use crate::store::activation::{InflightActivation, InflightActivationStatus}; + +use super::deserialize_activation::bucket_from_id; + +pub struct PassthroughConfig { + pub namespace: String, + pub application: String, + pub taskname: String, + pub processing_deadline_duration: u64, +} + +impl PassthroughConfig { + pub fn from_config(config: &Config) -> Option { + if !config.passthrough_mode { + return None; + } + Some(Self { + namespace: config + .passthrough_namespace + .clone() + .expect("passthrough_namespace required when passthrough_mode is enabled"), + application: config + .passthrough_application + .clone() + .expect("passthrough_application required when passthrough_mode is enabled"), + taskname: config + .passthrough_taskname + .clone() + .expect("passthrough_taskname required when passthrough_mode is enabled"), + processing_deadline_duration: config.passthrough_processing_deadline_duration, + }) + } +} + +/// Encode raw bytes into msgpack format: {"args": [raw_bytes], "kwargs": {}} +fn encode_passthrough_params(raw_bytes: &[u8]) -> Result, Error> { + use serde::Serialize; + + #[derive(Serialize)] + struct Params<'a> { + args: (&'a [u8],), + kwargs: HashMap<(), ()>, + } + + let params = Params { + args: (raw_bytes,), + kwargs: HashMap::new(), + }; + + rmp_serde::to_vec_named(¶ms).map_err(|e| anyhow!("Failed to encode msgpack: {}", e)) +} + +/// Create a deserializer closure for passthrough mode. +/// Wraps raw Kafka message bytes into a TaskActivation with msgpack-encoded parameters_bytes. +pub fn new( + config: PassthroughConfig, +) -> impl Fn(Arc) -> Result { + move |msg: Arc| { + let Some(payload) = msg.payload() else { + return Err(anyhow!("Message has no payload")); + }; + + let id = Uuid::new_v4().to_string(); + let parameters_bytes = encode_passthrough_params(payload)?; + let now = Utc::now(); + let received_at = prost_types::Timestamp { + seconds: now.timestamp(), + nanos: 0, + }; + + let activation = TaskActivation { + id: id.clone(), + application: Some(config.application.clone()), + namespace: config.namespace.clone(), + taskname: config.taskname.clone(), + #[allow(deprecated)] + parameters: String::new(), + parameters_bytes, + headers: HashMap::new(), + received_at: Some(received_at), + retry_state: None, + processing_deadline_duration: config.processing_deadline_duration, + expires: None, + delay: None, + }; + + let activation_bytes = activation.encode_to_vec(); + let bucket = bucket_from_id(&id); + + metrics::histogram!( + "consumer.passthrough.payload_size_bytes", + "namespace" => config.namespace.clone(), + "taskname" => config.taskname.clone() + ) + .record(payload.len() as f64); + + Ok(InflightActivation { + id, + activation: activation_bytes, + status: InflightActivationStatus::Pending, + partition: msg.partition(), + offset: msg.offset(), + added_at: now, + received_at: now, + processing_deadline: None, + claim_expires_at: None, + processing_deadline_duration: config.processing_deadline_duration as i32, + processing_attempts: 0, + expires_at: None, + delay_until: None, + at_most_once: false, + application: config.application.clone(), + namespace: config.namespace.clone(), + taskname: config.taskname.clone(), + on_attempts_exceeded: OnAttemptsExceeded::Discard, + bucket, + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use rdkafka::Timestamp; + use rdkafka::message::OwnedMessage; + + use super::*; + + #[test] + fn test_encode_passthrough_params() { + use serde::Deserialize; + + #[derive(Deserialize, Debug)] + struct Params { + args: (Vec,), + kwargs: HashMap<(), ()>, + } + + let raw_bytes = b"hello world"; + let encoded = encode_passthrough_params(raw_bytes).unwrap(); + + // Decode and verify + let decoded: Params = rmp_serde::from_slice(&encoded).unwrap(); + assert_eq!(decoded.args.0, raw_bytes); + assert!(decoded.kwargs.is_empty()); + } + + #[test] + fn test_passthrough_deserializer() { + let config = PassthroughConfig { + namespace: "test-namespace".to_string(), + application: "test-app".to_string(), + taskname: "test-task".to_string(), + processing_deadline_duration: 60, + }; + + let deserializer = new(config); + + let raw_payload = b"raw kafka message bytes"; + let message = OwnedMessage::new( + Some(raw_payload.to_vec()), + None, + "legacy-topic".into(), + Timestamp::now(), + 0, + 42, + None, + ); + + let result = deserializer(Arc::new(message)); + assert!(result.is_ok()); + + let inflight = result.unwrap(); + assert_eq!(inflight.namespace, "test-namespace"); + assert_eq!(inflight.application, "test-app"); + assert_eq!(inflight.taskname, "test-task"); + assert_eq!(inflight.processing_deadline_duration, 60); + assert_eq!(inflight.offset, 42); + assert_eq!(inflight.status, InflightActivationStatus::Pending); + + // Verify the activation can be decoded + let activation = TaskActivation::decode(inflight.activation.as_slice()).unwrap(); + assert_eq!(activation.namespace, "test-namespace"); + assert_eq!(activation.application, Some("test-app".to_string())); + assert_eq!(activation.taskname, "test-task"); + assert!(!activation.parameters_bytes.is_empty()); + } + + #[test] + fn test_passthrough_deserializer_empty_payload() { + let config = PassthroughConfig { + namespace: "test-namespace".to_string(), + application: "test-app".to_string(), + taskname: "test-task".to_string(), + processing_deadline_duration: 60, + }; + + let deserializer = new(config); + + let message = OwnedMessage::new( + None, // No payload + None, + "legacy-topic".into(), + Timestamp::now(), + 0, + 0, + None, + ); + + let result = deserializer(Arc::new(message)); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("no payload")); + } +} diff --git a/src/kafka/mod.rs b/src/kafka/mod.rs index ae4122ac..87b8c156 100644 --- a/src/kafka/mod.rs +++ b/src/kafka/mod.rs @@ -1,6 +1,8 @@ pub mod admin; pub mod consumer; +pub mod deserialize; pub mod deserialize_activation; +pub mod deserialize_passthrough; pub mod inflight_activation_batcher; pub mod inflight_activation_writer; pub mod os_stream_writer; diff --git a/src/main.rs b/src/main.rs index 4c174420..c91a766f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,8 +20,7 @@ use taskbroker::grpc::metrics_middleware::MetricsLayer; use taskbroker::grpc::server::TaskbrokerServer; use taskbroker::kafka::admin::create_missing_topics; use taskbroker::kafka::consumer::start_consumer; -use taskbroker::kafka::deserialize_activation; -use taskbroker::kafka::deserialize_activation::DeserializeActivationConfig; +use taskbroker::kafka::deserialize::{self, DeserializeConfig}; use taskbroker::kafka::inflight_activation_batcher::{ ActivationBatcherConfig, InflightActivationBatcher, }; @@ -174,7 +173,7 @@ async fn main() -> Result<(), Error> { ), map: - deserialize_activation::new(DeserializeActivationConfig::from_config(&consumer_config)), + deserialize::new(DeserializeConfig::from_config(&consumer_config)), reduce: InflightActivationBatcher::new(