From 6c6d0479364a6a98d89f8e7499dca0fdf2db5a9f Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Thu, 3 Jul 2025 15:58:45 +0200 Subject: [PATCH] Add examples for iceberg connector together with sts assume role --- solutions/iceberg-sink-connector/README.md | 164 +++++ solutions/iceberg-sink-connector/main.tf | 385 +++++++++++ .../scripts/requirements.txt | 3 + .../scripts/test_iceberg_connector.py | 618 ++++++++++++++++++ .../terraform.tfvars.example | 98 +++ 5 files changed, 1268 insertions(+) create mode 100644 solutions/iceberg-sink-connector/README.md create mode 100644 solutions/iceberg-sink-connector/main.tf create mode 100644 solutions/iceberg-sink-connector/scripts/requirements.txt create mode 100755 solutions/iceberg-sink-connector/scripts/test_iceberg_connector.py create mode 100644 solutions/iceberg-sink-connector/terraform.tfvars.example diff --git a/solutions/iceberg-sink-connector/README.md b/solutions/iceberg-sink-connector/README.md new file mode 100644 index 0000000..2f5e597 --- /dev/null +++ b/solutions/iceberg-sink-connector/README.md @@ -0,0 +1,164 @@ +# Iceberg Sink Connector with Aiven 🦀 + +This example demonstrates how to set up an Apache Iceberg sink connector using Aiven services (Kafka, Kafka Connect, PostgreSQL) to write data to Amazon S3. + +## Architecture + +- **Kafka**: Message streaming platform +- **Kafka Connect**: Connector framework +- **PostgreSQL**: Iceberg catalog metadata storage +- **S3**: Iceberg data warehouse storage +- **DuckDB**: Query engine for testing + +## Prerequisites + +- Aiven account and API token +- AWS account with S3 bucket +- Python 3.8+ with pip +- Terraform + +## Quick Start + +### 1. Configure Environment + +Copy the example configuration: +```bash +cp terraform.tfvars.example terraform.tfvars +``` + +### 2. Set Up AWS Access + +Choose one of these methods: + +#### Option A: IAM Assume Role (Recommended) +This is the most secure option as you don't expose any AWS credentials: + +- **No AWS credentials exposed** in configuration files +- **Aiven's dedicated IAM user** assumes your role using temporary credentials +- **Automatic credential rotation** - credentials expire and are refreshed automatically +- **Full audit trail** - all S3 access is logged in AWS CloudTrail with session names + +**How it works:** + +1. **Contact Aiven Support**: Email `support@aiven.io` to request your unique IAM user and External ID +2. **Create Cross-Account Role**: In your AWS Console, go to IAM → Roles → Create Role +3. **Select Trusted Entity**: Choose "Another AWS account" and enter Aiven's account ID +4. **Set External ID**: Paste the External ID provided by Aiven support +5. **Add S3 Permissions**: Attach policies for `s3:GetObject`, `s3:PutObject`, `s3:DeleteObject`, `s3:ListBucket`, `s3:AbortMultipartUpload`, `s3:ListMultipartUploadParts`, `s3:ListBucketMultipartUploads` + +**Example IAM Policy:** +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:PutObject", + "s3:DeleteObject", + "s3:ListBucket", + "s3:AbortMultipartUpload", + "s3:ListMultipartUploadParts", + "s3:ListBucketMultipartUploads" + ], + "Resource": [ + "arn:aws:s3:::your-bucket-name/*", + "arn:aws:s3:::your-bucket-name" + ] + } + ] +} +``` +6. **Note the Role ARN**: Copy the role ARN (e.g., `arn:aws:iam::123456789012:role/iceberg-s3-role`) + +For more information, see the [Aiven documentation](https://aiven.io/docs/products/kafka/kafka-connect/howto/s3-iam-assume-role). + +#### Option B: Direct AWS Credentials +For development/testing, you can use direct AWS credentials: + +```bash +export AWS_ACCESS_KEY_ID="your-access-key" +export AWS_SECRET_ACCESS_KEY="your-secret-key" +``` + +**Security Note**: This method stores credentials in your connector configuration. For production, use IAM assume role or secret providers. See the [Aiven documentation](https://aiven.io/docs/products/kafka/kafka-connect/howto/configure-secret-providers) for setup instructions. + + +### 3. Configure Aiven Access + +Get your Aiven API token from the [Aiven Console](https://console.aiven.io/account/tokens) and set it: + +```bash +export AIVEN_API_TOKEN="your-aiven-token" +``` + +### 4. Update Configuration + +Edit `terraform.tfvars` with your specific values: + +```hcl +# Required: Your Aiven project name +project_name = "your-aiven-project" + +# Required: Your S3 bucket name (must exist) +s3_bucket_name = "your-s3-bucket" + +# Required: AWS region (must match your S3 bucket region) +aws_region = "eu-west-1" + +# Optional: IAM role ARN (if using assume role) +# Copy the role ARN from step 6 above and paste it here: +aws_role_arn = "arn:aws:iam::123456789012:role/iceberg-s3-role" + +# Optional: Customize service names and settings +service_name_prefix = "iceberg-sink" +topic_name = "test_iceberg_topic" +table_name = "systest-table" +``` + +### 5. Deploy Infrastructure + +Initialize and deploy the infrastructure: + +```bash +# Initialize Terraform and download providers +terraform init + +# Review what will be created +terraform plan + +# Apply the configuration (this will create Aiven services) +terraform apply +``` + +**Important**: This creates multiple Aiven services that will incur costs. Review the plan carefully before applying. + +### 6. Install Python Dependencies + +Install the required Python packages for testing: + +```bash +pip install -r ./scripts/requirements.txt +``` + +### 7. Run Test + +Execute the test script to validate the setup: + +```bash +python ./scripts/test_iceberg_connector.py +``` + +The test will: +- Send test messages to Kafka +- Wait for the Iceberg sink connector to process them +- Query the data using DuckDB +- Display results and timing information + + +## Cleanup + +```bash +terraform destroy +``` diff --git a/solutions/iceberg-sink-connector/main.tf b/solutions/iceberg-sink-connector/main.tf new file mode 100644 index 0000000..7a1846d --- /dev/null +++ b/solutions/iceberg-sink-connector/main.tf @@ -0,0 +1,385 @@ +terraform { + required_version = ">= 1.0" + + required_providers { + aiven = { + source = "aiven/aiven" + version = "~> 4.0" + } + env = { + source = "tchupp/env" + version = "0.0.2" + } + null = { + source = "hashicorp/null" + version = "~> 3.0" + } + } +} + +# Configure the Aiven Provider +provider "aiven" { + api_token = local.aiven_api_token +} + +# Variables +variable "aiven_api_token" { + description = "Aiven API token. Automatically populated from AIVEN_API_TOKEN environment variable by tchupp/env provider, or can be set directly in terraform.tfvars" + type = string + default = "" + sensitive = true +} + +variable "project_name" { + description = "Aiven project name where services will be created" + type = string +} + +variable "cloud_name" { + description = "Cloud name for the services (e.g., google-europe-west1, aws-us-east-1)" + type = string + default = "google-europe-west1" +} + +variable "plan_name" { + description = "Plan name for the services (e.g., startup-4, business-8)" + type = string + default = "startup-4" +} + + + +variable "service_name_prefix" { + description = "Prefix for service names to ensure uniqueness" + type = string + default = "iceberg-sink" +} + +variable "aws_region" { + description = "AWS region for S3 operations (must match your S3 bucket region)" + type = string + default = "eu-west-1" +} + +variable "s3_bucket_name" { + description = "S3 bucket name for Iceberg warehouse storage" + type = string + default = "aiventest-systest" +} + +variable "aws_access_key_id" { + description = "AWS access key ID. Automatically populated from AWS_ACCESS_KEY_ID environment variable by tchupp/env provider, or can be set directly in terraform.tfvars" + type = string + default = "" + sensitive = true +} + +variable "aws_secret_access_key" { + description = "AWS secret access key. Automatically populated from AWS_SECRET_ACCESS_KEY environment variable by tchupp/env provider, or can be set directly in terraform.tfvars" + type = string + default = "" + sensitive = true +} + +variable "aws_role_arn" { + description = "AWS role ARN to assume for S3 access (optional)" + type = string + default = "" +} + +variable "topic_name" { + description = "Kafka topic name for the Iceberg sink connector" + type = string + default = "test_iceberg_topic" +} + +variable "table_name" { + description = "Iceberg table name that will be created automatically" + type = string + default = "systest-table" +} + +variable "s3_prefix" { + description = "S3 prefix for Iceberg warehouse organization" + type = string + default = "connect-iceberg-sink" +} + +# Data sources +data "aiven_project" "project" { + project = var.project_name +} + +# Data sources for reading environment variables using tchupp/env provider +data "env_variable" "aiven_api_token" { + name = "AIVEN_API_TOKEN" +} + +data "env_variable" "aws_access_key_id" { + name = "AWS_ACCESS_KEY_ID" +} + +data "env_variable" "aws_secret_access_key" { + name = "AWS_SECRET_ACCESS_KEY" +} + +# Local values for credentials with environment variable fallbacks +locals { + aiven_api_token = var.aiven_api_token != "" ? var.aiven_api_token : (data.env_variable.aiven_api_token.value != "" ? data.env_variable.aiven_api_token.value : null) + aws_access_key_id = var.aws_access_key_id != "" ? var.aws_access_key_id : (data.env_variable.aws_access_key_id.value != "" ? data.env_variable.aws_access_key_id.value : null) + aws_secret_access_key = var.aws_secret_access_key != "" ? var.aws_secret_access_key : (data.env_variable.aws_secret_access_key.value != "" ? data.env_variable.aws_secret_access_key.value : null) +} + +# Kafka service (without Kafka Connect) +resource "aiven_kafka" "kafka" { + project = var.project_name + cloud_name = var.cloud_name + plan = var.plan_name + service_name = "${var.service_name_prefix}-kafka" + + kafka_user_config { + kafka_rest = true + kafka_version = "3.9" + kafka { + auto_create_topics_enable = true + } + } +} + +# Dedicated Kafka Connect service +resource "aiven_kafka_connect" "kafka_connect" { + project = var.project_name + cloud_name = var.cloud_name + plan = var.plan_name + service_name = "${var.service_name_prefix}-kafka-connect" +} + +# Service integration to connect Kafka Connect to Kafka +resource "aiven_service_integration" "kafka_connect_kafka" { + project = var.project_name + integration_type = "kafka_connect" + source_service_name = aiven_kafka.kafka.service_name + destination_service_name = aiven_kafka_connect.kafka_connect.service_name +} + +# PostgreSQL service for Iceberg JDBC catalog +resource "aiven_pg" "postgres" { + project = var.project_name + cloud_name = var.cloud_name + plan = var.plan_name + service_name = "${var.service_name_prefix}-postgres" + + pg_user_config { + pg_version = "15" + } +} + + + +# Kafka topic +resource "aiven_kafka_topic" "iceberg_topic" { + project = var.project_name + service_name = aiven_kafka.kafka.service_name + topic_name = var.topic_name + partitions = 3 + replication = 2 +} + +# Database user for PostgreSQL +resource "aiven_pg_user" "iceberg_user" { + project = var.project_name + service_name = aiven_pg.postgres.service_name + username = "iceberg_user" +} + +# PostgreSQL database for Iceberg catalog +resource "aiven_pg_database" "iceberg_db" { + project = var.project_name + service_name = aiven_pg.postgres.service_name + database_name = "iceberg_catalog" + + depends_on = [ + aiven_pg_user.iceberg_user + ] +} + +# Grant permissions to the Iceberg user +resource "null_resource" "iceberg_user_permissions" { + triggers = { + user_id = aiven_pg_user.iceberg_user.id + database_id = aiven_pg_database.iceberg_db.id + } + + provisioner "local-exec" { + command = <<-EOT + PGPASSWORD='${aiven_pg.postgres.service_password}' psql -h ${aiven_pg.postgres.service_host} -p ${aiven_pg.postgres.service_port} -U avnadmin -d ${aiven_pg_database.iceberg_db.database_name} -c " + GRANT ALL PRIVILEGES ON DATABASE ${aiven_pg_database.iceberg_db.database_name} TO ${aiven_pg_user.iceberg_user.username}; + GRANT ALL PRIVILEGES ON SCHEMA public TO ${aiven_pg_user.iceberg_user.username}; + GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ${aiven_pg_user.iceberg_user.username}; + GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ${aiven_pg_user.iceberg_user.username}; + ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO ${aiven_pg_user.iceberg_user.username}; + ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO ${aiven_pg_user.iceberg_user.username}; + " + EOT + } + + depends_on = [ + aiven_pg_database.iceberg_db, + aiven_pg_user.iceberg_user + ] +} + +# Kafka Connect connector for Iceberg sink +resource "aiven_kafka_connector" "iceberg_sink" { + project = var.project_name + service_name = aiven_kafka_connect.kafka_connect.service_name + connector_name = "iceberg-sink-connector" + + config = merge({ + "name" = "iceberg-sink-connector" + "connector.class" = "org.apache.iceberg.connect.IcebergSinkConnector" + "topics" = var.topic_name + "iceberg.tables" = var.table_name + "iceberg.tables.auto-create-enabled" = "true" + "iceberg.tables.evolve-schema-enabled" = "true" + "iceberg.control.commit.interval-ms" = "15000" + "flush.size" : "10" + "flush.interval.ms" = "1000" + "iceberg.control.commit.timeout-ms" = "5000" + "tasks.max" = "1" + "key.converter" = "org.apache.kafka.connect.json.JsonConverter" + "value.converter" = "org.apache.kafka.connect.json.JsonConverter" + "consumer.override.auto.offset.reset" = "earliest" + "iceberg.catalog.client.region" = var.aws_region + "iceberg.catalog.io-impl" = "org.apache.iceberg.aws.s3.S3FileIO" + "iceberg.catalog.jdbc.useSSL" = "true" + "iceberg.catalog.jdbc.verifyServerCertificate" = "true" + "iceberg.catalog.s3.path-style-access" = "true" + "iceberg.catalog.type" = "jdbc" + "iceberg.catalog.uri" = "jdbc:postgresql://${aiven_pg.postgres.service_host}:${aiven_pg.postgres.service_port}/${aiven_pg_database.iceberg_db.database_name}?user=${aiven_pg_user.iceberg_user.username}&password=${aiven_pg_user.iceberg_user.password}&ssl=require" + "iceberg.catalog.warehouse" = "s3://${var.s3_bucket_name}/${var.s3_prefix}/" + "iceberg.kafka.auto.offset.reset" = "earliest" + "iceberg.kafka.bootstrap.servers" = aiven_kafka.kafka.service_uri + "iceberg.kafka.security.protocol" = "SSL" + "iceberg.kafka.ssl.key.password" = "password" + "iceberg.kafka.ssl.keystore.location" = "/run/aiven/keys/public.keystore.p12" + "iceberg.kafka.ssl.keystore.password" = "password" + "iceberg.kafka.ssl.keystore.type" = "PKCS12" + "iceberg.kafka.ssl.truststore.location" = "/run/aiven/keys/public.truststore.jks" + "iceberg.kafka.ssl.truststore.password" = "password" + "key.converter.schemas.enable" = "false" + "value.converter.schemas.enable" = "false" + }, var.aws_role_arn != "" ? { + "iceberg.catalog.client.assume-role.arn" = var.aws_role_arn + "iceberg.catalog.client.assume-role.region" = var.aws_region + "iceberg.catalog.client.factory" = "org.apache.iceberg.aws.AssumeRoleAwsClientFactory" + } : local.aws_access_key_id != null ? { + "iceberg.catalog.s3.access-key-id" = local.aws_access_key_id + "iceberg.catalog.s3.secret-access-key" = local.aws_secret_access_key + } : {}) + + depends_on = [ + aiven_kafka_topic.iceberg_topic, + aiven_pg_user.iceberg_user, + aiven_pg_database.iceberg_db, + null_resource.iceberg_user_permissions, + aiven_service_integration.kafka_connect_kafka + ] +} + +# Outputs +output "kafka_service_name" { + description = "Name of the Kafka service" + value = aiven_kafka.kafka.service_name +} + +output "kafka_service_uri" { + description = "URI of the Kafka service" + value = aiven_kafka.kafka.service_uri + sensitive = true +} + +output "kafka_service_password" { + description = "Password of the Kafka service" + value = aiven_kafka.kafka.service_password + sensitive = true +} + +output "kafka_rest_uri" { + description = "REST API URI of the Kafka service" + value = aiven_kafka.kafka.kafka[0].rest_uri + sensitive = true +} + +output "kafka_connect_service_name" { + description = "Name of the Kafka Connect service" + value = aiven_kafka_connect.kafka_connect.service_name +} + +output "kafka_connect_uri" { + description = "URI of the Kafka Connect service" + value = aiven_kafka_connect.kafka_connect.service_uri + sensitive = true +} + +output "postgres_service_name" { + description = "Name of the PostgreSQL service" + value = aiven_pg.postgres.service_name +} + +output "postgres_service_uri" { + description = "URI of the PostgreSQL service" + value = aiven_pg.postgres.service_uri + sensitive = true +} + +output "postgres_host" { + description = "Host of the PostgreSQL service" + value = aiven_pg.postgres.service_host +} + +output "postgres_port" { + description = "Port of the PostgreSQL service" + value = aiven_pg.postgres.service_port +} + +output "topic_name" { + description = "Name of the Kafka topic" + value = aiven_kafka_topic.iceberg_topic.topic_name +} + +output "connector_name" { + description = "Name of the Kafka Connect connector" + value = aiven_kafka_connector.iceberg_sink.connector_name +} + +output "iceberg_user_username" { + description = "Username of the PostgreSQL user for Iceberg" + value = aiven_pg_user.iceberg_user.username +} + +output "iceberg_user_password" { + description = "Password of the PostgreSQL user for Iceberg" + value = aiven_pg_user.iceberg_user.password + sensitive = true +} + +output "project_id" { + description = "Aiven project ID" + value = data.aiven_project.project.id +} + +output "s3_warehouse_path" { + description = "S3 warehouse path for Iceberg" + value = "s3://${var.s3_bucket_name}/${var.s3_prefix}/" +} + +output "table_name" { + description = "Iceberg table name" + value = var.table_name +} + +output "aws_region" { + description = "AWS region for S3 operations" + value = var.aws_region +} diff --git a/solutions/iceberg-sink-connector/scripts/requirements.txt b/solutions/iceberg-sink-connector/scripts/requirements.txt new file mode 100644 index 0000000..f384c26 --- /dev/null +++ b/solutions/iceberg-sink-connector/scripts/requirements.txt @@ -0,0 +1,3 @@ +duckdb>=0.9.2 +requests>=2.25.0 +urllib3>=1.26.0 diff --git a/solutions/iceberg-sink-connector/scripts/test_iceberg_connector.py b/solutions/iceberg-sink-connector/scripts/test_iceberg_connector.py new file mode 100755 index 0000000..4445b00 --- /dev/null +++ b/solutions/iceberg-sink-connector/scripts/test_iceberg_connector.py @@ -0,0 +1,618 @@ +#!/usr/bin/env python3 +""" +Iceberg Sink Connector Test Script (Terraform Integration) + +This script tests the Iceberg sink connector by: +1. Reading configuration from Terraform outputs +2. Producing test messages to Kafka +3. Waiting for the Iceberg connector to process them +4. Querying the data from DuckDB + +Usage: + python test_iceberg_terraform.py + python test_iceberg_terraform.py --verbose +""" + +import argparse +import json +import os +from pathlib import Path +import subprocess +import sys +import time +import requests +import urllib3 +import duckdb +import uuid +from datetime import datetime, timezone +from typing import Dict, Optional, Any + +# Suppress SSL warnings for self-signed certificates +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + +class Colors: + """ANSI color codes for terminal output.""" + + RED = "\033[0;31m" + GREEN = "\033[0;32m" + YELLOW = "\033[1;33m" + BLUE = "\033[0;34m" + NC = "\033[0m" # No Color + + +class TerraformIcebergTest: + """Main class for testing Iceberg sink connector with Terraform integration.""" + + def __init__(self, verbose: bool = False): + self.verbose = verbose + self.config = {} + self.run_id = str(uuid.uuid4()) + self.session_headers = { + "Content-Type": "application/json", + "User-Agent": "IcebergTest/1.0", + } + + def print_status(self, message: str) -> None: + """Print a status message.""" + print(f"{Colors.BLUE}[INFO]{Colors.NC} {message}") + + def print_success(self, message: str) -> None: + """Print a success message.""" + print(f"{Colors.GREEN}[SUCCESS]{Colors.NC} {message}") + + def print_warning(self, message: str) -> None: + """Print a warning message.""" + print(f"{Colors.YELLOW}[WARNING]{Colors.NC} {message}") + + def print_error(self, message: str) -> None: + """Print an error message.""" + print(f"{Colors.RED}[ERROR]{Colors.NC} {message}") + + def run_terraform_output_all(self) -> Dict[str, Any]: + """Run terraform output command and return all outputs as a dictionary with sensitive info.""" + try: + # Get the parent directory where Terraform files are located + terraform_dir = Path(os.path.dirname(os.path.abspath(__file__))).parent + + result = subprocess.run( + ["terraform", "output", "-json"], + capture_output=True, + text=True, + check=True, + cwd=terraform_dir, + ) + outputs = json.loads(result.stdout) + + # Return the full output structure to preserve sensitive information + return outputs + except subprocess.CalledProcessError as e: + self.print_error(f"Failed to get Terraform outputs: {e}") + if self.verbose: + print(f"STDOUT: {e.stdout}") + print(f"STDERR: {e.stderr}") + raise + except json.JSONDecodeError as e: + self.print_error(f"Failed to parse Terraform outputs JSON: {e}") + raise + + def load_config_from_terraform(self) -> Dict[str, str]: + """Load configuration from Terraform outputs.""" + self.print_status("Loading configuration from Terraform outputs...") + + try: + # Get all outputs in one command + terraform_outputs = self.run_terraform_output_all() + + # Get AWS credentials from environment + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + + if not aws_access_key_id or not aws_secret_access_key: + self.print_warning("AWS credentials not found in environment variables") + self.print_warning( + "Please set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY" + ) + + config = { + "KAFKA_URI": terraform_outputs.get("kafka_rest_uri", {}).get( + "value", "" + ), + "KAFKA_CONNECT_URI": terraform_outputs.get("kafka_connect_uri", {}).get( + "value", "" + ), + "S3_PATH": terraform_outputs.get("s3_warehouse_path", {}).get( + "value", "" + ), + "TOPIC_NAME": terraform_outputs.get("topic_name", {}).get("value", ""), + "CONNECTOR_NAME": terraform_outputs.get("connector_name", {}).get( + "value", "" + ), + "TABLE_NAME": terraform_outputs.get("table_name", {}).get("value", ""), + "AWS_REGION": terraform_outputs.get("aws_region", {}).get("value", ""), + "AWS_ACCESS_KEY_ID": aws_access_key_id or "", + "AWS_SECRET_ACCESS_KEY": aws_secret_access_key or "", + "NUM_MESSAGES": "10", + } + + # Validate that we have the required outputs + missing_outputs = [ + key + for key, value in config.items() + if key + not in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "NUM_MESSAGES"] + and not value + ] + + if missing_outputs: + raise ValueError( + f"Missing required Terraform outputs: {', '.join(missing_outputs)}" + ) + + self.print_success("Configuration loaded successfully") + if self.verbose: + print("Terraform outputs:") + for key, output_data in terraform_outputs.items(): + value = output_data.get("value", "") + is_sensitive = output_data.get("sensitive", False) + print( + f" {key}: {self._redact_sensitive_value(value, is_sensitive)}" + ) + print("\nProcessed configuration:") + for key, value in config.items(): + is_sensitive = "SECRET" in key or "PASSWORD" in key + print( + f" {key}: {self._redact_sensitive_value(value, is_sensitive)}" + ) + + return config + + except Exception as e: + self.print_error(f"Failed to load configuration: {e}") + raise + + def _redact_sensitive_value( + self, value: str, is_sensitive_key: bool = False + ) -> str: + """Redact sensitive parts of a value while preserving the structure.""" + if not isinstance(value, str): + return value + + # If the key itself is sensitive, replace entire value with asterisks + if is_sensitive_key: + return "*" * len(value) + + # Otherwise, replace any Aiven password occurrences within the value + import re + + # Pattern to match Aiven passwords (AVNS_ followed by alphanumeric characters and underscores) + aiven_password_pattern = r"AVNS_[a-zA-Z0-9_]+" + + # Replace all Aiven password occurrences with asterisks + redacted_value = re.sub( + aiven_password_pattern, lambda m: "*" * len(m.group()), value + ) + + return redacted_value + + def make_request( + self, + url: str, + method: str = "GET", + data: Optional[Dict] = None, + headers: Optional[Dict] = None, + ) -> Dict: + """Make HTTP request using requests.""" + request_headers = self.session_headers.copy() + if headers: + request_headers.update(headers) + + if self.verbose: + redacted_url = self._redact_sensitive_value(url, False) + self.print_status(f"Making {method} request to: {redacted_url}") + + try: + # Disable SSL verification for self-signed certificates + if method.upper() == "GET": + response = requests.get( + url, headers=request_headers, timeout=30, verify=False + ) + elif method.upper() == "POST": + response = requests.post( + url, headers=request_headers, json=data, timeout=30, verify=False + ) + else: + raise ValueError(f"Unsupported HTTP method: {method}") + + response.raise_for_status() + + if response.text: + return response.json() + return {} + + except requests.exceptions.HTTPError as e: + if self.verbose: + self.print_error( + f"HTTP Error {e.response.status_code}: {e.response.text}" + ) + raise + except requests.exceptions.RequestException as e: + self.print_error(f"Request Error: {e}") + raise + except json.JSONDecodeError as e: + self.print_error(f"JSON Decode Error: {e}") + raise + + def wait_for_service(self, service_uri: str, max_attempts: int = 30) -> bool: + """Wait for a service to be ready.""" + redacted_uri = self._redact_sensitive_value(service_uri, False) + self.print_status(f"Waiting for service to be ready: {redacted_uri}") + + for attempt in range(1, max_attempts + 1): + try: + # For other services, try to parse JSON response + self.make_request(service_uri) + self.print_success("Service is ready!") + return True + except ( + requests.exceptions.HTTPError, + requests.exceptions.RequestException, + json.JSONDecodeError, + ): + pass + + self.print_status( + f"Attempt {attempt}/{max_attempts} - Service not ready yet, waiting..." + ) + time.sleep(10) + + self.print_error( + f"Service failed to become ready after {max_attempts} attempts" + ) + return False + + def check_connector_status(self, connect_uri: str, connector_name: str) -> bool: + """Check if the connector is running.""" + self.print_status(f"Checking connector status: {connector_name}") + + try: + url = f"{connect_uri}/connectors/{connector_name}/status" + data = self.make_request(url) + + state = data.get("connector", {}).get("state") + + if state == "RUNNING": + self.print_success(f"Connector {connector_name} is running") + return True + else: + self.print_error( + f"Connector {connector_name} is not running (status: {state})" + ) + return False + + except Exception as e: + self.print_error(f"Failed to check connector status: {e}") + return False + + def produce_test_messages( + self, kafka_uri: str, topic_name: str, num_messages: int = 10 + ) -> bool: + """Produce test messages to Kafka using REST API.""" + self.print_status( + f"Producing {num_messages} test messages to topic: {topic_name}" + ) + + # Generate test messages + messages = [] + for i in range(1, num_messages + 1): + message = { + "key": f"key-{i}", + "value": { + "test_field": i, + "timestamp": datetime.now(timezone.utc).isoformat(), + "message": f"Test message {i}", + "user_id": f"user_{i % 5}", + "category": f"category_{i % 3}", + "run_id": self.run_id, + }, + } + messages.append(message) + + try: + url = f"{kafka_uri}/topics/{topic_name}" + headers = { + "Content-Type": "application/vnd.kafka.json.v2+json", + "Accept": "application/vnd.kafka.v2+json", + } + + # Send messages in batches + batch_size = 5 + for i in range(0, len(messages), batch_size): + batch = messages[i : i + batch_size] + payload = {"records": batch} + + self.make_request(url, method="POST", data=payload, headers=headers) + self.print_status( + f"Sent batch {i // batch_size + 1}/{(len(messages) + batch_size - 1) // batch_size}" + ) + + self.print_success(f"Produced {num_messages} test messages") + return True + + except Exception as e: + self.print_error(f"Failed to produce messages via REST API: {e}") + self.print_warning("You may need to use kafka-console-producer directly") + return False + + def wait_for_data_in_duckdb( + self, + s3_path: str, + aws_access_key: str, + aws_secret_key: str, + aws_region: str, + expected_count: int = 10, + max_attempts: int = 30, + ) -> bool: + """Wait for data to appear in DuckDB by querying the Iceberg table.""" + self.print_status( + f"Waiting for data to appear in DuckDB (expecting {expected_count} records)... This can take a couple of minutes..." + ) + + # Get table name from config + table_name = self.config.get("TABLE_NAME", "test-table") + + # Configure DuckDB with AWS credentials + con = duckdb.connect(":memory:") + + # Load the Iceberg extension + con.execute("INSTALL iceberg") + con.execute("LOAD iceberg") + + # Enable version guessing for Iceberg tables + con.execute("SET unsafe_enable_version_guessing = true") + + # Set up AWS credentials and region for S3 access + if aws_access_key and aws_secret_key: + con.execute(f"SET s3_access_key_id='{aws_access_key}'") + con.execute(f"SET s3_secret_access_key='{aws_secret_key}'") + if aws_region: + con.execute(f"SET s3_region='{aws_region}'") + + # Extract bucket and prefix from S3 path + s3_path_clean = s3_path.replace("s3://", "").rstrip("/") + if "/" in s3_path_clean: + bucket, prefix = s3_path_clean.split("/", 1) + else: + bucket = s3_path_clean + prefix = "" + + for attempt in range(1, max_attempts + 1): + try: + # Query the Iceberg table using DuckDB's Iceberg support + query = f""" + SELECT COUNT(*) AS count + FROM iceberg_scan('s3://{bucket}/{prefix}/{table_name}', allow_moved_paths = true) + WHERE run_id = '{self.run_id}' + """ + result = con.execute(query).fetchone() + + if result and result[0] >= expected_count: + self.print_success( + f"Found {result[0]} records in DuckDB (expected: {expected_count})" + ) + con.close() + return True + elif result: + self.print_status( + f"Attempt {attempt}/{max_attempts} - Found {result[0]} records, waiting for more..." + ) + else: + self.print_status( + f"Attempt {attempt}/{max_attempts} - No data found yet..." + ) + time.sleep(30) + except Exception as e: + self.print_warning( + f"Attempt {attempt}/{max_attempts} - Error querying DuckDB: {e}" + ) + time.sleep(30) + + con.close() + self.print_error( + f"Failed to find expected data in DuckDB after {max_attempts} attempts" + ) + return False + + def query_duckdb_data( + self, + s3_path: str, + aws_access_key: str, + aws_secret_key: str, + aws_region: str, + limit: int = 10, + ) -> bool: + """Query and display data from DuckDB by reading the Iceberg table.""" + self.print_status("Querying data from DuckDB...") + + # Get table name from config + table_name = self.config.get("TABLE_NAME", "test-table") + + # Configure DuckDB with AWS credentials + con = duckdb.connect(":memory:") + + # Load the Iceberg extension + con.execute("INSTALL iceberg") + con.execute("LOAD iceberg") + + # Enable version guessing for Iceberg tables + con.execute("SET unsafe_enable_version_guessing = true") + + # Set up AWS credentials and region for S3 access + if aws_access_key and aws_secret_key: + con.execute(f"SET s3_access_key_id='{aws_access_key}'") + con.execute(f"SET s3_secret_access_key='{aws_secret_key}'") + if aws_region: + con.execute(f"SET s3_region='{aws_region}'") + + # Extract bucket and prefix from S3 path + s3_path_clean = s3_path.replace("s3://", "").rstrip("/") + if "/" in s3_path_clean: + bucket, prefix = s3_path_clean.split("/", 1) + else: + bucket = s3_path_clean + prefix = "" + + try: + # Query the Iceberg table using DuckDB's Iceberg support + query = f""" + SELECT * + FROM iceberg_scan('s3://{bucket}/{prefix}/{table_name}', allow_moved_paths = true) + WHERE run_id = '{self.run_id}' + LIMIT {limit} + """ + result = con.execute(query).fetchall() + + self.print_success(f"Data from Iceberg table '{table_name}':") + if result: + # Get column names + columns = [desc[0] for desc in con.description] + print(f" Columns: {columns}") + for row in result: + print(f" {row}") + else: + print(" No data found in the table") + + con.close() + return True + except Exception as e: + self.print_error(f"Failed to query DuckDB: {e}") + con.close() + return False + + def show_connector_metrics(self, connect_uri: str, connector_name: str) -> None: + """Display connector metrics.""" + self.print_status(f"Connector metrics for: {connector_name}") + + try: + # Get connector status + status_url = f"{connect_uri}/connectors/{connector_name}/status" + status_data = self.make_request(status_url) + + print("Connector Status:") + print(json.dumps(status_data, indent=2)) + + # Get connector config + config_url = f"{connect_uri}/connectors/{connector_name}/config" + config_data = self.make_request(config_url) + + print("\nConnector Configuration:") + # Hide sensitive information + safe_config = {} + for key, value in config_data.items(): + is_sensitive = any( + sensitive in key.lower() + for sensitive in ["password", "secret", "key"] + ) + safe_config[key] = self._redact_sensitive_value(value, is_sensitive) + print(json.dumps(safe_config, indent=2)) + + except Exception as e: + self.print_error(f"Failed to get connector metrics: {e}") + + def run_workflow(self) -> bool: + """Run the complete test workflow.""" + self.print_status( + f"Starting Iceberg Sink Connector Test Workflow (Run ID: {self.run_id})" + ) + + # Load configuration from Terraform + self.config = self.load_config_from_terraform() + + # Wait for services to be ready + if not self.wait_for_service(self.config["KAFKA_CONNECT_URI"]): + return False + + # Check connector status + if not self.check_connector_status( + self.config["KAFKA_CONNECT_URI"], self.config["CONNECTOR_NAME"] + ): + return False + + # Show initial connector metrics + self.show_connector_metrics( + self.config["KAFKA_CONNECT_URI"], self.config["CONNECTOR_NAME"] + ) + + # Produce test messages + num_messages = int(self.config.get("NUM_MESSAGES", "10")) + if not self.produce_test_messages( + self.config["KAFKA_URI"], self.config["TOPIC_NAME"], num_messages + ): + return False + + # Wait for data to appear in DuckDB + expected_count = int(self.config.get("NUM_MESSAGES", "10")) + if not self.wait_for_data_in_duckdb( + self.config["S3_PATH"], + self.config["AWS_ACCESS_KEY_ID"], + self.config["AWS_SECRET_ACCESS_KEY"], + self.config["AWS_REGION"], + expected_count, + ): + return False + + # Query and display the data + if not self.query_duckdb_data( + self.config["S3_PATH"], + self.config["AWS_ACCESS_KEY_ID"], + self.config["AWS_SECRET_ACCESS_KEY"], + self.config["AWS_REGION"], + ): + return False + + # Show final connector metrics + self.show_connector_metrics( + self.config["KAFKA_CONNECT_URI"], self.config["CONNECTOR_NAME"] + ) + + self.print_success( + "Iceberg Sink Connector Test Workflow completed successfully!" + ) + return True + + +def main(): + """Main function.""" + parser = argparse.ArgumentParser( + description="Test Iceberg Sink Connector with Terraform Integration", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python test_iceberg_terraform.py + python test_iceberg_terraform.py --verbose + """, + ) + + parser.add_argument( + "-v", "--verbose", action="store_true", help="Enable verbose output" + ) + + args = parser.parse_args() + + try: + # Run the workflow + workflow = TerraformIcebergTest(verbose=args.verbose) + success = workflow.run_workflow() + + sys.exit(0 if success else 1) + + except KeyboardInterrupt: + print(f"\n{Colors.YELLOW}[WARNING]{Colors.NC} Test interrupted by user") + sys.exit(1) + except Exception as e: + print(f"{Colors.RED}[ERROR]{Colors.NC} {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/solutions/iceberg-sink-connector/terraform.tfvars.example b/solutions/iceberg-sink-connector/terraform.tfvars.example new file mode 100644 index 0000000..538a60c --- /dev/null +++ b/solutions/iceberg-sink-connector/terraform.tfvars.example @@ -0,0 +1,98 @@ +# Iceberg Sink Connector Configuration +# Copy this file to terraform.tfvars and fill in your actual values + +# ============================================================================= +# REQUIRED: Aiven Configuration +# ============================================================================= + +# Aiven API token (get from https://console.aiven.io/account/tokens) +# Can be provided in two ways: + +# METHOD 1: Environment Variable +# Set this environment variable before running terraform: +# export AIVEN_API_TOKEN="your-aiven-api-token-here" +# The tchupp/env provider will automatically read this via data sources: +aiven_api_token = "" + +# METHOD 2: Direct Configuration +# Uncomment and fill in the token directly: +# aiven_api_token = "your-aiven-api-token-here" + +# Aiven project name where services will be created +project_name = "my-aiven-project" + +# ============================================================================= +# REQUIRED: AWS S3 Configuration +# ============================================================================= + +# AWS credentials can be provided in two ways: + +# METHOD 1: Environment Variables +# Set these environment variables before running terraform: +# export AWS_ACCESS_KEY_ID="your-aws-access-key-id" +# export AWS_SECRET_ACCESS_KEY="your-aws-secret-access-key" +# The tchupp/env provider will automatically read these via data sources: +aws_access_key_id = "" +aws_secret_access_key = "" + +# METHOD 2: Direct Configuration +# Uncomment and fill in the credentials directly: +# aws_access_key_id = "your-aws-access-key-id" +# aws_secret_access_key = "your-aws-secret-access-key" + +# S3 bucket name for Iceberg warehouse storage +s3_bucket_name = "my-iceberg-warehouse" + +# AWS role ARN to assume for S3 access (RECOMMENDED - more secure than direct credentials) +# +# SECURITY BENEFIT: Using IAM assume role means you don't expose your AWS credentials. +# Instead, Aiven uses their own IAM user to assume your role and get temporary credentials. +# This provides: +# - No long-term AWS access keys in your config +# - Automatic credential rotation +# - Full audit trail in AWS CloudTrail +# +# SETUP REQUIRED: Contact Aiven support to get your unique IAM user and External ID, +# then create a cross-account role in your AWS account that trusts Aiven's IAM user. +# See: https://aiven.io/docs/products/kafka/kafka-connect/howto/s3-iam-assume-role +# +# Leave empty to use direct AWS credentials (less secure fallback) + +aws_role_arn = "" + +# As alternative consider using Aiven's secret providers to store +# sensitive configuration values in external secret managers: +# - AWS Secrets Manager +# - HashiCorp Vault +# +# This allows you to reference secrets in connector configs without storing +# them in plain text. See: https://aiven.io/docs/products/kafka/kafka-connect/howto/configure-secret-providers +# +# Example: Instead of aws_access_key_id = "AKIA...", you would use: +# aws_access_key_id = "${secret:aws-secrets-manager:my-secret-name:access-key-id}" + + +# AWS region for S3 operations (must match your S3 bucket region) +# aws_region = "eu-west-1" + +# ============================================================================= +# OPTIONAL: Customize the setup (uncomment and modify as needed) +# ============================================================================= + +# Cloud name for the services (e.g., google-europe-west1, aws-us-east-1) +# cloud_name = "google-europe-west1" + +# Plan name for the services (e.g., startup-4, business-8) +# plan_name = "startup-4" + +# Prefix for service names to ensure uniqueness +# service_name_prefix = "iceberg-sink" + +# S3 prefix for Iceberg warehouse organization +# s3_prefix = "connect-iceberg-sink" + +# Kafka topic name for the Iceberg sink connector +# topic_name = "test_iceberg_topic" + +# Iceberg table name that will be created automatically +# table_name = "test-table"