From 5a18b2ef30e500a80f63eb890a0a509e52088176 Mon Sep 17 00:00:00 2001 From: Arbin Date: Fri, 6 Feb 2026 01:36:59 +0800 Subject: [PATCH] feat: add support for OAUTHBEARER token refresh callback --- docs/oauth_bearer_callback.md | 211 +++++++++++++++++++++++++++++++ examples/oauth_example.cpp | 79 ++++++++++++ include/cppkafka/configuration.h | 17 +++ src/configuration.cpp | 19 +++ 4 files changed, 326 insertions(+) create mode 100644 docs/oauth_bearer_callback.md create mode 100644 examples/oauth_example.cpp diff --git a/docs/oauth_bearer_callback.md b/docs/oauth_bearer_callback.md new file mode 100644 index 00000000..c791ea64 --- /dev/null +++ b/docs/oauth_bearer_callback.md @@ -0,0 +1,211 @@ +# OAuth Bearer Token Refresh Callback + +## Overview + +The OAuth bearer token refresh callback allows you to implement custom OAuth bearer token generation and refresh logic in cppkafka. This is particularly useful for authentication mechanisms like AWS MSK IAM, Azure Event Hubs, or any custom OAuth implementation. + +## API + +### Setting the Callback + +```cpp +using OAuthBearerTokenRefreshCallback = std::function; + +Configuration& set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback); +``` + +### Getting the Callback + +```cpp +const OAuthBearerTokenRefreshCallback& get_oauthbearer_token_refresh_callback() const; +``` + +## Usage + +### Basic Setup + +1. **Create a callback function** that generates or fetches OAuth tokens: + +```cpp +void my_oauth_callback(KafkaHandleBase& handle, const std::string& oauthbearer_config) { + // Parse config if needed + // Generate token + std::string token = generate_my_token(); + int64_t expiry_ms = get_token_expiry(); + std::string principal = get_principal(); + + // Set the token + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( + handle.get_handle(), + token.c_str(), + expiry_ms, + principal.c_str(), + nullptr, 0, + errstr, sizeof(errstr) + ); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); + } +} +``` + +2. **Configure Kafka with OAUTHBEARER**: + +```cpp +Configuration config = { + {"metadata.broker.list", "broker:9092"}, + {"group.id", "my-consumer"}, + {"sasl.mechanism", "OAUTHBEARER"}, + {"security.protocol", "SASL_SSL"} +}; +``` + +3. **Set the callback**: + +```cpp +config.set_oauthbearer_token_refresh_callback(my_oauth_callback); +``` + +4. **Create consumer or producer**: + +```cpp +Consumer consumer(config); +// or +Producer producer(config); +``` + +## Callback Parameters + +### KafkaHandleBase& handle +The Kafka handle (consumer or producer) requesting token refresh. Use `handle.get_handle()` to get the underlying `rd_kafka_t*` pointer for calling librdkafka functions. + +### const std::string& oauthbearer_config +The value of the `sasl.oauthbearer.config` configuration property. You can use this to pass custom parameters to your callback. + +## Callback Responsibilities + +Your callback must either: + +1. **Successfully set a token** using `rd_kafka_oauthbearer_set_token()`, or +2. **Report failure** using `rd_kafka_oauthbearer_set_token_failure()` + +Failure to do either will result in authentication hanging. + +## Complete Example: AWS MSK IAM + +```cpp +#include +#include + +void aws_msk_token_callback(KafkaHandleBase& handle, const std::string& config) { + try { + // Get AWS credentials + auto provider = Aws::Auth::DefaultAWSCredentialsProviderChain(); + auto credentials = provider.GetAWSCredentials(); + + // Generate MSK IAM token (simplified) + std::string token = generate_msk_iam_token(credentials, "us-east-1"); + int64_t expiry_ms = current_time_ms() + 300000; // 5 minutes + + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( + handle.get_handle(), + token.c_str(), + expiry_ms, + credentials.GetAWSAccessKeyId().c_str(), + nullptr, 0, + errstr, sizeof(errstr) + ); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); + } + } catch (const std::exception& e) { + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), e.what()); + } +} + +int main() { + Configuration config = { + {"metadata.broker.list", "b-1.mycluster.kafka.us-east-1.amazonaws.com:9098"}, + {"security.protocol", "SASL_SSL"}, + {"sasl.mechanism", "OAUTHBEARER"}, + {"sasl.oauthbearer.config", "region=us-east-1"} + }; + + config.set_oauthbearer_token_refresh_callback(aws_msk_token_callback); + + Consumer consumer(config); + consumer.subscribe({"my-topic"}); + + // Process messages... +} +``` + +## When is the Callback Invoked? + +The callback is invoked: + +1. **On initial connection** - Before the first authentication attempt +2. **Before token expiry** - Automatically when the current token is about to expire +3. **On authentication failure** - If the broker rejects the current token + +## Thread Safety + +The callback may be invoked from librdkafka's internal threads. Ensure your callback is thread-safe if it accesses shared resources. + +## Error Handling + +Always handle errors in your callback: + +```cpp +void safe_oauth_callback(KafkaHandleBase& handle, const std::string& config) { + try { + // Token generation logic + std::string token = generate_token(); + + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( + handle.get_handle(), + token.c_str(), + expiry_ms, + principal.c_str(), + nullptr, 0, + errstr, sizeof(errstr) + ); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); + } + } catch (const std::exception& e) { + // Always report failures + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), e.what()); + } catch (...) { + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), + "Unknown error generating token"); + } +} +``` + +## Background Token Refresh + +For background token refresh (useful for long-lived consumers with low traffic): + +```cpp +// Enable SASL queue for background callbacks +rd_kafka_conf_enable_sasl_queue(config.get_handle(), 1); + +// Enable background SASL callbacks (if supported) +rd_kafka_sasl_background_callbacks_enable(consumer.get_handle()); +``` + +This ensures tokens are refreshed even when the consumer is idle. + +## See Also + +- [librdkafka OAuth documentation](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#authentication) +- [AWS MSK IAM authentication](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html) +- [OAuth 2.0 Bearer Token Usage](https://tools.ietf.org/html/rfc6750) \ No newline at end of file diff --git a/examples/oauth_example.cpp b/examples/oauth_example.cpp new file mode 100644 index 00000000..844a756b --- /dev/null +++ b/examples/oauth_example.cpp @@ -0,0 +1,79 @@ +/* + * OAuth Bearer Token Refresh Callback Example + * + * This example demonstrates how to use the OAuth bearer token refresh callback + * in cppkafka. This is useful for authentication mechanisms like AWS MSK IAM. + */ + +#include +#include + +using namespace cppkafka; + +// Example token refresh callback +void oauth_token_refresh_callback(KafkaHandleBase& handle, const std::string& oauthbearer_config) { + std::cout << "OAuth token refresh requested" << std::endl; + std::cout << "Config: " << oauthbearer_config << std::endl; + + // In a real implementation, you would: + // 1. Parse the oauthbearer_config to get any necessary parameters + // 2. Generate or fetch a new OAuth token + // 3. Call rd_kafka_oauthbearer_set_token() with the new token + // or rd_kafka_oauthbearer_set_token_failure() if token generation fails + + // Example (simplified): + std::string token = "your-generated-token"; + int64_t token_expiry_ms = 3600000; // 1 hour from now + std::string principal = "your-principal"; + + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( + handle.get_handle(), + token.c_str(), + token_expiry_ms, + principal.c_str(), + nullptr, 0, // no extensions + errstr, sizeof(errstr) + ); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + std::cerr << "Failed to set OAuth token: " << errstr << std::endl; + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); + } else { + std::cout << "OAuth token set successfully" << std::endl; + } +} + +int main() { + // Create configuration + Configuration config = { + {"metadata.broker.list", "localhost:9092"}, + {"group.id", "example-consumer"}, + {"sasl.mechanism", "OAUTHBEARER"}, + {"security.protocol", "SASL_SSL"} + }; + + // Set the OAuth bearer token refresh callback + config.set_oauthbearer_token_refresh_callback(oauth_token_refresh_callback); + + // Create consumer + Consumer consumer(config); + + // Subscribe to topics + consumer.subscribe({"test-topic"}); + + std::cout << "Consumer created with OAuth callback" << std::endl; + std::cout << "The callback will be invoked when token refresh is needed" << std::endl; + + // Poll for messages (the callback will be triggered as needed) + while (true) { + Message msg = consumer.poll(); + if (msg) { + if (!msg.get_error()) { + std::cout << "Received message: " << msg.get_payload() << std::endl; + } + } + } + + return 0; +} \ No newline at end of file diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index c97f5a83..6cdf4dbf 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -80,6 +80,8 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { using StatsCallback = std::function; using SocketCallback = std::function; using BackgroundEventCallback = std::function; + using OAuthBearerTokenRefreshCallback = std::function; using ConfigurationBase::set; using ConfigurationBase::get; @@ -144,6 +146,15 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { */ Configuration& set_socket_callback(SocketCallback callback); + /** + * Sets the OAuth bearer token refresh callback (invokes rd_kafka_conf_set_oauthbearer_token_refresh_cb) + * + * This callback is triggered when the SASL/OAUTHBEARER token needs to be refreshed. + * The callback should generate a new token and call rd_kafka_oauthbearer_set_token() + * or rd_kafka_oauthbearer_set_token_failure() on the rd_kafka_t handle. + */ + Configuration& set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback); + #if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION /** * Sets the background event callback (invokes rd_kafka_conf_set_background_event_cb) @@ -223,6 +234,11 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { */ const BackgroundEventCallback& get_background_event_callback() const; + /** + * Gets the OAuth bearer token refresh callback + */ + const OAuthBearerTokenRefreshCallback& get_oauthbearer_token_refresh_callback() const; + /** * Gets the default topic configuration */ @@ -249,6 +265,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { StatsCallback stats_callback_; SocketCallback socket_callback_; BackgroundEventCallback background_event_callback_; + OAuthBearerTokenRefreshCallback oauthbearer_token_refresh_callback_; }; } // cppkafka diff --git a/src/configuration.cpp b/src/configuration.cpp index 5a59c517..1565d2f6 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -109,6 +109,14 @@ void background_event_callback_proxy(rd_kafka_t*, rd_kafka_event_t* event_ptr, v (*handle, Event{event_ptr}); } +void oauthbearer_token_refresh_callback_proxy(rd_kafka_t*, const char* oauthbearer_config, void* opaque) { + KafkaHandleBase* handle = static_cast(opaque); + string config = oauthbearer_config ? oauthbearer_config : ""; + CallbackInvoker + ("oauthbearer_token_refresh", handle->get_configuration().get_oauthbearer_token_refresh_callback(), handle) + (*handle, config); +} + // Configuration Configuration::Configuration() @@ -184,6 +192,12 @@ Configuration& Configuration::set_socket_callback(SocketCallback callback) { return *this; } +Configuration& Configuration::set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback) { + oauthbearer_token_refresh_callback_ = move(callback); + rd_kafka_conf_set_oauthbearer_token_refresh_cb(handle_.get(), &oauthbearer_token_refresh_callback_proxy); + return *this; +} + #if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION Configuration& Configuration::set_background_event_callback(BackgroundEventCallback callback) { background_event_callback_ = move(callback); @@ -264,6 +278,11 @@ Configuration::get_background_event_callback() const { return background_event_callback_; } +const Configuration::OAuthBearerTokenRefreshCallback& +Configuration::get_oauthbearer_token_refresh_callback() const { + return oauthbearer_token_refresh_callback_; +} + const optional& Configuration::get_default_topic_configuration() const { return default_topic_config_; }