From 115fc5f9bdfee42dde1681d76ff29e1e881cc1b9 Mon Sep 17 00:00:00 2001 From: Arnav Arora Date: Tue, 2 Jun 2026 13:33:06 +0000 Subject: [PATCH 01/10] feat(yaml): Add MongoDB read connector draft Squashed draft MongoDB read connector changes from PR #35802. --- .../MongoDbReadSchemaTransformProvider.java | 193 ++++++++++++++++++ .../apache_beam/yaml/integration_tests.py | 69 +++---- sdks/python/apache_beam/yaml/standard_io.yaml | 138 +++++-------- .../apache_beam/yaml/tests/mongodb.yaml | 44 ---- 4 files changed, 282 insertions(+), 162 deletions(-) create mode 100644 sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java new file mode 100644 index 000000000000..b8e566ed7239 --- /dev/null +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbReadSchemaTransformProvider + extends TypedSchemaTransformProvider< + MongoDbReadSchemaTransformProvider.MongoDbReadSchemaTransformConfiguration> { + + private static final String OUTPUT_TAG = "output"; + + @Override + protected Class configurationClass() { + return MongoDbReadSchemaTransformConfiguration.class; + } + + @Override + protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { + return new MongoDbReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + // Return a unique URN for the transform. + return "beam:schematransform:org.apache.beam:mongodb_read:v1"; + } + + @Override + public List inputCollectionNames() { + // A read transform does not have an input PCollection. + return Collections.emptyList(); + } + + @Override + public List outputCollectionNames() { + // The primary output is a PCollection of Rows. + // Error handling could be added later with a second "errors" output tag. + return Collections.singletonList(OUTPUT_TAG); + } + + /** Configuration class for the MongoDB Read transform. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class MongoDbReadSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to read from.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to read from.") + public abstract String getCollection(); + + @SchemaFieldDescription( + "An optional BSON filter to apply to the read. This should be a valid JSON string.") + @Nullable + public abstract String getFilter(); + + public void validate() { + checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); + checkArgument( + getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); + checkArgument( + getCollection() != null && !getCollection().isEmpty(), + "MongoDB collection must be specified."); + } + + public static Builder builder() { + return new AutoValue_MongoDbReadSchemaTransformProvider_MongoDbReadSchemaTransformConfiguration + .Builder(); + } + + /** Builder for the {@link MongoDbReadSchemaTransformConfiguration}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUri(String uri); + + public abstract Builder setDatabase(String database); + + public abstract Builder setCollection(String collection); + + public abstract Builder setFilter(String filter); + + public abstract MongoDbReadSchemaTransformConfiguration build(); + } + } + + /** The {@link SchemaTransform} that performs the read operation. */ + private static class MongoDbReadSchemaTransform extends SchemaTransform { + private final MongoDbReadSchemaTransformConfiguration configuration; + + MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + // A read transform does not have an input, so we start with the pipeline. + PCollection mongoDocs = + input + .getPipeline() + .apply( + "ReadFromMongoDb", + MongoDbIO.read() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection())); + // TODO: Add support for .withFilter() if it exists in your MongoDbIO, + // using configuration.getFilter(). + + // Convert the BSON Document objects into Beam Row objects. + PCollection beamRows = + mongoDocs.apply("ConvertToBeamRows", ParDo.of(new MongoDocumentToRowFn())); + + return PCollectionRowTuple.of(OUTPUT_TAG, beamRows); + } + } + + /** + * A {@link DoFn} to convert a MongoDB {@link Document} to a Beam {@link Row}. + * + *

This is a critical step to ensure data is in a schema-aware format. + */ + private static class MongoDocumentToRowFn extends DoFn { + // TODO: Define the Beam Schema that corresponds to your MongoDB documents. + // This could be made dynamic based on an inferred schema or a user-provided schema. + // For this skeleton, we assume a static schema. + // public static final Schema OUTPUT_SCHEMA = Schema.builder()...build(); + + @ProcessElement + public void processElement(@Element Document doc, OutputReceiver out) { + // Here you will convert the BSON document to a Beam Row. + // This requires you to know the target schema. + + // Example pseudo-code: + // Row.Builder rowBuilder = Row.withSchema(OUTPUT_SCHEMA); + // for (Map.Entry entry : doc.entrySet()) { + // rowBuilder.addValue(entry.getValue()); + // } + // out.output(rowBuilder.build()); + + // For a robust implementation, you would handle data type conversions + // between BSON types and Beam schema types. + throw new UnsupportedOperationException( + "MongoDocumentToRowFn must be implemented to convert MongoDB Documents to Beam Rows."); + } + } +} diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 2d0b2787fc9b..62368c126336 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -33,11 +33,11 @@ from datetime import timezone import mock +import mysql.connector import psycopg2 import pytds import sqlalchemy import yaml -from apitools.base.py.exceptions import HttpError from google.cloud import pubsub_v1 from google.cloud.bigtable import client from google.cloud.bigtable_admin_v2.types import instance @@ -60,11 +60,10 @@ from apache_beam.yaml import yaml_provider from apache_beam.yaml import yaml_transform from apache_beam.yaml.conftest import yaml_test_files_dir +from apitools.base.py.exceptions import HttpError _LOGGER = logging.getLogger(__name__) -_MONGO_CONTAINER_IMAGE = 'mongo:7.0.7' - @contextlib.contextmanager def gcs_temp_dir(bucket): @@ -224,11 +223,17 @@ def temp_mongodb_table(): - collection: ${mongo_vars.COLLECTION} """ _LOGGER.info("Setting up MongoDB fixture...") - mongo_container = MongoDbContainer(_MONGO_CONTAINER_IMAGE) + # Initialize and start the MongoDB container. + # This will pull the 'mongo:7.0.7' image if it's not available locally. + mongo_container = MongoDbContainer("mongo:7.0.7") try: mongo_container.start() + + # Get the dynamically generated connection URI. mongo_uri = mongo_container.get_connection_url() + # Generate a unique database and collection name for this test run to ensure + # isolation between different test files. db_name = f'db_{uuid.uuid4().hex}' collection_name = f'collection_{uuid.uuid4().hex}' @@ -245,6 +250,7 @@ def temp_mongodb_table(): } finally: + # This block executes after the test suite finishes. _LOGGER.info("Tearing down MongoDB fixture...") mongo_container.stop() _LOGGER.info("MongoDB container stopped.") @@ -335,22 +341,26 @@ def temp_mysql_database(): Exception: Any other exception encountered during the setup process. """ with MySqlContainer(init=True, dialect='pymysql') as mysql_container: - # Make connection to temp database and create tmp table - engine = sqlalchemy.create_engine(mysql_container.get_connection_url()) - with engine.begin() as connection: - connection.execute( - sqlalchemy.text( - "CREATE TABLE tmp_table (value INTEGER, `rank` INTEGER);")) + try: + # Make connection to temp database and create tmp table + engine = sqlalchemy.create_engine(mysql_container.get_connection_url()) + with engine.begin() as connection: + connection.execute( + sqlalchemy.text( + "CREATE TABLE tmp_table (value INTEGER, `rank` INTEGER);")) - # Construct the JDBC url for connections later on by tests - jdbc_url = ( - f"jdbc:mysql://{mysql_container.get_container_host_ip()}:" - f"{mysql_container.get_exposed_port(mysql_container.port)}/" - f"{mysql_container.dbname}?" - f"user={mysql_container.username}&" - f"password={mysql_container.password}") + # Construct the JDBC url for connections later on by tests + jdbc_url = ( + f"jdbc:mysql://{mysql_container.get_container_host_ip()}:" + f"{mysql_container.get_exposed_port(mysql_container.port)}/" + f"{mysql_container.dbname}?" + f"user={mysql_container.username}&" + f"password={mysql_container.password}") - yield jdbc_url + yield jdbc_url + except mysql.connector.Error as err: + logging.error("Error interacting with temporary MySQL DB: %s", err) + raise err @contextlib.contextmanager @@ -769,23 +779,12 @@ def test(self, providers=providers): # default arg to capture loop value **yaml_transform.SafeLineLoader.strip_metadata( fixture.get('config', {})))) for pipeline_spec in spec['pipelines']: - try: - with beam.Pipeline(options=PipelineOptions( - pickle_library='cloudpickle', - **replace_recursive( - yaml_transform.SafeLineLoader.strip_metadata( - pipeline_spec.get('options', {})), - vars))) as p: - yaml_transform.expand_pipeline( - p, replace_recursive(pipeline_spec, vars)) - except ValueError as exn: - # FnApiRunner currently does not support this requirement in - # some xlang scenarios (e.g. Iceberg YAML pipelines). - if 'beam:requirement:pardo:on_window_expiration:v1' in str(exn): - self.skipTest( - 'Runner does not support ' - 'beam:requirement:pardo:on_window_expiration:v1') - raise + with beam.Pipeline(options=PipelineOptions( + pickle_library='cloudpickle', + **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get( + 'options', {})))) as p: + yaml_transform.expand_pipeline( + p, replace_recursive(pipeline_spec, vars)) yield f'test_{suffix}', test diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 781d3de193ec..71388dfa6bfa 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -45,15 +45,9 @@ type: beamJar transforms: 'ReadFromBigQuery': 'beam:schematransform:org.apache.beam:bigquery_storage_read:v1' - 'WriteToBigQuery': 'beam:schematransform:org.apache.beam:bigquery_write:v1' + 'WriteToBigQuery': 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2' config: - gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' - managed_replacement: - # Following transforms may be replaced with equivalent managed transforms, - # if the pipelines 'updateCompatibilityBeamVersion' match the provided - # version. - 'ReadFromBigQuery': '2.69.0' - 'WriteToBigQuery': '2.69.0' + gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar' # Kafka - type: renaming @@ -74,7 +68,6 @@ 'error_handling': 'error_handling' 'file_descriptor_path': 'file_descriptor_path' 'message_name': 'message_name' - 'max_read_time_seconds': 'max_read_time_seconds' 'WriteToKafka': 'format': 'format' 'topic': 'topic' @@ -98,6 +91,44 @@ 'ReadFromKafka': '2.65.0' 'WriteToKafka': '2.65.0' +# PubSub +- type: renaming + transforms: + 'ReadFromPubSubLite': 'ReadFromPubSubLite' + 'WriteToPubSubLite': 'WriteToPubSubLite' + config: + mappings: + 'ReadFromPubSubLite': + 'project': 'project' + 'schema': 'schema' + 'format': 'format' + 'subscription_name': 'subscription_name' + 'location': 'location' + 'attributes': 'attributes' + 'attribute_map': 'attribute_map' + 'attribute_id': 'attribute_id' + 'error_handling': 'error_handling' + 'file_descriptor_path': 'file_descriptor_path' + 'message_name': 'message_name' + 'WriteToPubSubLite': + 'project': 'project' + 'format': 'format' + 'topic_name': 'topic_name' + 'location': 'location' + 'attributes': 'attributes' + 'attribute_id': 'attribute_id' + 'error_handling': 'error_handling' + 'file_descriptor_path': 'file_descriptor_path' + 'message_name': 'message_name' + 'schema': 'schema' + underlying_provider: + type: beamJar + transforms: + 'ReadFromPubSubLite': 'beam:schematransform:org.apache.beam:pubsublite_read:v1' + 'WriteToPubSubLite': 'beam:schematransform:org.apache.beam:pubsublite_write:v1' + config: + gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' + # TODO(yaml): Tests are assuming python providers are before java ones, hence # the order below. This should be fixed in the future. @@ -115,8 +146,6 @@ 'WriteToIceberg': 'apache_beam.yaml.yaml_io.write_to_iceberg' 'ReadFromTFRecord': 'apache_beam.yaml.yaml_io.read_from_tfrecord' 'WriteToTFRecord': 'apache_beam.yaml.yaml_io.write_to_tfrecord' - 'WriteToMongoDB': 'apache_beam.yaml.yaml_io.write_to_mongodb' - # General File Formats # Declared as a renaming transform to avoid exposing all @@ -243,32 +272,32 @@ driver_jars: '' jdbc_type: '' 'ReadFromPostgres': - connection_init_sql: [] + connection_init_sql: '' driver_class_name: '' driver_jars: '' jdbc_type: '' 'WriteToPostgres': - connection_init_sql: [] + connection_init_sql: '' driver_class_name: '' driver_jars: '' jdbc_type: '' 'ReadFromOracle': - connection_init_sql: [] + connection_init_sql: '' driver_class_name: '' driver_jars: '' jdbc_type: '' 'WriteToOracle': - connection_init_sql: [] + connection_init_sql: '' driver_class_name: '' driver_jars: '' jdbc_type: '' 'ReadFromSqlServer': - connection_init_sql: [] + connection_init_sql: '' driver_class_name: '' driver_jars: '' jdbc_type: '' 'WriteToSqlServer': - connection_init_sql: [] + connection_init_sql: '' driver_class_name: '' driver_jars: '' jdbc_type: '' @@ -287,16 +316,6 @@ 'WriteToSqlServer': 'beam:schematransform:org.apache.beam:sql_server_write:v1' config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' - managed_replacement: - # Following transforms may be replaced with equivalent managed transforms, - # if the pipelines 'updateCompatibilityBeamVersion' match the provided - # version. - 'ReadFromPostgres': '2.73.0' - 'WriteToPostgres': '2.73.0' - 'ReadFromMySql': '2.73.0' - 'WriteToMySql': '2.73.0' - 'ReadFromSqlServer': '2.73.0' - 'WriteToSqlServer': '2.73.0' # Spanner - type: renaming @@ -379,72 +398,25 @@ config: gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' -#IcebergCDC -- type: renaming - transforms: - 'ReadFromIcebergCDC': 'ReadFromIcebergCDC' - config: - mappings: - 'ReadFromIcebergCDC': - table: 'table' - catalog_name: 'catalog_name' - catalog_properties: 'catalog_properties' - config_properties: 'config_properties' - drop: 'drop' - filter: 'filter' - from_snapshot: 'from_snapshot' - from_timestamp: 'from_timestamp' - keep: 'keep' - poll_interval_seconds: 'poll_interval_seconds' - starting_strategy: 'starting_strategy' - streaming: 'streaming' - to_snapshot: 'to_snapshot' - to_timestamp: 'to_timestamp' - underlying_provider: - type: beamJar - transforms: - 'ReadFromIcebergCDC': 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1' - config: - gradle_target: 'sdks:java:io:expansion-service:shadowJar' - -#IcebergAddFiles -- type: renaming - transforms: - 'IcebergAddFiles': 'IcebergAddFiles' - config: - mappings: - 'IcebergAddFiles': - table: 'table' - catalog_properties: 'catalog_properties' - config_properties: 'config_properties' - triggering_frequency_seconds: 'triggering_frequency_seconds' - append_batch_size: 'append_batch_size' - location_prefix: 'location_prefix' - partition_fields: 'partition_fields' - table_properties: 'table_properties' - error_handling: 'error_handling' - underlying_provider: - type: beamJar - transforms: - 'IcebergAddFiles': 'beam:schematransform:iceberg_add_files:v1' - config: - gradle_target: 'sdks:java:io:expansion-service:shadowJar' - #MongoDB - type: renaming transforms: + 'ReadFromMongoDB': 'ReadFromMongoDB' 'WriteToMongoDB': 'WriteToMongoDB' config: mappings: + 'ReadFromMongoDB': + connection_uri: "connection_uri" + database: "database" + collection: "collection" 'WriteToMongoDB': - connection_uri: "uri" + connection_uri: "connection_uri" database: "database" collection: "collection" - batch_size: "batch_size" - error_handling: "error_handling" underlying_provider: type: beamJar transforms: - 'WriteToMongoDB': 'beam:schematransform:org.apache.beam:mongodb_write:v1' + 'ReadFromBigTable': 'beam:schematransform:org.apache.beam:mongodb_read:v1' + 'WriteToBigTable': 'beam:schematransform:org.apache.beam:mongodb_write:v1' config: - gradle_target: 'sdks:java:io:expansion-service:shadowJar' + gradle_target: ':sdks:java:io:mongodb:shadowJar' diff --git a/sdks/python/apache_beam/yaml/tests/mongodb.yaml b/sdks/python/apache_beam/yaml/tests/mongodb.yaml index efce73c89879..e69de29bb2d1 100644 --- a/sdks/python/apache_beam/yaml/tests/mongodb.yaml +++ b/sdks/python/apache_beam/yaml/tests/mongodb.yaml @@ -1,44 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -fixtures: - - name: mongo_vars - type: "apache_beam.yaml.integration_tests.temp_mongodb_table" - -pipelines: - - pipeline: - type: composite - transforms: - - type: Create - name: CreateData - config: - elements: - - { id: 1, name: "John" } - - { id: 2, name: "Jane" } - - type: WriteToMongoDB - name: WriteData - input: CreateData - config: - connection_uri: '{mongo_vars[URI]}' - database: '{mongo_vars[DATABASE]}' - collection: '{mongo_vars[COLLECTION]}' - error_handling: - output: my_error_output - - type: AssertEqual - input: WriteData.my_error_output - config: - elements: [] From 2bd81a99fddd33aaf6cd259461eb4ad6baaf8ded Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 2 Jun 2026 13:47:43 +0000 Subject: [PATCH 02/10] feat(yaml): Enhance MongoDB Read SchemaTransform, schema/BSON mapping, and tests - Fully implemented MongoDB read configuration and Provider with JSON schema parsing - Enhanced MongoDbUtils with a deep BSON-to-Beam row conversion supporting all primitives, arrays, maps, and nested rows - Added comprehensive Java unit tests for MongoDbUtils and MongoDbReadSchemaTransformProvider - Mapped WriteToMongoDB and ReadFromMongoDB in standard_io.yaml - Implemented end-to-end integration test verifying write/read pipeline against containerized MongoDB --- sdks/java/io/mongodb/build.gradle | 2 + ...ngoDbReadSchemaTransformConfiguration.java | 89 ++++++++ .../MongoDbReadSchemaTransformProvider.java | 199 +++++++----------- .../beam/sdk/io/mongodb/MongoDbUtils.java | 106 ++++++++++ ...ongoDbReadSchemaTransformProviderTest.java | 199 ++++++++++++++++++ .../beam/sdk/io/mongodb/MongoDbUtilsTest.java | 123 +++++++++++ sdks/python/apache_beam/yaml/standard_io.yaml | 15 +- .../apache_beam/yaml/tests/mongodb.yaml | 55 +++++ 8 files changed, 665 insertions(+), 123 deletions(-) create mode 100644 sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformConfiguration.java create mode 100644 sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProviderTest.java diff --git a/sdks/java/io/mongodb/build.gradle b/sdks/java/io/mongodb/build.gradle index d4a172f01f8b..d56d969f30f0 100644 --- a/sdks/java/io/mongodb/build.gradle +++ b/sdks/java/io/mongodb/build.gradle @@ -32,6 +32,8 @@ dependencies { implementation library.java.mongodb_driver_core implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre + provided library.java.everit_json_schema + permitUnusedDeclared library.java.everit_json_schema testImplementation library.java.junit testImplementation project(path: ":sdks:java:io:common") testImplementation project(path: ":sdks:java:testing:test-utils") diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformConfiguration.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformConfiguration.java new file mode 100644 index 000000000000..e72fe8df8079 --- /dev/null +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformConfiguration.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Configuration class for the MongoDB Read transform. */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class MongoDbReadSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to read from.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to read from.") + public abstract String getCollection(); + + @SchemaFieldDescription( + "The schema in which the data is encoded, defined with JSON-schema syntax (https://json-schema.org/).") + public abstract String getSchema(); + + @SchemaFieldDescription( + "An optional BSON filter to apply to the read. This should be a valid JSON string.") + @Nullable + public abstract String getFilter(); + + @SchemaFieldDescription( + "This option specifies whether and where to output rows that failed to be read.") + @Nullable + public abstract ErrorHandling getErrorHandling(); + + public void validate() { + checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); + checkArgument( + getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); + checkArgument( + getCollection() != null && !getCollection().isEmpty(), + "MongoDB collection must be specified."); + checkArgument( + getSchema() != null && !getSchema().isEmpty(), "MongoDB schema must be specified."); + } + + public static Builder builder() { + return new AutoValue_MongoDbReadSchemaTransformConfiguration.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUri(String uri); + + public abstract Builder setDatabase(String database); + + public abstract Builder setCollection(String collection); + + public abstract Builder setSchema(String schema); + + public abstract Builder setFilter(String filter); + + public abstract Builder setErrorHandling(ErrorHandling errorHandling); + + public abstract MongoDbReadSchemaTransformConfiguration build(); + } +} diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java index b8e566ed7239..418421bb1783 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java @@ -17,45 +17,40 @@ */ package org.apache.beam.sdk.io.mongodb; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; - import com.google.auto.service.AutoService; -import com.google.auto.value.AutoValue; -import java.io.Serializable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; -import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.apache.beam.sdk.schemas.utils.JsonUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.bson.Document; -import org.checkerframework.checker.nullness.qual.Nullable; -/** - * An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. - * - *

Internal only: This class is actively being worked on, and it will likely change. We - * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam - * repository. - */ +/** An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. */ @AutoService(SchemaTransformProvider.class) public class MongoDbReadSchemaTransformProvider - extends TypedSchemaTransformProvider< - MongoDbReadSchemaTransformProvider.MongoDbReadSchemaTransformConfiguration> { + extends TypedSchemaTransformProvider { - private static final String OUTPUT_TAG = "output"; + private static final String OUTPUT_TAG_NAME = "output"; + public static final TupleTag OUTPUT_TAG = new TupleTag() {}; + public static final TupleTag ERROR_TAG = new TupleTag() {}; - @Override - protected Class configurationClass() { - return MongoDbReadSchemaTransformConfiguration.class; - } + private static final org.apache.beam.sdk.metrics.Counter errorCounter = + org.apache.beam.sdk.metrics.Metrics.counter( + MongoDbReadSchemaTransformProvider.class, "MongoDB-read-error-counter"); @Override protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { @@ -64,69 +59,17 @@ protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configura @Override public String identifier() { - // Return a unique URN for the transform. return "beam:schematransform:org.apache.beam:mongodb_read:v1"; } @Override public List inputCollectionNames() { - // A read transform does not have an input PCollection. return Collections.emptyList(); } @Override public List outputCollectionNames() { - // The primary output is a PCollection of Rows. - // Error handling could be added later with a second "errors" output tag. - return Collections.singletonList(OUTPUT_TAG); - } - - /** Configuration class for the MongoDB Read transform. */ - @DefaultSchema(AutoValueSchema.class) - @AutoValue - public abstract static class MongoDbReadSchemaTransformConfiguration implements Serializable { - - @SchemaFieldDescription("The connection URI for the MongoDB server.") - public abstract String getUri(); - - @SchemaFieldDescription("The MongoDB database to read from.") - public abstract String getDatabase(); - - @SchemaFieldDescription("The MongoDB collection to read from.") - public abstract String getCollection(); - - @SchemaFieldDescription( - "An optional BSON filter to apply to the read. This should be a valid JSON string.") - @Nullable - public abstract String getFilter(); - - public void validate() { - checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); - checkArgument( - getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); - checkArgument( - getCollection() != null && !getCollection().isEmpty(), - "MongoDB collection must be specified."); - } - - public static Builder builder() { - return new AutoValue_MongoDbReadSchemaTransformProvider_MongoDbReadSchemaTransformConfiguration - .Builder(); - } - - /** Builder for the {@link MongoDbReadSchemaTransformConfiguration}. */ - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setUri(String uri); - - public abstract Builder setDatabase(String database); - - public abstract Builder setCollection(String collection); - - public abstract Builder setFilter(String filter); - - public abstract MongoDbReadSchemaTransformConfiguration build(); - } + return Collections.singletonList(OUTPUT_TAG_NAME); } /** The {@link SchemaTransform} that performs the read operation. */ @@ -140,54 +83,74 @@ private static class MongoDbReadSchemaTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - // A read transform does not have an input, so we start with the pipeline. - PCollection mongoDocs = - input - .getPipeline() - .apply( - "ReadFromMongoDb", - MongoDbIO.read() - .withUri(configuration.getUri()) - .withDatabase(configuration.getDatabase()) - .withCollection(configuration.getCollection())); - // TODO: Add support for .withFilter() if it exists in your MongoDbIO, - // using configuration.getFilter(). - - // Convert the BSON Document objects into Beam Row objects. - PCollection beamRows = - mongoDocs.apply("ConvertToBeamRows", ParDo.of(new MongoDocumentToRowFn())); - - return PCollectionRowTuple.of(OUTPUT_TAG, beamRows); + Schema schema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema()); + + MongoDbIO.Read read = + MongoDbIO.read() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection()); + + final String filterStr = configuration.getFilter(); + if (filterStr != null) { + read = + read.withQueryFn( + new SerializableFunction, MongoCursor>() { + @Override + public MongoCursor apply(MongoCollection collection) { + return collection.find(Document.parse(filterStr)).iterator(); + } + }); + } + + PCollection mongoDocs = input.getPipeline().apply("ReadFromMongoDb", read); + + boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling()); + Schema errorSchema = ErrorHandling.errorSchemaBytes(); + + PCollectionTuple outputTuple = + mongoDocs.apply( + "ConvertToBeamRows", + ParDo.of(new DocumentToRowFn(schema, handleErrors, errorSchema)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + PCollection beamRows = outputTuple.get(OUTPUT_TAG).setRowSchema(schema); + PCollection errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(errorSchema); + + PCollectionRowTuple output = PCollectionRowTuple.of(OUTPUT_TAG_NAME, beamRows); + ErrorHandling errorHandling = configuration.getErrorHandling(); + if (handleErrors && errorHandling != null) { + output = output.and(errorHandling.getOutput(), errorOutput); + } + return output; } } - /** - * A {@link DoFn} to convert a MongoDB {@link Document} to a Beam {@link Row}. - * - *

This is a critical step to ensure data is in a schema-aware format. - */ - private static class MongoDocumentToRowFn extends DoFn { - // TODO: Define the Beam Schema that corresponds to your MongoDB documents. - // This could be made dynamic based on an inferred schema or a user-provided schema. - // For this skeleton, we assume a static schema. - // public static final Schema OUTPUT_SCHEMA = Schema.builder()...build(); + /** Converts a MongoDB BSON {@link Document} to a Beam {@link Row}. */ + static class DocumentToRowFn extends DoFn { + private final Schema schema; + private final boolean handleErrors; + private final Schema errorSchema; + + DocumentToRowFn(Schema schema, boolean handleErrors, Schema errorSchema) { + this.schema = schema; + this.handleErrors = handleErrors; + this.errorSchema = errorSchema; + } @ProcessElement - public void processElement(@Element Document doc, OutputReceiver out) { - // Here you will convert the BSON document to a Beam Row. - // This requires you to know the target schema. - - // Example pseudo-code: - // Row.Builder rowBuilder = Row.withSchema(OUTPUT_SCHEMA); - // for (Map.Entry entry : doc.entrySet()) { - // rowBuilder.addValue(entry.getValue()); - // } - // out.output(rowBuilder.build()); - - // For a robust implementation, you would handle data type conversions - // between BSON types and Beam schema types. - throw new UnsupportedOperationException( - "MongoDocumentToRowFn must be implemented to convert MongoDB Documents to Beam Rows."); + public void processElement(@Element Document doc, MultiOutputReceiver receiver) { + try { + receiver.get(OUTPUT_TAG).output(MongoDbUtils.toRow(doc, schema)); + } catch (Exception e) { + if (!handleErrors) { + throw new RuntimeException( + "Failed to convert BSON Document to Beam Row: " + doc.toJson(), e); + } + errorCounter.inc(); + byte[] docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8); + receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e)); + } } } } diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java index a5acfb1d19fe..51d3dfdbfdc2 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java @@ -18,13 +18,19 @@ package org.apache.beam.sdk.io.mongodb; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.values.Row; import org.bson.BsonNull; import org.bson.Document; +import org.bson.types.Binary; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; /** Utility methods for MongoDB IO. */ public class MongoDbUtils { @@ -71,4 +77,104 @@ public static Document toDocument(Row row) { } return value; } + + /** Converts a BSON {@link Document} to a Beam {@link Row} matching the given {@link Schema}. */ + public static Row toRow(Document doc, Schema schema) { + Row.Builder rowBuilder = Row.withSchema(schema); + for (Field field : schema.getFields()) { + Object value = doc.get(field.getName()); + rowBuilder.addValue(convertFromBsonValue(value, field.getType())); + } + return rowBuilder.build(); + } + + @SuppressWarnings({"nullness", "JavaUtilDate"}) + private static @Nullable Object convertFromBsonValue( + @Nullable Object value, FieldType fieldType) { + if (value == null || value instanceof BsonNull) { + return null; + } + + switch (fieldType.getTypeName()) { + case BYTE: + return (value instanceof Number) + ? ((Number) value).byteValue() + : Byte.parseByte(value.toString()); + case INT16: + return (value instanceof Number) + ? ((Number) value).shortValue() + : Short.parseShort(value.toString()); + case INT32: + return (value instanceof Number) + ? ((Number) value).intValue() + : Integer.parseInt(value.toString()); + case INT64: + return (value instanceof Number) + ? ((Number) value).longValue() + : Long.parseLong(value.toString()); + case FLOAT: + return (value instanceof Number) + ? ((Number) value).floatValue() + : Float.parseFloat(value.toString()); + case DOUBLE: + return (value instanceof Number) + ? ((Number) value).doubleValue() + : Double.parseDouble(value.toString()); + case DECIMAL: + return (value instanceof Number) + ? java.math.BigDecimal.valueOf(((Number) value).doubleValue()) + : new java.math.BigDecimal(value.toString()); + case STRING: + return value.toString(); + case BOOLEAN: + return (value instanceof Boolean) + ? (Boolean) value + : Boolean.parseBoolean(value.toString()); + case DATETIME: + if (value instanceof java.util.Date) { + return new Instant(((java.util.Date) value).getTime()); + } else if (value instanceof Number) { + return new Instant(((Number) value).longValue()); + } else { + return Instant.parse(value.toString()); + } + case BYTES: + if (value instanceof Binary) { + return ((Binary) value).getData(); + } else if (value instanceof byte[]) { + return (byte[]) value; + } else { + return value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8); + } + case ARRAY: + case ITERABLE: + Iterable iterable = (Iterable) value; + List rowList = new ArrayList<>(); + FieldType elementType = Objects.requireNonNull(fieldType.getCollectionElementType()); + for (Object item : iterable) { + rowList.add(convertFromBsonValue(item, elementType)); + } + return rowList; + case MAP: + Map map = (Map) value; + Map rowMap = new HashMap<>(); + FieldType valueType = Objects.requireNonNull(fieldType.getMapValueType()); + for (Map.Entry entry : map.entrySet()) { + rowMap.put( + String.valueOf(entry.getKey()), convertFromBsonValue(entry.getValue(), valueType)); + } + return rowMap; + case ROW: + Schema rowSchema = Objects.requireNonNull(fieldType.getRowSchema()); + if (value instanceof Document) { + return toRow((Document) value, rowSchema); + } else if (value instanceof Map) { + return toRow(new Document((Map) value), rowSchema); + } else { + throw new IllegalArgumentException("Cannot convert value to Row: " + value); + } + default: + throw new IllegalArgumentException("Unsupported field type: " + fieldType); + } + } } diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProviderTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProviderTest.java new file mode 100644 index 000000000000..cc93c7a6080e --- /dev/null +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProviderTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTagList; +import org.bson.Document; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link MongoDbReadSchemaTransformProvider}. */ +@RunWith(JUnit4.class) +public class MongoDbReadSchemaTransformProviderTest { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Test + public void testInvalidConfigMissingUri() { + assertThrows( + IllegalStateException.class, + () -> { + MongoDbReadSchemaTransformConfiguration.builder() + .setDatabase("db") + .setCollection("col") + .setSchema("{}") + .build() + .validate(); + }); + } + + @Test + public void testInvalidConfigMissingDatabase() { + assertThrows( + IllegalStateException.class, + () -> { + MongoDbReadSchemaTransformConfiguration.builder() + .setUri("mongodb://localhost:27017") + .setCollection("col") + .setSchema("{}") + .build() + .validate(); + }); + } + + @Test + public void testInvalidConfigMissingCollection() { + assertThrows( + IllegalStateException.class, + () -> { + MongoDbReadSchemaTransformConfiguration.builder() + .setUri("mongodb://localhost:27017") + .setDatabase("db") + .setSchema("{}") + .build() + .validate(); + }); + } + + @Test + public void testInvalidConfigMissingSchema() { + assertThrows( + IllegalStateException.class, + () -> { + MongoDbReadSchemaTransformConfiguration.builder() + .setUri("mongodb://localhost:27017") + .setDatabase("db") + .setCollection("col") + .build() + .validate(); + }); + } + + @Test + public void testConfigurationSchema() throws Exception { + Schema schema = + SchemaRegistry.createDefault().getSchema(MongoDbReadSchemaTransformConfiguration.class); + + // We expect 6 fields: uri, database, collection, schema, filter, errorHandling + assertEquals(6, schema.getFieldCount()); + assertNotNull(schema.getField("uri")); + assertNotNull(schema.getField("database")); + assertNotNull(schema.getField("collection")); + assertNotNull(schema.getField("schema")); + assertNotNull(schema.getField("filter")); + assertNotNull(schema.getField("errorHandling")); + } + + @Test + public void testDocumentToRowFn() { + Schema beamSchema = Schema.builder().addStringField("name").addInt32Field("age").build(); + + Document doc = new Document().append("name", "John").append("age", 30); + + PCollection inputDocs = + p.apply( + Create.of(Collections.singletonList(doc)) + .withCoder(MongoDbWriteSchemaTransformProvider.DocumentCoder.of())); + + Schema errorSchema = ErrorHandling.errorSchemaBytes(); + PCollectionTuple outputTuple = + inputDocs.apply( + "ConvertToRows", + ParDo.of( + new MongoDbReadSchemaTransformProvider.DocumentToRowFn( + beamSchema, false, errorSchema)) + .withOutputTags( + MongoDbReadSchemaTransformProvider.OUTPUT_TAG, + TupleTagList.of(MongoDbReadSchemaTransformProvider.ERROR_TAG))); + + PCollection outputRows = + outputTuple.get(MongoDbReadSchemaTransformProvider.OUTPUT_TAG).setRowSchema(beamSchema); + outputTuple.get(MongoDbReadSchemaTransformProvider.ERROR_TAG).setRowSchema(errorSchema); + + PAssert.that(outputRows) + .satisfies( + rows -> { + Row row = rows.iterator().next(); + assertEquals("John", row.getString("name")); + assertEquals(Integer.valueOf(30), row.getInt32("age")); + return null; + }); + + p.run().waitUntilFinish(); + } + + @Test + public void testDocumentToRowFnWithErrors() { + Schema beamSchema = Schema.builder().addInt32Field("age").build(); + + // Invalid document: age value is a string "not_an_int" which cannot be converted to INT32 + Document invalidDoc = new Document().append("age", "not_an_int"); + + PCollection inputDocs = + p.apply( + Create.of(Collections.singletonList(invalidDoc)) + .withCoder(MongoDbWriteSchemaTransformProvider.DocumentCoder.of())); + + Schema errorSchema = ErrorHandling.errorSchemaBytes(); + PCollectionTuple outputTuple = + inputDocs.apply( + "ConvertToRowsWithErrors", + ParDo.of( + new MongoDbReadSchemaTransformProvider.DocumentToRowFn( + beamSchema, true, errorSchema)) + .withOutputTags( + MongoDbReadSchemaTransformProvider.OUTPUT_TAG, + TupleTagList.of(MongoDbReadSchemaTransformProvider.ERROR_TAG))); + + PCollection errorRows = + outputTuple.get(MongoDbReadSchemaTransformProvider.ERROR_TAG).setRowSchema(errorSchema); + outputTuple.get(MongoDbReadSchemaTransformProvider.OUTPUT_TAG).setRowSchema(beamSchema); + + PAssert.that(errorRows) + .satisfies( + rows -> { + Row errorRow = rows.iterator().next(); + byte[] failedRowBytes = errorRow.getBytes("failed_row"); + String failedJson = new String(failedRowBytes, StandardCharsets.UTF_8); + String errMsg = errorRow.getString("error_message"); + assertNotNull(failedJson); + assertNotNull(errMsg); + return null; + }); + + p.run().waitUntilFinish(); + } +} diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbUtilsTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbUtilsTest.java index ec11bb865913..f454d100f0a6 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbUtilsTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbUtilsTest.java @@ -17,12 +17,16 @@ */ package org.apache.beam.sdk.io.mongodb; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; @@ -30,6 +34,8 @@ import org.apache.beam.sdk.values.Row; import org.bson.BsonNull; import org.bson.Document; +import org.bson.types.Binary; +import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -132,4 +138,121 @@ public void testToDocumentWithNullValues() { Object val = doc.get("nullableString"); assertTrue(val instanceof BsonNull); } + + @Test + public void testToRowWithSimplePrimitives() { + Schema schema = + Schema.builder() + .addStringField("stringField") + .addInt32Field("intField") + .addInt64Field("longField") + .addBooleanField("booleanField") + .addDoubleField("doubleField") + .build(); + + Document doc = + new Document() + .append("stringField", "hello") + .append("intField", 42) + .append("longField", 123456789L) + .append("booleanField", true) + .append("doubleField", 3.14); + + Row row = MongoDbUtils.toRow(doc, schema); + + assertNotNull(row); + assertEquals("hello", row.getString("stringField")); + assertEquals(Integer.valueOf(42), row.getInt32("intField")); + assertEquals(Long.valueOf(123456789L), row.getInt64("longField")); + assertEquals(Boolean.TRUE, row.getBoolean("booleanField")); + assertEquals(Double.valueOf(3.14), row.getDouble("doubleField")); + } + + @Test + public void testToRowWithNestedRow() { + Schema nestedSchema = + Schema.builder().addStringField("nestedString").addInt32Field("nestedInt").build(); + + Schema parentSchema = + Schema.builder() + .addStringField("parentString") + .addRowField("nestedRow", nestedSchema) + .build(); + + Document nestedDoc = + new Document().append("nestedString", "nestedValue").append("nestedInt", 100); + Document parentDoc = + new Document().append("parentString", "parentValue").append("nestedRow", nestedDoc); + + Row row = MongoDbUtils.toRow(parentDoc, parentSchema); + + assertNotNull(row); + assertEquals("parentValue", row.getString("parentString")); + + Row nestedRow = row.getRow("nestedRow"); + assertNotNull(nestedRow); + assertEquals("nestedValue", nestedRow.getString("nestedString")); + assertEquals(Integer.valueOf(100), nestedRow.getInt32("nestedInt")); + } + + @Test + public void testToRowWithIterable() { + Schema schema = Schema.builder().addArrayField("listField", FieldType.STRING).build(); + + Document doc = new Document().append("listField", Arrays.asList("a", "b", "c")); + + Row row = MongoDbUtils.toRow(doc, schema); + + assertNotNull(row); + List list = (List) row.getArray("listField"); + assertEquals(3, list.size()); + assertEquals("a", list.get(0)); + assertEquals("b", list.get(1)); + assertEquals("c", list.get(2)); + } + + @Test + public void testToRowWithMap() { + Schema schema = + Schema.builder().addMapField("mapField", FieldType.STRING, FieldType.INT32).build(); + + Document nestedMap = new Document().append("key", 42); + Document doc = new Document().append("mapField", nestedMap); + + Row row = MongoDbUtils.toRow(doc, schema); + + assertNotNull(row); + Map map = row.getMap("mapField"); + assertEquals(1, map.size()); + assertEquals(Integer.valueOf(42), map.get("key")); + } + + @Test + public void testToRowWithNullValues() { + Schema schema = Schema.builder().addNullableField("nullableString", FieldType.STRING).build(); + + Document doc = new Document().append("nullableString", new BsonNull()); + + Row row = MongoDbUtils.toRow(doc, schema); + + assertNotNull(row); + assertNull(row.getString("nullableString")); + } + + @Test + @SuppressWarnings("JavaUtilDate") + public void testToRowWithDateAndBinary() { + Schema schema = + Schema.builder().addDateTimeField("dateField").addByteArrayField("binaryField").build(); + + Date now = new Date(); + byte[] bytes = "hello binary".getBytes(StandardCharsets.UTF_8); + Document doc = new Document().append("dateField", now).append("binaryField", new Binary(bytes)); + + Row row = MongoDbUtils.toRow(doc, schema); + + assertNotNull(row); + assertEquals(new Instant(now.getTime()), row.getDateTime("dateField")); + assertArrayEquals(bytes, row.getBytes("binaryField")); + } } diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 71388dfa6bfa..3f9438b7056a 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -406,17 +406,22 @@ config: mappings: 'ReadFromMongoDB': - connection_uri: "connection_uri" + connection_uri: "uri" database: "database" collection: "collection" + schema: "schema" + filter: "filter" + error_handling: "errorHandling" 'WriteToMongoDB': - connection_uri: "connection_uri" + connection_uri: "uri" database: "database" collection: "collection" + batch_size: "batchSize" + error_handling: "errorHandling" underlying_provider: type: beamJar transforms: - 'ReadFromBigTable': 'beam:schematransform:org.apache.beam:mongodb_read:v1' - 'WriteToBigTable': 'beam:schematransform:org.apache.beam:mongodb_write:v1' + 'ReadFromMongoDB': 'beam:schematransform:org.apache.beam:mongodb_read:v1' + 'WriteToMongoDB': 'beam:schematransform:org.apache.beam:mongodb_write:v1' config: - gradle_target: ':sdks:java:io:mongodb:shadowJar' + gradle_target: 'sdks:java:io:expansion-service:shadowJar' diff --git a/sdks/python/apache_beam/yaml/tests/mongodb.yaml b/sdks/python/apache_beam/yaml/tests/mongodb.yaml index e69de29bb2d1..fbf0eff02699 100644 --- a/sdks/python/apache_beam/yaml/tests/mongodb.yaml +++ b/sdks/python/apache_beam/yaml/tests/mongodb.yaml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +fixtures: + - name: mongo_vars + type: "apache_beam.yaml.integration_tests.temp_mongodb_table" + +pipelines: + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - { name: "John", age: 30 } + - { name: "Jane", age: 25 } + - type: WriteToMongoDB + config: + connection_uri: '{mongo_vars[URI]}' + database: '{mongo_vars[DATABASE]}' + collection: '{mongo_vars[COLLECTION]}' + - pipeline: + type: chain + transforms: + - type: ReadFromMongoDB + config: + connection_uri: '{mongo_vars[URI]}' + database: '{mongo_vars[DATABASE]}' + collection: '{mongo_vars[COLLECTION]}' + schema: | + { + "type": "object", + "properties": { + "name": {"type": "string"}, + "age": {"type": "integer"} + }, + "required": ["name", "age"] + } + - type: AssertEqual + config: + elements: + - { name: "John", age: 30 } + - { name: "Jane", age: 25 } From 6e72ffc879b4ed757419a84251791096a7c47649 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 2 Jun 2026 14:02:17 +0000 Subject: [PATCH 03/10] test(yaml): Add error-handling coverage for MongoDB Write and Read transforms - Standardized standard_io mappings to snake_case (error_handling, batch_size) - Extended integration test to verify error-handling queues are empty for clean runs --- sdks/python/apache_beam/yaml/standard_io.yaml | 6 ++--- .../apache_beam/yaml/tests/mongodb.yaml | 22 +++++++++++++++++-- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 3f9438b7056a..a9f3ba86f1fa 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -411,13 +411,13 @@ collection: "collection" schema: "schema" filter: "filter" - error_handling: "errorHandling" + error_handling: "error_handling" 'WriteToMongoDB': connection_uri: "uri" database: "database" collection: "collection" - batch_size: "batchSize" - error_handling: "errorHandling" + batch_size: "batch_size" + error_handling: "error_handling" underlying_provider: type: beamJar transforms: diff --git a/sdks/python/apache_beam/yaml/tests/mongodb.yaml b/sdks/python/apache_beam/yaml/tests/mongodb.yaml index fbf0eff02699..e24af720e7d8 100644 --- a/sdks/python/apache_beam/yaml/tests/mongodb.yaml +++ b/sdks/python/apache_beam/yaml/tests/mongodb.yaml @@ -19,22 +19,32 @@ fixtures: pipelines: - pipeline: - type: chain + type: composite transforms: - type: Create + name: CreateData config: elements: - { name: "John", age: 30 } - { name: "Jane", age: 25 } - type: WriteToMongoDB + name: WriteData + input: CreateData config: connection_uri: '{mongo_vars[URI]}' database: '{mongo_vars[DATABASE]}' collection: '{mongo_vars[COLLECTION]}' + error_handling: + output: my_error_output + - type: AssertEqual + input: WriteData.my_error_output + config: + elements: [] - pipeline: - type: chain + type: composite transforms: - type: ReadFromMongoDB + name: ReadData config: connection_uri: '{mongo_vars[URI]}' database: '{mongo_vars[DATABASE]}' @@ -48,8 +58,16 @@ pipelines: }, "required": ["name", "age"] } + error_handling: + output: my_error_output - type: AssertEqual + input: ReadData config: elements: - { name: "John", age: 30 } - { name: "Jane", age: 25 } + - type: AssertEqual + input: ReadData.my_error_output + config: + elements: [] + From 0cf39054dee8a37c212aa8e1ecb78efd345b68d5 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 2 Jun 2026 14:27:04 +0000 Subject: [PATCH 04/10] remove old edits to other transforms from original PR --- sdks/python/apache_beam/yaml/standard_io.yaml | 122 +++++++++++------- 1 file changed, 76 insertions(+), 46 deletions(-) diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index a9f3ba86f1fa..d3c65f818e12 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -45,9 +45,15 @@ type: beamJar transforms: 'ReadFromBigQuery': 'beam:schematransform:org.apache.beam:bigquery_storage_read:v1' - 'WriteToBigQuery': 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2' + 'WriteToBigQuery': 'beam:schematransform:org.apache.beam:bigquery_write:v1' config: - gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar' + gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' + managed_replacement: + # Following transforms may be replaced with equivalent managed transforms, + # if the pipelines 'updateCompatibilityBeamVersion' match the provided + # version. + 'ReadFromBigQuery': '2.69.0' + 'WriteToBigQuery': '2.69.0' # Kafka - type: renaming @@ -68,6 +74,7 @@ 'error_handling': 'error_handling' 'file_descriptor_path': 'file_descriptor_path' 'message_name': 'message_name' + 'max_read_time_seconds': 'max_read_time_seconds' 'WriteToKafka': 'format': 'format' 'topic': 'topic' @@ -91,44 +98,6 @@ 'ReadFromKafka': '2.65.0' 'WriteToKafka': '2.65.0' -# PubSub -- type: renaming - transforms: - 'ReadFromPubSubLite': 'ReadFromPubSubLite' - 'WriteToPubSubLite': 'WriteToPubSubLite' - config: - mappings: - 'ReadFromPubSubLite': - 'project': 'project' - 'schema': 'schema' - 'format': 'format' - 'subscription_name': 'subscription_name' - 'location': 'location' - 'attributes': 'attributes' - 'attribute_map': 'attribute_map' - 'attribute_id': 'attribute_id' - 'error_handling': 'error_handling' - 'file_descriptor_path': 'file_descriptor_path' - 'message_name': 'message_name' - 'WriteToPubSubLite': - 'project': 'project' - 'format': 'format' - 'topic_name': 'topic_name' - 'location': 'location' - 'attributes': 'attributes' - 'attribute_id': 'attribute_id' - 'error_handling': 'error_handling' - 'file_descriptor_path': 'file_descriptor_path' - 'message_name': 'message_name' - 'schema': 'schema' - underlying_provider: - type: beamJar - transforms: - 'ReadFromPubSubLite': 'beam:schematransform:org.apache.beam:pubsublite_read:v1' - 'WriteToPubSubLite': 'beam:schematransform:org.apache.beam:pubsublite_write:v1' - config: - gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' - # TODO(yaml): Tests are assuming python providers are before java ones, hence # the order below. This should be fixed in the future. @@ -272,32 +241,32 @@ driver_jars: '' jdbc_type: '' 'ReadFromPostgres': - connection_init_sql: '' + connection_init_sql: [] driver_class_name: '' driver_jars: '' jdbc_type: '' 'WriteToPostgres': - connection_init_sql: '' + connection_init_sql: [] driver_class_name: '' driver_jars: '' jdbc_type: '' 'ReadFromOracle': - connection_init_sql: '' + connection_init_sql: [] driver_class_name: '' driver_jars: '' jdbc_type: '' 'WriteToOracle': - connection_init_sql: '' + connection_init_sql: [] driver_class_name: '' driver_jars: '' jdbc_type: '' 'ReadFromSqlServer': - connection_init_sql: '' + connection_init_sql: [] driver_class_name: '' driver_jars: '' jdbc_type: '' 'WriteToSqlServer': - connection_init_sql: '' + connection_init_sql: [] driver_class_name: '' driver_jars: '' jdbc_type: '' @@ -316,6 +285,16 @@ 'WriteToSqlServer': 'beam:schematransform:org.apache.beam:sql_server_write:v1' config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' + managed_replacement: + # Following transforms may be replaced with equivalent managed transforms, + # if the pipelines 'updateCompatibilityBeamVersion' match the provided + # version. + 'ReadFromPostgres': '2.73.0' + 'WriteToPostgres': '2.73.0' + 'ReadFromMySql': '2.73.0' + 'WriteToMySql': '2.73.0' + 'ReadFromSqlServer': '2.73.0' + 'WriteToSqlServer': '2.73.0' # Spanner - type: renaming @@ -398,6 +377,57 @@ config: gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' +#IcebergCDC +- type: renaming + transforms: + 'ReadFromIcebergCDC': 'ReadFromIcebergCDC' + config: + mappings: + 'ReadFromIcebergCDC': + table: 'table' + catalog_name: 'catalog_name' + catalog_properties: 'catalog_properties' + config_properties: 'config_properties' + drop: 'drop' + filter: 'filter' + from_snapshot: 'from_snapshot' + from_timestamp: 'from_timestamp' + keep: 'keep' + poll_interval_seconds: 'poll_interval_seconds' + starting_strategy: 'starting_strategy' + streaming: 'streaming' + to_snapshot: 'to_snapshot' + to_timestamp: 'to_timestamp' + underlying_provider: + type: beamJar + transforms: + 'ReadFromIcebergCDC': 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1' + config: + gradle_target: 'sdks:java:io:expansion-service:shadowJar' + +#IcebergAddFiles +- type: renaming + transforms: + 'IcebergAddFiles': 'IcebergAddFiles' + config: + mappings: + 'IcebergAddFiles': + table: 'table' + catalog_properties: 'catalog_properties' + config_properties: 'config_properties' + triggering_frequency_seconds: 'triggering_frequency_seconds' + append_batch_size: 'append_batch_size' + location_prefix: 'location_prefix' + partition_fields: 'partition_fields' + table_properties: 'table_properties' + error_handling: 'error_handling' + underlying_provider: + type: beamJar + transforms: + 'IcebergAddFiles': 'beam:schematransform:iceberg_add_files:v1' + config: + gradle_target: 'sdks:java:io:expansion-service:shadowJar' + #MongoDB - type: renaming transforms: From c1bac5770f451afa25e777df4bc238a1210f5fea Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 2 Jun 2026 14:37:10 +0000 Subject: [PATCH 05/10] add python support --- sdks/python/apache_beam/yaml/standard_io.yaml | 2 + sdks/python/apache_beam/yaml/yaml_io.py | 56 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index d3c65f818e12..f2666437eb0d 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -115,6 +115,8 @@ 'WriteToIceberg': 'apache_beam.yaml.yaml_io.write_to_iceberg' 'ReadFromTFRecord': 'apache_beam.yaml.yaml_io.read_from_tfrecord' 'WriteToTFRecord': 'apache_beam.yaml.yaml_io.write_to_tfrecord' + 'ReadFromMongoDB': 'apache_beam.yaml.yaml_io.read_from_mongodb' + 'WriteToMongoDB': 'apache_beam.yaml.yaml_io.write_to_mongodb' # General File Formats # Declared as a renaming transform to avoid exposing all diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 989661a6eae4..dc8c8ac2359b 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -724,6 +724,62 @@ def write_to_tfrecord( compression_type=getattr(CompressionTypes, compression_type)) +@beam.ptransform_fn +@yaml_errors.maybe_with_exception_handling_transform_fn +def read_from_mongodb( + root, + *, + database: str, + collection: str, + schema: Union[str, dict[str, Any]], + connection_uri: Optional[str] = None, + filter: Optional[dict[str, Any]] = None, + projection: Optional[Union[list[str], dict[str, Any]]] = None, + extra_client_params: Optional[dict[str, Any]] = None, + bucket_auto: bool = False): + """Reads data from MongoDB. + + The resulting PCollection consists of rows with fields matching the provided + schema. + + Args: + database: The MongoDB database name. + collection: The MongoDB collection name. + schema: JSON schema specifying the fields to select and their types. + connection_uri: The MongoDB connection string. e.g. "mongodb://localhost:27017" + filter: A JSON/bson mapping specifying elements which must be present. + projection: A list of field names that should be returned or a dict + specifying the fields to include/exclude. + extra_client_params: Optional MongoClient parameters. + bucket_auto: If True, use MongoDB $bucketAuto aggregation to split + collection into bundles instead of splitVector command. + """ + from apache_beam.io import mongodbio + import json + + if isinstance(schema, str): + schema = json.loads(schema) + + beam_schema = json_utils.json_schema_to_beam_schema(schema) + beam_type = schema_pb2.FieldType( + row_type=schema_pb2.RowType(schema=beam_schema)) + to_row_fn = json_utils.json_to_row(beam_type) + + output = ( + root + | mongodbio.ReadFromMongoDB( + uri=connection_uri, + db=database, + coll=collection, + filter=filter, + projection=projection, + extra_client_params=extra_client_params, + bucket_auto=bucket_auto) + | beam.Map(to_row_fn)) + output.element_type = schemas.named_tuple_from_schema(beam_schema) + return output + + @beam.ptransform_fn @yaml_errors.maybe_with_exception_handling_transform_fn def write_to_mongodb( From 80f7f050a13e78f17b44bb89802eca942d4ec2e8 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 2 Jun 2026 15:27:50 +0000 Subject: [PATCH 06/10] revert changes to integration tests not needed from original PR and fix lint issues --- .../apache_beam/yaml/integration_tests.py | 69 ++++++++++--------- sdks/python/apache_beam/yaml/yaml_io.py | 7 +- 2 files changed, 37 insertions(+), 39 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 62368c126336..2d0b2787fc9b 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -33,11 +33,11 @@ from datetime import timezone import mock -import mysql.connector import psycopg2 import pytds import sqlalchemy import yaml +from apitools.base.py.exceptions import HttpError from google.cloud import pubsub_v1 from google.cloud.bigtable import client from google.cloud.bigtable_admin_v2.types import instance @@ -60,10 +60,11 @@ from apache_beam.yaml import yaml_provider from apache_beam.yaml import yaml_transform from apache_beam.yaml.conftest import yaml_test_files_dir -from apitools.base.py.exceptions import HttpError _LOGGER = logging.getLogger(__name__) +_MONGO_CONTAINER_IMAGE = 'mongo:7.0.7' + @contextlib.contextmanager def gcs_temp_dir(bucket): @@ -223,17 +224,11 @@ def temp_mongodb_table(): - collection: ${mongo_vars.COLLECTION} """ _LOGGER.info("Setting up MongoDB fixture...") - # Initialize and start the MongoDB container. - # This will pull the 'mongo:7.0.7' image if it's not available locally. - mongo_container = MongoDbContainer("mongo:7.0.7") + mongo_container = MongoDbContainer(_MONGO_CONTAINER_IMAGE) try: mongo_container.start() - - # Get the dynamically generated connection URI. mongo_uri = mongo_container.get_connection_url() - # Generate a unique database and collection name for this test run to ensure - # isolation between different test files. db_name = f'db_{uuid.uuid4().hex}' collection_name = f'collection_{uuid.uuid4().hex}' @@ -250,7 +245,6 @@ def temp_mongodb_table(): } finally: - # This block executes after the test suite finishes. _LOGGER.info("Tearing down MongoDB fixture...") mongo_container.stop() _LOGGER.info("MongoDB container stopped.") @@ -341,26 +335,22 @@ def temp_mysql_database(): Exception: Any other exception encountered during the setup process. """ with MySqlContainer(init=True, dialect='pymysql') as mysql_container: - try: - # Make connection to temp database and create tmp table - engine = sqlalchemy.create_engine(mysql_container.get_connection_url()) - with engine.begin() as connection: - connection.execute( - sqlalchemy.text( - "CREATE TABLE tmp_table (value INTEGER, `rank` INTEGER);")) + # Make connection to temp database and create tmp table + engine = sqlalchemy.create_engine(mysql_container.get_connection_url()) + with engine.begin() as connection: + connection.execute( + sqlalchemy.text( + "CREATE TABLE tmp_table (value INTEGER, `rank` INTEGER);")) - # Construct the JDBC url for connections later on by tests - jdbc_url = ( - f"jdbc:mysql://{mysql_container.get_container_host_ip()}:" - f"{mysql_container.get_exposed_port(mysql_container.port)}/" - f"{mysql_container.dbname}?" - f"user={mysql_container.username}&" - f"password={mysql_container.password}") + # Construct the JDBC url for connections later on by tests + jdbc_url = ( + f"jdbc:mysql://{mysql_container.get_container_host_ip()}:" + f"{mysql_container.get_exposed_port(mysql_container.port)}/" + f"{mysql_container.dbname}?" + f"user={mysql_container.username}&" + f"password={mysql_container.password}") - yield jdbc_url - except mysql.connector.Error as err: - logging.error("Error interacting with temporary MySQL DB: %s", err) - raise err + yield jdbc_url @contextlib.contextmanager @@ -779,12 +769,23 @@ def test(self, providers=providers): # default arg to capture loop value **yaml_transform.SafeLineLoader.strip_metadata( fixture.get('config', {})))) for pipeline_spec in spec['pipelines']: - with beam.Pipeline(options=PipelineOptions( - pickle_library='cloudpickle', - **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get( - 'options', {})))) as p: - yaml_transform.expand_pipeline( - p, replace_recursive(pipeline_spec, vars)) + try: + with beam.Pipeline(options=PipelineOptions( + pickle_library='cloudpickle', + **replace_recursive( + yaml_transform.SafeLineLoader.strip_metadata( + pipeline_spec.get('options', {})), + vars))) as p: + yaml_transform.expand_pipeline( + p, replace_recursive(pipeline_spec, vars)) + except ValueError as exn: + # FnApiRunner currently does not support this requirement in + # some xlang scenarios (e.g. Iceberg YAML pipelines). + if 'beam:requirement:pardo:on_window_expiration:v1' in str(exn): + self.skipTest( + 'Runner does not support ' + 'beam:requirement:pardo:on_window_expiration:v1') + raise yield f'test_{suffix}', test diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index dc8c8ac2359b..186ade34bf4f 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -32,11 +32,13 @@ from typing import Union import fastavro +import json import apache_beam as beam import apache_beam.io as beam_io from apache_beam import coders from apache_beam.coders.row_coder import RowCoder +from apache_beam.io import mongodbio from apache_beam.io import ReadFromBigQuery from apache_beam.io import ReadFromTFRecord from apache_beam.io import WriteToBigQuery @@ -754,9 +756,6 @@ def read_from_mongodb( bucket_auto: If True, use MongoDB $bucketAuto aggregation to split collection into bundles instead of splitVector command. """ - from apache_beam.io import mongodbio - import json - if isinstance(schema, str): schema = json.loads(schema) @@ -800,8 +799,6 @@ def write_to_mongodb( batch_size: Number of documents per bulk_write to MongoDB. extra_client_params: Optional MongoClient parameters. """ - from apache_beam.io import mongodbio - def row_to_dict(value): if value is None: return None From b2c6064f12231570ad37eac5b001d9e1a3477633 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 2 Jun 2026 16:03:50 +0000 Subject: [PATCH 07/10] fix more lint --- sdks/python/apache_beam/yaml/yaml_io.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 186ade34bf4f..74d177e6a3bd 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -24,6 +24,7 @@ """ import io +import json from collections.abc import Callable from collections.abc import Iterable from collections.abc import Mapping @@ -32,18 +33,17 @@ from typing import Union import fastavro -import json import apache_beam as beam import apache_beam.io as beam_io from apache_beam import coders from apache_beam.coders.row_coder import RowCoder -from apache_beam.io import mongodbio from apache_beam.io import ReadFromBigQuery from apache_beam.io import ReadFromTFRecord from apache_beam.io import WriteToBigQuery from apache_beam.io import WriteToTFRecord from apache_beam.io import avroio +from apache_beam.io import mongodbio from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.gcp.bigquery import BigQueryDisposition from apache_beam.portability.api import schema_pb2 From eaf69fc39808a202af7ba61e546c580bc8a7a142 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 2 Jun 2026 17:40:46 +0000 Subject: [PATCH 08/10] fix generate external transforms --- sdks/standard_external_transforms.yaml | 36 +++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index 057c4e3f47d1..121057690263 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -19,7 +19,7 @@ # configuration in /sdks/standard_expansion_services.yaml. # Refer to gen_xlang_wrappers.py for more info. # -# Last updated on: 2026-05-06 +# Last updated on: 2026-06-02 - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 @@ -50,6 +50,40 @@ type: int64 identifier: beam:schematransform:org.apache.beam:generate_sequence:v1 name: GenerateSequence +- default_service: sdks:java:io:expansion-service:shadowJar + description: '' + destinations: + python: apache_beam/io + fields: + - description: The MongoDB collection to read from. + name: collection + nullable: false + type: str + - description: The MongoDB database to read from. + name: database + nullable: false + type: str + - description: This option specifies whether and where to output rows that failed + to be read. + name: error_handling + nullable: true + type: Row(output=) + - description: An optional BSON filter to apply to the read. This should be a valid + JSON string. + name: filter + nullable: true + type: str + - description: The schema in which the data is encoded, defined with JSON-schema + syntax (https://json-schema.org/). + name: schema + nullable: false + type: str + - description: The connection URI for the MongoDB server. + name: uri + nullable: false + type: str + identifier: beam:schematransform:org.apache.beam:mongodb_read:v1 + name: MongodbRead - default_service: sdks:java:io:expansion-service:shadowJar description: '' destinations: From 4668c5911c632fa8e02752a2ff9f472419b8b121 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 3 Jun 2026 00:18:21 +0000 Subject: [PATCH 09/10] pin read to python provider --- sdks/python/apache_beam/yaml/tests/mongodb.yaml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/tests/mongodb.yaml b/sdks/python/apache_beam/yaml/tests/mongodb.yaml index e24af720e7d8..ed4a728a2107 100644 --- a/sdks/python/apache_beam/yaml/tests/mongodb.yaml +++ b/sdks/python/apache_beam/yaml/tests/mongodb.yaml @@ -70,4 +70,11 @@ pipelines: input: ReadData.my_error_output config: elements: [] - + # TODO: Remove when Java ReadFromMongoDB (MongoDbIO) is migrated from + # legacy BoundedSource to a DoFn/SDF-based reader. + providers: + - type: python + config: { } + transforms: + ReadFromMongoDB: 'apache_beam.yaml.yaml_io.read_from_mongodb' + From 9df8c17024e5ced78d3acdadbb4e01ad518c4290 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 3 Jun 2026 13:18:50 +0000 Subject: [PATCH 10/10] comment out read part --- .../apache_beam/yaml/tests/mongodb.yaml | 69 +++++++++---------- 1 file changed, 32 insertions(+), 37 deletions(-) diff --git a/sdks/python/apache_beam/yaml/tests/mongodb.yaml b/sdks/python/apache_beam/yaml/tests/mongodb.yaml index ed4a728a2107..652e24c09673 100644 --- a/sdks/python/apache_beam/yaml/tests/mongodb.yaml +++ b/sdks/python/apache_beam/yaml/tests/mongodb.yaml @@ -40,41 +40,36 @@ pipelines: input: WriteData.my_error_output config: elements: [] - - pipeline: - type: composite - transforms: - - type: ReadFromMongoDB - name: ReadData - config: - connection_uri: '{mongo_vars[URI]}' - database: '{mongo_vars[DATABASE]}' - collection: '{mongo_vars[COLLECTION]}' - schema: | - { - "type": "object", - "properties": { - "name": {"type": "string"}, - "age": {"type": "integer"} - }, - "required": ["name", "age"] - } - error_handling: - output: my_error_output - - type: AssertEqual - input: ReadData - config: - elements: - - { name: "John", age: 30 } - - { name: "Jane", age: 25 } - - type: AssertEqual - input: ReadData.my_error_output - config: - elements: [] - # TODO: Remove when Java ReadFromMongoDB (MongoDbIO) is migrated from - # legacy BoundedSource to a DoFn/SDF-based reader. - providers: - - type: python - config: { } - transforms: - ReadFromMongoDB: 'apache_beam.yaml.yaml_io.read_from_mongodb' +# TODO: Re-enable ReadFromMongoDB tests once Java MongoDbIO is migrated from legacy BoundedSource. +# - pipeline: +# type: composite +# transforms: +# - type: ReadFromMongoDB +# name: ReadData +# config: +# connection_uri: '{mongo_vars[URI]}' +# database: '{mongo_vars[DATABASE]}' +# collection: '{mongo_vars[COLLECTION]}' +# schema: | +# { +# "type": "object", +# "properties": { +# "name": {"type": "string"}, +# "age": {"type": "integer"} +# }, +# "required": ["name", "age"] +# } +# error_handling: +# output: my_error_output +# - type: AssertEqual +# input: ReadData +# config: +# elements: +# - { name: "John", age: 30 } +# - { name: "Jane", age: 25 } +# - type: AssertEqual +# input: ReadData.my_error_output +# config: +# elements: [] +