Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 67 additions & 34 deletions pipeline/inputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,73 +140,106 @@ The example can be executed locally with `make start` in the `examples/kafka_fil

## AWS MSK IAM authentication

Fluent Bit v4.0.4 and later supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM. This lets you securely connect to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.
In Fluent Bit version 4.0.4 and later, you can use AWS IAM authentication for Amazon MSK clusters. This lets you use your AWS credentials and IAM policies to control access to Kafka topics.

### Build requirements
### Prerequisites

Comment thread
alexakreizinger marked this conversation as resolved.
If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:
To use AWS MSK IAM authentication, you must meet these requirements:

- The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment.

### Runtime requirements

- **Network Access:** Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).
- **AWS Credentials:** Provide these AWS credentials using any supported AWS method. These credentials are discovered by default when `aws_msk_iam` flag is enabled.
- IAM roles (recommended for EC2, ECS, or EKS)
- Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
- AWS credentials file (`~/.aws/credentials`)
- Instance metadata service (IMDS)
- **IAM Permissions:** The credentials must allow access to the target MSK cluster, as shown in the following example policy.
- You must have access to an AWS MSK cluster with IAM authentication enabled.
- You must have valid AWS credentials (IAM role, access keys, or instance profile).
- You must have network connectivity to your MSK brokers.

### Configuration parameters [#config-aws]

| Property | Description | Required |
| -------- | ----------- | -------- |
| `aws_msk_iam` | If `true`, enables AWS MSK IAM authentication. Possible values: `true`, `false`. | `false` |
| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction. This value is required if `aws_msk_iam` is `true`. | _none_ |
| Property | Description | Default |
| -------- | ----------- | ------- |
| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` to enable MSK IAM authentication. | _none_ |
| `aws_region` | The name of your AWS region. This value is optional. If you don't set a value, but MSK IAM authentication is enabled, Fluent Bit detects your AWS region from the broker hostname for standard MSK endpoints. | _none_ |

### Configuration example
### Basic configuration

For most use cases, the only necessary configuration step is to set `rdkafka.sasl.mechanism` to `aws_msk_iam`:

```yaml
pipeline:
inputs:
- name: kafka
brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098
brokers: boot-abc123.c1.kafka-serverless.us-east-1.amazonaws.com:9098
topics: my-topic
aws_msk_iam: true
aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3
rdkafka.sasl.mechanism: aws_msk_iam
```

outputs:
- name: stdout
match: '*'
The AWS region is automatically detected from the broker hostname for standard MSK endpoints.

Comment thread
coderabbitai[bot] marked this conversation as resolved.
{% hint style="info" %}

When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually.

{% endhint %}

### Using custom DNS or PrivateLink

If you're using custom DNS names or PrivateLink aliases, specify the `aws_region` parameter:

```yaml
pipeline:
inputs:
- name: kafka
brokers: my-kafka-endpoint.example.com:9098
topics: my-topic
rdkafka.sasl.mechanism: aws_msk_iam
aws_region: us-east-1
```
Comment thread
kalavt marked this conversation as resolved.

### Example AWS IAM policy
### AWS credentials

Fluent Bit uses the standard AWS credentials chain to authenticate:

1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
1. AWS credentials file (`~/.aws/credentials`)
1. IAM instance profile (recommended for EC2)
1. IAM task role (recommended for ECS)
1. IAM service account (recommended for EKS)

### Required IAM permissions
Comment thread
alexakreizinger marked this conversation as resolved.

{% hint style="info" %}

IAM policies and permissions can be complex and might vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, consult your AWS administrator or an AWS expert who is familiar with MSK and IAM security.
For detailed IAM policy configuration, consult your AWS administrator or refer to the [AWS MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html).

{% endhint %}

The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy:
Your AWS credentials need the following permissions to consume from MSK topics:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kafka-cluster:*",
"kafka-cluster:DescribeCluster",
"kafka-cluster:ReadData",
"kafka-cluster:Connect",
"kafka-cluster:DescribeTopic",
"kafka-cluster:Connect"
"kafka-cluster:ReadData",
"kafka-cluster:DescribeGroup",
"kafka-cluster:AlterGroup"
],
"Resource": "*"
"Resource": [
"arn:aws:kafka:REGION:ACCOUNT:cluster/CLUSTER_NAME/CLUSTER_UUID",
"arn:aws:kafka:REGION:ACCOUNT:topic/CLUSTER_NAME/CLUSTER_UUID/my-topic",
"arn:aws:kafka:REGION:ACCOUNT:group/CLUSTER_NAME/CLUSTER_UUID/fluent-bit"
]
}
]
}
```

Replace `REGION`, `ACCOUNT`, `CLUSTER_NAME`, `CLUSTER_UUID`, and topic and group names with your actual values.

{% hint style="info" %}

The `CLUSTER_UUID` segment is required in all topic and group ARNs. You can find your cluster's UUID in the MSK console or by describing the cluster with the AWS CLI.

{% endhint %}

153 changes: 116 additions & 37 deletions pipeline/outputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The _Kafka Producer_ output plugin lets you ingest your records into an [Apache Kafka](https://kafka.apache.org/) service. This plugin uses the official [librdkafka C library](https://github.com/confluentinc/librdkafka).

In Fluent Bit 4.0.4 and later, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access.
In Fluent Bit 4.0.4 and later, the Kafka output plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access.

## Configuration parameters

Expand Down Expand Up @@ -248,82 +248,161 @@ pipeline:

## AWS MSK IAM authentication

Fluent Bit 4.0.4 and later supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM for the Kafka output plugin. This lets you securely send data to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.
In Fluent Bit version 4.0.4 and later, you can use AWS IAM authentication for Amazon MSK clusters. This lets you use your AWS credentials and IAM policies to control access to Kafka topics.

### Prerequisites

If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:
To use AWS MSK IAM authentication, you must meet these requirements:

- Build Requirements
- You must have access to an AWS MSK cluster with IAM authentication enabled.
- You must have valid AWS credentials (IAM role, access keys, or instance profile).
- You must have network connectivity to your MSK brokers.

### Configuration parameters

| Platform | Requirements |
|----------|-------------|
| **Linux/macOS** | The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment. |
| **Windows** | No additional SASL libraries required. Windows uses the built-in Security Support Provider Interface (SSPI) for SASL authentication, which only requires OpenSSL/TLS to be enabled. |


- Runtime Requirements:
| Property | Description | Default |
| -------- | ----------- | ------- |
| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` to enable MSK IAM authentication. | _none_ |
| `aws_region` | The name of your AWS region. This value is optional. If you don't set a value, but MSK IAM authentication is enabled, Fluent Bit detects your AWS region from the broker hostname for standard MSK endpoints. | _none_ |

### Basic configuration

For most use cases, the only necessary configuration step is to set `rdkafka.sasl.mechanism` to `aws_msk_iam`:

{% tabs %}
{% tab title="fluent-bit.yaml" %}

```yaml
pipeline:
inputs:
- name: cpu

outputs:
- name: kafka
match: '*'
brokers: b-1.mycluster.kafka.us-east-1.amazonaws.com:9098
topics: my-topic
rdkafka.sasl.mechanism: aws_msk_iam
```

- Network Access: Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).
- AWS Credentials: Provide credentials using any supported AWS method:
- IAM roles (recommended for EC2, ECS, or EKS)
- Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
- AWS credentials file (`~/.aws/credentials`)
- Instance metadata service (IMDS)
{% endtab %}
{% tab title="fluent-bit.conf" %}

These credentials are discovered by default when `aws_msk_iam` flag is enabled.
```text
[INPUT]
Name cpu

- IAM Permissions: The credentials must allow access to the target MSK cluster.
[OUTPUT]
Name kafka
Match *
Brokers b-1.mycluster.kafka.us-east-1.amazonaws.com:9098
Topics my-topic
rdkafka.sasl.mechanism aws_msk_iam

### AWS MSK IAM configuration parameters
{% endtab %}
{% endtabs %}

See `aws_msk_iam` and `aws_msk_iam_cluster_arn` in the [configuration parameters](#configuration-parameters) table.

### Configuration example
The AWS region is automatically detected from the broker hostname for standard MSK endpoints.

{% hint style="info" %}

When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually.

Comment on lines 311 to 318
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Parameter references are inconsistent with the rewritten configuration model.

Line 311 points to aws_msk_iam/aws_msk_iam_cluster_arn, but this section’s parameter table and examples are centered on rdkafka.sasl.mechanism and aws_region. Update this sentence (and the note wording) to use the same parameter model consistently.

Proposed fix
-See `aws_msk_iam` and `aws_msk_iam_cluster_arn` in the [configuration parameters](`#configuration-parameters`) table.
+See `rdkafka.sasl.mechanism` and `aws_region` in [Configuration parameters](`#configuration-parameters`).

 ...

-When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually.
+When `rdkafka.sasl.mechanism` is set to `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pipeline/outputs/kafka.md` around lines 311 - 318, The paragraph and
informational note still reference the old parameters aws_msk_iam and
aws_msk_iam_cluster_arn; change both to the rewritten model by referring to
rdkafka.sasl.mechanism and aws_region (and update the anchor/mention
accordingly), and reword the hint so it says that when rdkafka.sasl.mechanism is
set to the MSK/IAM value the system automatically sets rdkafka.security.protocol
to SASL_SSL (so users don’t need to set it manually). Ensure the wording and any
example references match the rest of the section’s parameter names
(rdkafka.sasl.mechanism, aws_region) for consistency.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kalavt this would need to be addressed to pass the review please?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

{% endhint %}

Comment thread
kalavt marked this conversation as resolved.
### Using custom DNS or PrivateLink

If you're using custom DNS names or PrivateLink aliases, specify the `aws_region` parameter:

{% tabs %}
{% tab title="fluent-bit.yaml" %}

```yaml
pipeline:
inputs:
- name: random
- name: cpu

outputs:
- name: kafka
match: '*'
brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098
brokers: my-kafka-endpoint.example.com:9098
topics: my-topic
aws_msk_iam: true
aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3
rdkafka.sasl.mechanism: aws_msk_iam
aws_region: us-east-1
```

{% endtab %}
{% tab title="fluent-bit.conf" %}

```text
[INPUT]
Name cpu

[OUTPUT]
Name kafka
Match *
Brokers my-kafka-endpoint.example.com:9098
Topics my-topic
rdkafka.sasl.mechanism aws_msk_iam
aws_region us-east-1
```

{% endtab %}
{% endtabs %}
Comment thread
kalavt marked this conversation as resolved.

Comment thread
kalavt marked this conversation as resolved.
### AWS IAM policy
### AWS credentials

Fluent Bit uses the standard AWS credentials chain to authenticate:

1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
1. AWS credentials file (`~/.aws/credentials`)
1. IAM instance profile (recommended for EC2)
1. IAM task role (recommended for ECS)
1. IAM service account (recommended for EKS)

IAM policies and permissions can be complex and can vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security.
### Required IAM permissions
Comment thread
alexakreizinger marked this conversation as resolved.

The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy:
{% hint style="info" %}

For detailed IAM policy configuration, consult your AWS administrator or refer to the [AWS MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html).

{% endhint %}

Your AWS credentials need the following permissions to produce to MSK topics:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kafka-cluster:*",
"kafka-cluster:DescribeCluster",
"kafka-cluster:ReadData",
"kafka-cluster:DescribeTopic",
"kafka-cluster:Connect"
],
"Resource": "*"
}
]
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:DescribeTopic",
"kafka-cluster:WriteData"
],
"Resource": [
"arn:aws:kafka:REGION:ACCOUNT:cluster/CLUSTER_NAME/CLUSTER_UUID",
"arn:aws:kafka:REGION:ACCOUNT:topic/CLUSTER_NAME/CLUSTER_UUID/my-topic"
]
}
]
}
```

Replace `REGION`, `ACCOUNT`, `CLUSTER_NAME`, `CLUSTER_UUID`, and topic name with your actual values.

{% hint style="info" %}

The `CLUSTER_UUID` segment is required in all topic and group ARNs. You can find your cluster's UUID in the MSK console or by describing the cluster with the AWS CLI.

{% endhint %}