Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions docs/oauth_bearer_callback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
# OAuth Bearer Token Refresh Callback

## Overview

The OAuth bearer token refresh callback allows you to implement custom OAuth bearer token generation and refresh logic in cppkafka. This is particularly useful for authentication mechanisms like AWS MSK IAM, Azure Event Hubs, or any custom OAuth implementation.

## API

### Setting the Callback

```cpp
using OAuthBearerTokenRefreshCallback = std::function<void(KafkaHandleBase& handle,
const std::string& oauthbearer_config)>;

Configuration& set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback);
```

### Getting the Callback

```cpp
const OAuthBearerTokenRefreshCallback& get_oauthbearer_token_refresh_callback() const;
```

## Usage

### Basic Setup

1. **Create a callback function** that generates or fetches OAuth tokens:

```cpp
void my_oauth_callback(KafkaHandleBase& handle, const std::string& oauthbearer_config) {
// Parse config if needed
// Generate token
std::string token = generate_my_token();
int64_t expiry_ms = get_token_expiry();
std::string principal = get_principal();

// Set the token
char errstr[512];
rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token(
handle.get_handle(),
token.c_str(),
expiry_ms,
principal.c_str(),
nullptr, 0,
errstr, sizeof(errstr)
);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr);
}
}
```

2. **Configure Kafka with OAUTHBEARER**:

```cpp
Configuration config = {
{"metadata.broker.list", "broker:9092"},
{"group.id", "my-consumer"},
{"sasl.mechanism", "OAUTHBEARER"},
{"security.protocol", "SASL_SSL"}
};
```

3. **Set the callback**:

```cpp
config.set_oauthbearer_token_refresh_callback(my_oauth_callback);
```

4. **Create consumer or producer**:

```cpp
Consumer consumer(config);
// or
Producer producer(config);
```

## Callback Parameters

### KafkaHandleBase& handle
The Kafka handle (consumer or producer) requesting token refresh. Use `handle.get_handle()` to get the underlying `rd_kafka_t*` pointer for calling librdkafka functions.

### const std::string& oauthbearer_config
The value of the `sasl.oauthbearer.config` configuration property. You can use this to pass custom parameters to your callback.

## Callback Responsibilities

Your callback must either:

1. **Successfully set a token** using `rd_kafka_oauthbearer_set_token()`, or
2. **Report failure** using `rd_kafka_oauthbearer_set_token_failure()`

Failure to do either will result in authentication hanging.

## Complete Example: AWS MSK IAM

```cpp
#include <cppkafka/cppkafka.h>
#include <aws/core/auth/AWSCredentialsProvider.h>

void aws_msk_token_callback(KafkaHandleBase& handle, const std::string& config) {
try {
// Get AWS credentials
auto provider = Aws::Auth::DefaultAWSCredentialsProviderChain();
auto credentials = provider.GetAWSCredentials();

// Generate MSK IAM token (simplified)
std::string token = generate_msk_iam_token(credentials, "us-east-1");
int64_t expiry_ms = current_time_ms() + 300000; // 5 minutes

char errstr[512];
rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token(
handle.get_handle(),
token.c_str(),
expiry_ms,
credentials.GetAWSAccessKeyId().c_str(),
nullptr, 0,
errstr, sizeof(errstr)
);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr);
}
} catch (const std::exception& e) {
rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), e.what());
}
}

int main() {
Configuration config = {
{"metadata.broker.list", "b-1.mycluster.kafka.us-east-1.amazonaws.com:9098"},
{"security.protocol", "SASL_SSL"},
{"sasl.mechanism", "OAUTHBEARER"},
{"sasl.oauthbearer.config", "region=us-east-1"}
};

config.set_oauthbearer_token_refresh_callback(aws_msk_token_callback);

Consumer consumer(config);
consumer.subscribe({"my-topic"});

// Process messages...
}
```

## When is the Callback Invoked?

The callback is invoked:

1. **On initial connection** - Before the first authentication attempt
2. **Before token expiry** - Automatically when the current token is about to expire
3. **On authentication failure** - If the broker rejects the current token

## Thread Safety

The callback may be invoked from librdkafka's internal threads. Ensure your callback is thread-safe if it accesses shared resources.

## Error Handling

Always handle errors in your callback:

```cpp
void safe_oauth_callback(KafkaHandleBase& handle, const std::string& config) {
try {
// Token generation logic
std::string token = generate_token();

char errstr[512];
rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token(
handle.get_handle(),
token.c_str(),
expiry_ms,
principal.c_str(),
nullptr, 0,
errstr, sizeof(errstr)
);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr);
}
} catch (const std::exception& e) {
// Always report failures
rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), e.what());
} catch (...) {
rd_kafka_oauthbearer_set_token_failure(handle.get_handle(),
"Unknown error generating token");
}
}
```

## Background Token Refresh

For background token refresh (useful for long-lived consumers with low traffic):

```cpp
// Enable SASL queue for background callbacks
rd_kafka_conf_enable_sasl_queue(config.get_handle(), 1);

// Enable background SASL callbacks (if supported)
rd_kafka_sasl_background_callbacks_enable(consumer.get_handle());
```

This ensures tokens are refreshed even when the consumer is idle.

## See Also

- [librdkafka OAuth documentation](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#authentication)
- [AWS MSK IAM authentication](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)
- [OAuth 2.0 Bearer Token Usage](https://tools.ietf.org/html/rfc6750)
79 changes: 79 additions & 0 deletions examples/oauth_example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* OAuth Bearer Token Refresh Callback Example
*
* This example demonstrates how to use the OAuth bearer token refresh callback
* in cppkafka. This is useful for authentication mechanisms like AWS MSK IAM.
*/

#include <iostream>
#include <cppkafka/cppkafka.h>

using namespace cppkafka;

// Example token refresh callback
void oauth_token_refresh_callback(KafkaHandleBase& handle, const std::string& oauthbearer_config) {
std::cout << "OAuth token refresh requested" << std::endl;
std::cout << "Config: " << oauthbearer_config << std::endl;

// In a real implementation, you would:
// 1. Parse the oauthbearer_config to get any necessary parameters
// 2. Generate or fetch a new OAuth token
// 3. Call rd_kafka_oauthbearer_set_token() with the new token
// or rd_kafka_oauthbearer_set_token_failure() if token generation fails

// Example (simplified):
std::string token = "your-generated-token";
int64_t token_expiry_ms = 3600000; // 1 hour from now
std::string principal = "your-principal";

char errstr[512];
rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token(
handle.get_handle(),
token.c_str(),
token_expiry_ms,
principal.c_str(),
nullptr, 0, // no extensions
errstr, sizeof(errstr)
);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
std::cerr << "Failed to set OAuth token: " << errstr << std::endl;
rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr);
} else {
std::cout << "OAuth token set successfully" << std::endl;
}
}

int main() {
// Create configuration
Configuration config = {
{"metadata.broker.list", "localhost:9092"},
{"group.id", "example-consumer"},
{"sasl.mechanism", "OAUTHBEARER"},
{"security.protocol", "SASL_SSL"}
};

// Set the OAuth bearer token refresh callback
config.set_oauthbearer_token_refresh_callback(oauth_token_refresh_callback);

// Create consumer
Consumer consumer(config);

// Subscribe to topics
consumer.subscribe({"test-topic"});

std::cout << "Consumer created with OAuth callback" << std::endl;
std::cout << "The callback will be invoked when token refresh is needed" << std::endl;

// Poll for messages (the callback will be triggered as needed)
while (true) {
Message msg = consumer.poll();
if (msg) {
if (!msg.get_error()) {
std::cout << "Received message: " << msg.get_payload() << std::endl;
}
}
}

return 0;
}
17 changes: 17 additions & 0 deletions include/cppkafka/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
using StatsCallback = std::function<void(KafkaHandleBase& handle, const std::string& json)>;
using SocketCallback = std::function<int(int domain, int type, int protocol)>;
using BackgroundEventCallback = std::function<void(KafkaHandleBase& handle, Event)>;
using OAuthBearerTokenRefreshCallback = std::function<void(KafkaHandleBase& handle,
const std::string& oauthbearer_config)>;
Comment on lines +83 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
using OAuthBearerTokenRefreshCallback = std::function<void(KafkaHandleBase& handle,
const std::string& oauthbearer_config)>;
using OAuthBearerTokenRefreshCallback = std::function<void(KafkaHandleBase& handle,
const std::string* oauthbearer_config)>;

Let's keep the semantic: if oauthbearer_config is not set, we should call the callback with nullptr.


using ConfigurationBase<Configuration>::set;
using ConfigurationBase<Configuration>::get;
Expand Down Expand Up @@ -144,6 +146,15 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
*/
Configuration& set_socket_callback(SocketCallback callback);

/**
* Sets the OAuth bearer token refresh callback (invokes rd_kafka_conf_set_oauthbearer_token_refresh_cb)
*
* This callback is triggered when the SASL/OAUTHBEARER token needs to be refreshed.
* The callback should generate a new token and call rd_kafka_oauthbearer_set_token()
* or rd_kafka_oauthbearer_set_token_failure() on the rd_kafka_t handle.
*/
Configuration& set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback);

#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION
/**
* Sets the background event callback (invokes rd_kafka_conf_set_background_event_cb)
Expand Down Expand Up @@ -223,6 +234,11 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
*/
const BackgroundEventCallback& get_background_event_callback() const;

/**
* Gets the OAuth bearer token refresh callback
*/
const OAuthBearerTokenRefreshCallback& get_oauthbearer_token_refresh_callback() const;

/**
* Gets the default topic configuration
*/
Expand All @@ -249,6 +265,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
StatsCallback stats_callback_;
SocketCallback socket_callback_;
BackgroundEventCallback background_event_callback_;
OAuthBearerTokenRefreshCallback oauthbearer_token_refresh_callback_;
};

} // cppkafka
Expand Down
19 changes: 19 additions & 0 deletions src/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ void background_event_callback_proxy(rd_kafka_t*, rd_kafka_event_t* event_ptr, v
(*handle, Event{event_ptr});
}

void oauthbearer_token_refresh_callback_proxy(rd_kafka_t*, const char* oauthbearer_config, void* opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
string config = oauthbearer_config ? oauthbearer_config : "";
CallbackInvoker<Configuration::OAuthBearerTokenRefreshCallback>
("oauthbearer_token_refresh", handle->get_configuration().get_oauthbearer_token_refresh_callback(), handle)
(*handle, config);
}

// Configuration

Configuration::Configuration()
Expand Down Expand Up @@ -184,6 +192,12 @@ Configuration& Configuration::set_socket_callback(SocketCallback callback) {
return *this;
}

Configuration& Configuration::set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback) {
oauthbearer_token_refresh_callback_ = move(callback);
rd_kafka_conf_set_oauthbearer_token_refresh_cb(handle_.get(), &oauthbearer_token_refresh_callback_proxy);
return *this;
}

#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION
Configuration& Configuration::set_background_event_callback(BackgroundEventCallback callback) {
background_event_callback_ = move(callback);
Expand Down Expand Up @@ -264,6 +278,11 @@ Configuration::get_background_event_callback() const {
return background_event_callback_;
}

const Configuration::OAuthBearerTokenRefreshCallback&
Configuration::get_oauthbearer_token_refresh_callback() const {
return oauthbearer_token_refresh_callback_;
}

const optional<TopicConfiguration>& Configuration::get_default_topic_configuration() const {
return default_topic_config_;
}
Expand Down