Skip to content

Conversation

@kalavt
Copy link

@kalavt kalavt commented Feb 5, 2026

This PR extends cppkafka::Configuration to support the sasl.oauthbearer.token.refresh.cb callback provided by librdkafka.

Motivation Mechanisms like AWS_MSK_IAM or generic OAUTHBEARER require the client to dynamically generate and refresh tokens when they expire. librdkafka handles this via a callback that requests a new token. Previously, cppkafka did not expose this callback, making it impossible to implement custom OAuth logic (like AWS SigV4 signing) purely in C++ without bypassing the wrapper.

Changes

  • API: Added Configuration::set_oauthbearer_token_refresh_callback which accepts a std::function.
  • Implementation: Implemented a proxy function oauthbearer_token_refresh_callback_proxy that correctly bridges the C-style librdkafka callback to the C++ function, properly casting the opaque handle.
  • Documentation: Added docs/oauth_bearer_callback.md explaining how to use the new feature.
  • Example: Added examples/oauth_example.cpp demonstrating a basic implementation.

Usage Example

config.set_oauthbearer_token_refresh_callback([](KafkaHandleBase& handle, const std::string& config) {
    // Generate token...
    rd_kafka_oauthbearer_set_token(handle.get_handle(), token_str.c_str(), expiry, ...);
});

This change is required for integrating AWS MSK IAM Authentication into ClickHouse.

@kalavt
Copy link
Author

kalavt commented Feb 5, 2026

@antaljanosbenjamin for your review to address the comments
ClickHouse/ClickHouse#91118 (comment)

Copy link
Member

@antaljanosbenjamin antaljanosbenjamin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from this small change, the PR looks good. One question remains: do you mind also submitting the same PR to the upstream project or should I do that once this is merged?

The reason is we like to contribute back and it also help us keep our fork closer to the upstream one.

Comment on lines +83 to +84
using OAuthBearerTokenRefreshCallback = std::function<void(KafkaHandleBase& handle,
const std::string& oauthbearer_config)>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
using OAuthBearerTokenRefreshCallback = std::function<void(KafkaHandleBase& handle,
const std::string& oauthbearer_config)>;
using OAuthBearerTokenRefreshCallback = std::function<void(KafkaHandleBase& handle,
const std::string* oauthbearer_config)>;

Let's keep the semantic: if oauthbearer_config is not set, we should call the callback with nullptr.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants