Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.kafka.schema.confluent;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
Expand All @@ -22,15 +23,20 @@
import io.airlift.units.MinDuration;
import io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.SchemaTableName;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Size;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

public class ConfluentSchemaRegistryConfig
Expand All @@ -46,6 +52,7 @@ public enum ConfluentSchemaRegistryAuthType
private int confluentSchemaRegistryClientCacheSize = 1000;
private EmptyFieldStrategy emptyFieldStrategy = IGNORE;
private Duration confluentSubjectsCacheRefreshInterval = new Duration(1, SECONDS);
private Map<SchemaTableName, String> confluentSchemaRegistrySubjectMapping = ImmutableMap.of();

@Size(min = 1)
public Set<HostAddress> getConfluentSchemaRegistryUrls()
Expand Down Expand Up @@ -117,6 +124,19 @@ public ConfluentSchemaRegistryConfig setConfluentSubjectsCacheRefreshInterval(Du
return this;
}

public Map<SchemaTableName, String> getConfluentSchemaRegistrySubjectMapping()
{
return confluentSchemaRegistrySubjectMapping;
}

@Config("kafka.confluent-schema-registry-subject-mapping")
@ConfigDescription("Comma-separated list of schema.table to actual topic name mappings. Format: schema1.table1:topic1,schema2.table2:topic2")
public ConfluentSchemaRegistryConfig setConfluentSchemaRegistrySubjectMapping(String mapping)
{
this.confluentSchemaRegistrySubjectMapping = (mapping == null) ? ImmutableMap.of() : parseSubjectMapping(mapping);
return this;
}

private static ImmutableSet<HostAddress> parseNodes(String nodes)
{
Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
Expand All @@ -129,4 +149,40 @@ private static HostAddress toHostAddress(String value)
{
return HostAddress.fromString(value);
}

private static ImmutableMap<SchemaTableName, String> parseSubjectMapping(String mapping)
{
requireNonNull(mapping, "mapping is null");

Splitter entrySplitter = Splitter.on(',').omitEmptyStrings().trimResults();
Splitter keyValueSplitter = Splitter.on(':').trimResults();

ImmutableMap.Builder<SchemaTableName, String> builder = ImmutableMap.builder();

for (String entry : entrySplitter.split(mapping)) {
List<String> parts = keyValueSplitter.splitToList(entry);
checkArgument(parts.size() == 2,
"Invalid mapping format '%s'. Expected format: schema.table:topic", entry);

String schemaTable = parts.get(0);
String topicName = parts.get(1);

List<String> schemaTableParts = Splitter.on('.').trimResults().splitToList(schemaTable);
checkArgument(schemaTableParts.size() == 2,
"Invalid schema.table format '%s'. Expected format: schema.table", schemaTable);

String schema = schemaTableParts.get(0);
String table = schemaTableParts.get(1);

checkArgument(!schema.isEmpty() && !table.isEmpty(),
"Schema and table names cannot be empty in '%s'", schemaTable);
checkArgument(!topicName.isEmpty(),
"Topic name cannot be empty in mapping '%s'", entry);

SchemaTableName schemaTableName = new SchemaTableName(schema, table);
builder.put(schemaTableName, topicName);
}

return builder.buildOrThrow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,19 @@ public class ConfluentSchemaRegistryTableDescriptionSupplier
private final String defaultSchema;
private final Supplier<SetMultimap<String, TopicAndSubjects>> topicAndSubjectsSupplier;
private final Supplier<SetMultimap<String, String>> subjectsSupplier;
private final Map<SchemaTableName, String> subjectMapping;

public ConfluentSchemaRegistryTableDescriptionSupplier(
SchemaRegistryClient schemaRegistryClient,
Map<String, SchemaParser> schemaParsers,
String defaultSchema,
Duration subjectsCacheRefreshInterval)
Duration subjectsCacheRefreshInterval,
Map<SchemaTableName, String> subjectMapping)
{
this.schemaRegistryClient = requireNonNull(schemaRegistryClient, "schemaRegistryClient is null");
this.schemaParsers = ImmutableMap.copyOf(requireNonNull(schemaParsers, "schemaParsers is null"));
this.defaultSchema = requireNonNull(defaultSchema, "defaultSchema is null");
this.subjectMapping = ImmutableMap.copyOf(requireNonNull(subjectMapping, "subjectMapping is null"));
topicAndSubjectsSupplier = memoizeWithExpiration(this::getTopicAndSubjects, subjectsCacheRefreshInterval.toMillis(), MILLISECONDS);
subjectsSupplier = memoizeWithExpiration(this::getAllSubjects, subjectsCacheRefreshInterval.toMillis(), MILLISECONDS);
}
Expand All @@ -97,6 +100,7 @@ public static class Factory
private final Map<String, SchemaParser> schemaParsers;
private final String defaultSchema;
private final Duration subjectsCacheRefreshInterval;
private final Map<SchemaTableName, String> subjectMapping;

@Inject
public Factory(
Expand All @@ -109,12 +113,18 @@ public Factory(
this.schemaParsers = ImmutableMap.copyOf(requireNonNull(schemaParsers, "schemaParsers is null"));
this.defaultSchema = kafkaConfig.getDefaultSchema();
this.subjectsCacheRefreshInterval = confluentConfig.getConfluentSubjectsCacheRefreshInterval();
this.subjectMapping = confluentConfig.getConfluentSchemaRegistrySubjectMapping();
}

@Override
public TableDescriptionSupplier get()
{
return new ConfluentSchemaRegistryTableDescriptionSupplier(schemaRegistryClient, schemaParsers, defaultSchema, subjectsCacheRefreshInterval);
return new ConfluentSchemaRegistryTableDescriptionSupplier(
schemaRegistryClient,
schemaParsers,
defaultSchema,
subjectsCacheRefreshInterval,
subjectMapping);
}
}

Expand Down Expand Up @@ -195,6 +205,15 @@ public Optional<KafkaTopicDescription> getTopicDescription(ConnectorSession sess
topicAndSubjects.getValueSubject().or(topicAndSubjectsFromCache::getValueSubject));
}

// Apply subject mapping override if configured
if (subjectMapping.containsKey(schemaTableName)) {
String overrideTopic = subjectMapping.get(schemaTableName);
topicAndSubjects = new TopicAndSubjects(
overrideTopic,
topicAndSubjects.getKeySubject(),
topicAndSubjects.getValueSubject());
}

if (topicAndSubjects.getKeySubject().isEmpty() && topicAndSubjects.getValueSubject().isEmpty()) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ private TableDescriptionSupplier createTableDescriptionSupplier()
SCHEMA_REGISTRY_CLIENT,
ImmutableMap.of("AVRO", new AvroSchemaParser(new TestingTypeManager())),
DEFAULT_NAME,
new Duration(1, SECONDS));
new Duration(1, SECONDS),
ImmutableMap.of());
}

private static Schema getAvroSchema(String topicName, String columnNamePrefix)
Expand Down