From 56bf28c06f0da7bb17939a5af870842e4f43f444 Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 7 Feb 2026 01:01:52 +0800 Subject: [PATCH] subject mapping --- .../ConfluentSchemaRegistryConfig.java | 56 +++++++++++++++++++ ...chemaRegistryTableDescriptionSupplier.java | 23 +++++++- ...chemaRegistryTableDescriptionSupplier.java | 3 +- 3 files changed, 79 insertions(+), 3 deletions(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java index 67b0f757d2e..c2810d9f422 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java @@ -14,6 +14,7 @@ package io.trino.plugin.kafka.schema.confluent; import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; @@ -22,15 +23,20 @@ import io.airlift.units.MinDuration; import io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy; import io.trino.spi.HostAddress; +import io.trino.spi.connector.SchemaTableName; import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.Size; +import java.util.List; +import java.util.Map; import java.util.Set; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Streams.stream; import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE; +import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.SECONDS; public class ConfluentSchemaRegistryConfig @@ -46,6 +52,7 @@ public enum ConfluentSchemaRegistryAuthType private int confluentSchemaRegistryClientCacheSize = 1000; private EmptyFieldStrategy emptyFieldStrategy = IGNORE; private Duration confluentSubjectsCacheRefreshInterval = new Duration(1, SECONDS); + private Map confluentSchemaRegistrySubjectMapping = ImmutableMap.of(); @Size(min = 1) public Set getConfluentSchemaRegistryUrls() @@ -117,6 +124,19 @@ public ConfluentSchemaRegistryConfig setConfluentSubjectsCacheRefreshInterval(Du return this; } + public Map getConfluentSchemaRegistrySubjectMapping() + { + return confluentSchemaRegistrySubjectMapping; + } + + @Config("kafka.confluent-schema-registry-subject-mapping") + @ConfigDescription("Comma-separated list of schema.table to actual topic name mappings. Format: schema1.table1:topic1,schema2.table2:topic2") + public ConfluentSchemaRegistryConfig setConfluentSchemaRegistrySubjectMapping(String mapping) + { + this.confluentSchemaRegistrySubjectMapping = (mapping == null) ? ImmutableMap.of() : parseSubjectMapping(mapping); + return this; + } + private static ImmutableSet parseNodes(String nodes) { Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults(); @@ -129,4 +149,40 @@ private static HostAddress toHostAddress(String value) { return HostAddress.fromString(value); } + + private static ImmutableMap parseSubjectMapping(String mapping) + { + requireNonNull(mapping, "mapping is null"); + + Splitter entrySplitter = Splitter.on(',').omitEmptyStrings().trimResults(); + Splitter keyValueSplitter = Splitter.on(':').trimResults(); + + ImmutableMap.Builder builder = ImmutableMap.builder(); + + for (String entry : entrySplitter.split(mapping)) { + List parts = keyValueSplitter.splitToList(entry); + checkArgument(parts.size() == 2, + "Invalid mapping format '%s'. Expected format: schema.table:topic", entry); + + String schemaTable = parts.get(0); + String topicName = parts.get(1); + + List schemaTableParts = Splitter.on('.').trimResults().splitToList(schemaTable); + checkArgument(schemaTableParts.size() == 2, + "Invalid schema.table format '%s'. Expected format: schema.table", schemaTable); + + String schema = schemaTableParts.get(0); + String table = schemaTableParts.get(1); + + checkArgument(!schema.isEmpty() && !table.isEmpty(), + "Schema and table names cannot be empty in '%s'", schemaTable); + checkArgument(!topicName.isEmpty(), + "Topic name cannot be empty in mapping '%s'", entry); + + SchemaTableName schemaTableName = new SchemaTableName(schema, table); + builder.put(schemaTableName, topicName); + } + + return builder.buildOrThrow(); + } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java index b18236eb065..40f851f454c 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java @@ -76,16 +76,19 @@ public class ConfluentSchemaRegistryTableDescriptionSupplier private final String defaultSchema; private final Supplier> topicAndSubjectsSupplier; private final Supplier> subjectsSupplier; + private final Map subjectMapping; public ConfluentSchemaRegistryTableDescriptionSupplier( SchemaRegistryClient schemaRegistryClient, Map schemaParsers, String defaultSchema, - Duration subjectsCacheRefreshInterval) + Duration subjectsCacheRefreshInterval, + Map subjectMapping) { this.schemaRegistryClient = requireNonNull(schemaRegistryClient, "schemaRegistryClient is null"); this.schemaParsers = ImmutableMap.copyOf(requireNonNull(schemaParsers, "schemaParsers is null")); this.defaultSchema = requireNonNull(defaultSchema, "defaultSchema is null"); + this.subjectMapping = ImmutableMap.copyOf(requireNonNull(subjectMapping, "subjectMapping is null")); topicAndSubjectsSupplier = memoizeWithExpiration(this::getTopicAndSubjects, subjectsCacheRefreshInterval.toMillis(), MILLISECONDS); subjectsSupplier = memoizeWithExpiration(this::getAllSubjects, subjectsCacheRefreshInterval.toMillis(), MILLISECONDS); } @@ -97,6 +100,7 @@ public static class Factory private final Map schemaParsers; private final String defaultSchema; private final Duration subjectsCacheRefreshInterval; + private final Map subjectMapping; @Inject public Factory( @@ -109,12 +113,18 @@ public Factory( this.schemaParsers = ImmutableMap.copyOf(requireNonNull(schemaParsers, "schemaParsers is null")); this.defaultSchema = kafkaConfig.getDefaultSchema(); this.subjectsCacheRefreshInterval = confluentConfig.getConfluentSubjectsCacheRefreshInterval(); + this.subjectMapping = confluentConfig.getConfluentSchemaRegistrySubjectMapping(); } @Override public TableDescriptionSupplier get() { - return new ConfluentSchemaRegistryTableDescriptionSupplier(schemaRegistryClient, schemaParsers, defaultSchema, subjectsCacheRefreshInterval); + return new ConfluentSchemaRegistryTableDescriptionSupplier( + schemaRegistryClient, + schemaParsers, + defaultSchema, + subjectsCacheRefreshInterval, + subjectMapping); } } @@ -195,6 +205,15 @@ public Optional getTopicDescription(ConnectorSession sess topicAndSubjects.getValueSubject().or(topicAndSubjectsFromCache::getValueSubject)); } + // Apply subject mapping override if configured + if (subjectMapping.containsKey(schemaTableName)) { + String overrideTopic = subjectMapping.get(schemaTableName); + topicAndSubjects = new TopicAndSubjects( + overrideTopic, + topicAndSubjects.getKeySubject(), + topicAndSubjects.getValueSubject()); + } + if (topicAndSubjects.getKeySubject().isEmpty() && topicAndSubjects.getValueSubject().isEmpty()) { return Optional.empty(); } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java index b9b7ca6e637..b43cf869c45 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java @@ -195,7 +195,8 @@ private TableDescriptionSupplier createTableDescriptionSupplier() SCHEMA_REGISTRY_CLIENT, ImmutableMap.of("AVRO", new AvroSchemaParser(new TestingTypeManager())), DEFAULT_NAME, - new Duration(1, SECONDS)); + new Duration(1, SECONDS), + ImmutableMap.of()); } private static Schema getAvroSchema(String topicName, String columnNamePrefix)