diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java new file mode 100644 index 00000000000..93ed2b72717 --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import io.trino.plugin.kafka.schema.confluent.SchemaRegistryClientPropertiesProvider; + +import java.util.Map; + +public class KafkaSchemaRegistryClientPropertiesProvider + implements SchemaRegistryClientPropertiesProvider +{ + private final SchemaRegistryClientPropertiesProvider auth; + + @Inject + public KafkaSchemaRegistryClientPropertiesProvider(SchemaRegistryClientPropertiesProvider auth) + { + this.auth = auth; + } + + @Override + public Map getSchemaRegistryClientProperties() + { + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.putAll(auth.getSchemaRegistryClientProperties()); + return properties.buildOrThrow(); + } +} diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java new file mode 100644 index 00000000000..8f281c04a6c --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.ConfigSecuritySensitive; +import jakarta.validation.constraints.NotNull; + +public class BasicAuthConfig +{ + private String username; + private String password; + + @NotNull + public String getConfluentSchemaRegistryUsername() + { + return username; + } + + @Config("kafka.confluent-schema-registry.basic-auth.username") + @ConfigDescription("The username for the Confluent Schema Registry") + @ConfigSecuritySensitive + public BasicAuthConfig setConfluentSchemaRegistryUsername(String username) + { + this.username = username; + return this; + } + + @NotNull + public String getConfluentSchemaRegistryPassword() + { + return password; + } + + @Config("kafka.confluent-schema-registry.basic-auth.password") + @ConfigSecuritySensitive + public BasicAuthConfig setConfluentSchemaRegistryPassword(String password) + { + this.password = password; + return this; + } +} diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java index a3003c135e9..5f47e2ee2e5 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java @@ -52,6 +52,7 @@ import io.trino.plugin.kafka.encoder.protobuf.ProtobufRowEncoder; import io.trino.plugin.kafka.encoder.protobuf.ProtobufSchemaParser; import io.trino.plugin.kafka.schema.ContentSchemaProvider; +import io.trino.plugin.kafka.schema.KafkaSchemaRegistryClientPropertiesProvider; import io.trino.plugin.kafka.schema.ProtobufAnySupportConfig; import io.trino.plugin.kafka.schema.TableDescriptionSupplier; import io.trino.spi.HostAddress; @@ -76,6 +77,7 @@ import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.kafka.encoder.EncoderModule.encoderFactory; +import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; @@ -98,8 +100,10 @@ protected void setup(Binder binder) install(new ConfluentDecoderModule()); install(new ConfluentEncoderModule()); binder.bind(ContentSchemaProvider.class).to(AvroConfluentContentSchemaProvider.class).in(Scopes.SINGLETON); + newSetBinder(binder, SchemaRegistryClientPropertiesProvider.class).addBinding().to(KafkaSchemaRegistryClientPropertiesProvider.class).in(Scopes.SINGLETON); newSetBinder(binder, SchemaRegistryClientPropertiesProvider.class); newSetBinder(binder, SchemaProvider.class).addBinding().to(AvroSchemaProvider.class).in(Scopes.SINGLETON); + // Each SchemaRegistry object should have a new instance of SchemaProvider newSetBinder(binder, SchemaProvider.class).addBinding().to(LazyLoadedProtobufSchemaProvider.class); binder.bind(DynamicMessageProvider.Factory.class).to(ConfluentSchemaRegistryDynamicMessageProvider.Factory.class).in(SINGLETON); @@ -107,6 +111,16 @@ protected void setup(Binder binder) binder.bind(TableDescriptionSupplier.class).toProvider(ConfluentSchemaRegistryTableDescriptionSupplier.Factory.class).in(Scopes.SINGLETON); newMapBinder(binder, String.class, SchemaParser.class).addBinding("AVRO").to(AvroSchemaParser.class).in(Scopes.SINGLETON); newMapBinder(binder, String.class, SchemaParser.class).addBinding("PROTOBUF").to(LazyLoadedProtobufSchemaParser.class).in(Scopes.SINGLETON); + + // Bind the appropriate ConfluentSchemaRegistryAuth implementation based on configuration + ConfluentSchemaRegistryConfig schemaRegistryConfig = buildConfigObject(ConfluentSchemaRegistryConfig.class); + if (schemaRegistryConfig.getConfluentSchemaRegistryAuthType() == BASIC_AUTH) { + configBinder(binder).bindConfig(BasicAuthConfig.class); + binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryBasicAuth.class).in(Scopes.SINGLETON); + } + else { + binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryNoAuth.class).in(Scopes.SINGLETON); + } } @Provides diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java new file mode 100644 index 00000000000..204116d4e00 --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class ConfluentSchemaRegistryBasicAuth + implements SchemaRegistryClientPropertiesProvider +{ + private final String user; + private final String password; + + @Inject + public ConfluentSchemaRegistryBasicAuth(BasicAuthConfig basicAuthConfig) + { + this.user = requireNonNull(basicAuthConfig.getConfluentSchemaRegistryUsername(), "user is null"); + this.password = requireNonNull(basicAuthConfig.getConfluentSchemaRegistryPassword(), "password is null"); + } + + @Override + public Map getSchemaRegistryClientProperties() + { + return ImmutableMap.builder() + .put("basic.auth.credentials.source", "USER_INFO") + .put("basic.auth.user.info", user + ":" + password) + .buildOrThrow(); + } +} diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java index a484d7fd8c5..67b0f757d2e 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java @@ -35,7 +35,14 @@ public class ConfluentSchemaRegistryConfig { + public enum ConfluentSchemaRegistryAuthType + { + NONE, + BASIC_AUTH, + } + private Set confluentSchemaRegistryUrls; + private ConfluentSchemaRegistryAuthType confluentSchemaRegistryAuthType = ConfluentSchemaRegistryAuthType.NONE; private int confluentSchemaRegistryClientCacheSize = 1000; private EmptyFieldStrategy emptyFieldStrategy = IGNORE; private Duration confluentSubjectsCacheRefreshInterval = new Duration(1, SECONDS); @@ -54,6 +61,19 @@ public ConfluentSchemaRegistryConfig setConfluentSchemaRegistryUrls(String confl return this; } + public ConfluentSchemaRegistryAuthType getConfluentSchemaRegistryAuthType() + { + return confluentSchemaRegistryAuthType; + } + + @Config("kafka.confluent-schema-registry-auth-type") + @ConfigDescription("Auth type for logging in Confluent Schema Registry") + public ConfluentSchemaRegistryConfig setConfluentSchemaRegistryAuthType(ConfluentSchemaRegistryAuthType confluentSchemaRegistryAuthType) + { + this.confluentSchemaRegistryAuthType = confluentSchemaRegistryAuthType; + return this; + } + @Min(1) @Max(2000) public int getConfluentSchemaRegistryClientCacheSize() diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java new file mode 100644 index 00000000000..869457227c2 --- /dev/null +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka.schema.confluent; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +/* Empty Schema Registry Auth for registries without any authentication */ +public class ConfluentSchemaRegistryNoAuth + implements SchemaRegistryClientPropertiesProvider +{ + @Override + public Map getSchemaRegistryClientProperties() + { + return ImmutableMap.of(); + } +} diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java index d807877e915..f24bf2b1575 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java @@ -24,6 +24,8 @@ import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE; import static io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.MARK; +import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH; +import static io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.NONE; import static java.util.concurrent.TimeUnit.SECONDS; public class TestConfluentSchemaRegistryConfig @@ -33,6 +35,7 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(ConfluentSchemaRegistryConfig.class) .setConfluentSchemaRegistryUrls(null) + .setConfluentSchemaRegistryAuthType(NONE) .setConfluentSchemaRegistryClientCacheSize(1000) .setEmptyFieldStrategy(IGNORE) .setConfluentSubjectsCacheRefreshInterval(new Duration(1, SECONDS))); @@ -43,6 +46,7 @@ public void testExplicitPropertyMappings() { Map properties = ImmutableMap.builder() .put("kafka.confluent-schema-registry-url", "http://schema-registry-a:8081, http://schema-registry-b:8081") + .put("kafka.confluent-schema-registry-auth-type", "BASIC_AUTH") .put("kafka.confluent-schema-registry-client-cache-size", "1500") .put("kafka.empty-field-strategy", "MARK") .put("kafka.confluent-subjects-cache-refresh-interval", "2s") @@ -50,6 +54,7 @@ public void testExplicitPropertyMappings() ConfluentSchemaRegistryConfig expected = new ConfluentSchemaRegistryConfig() .setConfluentSchemaRegistryUrls("http://schema-registry-a:8081, http://schema-registry-b:8081") + .setConfluentSchemaRegistryAuthType(BASIC_AUTH) .setConfluentSchemaRegistryClientCacheSize(1500) .setEmptyFieldStrategy(MARK) .setConfluentSubjectsCacheRefreshInterval(new Duration(2, SECONDS));