From e3df114014f6d492b26cf602455139347e0ddd27 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 29 Jan 2026 22:12:46 +0800 Subject: [PATCH 1/2] [opt](kafka) support auth for schema registry --- plugin/trino-kafka/2.diff | 37 +++++++++++++ ...chemaRegistryClientPropertiesProvider.java | 40 ++++++++++++++ .../schema/confluent/BasicAuthConfig.java | 54 +++++++++++++++++++ .../schema/confluent/ConfluentModule.java | 14 +++++ .../ConfluentSchemaRegistryBasicAuth.java | 44 +++++++++++++++ .../ConfluentSchemaRegistryConfig.java | 20 +++++++ .../ConfluentSchemaRegistryNoAuth.java | 29 ++++++++++ .../TestConfluentSchemaRegistryConfig.java | 5 ++ 8 files changed, 243 insertions(+) create mode 100644 plugin/trino-kafka/2.diff create mode 100644 plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java create mode 100644 plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java create mode 100644 plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java create mode 100644 plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java diff --git a/plugin/trino-kafka/2.diff b/plugin/trino-kafka/2.diff new file mode 100644 index 00000000000..42934324751 --- /dev/null +++ b/plugin/trino-kafka/2.diff @@ -0,0 +1,37 @@ +diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java +index d807877e915..f24bf2b1575 100644 +--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java ++++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java +@@ -24,6 +24,8 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDe + import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE; + import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.MARK; ++import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH; ++import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.NONE; + import static java.util.concurrent.TimeUnit.SECONDS; + + public class TestConfluentSchemaRegistryConfig +@@ -33,6 +35,7 @@ public class TestConfluentSchemaRegistryConfig + { + assertRecordedDefaults(recordDefaults(ConfluentSchemaRegistryConfig.class) + .setConfluentSchemaRegistryUrls(null) ++ .setConfluentSchemaRegistryAuthType(NONE) + .setConfluentSchemaRegistryClientCacheSize(1000) + .setEmptyFieldStrategy(IGNORE) + .setConfluentSubjectsCacheRefreshInterval(new Duration(1, SECONDS))); +@@ -43,6 +46,7 @@ public class TestConfluentSchemaRegistryConfig + { + Map properties = ImmutableMap.builder() + .put("kafka.confluent-schema-registry-url", "http://schema-registry-a:8081, http://schema-registry-b:8081") ++ .put("kafka.confluent-schema-registry-auth-type", "BASIC_AUTH") + .put("kafka.confluent-schema-registry-client-cache-size", "1500") + .put("kafka.empty-field-strategy", "MARK") + .put("kafka.confluent-subjects-cache-refresh-interval", "2s") +@@ -50,6 +54,7 @@ public class TestConfluentSchemaRegistryConfig + + ConfluentSchemaRegistryConfig expected = new ConfluentSchemaRegistryConfig() + .setConfluentSchemaRegistryUrls("http://schema-registry-a:8081, http://schema-registry-b:8081") ++ .setConfluentSchemaRegistryAuthType(BASIC_AUTH) + .setConfluentSchemaRegistryClientCacheSize(1500) + .setEmptyFieldStrategy(MARK) + .setConfluentSubjectsCacheRefreshInterval(new Duration(2, SECONDS)); diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java new file mode 100644 index 00000000000..93ed2b72717 --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import io.trino.plugin.kafka.schema.confluent.SchemaRegistryClientPropertiesProvider; + +import java.util.Map; + +public class KafkaSchemaRegistryClientPropertiesProvider + implements SchemaRegistryClientPropertiesProvider +{ + private final SchemaRegistryClientPropertiesProvider auth; + + @Inject + public KafkaSchemaRegistryClientPropertiesProvider(SchemaRegistryClientPropertiesProvider auth) + { + this.auth = auth; + } + + @Override + public Map getSchemaRegistryClientProperties() + { + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.putAll(auth.getSchemaRegistryClientProperties()); + return properties.buildOrThrow(); + } +} diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java new file mode 100644 index 00000000000..8f281c04a6c --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.ConfigSecuritySensitive; +import jakarta.validation.constraints.NotNull; + +public class BasicAuthConfig +{ + private String username; + private String password; + + @NotNull + public String getConfluentSchemaRegistryUsername() + { + return username; + } + + @Config("kafka.confluent-schema-registry.basic-auth.username") + @ConfigDescription("The username for the Confluent Schema Registry") + @ConfigSecuritySensitive + public BasicAuthConfig setConfluentSchemaRegistryUsername(String username) + { + this.username = username; + return this; + } + + @NotNull + public String getConfluentSchemaRegistryPassword() + { + return password; + } + + @Config("kafka.confluent-schema-registry.basic-auth.password") + @ConfigSecuritySensitive + public BasicAuthConfig setConfluentSchemaRegistryPassword(String password) + { + this.password = password; + return this; + } +} diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java index a3003c135e9..5f47e2ee2e5 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java @@ -52,6 +52,7 @@ import io.trino.plugin.kafka.encoder.protobuf.ProtobufRowEncoder; import io.trino.plugin.kafka.encoder.protobuf.ProtobufSchemaParser; import io.trino.plugin.kafka.schema.ContentSchemaProvider; +import io.trino.plugin.kafka.schema.KafkaSchemaRegistryClientPropertiesProvider; import io.trino.plugin.kafka.schema.ProtobufAnySupportConfig; import io.trino.plugin.kafka.schema.TableDescriptionSupplier; import io.trino.spi.HostAddress; @@ -76,6 +77,7 @@ import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.kafka.encoder.EncoderModule.encoderFactory; +import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; @@ -98,8 +100,10 @@ protected void setup(Binder binder) install(new ConfluentDecoderModule()); install(new ConfluentEncoderModule()); binder.bind(ContentSchemaProvider.class).to(AvroConfluentContentSchemaProvider.class).in(Scopes.SINGLETON); + newSetBinder(binder, SchemaRegistryClientPropertiesProvider.class).addBinding().to(KafkaSchemaRegistryClientPropertiesProvider.class).in(Scopes.SINGLETON); newSetBinder(binder, SchemaRegistryClientPropertiesProvider.class); newSetBinder(binder, SchemaProvider.class).addBinding().to(AvroSchemaProvider.class).in(Scopes.SINGLETON); + // Each SchemaRegistry object should have a new instance of SchemaProvider newSetBinder(binder, SchemaProvider.class).addBinding().to(LazyLoadedProtobufSchemaProvider.class); binder.bind(DynamicMessageProvider.Factory.class).to(ConfluentSchemaRegistryDynamicMessageProvider.Factory.class).in(SINGLETON); @@ -107,6 +111,16 @@ protected void setup(Binder binder) binder.bind(TableDescriptionSupplier.class).toProvider(ConfluentSchemaRegistryTableDescriptionSupplier.Factory.class).in(Scopes.SINGLETON); newMapBinder(binder, String.class, SchemaParser.class).addBinding("AVRO").to(AvroSchemaParser.class).in(Scopes.SINGLETON); newMapBinder(binder, String.class, SchemaParser.class).addBinding("PROTOBUF").to(LazyLoadedProtobufSchemaParser.class).in(Scopes.SINGLETON); + + // Bind the appropriate ConfluentSchemaRegistryAuth implementation based on configuration + ConfluentSchemaRegistryConfig schemaRegistryConfig = buildConfigObject(ConfluentSchemaRegistryConfig.class); + if (schemaRegistryConfig.getConfluentSchemaRegistryAuthType() == BASIC_AUTH) { + configBinder(binder).bindConfig(BasicAuthConfig.class); + binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryBasicAuth.class).in(Scopes.SINGLETON); + } + else { + binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryNoAuth.class).in(Scopes.SINGLETON); + } } @Provides diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java new file mode 100644 index 00000000000..204116d4e00 --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class ConfluentSchemaRegistryBasicAuth + implements SchemaRegistryClientPropertiesProvider +{ + private final String user; + private final String password; + + @Inject + public ConfluentSchemaRegistryBasicAuth(BasicAuthConfig basicAuthConfig) + { + this.user = requireNonNull(basicAuthConfig.getConfluentSchemaRegistryUsername(), "user is null"); + this.password = requireNonNull(basicAuthConfig.getConfluentSchemaRegistryPassword(), "password is null"); + } + + @Override + public Map getSchemaRegistryClientProperties() + { + return ImmutableMap.builder() + .put("basic.auth.credentials.source", "USER_INFO") + .put("basic.auth.user.info", user + ":" + password) + .buildOrThrow(); + } +} diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java index a484d7fd8c5..67b0f757d2e 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java @@ -35,7 +35,14 @@ public class ConfluentSchemaRegistryConfig { + public enum ConfluentSchemaRegistryAuthType + { + NONE, + BASIC_AUTH, + } + private Set confluentSchemaRegistryUrls; + private ConfluentSchemaRegistryAuthType confluentSchemaRegistryAuthType = ConfluentSchemaRegistryAuthType.NONE; private int confluentSchemaRegistryClientCacheSize = 1000; private EmptyFieldStrategy emptyFieldStrategy = IGNORE; private Duration confluentSubjectsCacheRefreshInterval = new Duration(1, SECONDS); @@ -54,6 +61,19 @@ public ConfluentSchemaRegistryConfig setConfluentSchemaRegistryUrls(String confl return this; } + public ConfluentSchemaRegistryAuthType getConfluentSchemaRegistryAuthType() + { + return confluentSchemaRegistryAuthType; + } + + @Config("kafka.confluent-schema-registry-auth-type") + @ConfigDescription("Auth type for logging in Confluent Schema Registry") + public ConfluentSchemaRegistryConfig setConfluentSchemaRegistryAuthType(ConfluentSchemaRegistryAuthType confluentSchemaRegistryAuthType) + { + this.confluentSchemaRegistryAuthType = confluentSchemaRegistryAuthType; + return this; + } + @Min(1) @Max(2000) public int getConfluentSchemaRegistryClientCacheSize() diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java new file mode 100644 index 00000000000..869457227c2 --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +/* Empty Schema Registry Auth for registries without any authentication */ +public class ConfluentSchemaRegistryNoAuth + implements SchemaRegistryClientPropertiesProvider +{ + @Override + public Map getSchemaRegistryClientProperties() + { + return ImmutableMap.of(); + } +} diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java index d807877e915..f24bf2b1575 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java @@ -24,6 +24,8 @@ import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE; import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.MARK; +import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH; +import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.NONE; import static java.util.concurrent.TimeUnit.SECONDS; public class TestConfluentSchemaRegistryConfig @@ -33,6 +35,7 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(ConfluentSchemaRegistryConfig.class) .setConfluentSchemaRegistryUrls(null) + .setConfluentSchemaRegistryAuthType(NONE) .setConfluentSchemaRegistryClientCacheSize(1000) .setEmptyFieldStrategy(IGNORE) .setConfluentSubjectsCacheRefreshInterval(new Duration(1, SECONDS))); @@ -43,6 +46,7 @@ public void testExplicitPropertyMappings() { Map properties = ImmutableMap.builder() .put("kafka.confluent-schema-registry-url", "http://schema-registry-a:8081, http://schema-registry-b:8081") + .put("kafka.confluent-schema-registry-auth-type", "BASIC_AUTH") .put("kafka.confluent-schema-registry-client-cache-size", "1500") .put("kafka.empty-field-strategy", "MARK") .put("kafka.confluent-subjects-cache-refresh-interval", "2s") @@ -50,6 +54,7 @@ public void testExplicitPropertyMappings() ConfluentSchemaRegistryConfig expected = new ConfluentSchemaRegistryConfig() .setConfluentSchemaRegistryUrls("http://schema-registry-a:8081, http://schema-registry-b:8081") + .setConfluentSchemaRegistryAuthType(BASIC_AUTH) .setConfluentSchemaRegistryClientCacheSize(1500) .setEmptyFieldStrategy(MARK) .setConfluentSubjectsCacheRefreshInterval(new Duration(2, SECONDS)); From b91e95ced956a9cecb96252f128c152309082a34 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 29 Jan 2026 22:13:46 +0800 Subject: [PATCH 2/2] 2 --- plugin/trino-kafka/2.diff | 37 ------------------------------------- 1 file changed, 37 deletions(-) delete mode 100644 plugin/trino-kafka/2.diff diff --git a/plugin/trino-kafka/2.diff b/plugin/trino-kafka/2.diff deleted file mode 100644 index 42934324751..00000000000 --- a/plugin/trino-kafka/2.diff +++ /dev/null @@ -1,37 +0,0 @@ -diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java -index d807877e915..f24bf2b1575 100644 ---- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java -+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java -@@ -24,6 +24,8 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDe - import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; - import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE; - import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.MARK; -+import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH; -+import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.NONE; - import static java.util.concurrent.TimeUnit.SECONDS; - - public class TestConfluentSchemaRegistryConfig -@@ -33,6 +35,7 @@ public class TestConfluentSchemaRegistryConfig - { - assertRecordedDefaults(recordDefaults(ConfluentSchemaRegistryConfig.class) - .setConfluentSchemaRegistryUrls(null) -+ .setConfluentSchemaRegistryAuthType(NONE) - .setConfluentSchemaRegistryClientCacheSize(1000) - .setEmptyFieldStrategy(IGNORE) - .setConfluentSubjectsCacheRefreshInterval(new Duration(1, SECONDS))); -@@ -43,6 +46,7 @@ public class TestConfluentSchemaRegistryConfig - { - Map properties = ImmutableMap.builder() - .put("kafka.confluent-schema-registry-url", "http://schema-registry-a:8081, http://schema-registry-b:8081") -+ .put("kafka.confluent-schema-registry-auth-type", "BASIC_AUTH") - .put("kafka.confluent-schema-registry-client-cache-size", "1500") - .put("kafka.empty-field-strategy", "MARK") - .put("kafka.confluent-subjects-cache-refresh-interval", "2s") -@@ -50,6 +54,7 @@ public class TestConfluentSchemaRegistryConfig - - ConfluentSchemaRegistryConfig expected = new ConfluentSchemaRegistryConfig() - .setConfluentSchemaRegistryUrls("http://schema-registry-a:8081, http://schema-registry-b:8081") -+ .setConfluentSchemaRegistryAuthType(BASIC_AUTH) - .setConfluentSchemaRegistryClientCacheSize(1500) - .setEmptyFieldStrategy(MARK) - .setConfluentSubjectsCacheRefreshInterval(new Duration(2, SECONDS));