forked from mfontanini/cppkafka
-
Notifications
You must be signed in to change notification settings - Fork 6
feat: add support for OAUTHBEARER token refresh callback #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
kalavt
wants to merge
1
commit into
ClickHouse:ClickHouse/release-0.4.1
Choose a base branch
from
kalavt:ClickHouse/release-0.4.1
base: ClickHouse/release-0.4.1
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+326
−0
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,211 @@ | ||
| # OAuth Bearer Token Refresh Callback | ||
|
|
||
| ## Overview | ||
|
|
||
| The OAuth bearer token refresh callback allows you to implement custom OAuth bearer token generation and refresh logic in cppkafka. This is particularly useful for authentication mechanisms like AWS MSK IAM, Azure Event Hubs, or any custom OAuth implementation. | ||
|
|
||
| ## API | ||
|
|
||
| ### Setting the Callback | ||
|
|
||
| ```cpp | ||
| using OAuthBearerTokenRefreshCallback = std::function<void(KafkaHandleBase& handle, | ||
| const std::string& oauthbearer_config)>; | ||
|
|
||
| Configuration& set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback); | ||
| ``` | ||
|
|
||
| ### Getting the Callback | ||
|
|
||
| ```cpp | ||
| const OAuthBearerTokenRefreshCallback& get_oauthbearer_token_refresh_callback() const; | ||
| ``` | ||
|
|
||
| ## Usage | ||
|
|
||
| ### Basic Setup | ||
|
|
||
| 1. **Create a callback function** that generates or fetches OAuth tokens: | ||
|
|
||
| ```cpp | ||
| void my_oauth_callback(KafkaHandleBase& handle, const std::string& oauthbearer_config) { | ||
| // Parse config if needed | ||
| // Generate token | ||
| std::string token = generate_my_token(); | ||
| int64_t expiry_ms = get_token_expiry(); | ||
| std::string principal = get_principal(); | ||
|
|
||
| // Set the token | ||
| char errstr[512]; | ||
| rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( | ||
| handle.get_handle(), | ||
| token.c_str(), | ||
| expiry_ms, | ||
| principal.c_str(), | ||
| nullptr, 0, | ||
| errstr, sizeof(errstr) | ||
| ); | ||
|
|
||
| if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { | ||
| rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| 2. **Configure Kafka with OAUTHBEARER**: | ||
|
|
||
| ```cpp | ||
| Configuration config = { | ||
| {"metadata.broker.list", "broker:9092"}, | ||
| {"group.id", "my-consumer"}, | ||
| {"sasl.mechanism", "OAUTHBEARER"}, | ||
| {"security.protocol", "SASL_SSL"} | ||
| }; | ||
| ``` | ||
|
|
||
| 3. **Set the callback**: | ||
|
|
||
| ```cpp | ||
| config.set_oauthbearer_token_refresh_callback(my_oauth_callback); | ||
| ``` | ||
|
|
||
| 4. **Create consumer or producer**: | ||
|
|
||
| ```cpp | ||
| Consumer consumer(config); | ||
| // or | ||
| Producer producer(config); | ||
| ``` | ||
|
|
||
| ## Callback Parameters | ||
|
|
||
| ### KafkaHandleBase& handle | ||
| The Kafka handle (consumer or producer) requesting token refresh. Use `handle.get_handle()` to get the underlying `rd_kafka_t*` pointer for calling librdkafka functions. | ||
|
|
||
| ### const std::string& oauthbearer_config | ||
| The value of the `sasl.oauthbearer.config` configuration property. You can use this to pass custom parameters to your callback. | ||
|
|
||
| ## Callback Responsibilities | ||
|
|
||
| Your callback must either: | ||
|
|
||
| 1. **Successfully set a token** using `rd_kafka_oauthbearer_set_token()`, or | ||
| 2. **Report failure** using `rd_kafka_oauthbearer_set_token_failure()` | ||
|
|
||
| Failure to do either will result in authentication hanging. | ||
|
|
||
| ## Complete Example: AWS MSK IAM | ||
|
|
||
| ```cpp | ||
| #include <cppkafka/cppkafka.h> | ||
| #include <aws/core/auth/AWSCredentialsProvider.h> | ||
|
|
||
| void aws_msk_token_callback(KafkaHandleBase& handle, const std::string& config) { | ||
| try { | ||
| // Get AWS credentials | ||
| auto provider = Aws::Auth::DefaultAWSCredentialsProviderChain(); | ||
| auto credentials = provider.GetAWSCredentials(); | ||
|
|
||
| // Generate MSK IAM token (simplified) | ||
| std::string token = generate_msk_iam_token(credentials, "us-east-1"); | ||
| int64_t expiry_ms = current_time_ms() + 300000; // 5 minutes | ||
|
|
||
| char errstr[512]; | ||
| rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( | ||
| handle.get_handle(), | ||
| token.c_str(), | ||
| expiry_ms, | ||
| credentials.GetAWSAccessKeyId().c_str(), | ||
| nullptr, 0, | ||
| errstr, sizeof(errstr) | ||
| ); | ||
|
|
||
| if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { | ||
| rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); | ||
| } | ||
| } catch (const std::exception& e) { | ||
| rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), e.what()); | ||
| } | ||
| } | ||
|
|
||
| int main() { | ||
| Configuration config = { | ||
| {"metadata.broker.list", "b-1.mycluster.kafka.us-east-1.amazonaws.com:9098"}, | ||
| {"security.protocol", "SASL_SSL"}, | ||
| {"sasl.mechanism", "OAUTHBEARER"}, | ||
| {"sasl.oauthbearer.config", "region=us-east-1"} | ||
| }; | ||
|
|
||
| config.set_oauthbearer_token_refresh_callback(aws_msk_token_callback); | ||
|
|
||
| Consumer consumer(config); | ||
| consumer.subscribe({"my-topic"}); | ||
|
|
||
| // Process messages... | ||
| } | ||
| ``` | ||
|
|
||
| ## When is the Callback Invoked? | ||
|
|
||
| The callback is invoked: | ||
|
|
||
| 1. **On initial connection** - Before the first authentication attempt | ||
| 2. **Before token expiry** - Automatically when the current token is about to expire | ||
| 3. **On authentication failure** - If the broker rejects the current token | ||
|
|
||
| ## Thread Safety | ||
|
|
||
| The callback may be invoked from librdkafka's internal threads. Ensure your callback is thread-safe if it accesses shared resources. | ||
|
|
||
| ## Error Handling | ||
|
|
||
| Always handle errors in your callback: | ||
|
|
||
| ```cpp | ||
| void safe_oauth_callback(KafkaHandleBase& handle, const std::string& config) { | ||
| try { | ||
| // Token generation logic | ||
| std::string token = generate_token(); | ||
|
|
||
| char errstr[512]; | ||
| rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( | ||
| handle.get_handle(), | ||
| token.c_str(), | ||
| expiry_ms, | ||
| principal.c_str(), | ||
| nullptr, 0, | ||
| errstr, sizeof(errstr) | ||
| ); | ||
|
|
||
| if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { | ||
| rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); | ||
| } | ||
| } catch (const std::exception& e) { | ||
| // Always report failures | ||
| rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), e.what()); | ||
| } catch (...) { | ||
| rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), | ||
| "Unknown error generating token"); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ## Background Token Refresh | ||
|
|
||
| For background token refresh (useful for long-lived consumers with low traffic): | ||
|
|
||
| ```cpp | ||
| // Enable SASL queue for background callbacks | ||
| rd_kafka_conf_enable_sasl_queue(config.get_handle(), 1); | ||
|
|
||
| // Enable background SASL callbacks (if supported) | ||
| rd_kafka_sasl_background_callbacks_enable(consumer.get_handle()); | ||
| ``` | ||
|
|
||
| This ensures tokens are refreshed even when the consumer is idle. | ||
|
|
||
| ## See Also | ||
|
|
||
| - [librdkafka OAuth documentation](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#authentication) | ||
| - [AWS MSK IAM authentication](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html) | ||
| - [OAuth 2.0 Bearer Token Usage](https://tools.ietf.org/html/rfc6750) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| /* | ||
| * OAuth Bearer Token Refresh Callback Example | ||
| * | ||
| * This example demonstrates how to use the OAuth bearer token refresh callback | ||
| * in cppkafka. This is useful for authentication mechanisms like AWS MSK IAM. | ||
| */ | ||
|
|
||
| #include <iostream> | ||
| #include <cppkafka/cppkafka.h> | ||
|
|
||
| using namespace cppkafka; | ||
|
|
||
| // Example token refresh callback | ||
| void oauth_token_refresh_callback(KafkaHandleBase& handle, const std::string& oauthbearer_config) { | ||
| std::cout << "OAuth token refresh requested" << std::endl; | ||
| std::cout << "Config: " << oauthbearer_config << std::endl; | ||
|
|
||
| // In a real implementation, you would: | ||
| // 1. Parse the oauthbearer_config to get any necessary parameters | ||
| // 2. Generate or fetch a new OAuth token | ||
| // 3. Call rd_kafka_oauthbearer_set_token() with the new token | ||
| // or rd_kafka_oauthbearer_set_token_failure() if token generation fails | ||
|
|
||
| // Example (simplified): | ||
| std::string token = "your-generated-token"; | ||
| int64_t token_expiry_ms = 3600000; // 1 hour from now | ||
| std::string principal = "your-principal"; | ||
|
|
||
| char errstr[512]; | ||
| rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( | ||
| handle.get_handle(), | ||
| token.c_str(), | ||
| token_expiry_ms, | ||
| principal.c_str(), | ||
| nullptr, 0, // no extensions | ||
| errstr, sizeof(errstr) | ||
| ); | ||
|
|
||
| if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { | ||
| std::cerr << "Failed to set OAuth token: " << errstr << std::endl; | ||
| rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); | ||
| } else { | ||
| std::cout << "OAuth token set successfully" << std::endl; | ||
| } | ||
| } | ||
|
|
||
| int main() { | ||
| // Create configuration | ||
| Configuration config = { | ||
| {"metadata.broker.list", "localhost:9092"}, | ||
| {"group.id", "example-consumer"}, | ||
| {"sasl.mechanism", "OAUTHBEARER"}, | ||
| {"security.protocol", "SASL_SSL"} | ||
| }; | ||
|
|
||
| // Set the OAuth bearer token refresh callback | ||
| config.set_oauthbearer_token_refresh_callback(oauth_token_refresh_callback); | ||
|
|
||
| // Create consumer | ||
| Consumer consumer(config); | ||
|
|
||
| // Subscribe to topics | ||
| consumer.subscribe({"test-topic"}); | ||
|
|
||
| std::cout << "Consumer created with OAuth callback" << std::endl; | ||
| std::cout << "The callback will be invoked when token refresh is needed" << std::endl; | ||
|
|
||
| // Poll for messages (the callback will be triggered as needed) | ||
| while (true) { | ||
| Message msg = consumer.poll(); | ||
| if (msg) { | ||
| if (!msg.get_error()) { | ||
| std::cout << "Received message: " << msg.get_payload() << std::endl; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return 0; | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the semantic: if
oauthbearer_configis not set, we should call the callback withnullptr.