diff --git a/sdks/java/io/mongodb/build.gradle b/sdks/java/io/mongodb/build.gradle index d4a172f01f8b..d56d969f30f0 100644 --- a/sdks/java/io/mongodb/build.gradle +++ b/sdks/java/io/mongodb/build.gradle @@ -32,6 +32,8 @@ dependencies { implementation library.java.mongodb_driver_core implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre + provided library.java.everit_json_schema + permitUnusedDeclared library.java.everit_json_schema testImplementation library.java.junit testImplementation project(path: ":sdks:java:io:common") testImplementation project(path: ":sdks:java:testing:test-utils") diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformConfiguration.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformConfiguration.java new file mode 100644 index 000000000000..e72fe8df8079 --- /dev/null +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformConfiguration.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Configuration class for the MongoDB Read transform. */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class MongoDbReadSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to read from.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to read from.") + public abstract String getCollection(); + + @SchemaFieldDescription( + "The schema in which the data is encoded, defined with JSON-schema syntax (https://json-schema.org/).") + public abstract String getSchema(); + + @SchemaFieldDescription( + "An optional BSON filter to apply to the read. This should be a valid JSON string.") + @Nullable + public abstract String getFilter(); + + @SchemaFieldDescription( + "This option specifies whether and where to output rows that failed to be read.") + @Nullable + public abstract ErrorHandling getErrorHandling(); + + public void validate() { + checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); + checkArgument( + getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); + checkArgument( + getCollection() != null && !getCollection().isEmpty(), + "MongoDB collection must be specified."); + checkArgument( + getSchema() != null && !getSchema().isEmpty(), "MongoDB schema must be specified."); + } + + public static Builder builder() { + return new AutoValue_MongoDbReadSchemaTransformConfiguration.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUri(String uri); + + public abstract Builder setDatabase(String database); + + public abstract Builder setCollection(String collection); + + public abstract Builder setSchema(String schema); + + public abstract Builder setFilter(String filter); + + public abstract Builder setErrorHandling(ErrorHandling errorHandling); + + public abstract MongoDbReadSchemaTransformConfiguration build(); + } +} diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java new file mode 100644 index 000000000000..418421bb1783 --- /dev/null +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import com.google.auto.service.AutoService; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.bson.Document; + +/** An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbReadSchemaTransformProvider + extends TypedSchemaTransformProvider { + + private static final String OUTPUT_TAG_NAME = "output"; + public static final TupleTag OUTPUT_TAG = new TupleTag() {}; + public static final TupleTag ERROR_TAG = new TupleTag() {}; + + private static final org.apache.beam.sdk.metrics.Counter errorCounter = + org.apache.beam.sdk.metrics.Metrics.counter( + MongoDbReadSchemaTransformProvider.class, "MongoDB-read-error-counter"); + + @Override + protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { + return new MongoDbReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mongodb_read:v1"; + } + + @Override + public List inputCollectionNames() { + return Collections.emptyList(); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG_NAME); + } + + /** The {@link SchemaTransform} that performs the read operation. */ + private static class MongoDbReadSchemaTransform extends SchemaTransform { + private final MongoDbReadSchemaTransformConfiguration configuration; + + MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema schema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema()); + + MongoDbIO.Read read = + MongoDbIO.read() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection()); + + final String filterStr = configuration.getFilter(); + if (filterStr != null) { + read = + read.withQueryFn( + new SerializableFunction, MongoCursor>() { + @Override + public MongoCursor apply(MongoCollection collection) { + return collection.find(Document.parse(filterStr)).iterator(); + } + }); + } + + PCollection mongoDocs = input.getPipeline().apply("ReadFromMongoDb", read); + + boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling()); + Schema errorSchema = ErrorHandling.errorSchemaBytes(); + + PCollectionTuple outputTuple = + mongoDocs.apply( + "ConvertToBeamRows", + ParDo.of(new DocumentToRowFn(schema, handleErrors, errorSchema)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + PCollection beamRows = outputTuple.get(OUTPUT_TAG).setRowSchema(schema); + PCollection errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(errorSchema); + + PCollectionRowTuple output = PCollectionRowTuple.of(OUTPUT_TAG_NAME, beamRows); + ErrorHandling errorHandling = configuration.getErrorHandling(); + if (handleErrors && errorHandling != null) { + output = output.and(errorHandling.getOutput(), errorOutput); + } + return output; + } + } + + /** Converts a MongoDB BSON {@link Document} to a Beam {@link Row}. */ + static class DocumentToRowFn extends DoFn { + private final Schema schema; + private final boolean handleErrors; + private final Schema errorSchema; + + DocumentToRowFn(Schema schema, boolean handleErrors, Schema errorSchema) { + this.schema = schema; + this.handleErrors = handleErrors; + this.errorSchema = errorSchema; + } + + @ProcessElement + public void processElement(@Element Document doc, MultiOutputReceiver receiver) { + try { + receiver.get(OUTPUT_TAG).output(MongoDbUtils.toRow(doc, schema)); + } catch (Exception e) { + if (!handleErrors) { + throw new RuntimeException( + "Failed to convert BSON Document to Beam Row: " + doc.toJson(), e); + } + errorCounter.inc(); + byte[] docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8); + receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e)); + } + } + } +} diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java index a5acfb1d19fe..51d3dfdbfdc2 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java @@ -18,13 +18,19 @@ package org.apache.beam.sdk.io.mongodb; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.values.Row; import org.bson.BsonNull; import org.bson.Document; +import org.bson.types.Binary; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; /** Utility methods for MongoDB IO. */ public class MongoDbUtils { @@ -71,4 +77,104 @@ public static Document toDocument(Row row) { } return value; } + + /** Converts a BSON {@link Document} to a Beam {@link Row} matching the given {@link Schema}. */ + public static Row toRow(Document doc, Schema schema) { + Row.Builder rowBuilder = Row.withSchema(schema); + for (Field field : schema.getFields()) { + Object value = doc.get(field.getName()); + rowBuilder.addValue(convertFromBsonValue(value, field.getType())); + } + return rowBuilder.build(); + } + + @SuppressWarnings({"nullness", "JavaUtilDate"}) + private static @Nullable Object convertFromBsonValue( + @Nullable Object value, FieldType fieldType) { + if (value == null || value instanceof BsonNull) { + return null; + } + + switch (fieldType.getTypeName()) { + case BYTE: + return (value instanceof Number) + ? ((Number) value).byteValue() + : Byte.parseByte(value.toString()); + case INT16: + return (value instanceof Number) + ? ((Number) value).shortValue() + : Short.parseShort(value.toString()); + case INT32: + return (value instanceof Number) + ? ((Number) value).intValue() + : Integer.parseInt(value.toString()); + case INT64: + return (value instanceof Number) + ? ((Number) value).longValue() + : Long.parseLong(value.toString()); + case FLOAT: + return (value instanceof Number) + ? ((Number) value).floatValue() + : Float.parseFloat(value.toString()); + case DOUBLE: + return (value instanceof Number) + ? ((Number) value).doubleValue() + : Double.parseDouble(value.toString()); + case DECIMAL: + return (value instanceof Number) + ? java.math.BigDecimal.valueOf(((Number) value).doubleValue()) + : new java.math.BigDecimal(value.toString()); + case STRING: + return value.toString(); + case BOOLEAN: + return (value instanceof Boolean) + ? (Boolean) value + : Boolean.parseBoolean(value.toString()); + case DATETIME: + if (value instanceof java.util.Date) { + return new Instant(((java.util.Date) value).getTime()); + } else if (value instanceof Number) { + return new Instant(((Number) value).longValue()); + } else { + return Instant.parse(value.toString()); + } + case BYTES: + if (value instanceof Binary) { + return ((Binary) value).getData(); + } else if (value instanceof byte[]) { + return (byte[]) value; + } else { + return value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8); + } + case ARRAY: + case ITERABLE: + Iterable iterable = (Iterable) value; + List rowList = new ArrayList<>(); + FieldType elementType = Objects.requireNonNull(fieldType.getCollectionElementType()); + for (Object item : iterable) { + rowList.add(convertFromBsonValue(item, elementType)); + } + return rowList; + case MAP: + Map map = (Map) value; + Map rowMap = new HashMap<>(); + FieldType valueType = Objects.requireNonNull(fieldType.getMapValueType()); + for (Map.Entry entry : map.entrySet()) { + rowMap.put( + String.valueOf(entry.getKey()), convertFromBsonValue(entry.getValue(), valueType)); + } + return rowMap; + case ROW: + Schema rowSchema = Objects.requireNonNull(fieldType.getRowSchema()); + if (value instanceof Document) { + return toRow((Document) value, rowSchema); + } else if (value instanceof Map) { + return toRow(new Document((Map) value), rowSchema); + } else { + throw new IllegalArgumentException("Cannot convert value to Row: " + value); + } + default: + throw new IllegalArgumentException("Unsupported field type: " + fieldType); + } + } } diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProviderTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProviderTest.java new file mode 100644 index 000000000000..cc93c7a6080e --- /dev/null +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProviderTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTagList; +import org.bson.Document; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link MongoDbReadSchemaTransformProvider}. */ +@RunWith(JUnit4.class) +public class MongoDbReadSchemaTransformProviderTest { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Test + public void testInvalidConfigMissingUri() { + assertThrows( + IllegalStateException.class, + () -> { + MongoDbReadSchemaTransformConfiguration.builder() + .setDatabase("db") + .setCollection("col") + .setSchema("{}") + .build() + .validate(); + }); + } + + @Test + public void testInvalidConfigMissingDatabase() { + assertThrows( + IllegalStateException.class, + () -> { + MongoDbReadSchemaTransformConfiguration.builder() + .setUri("mongodb://localhost:27017") + .setCollection("col") + .setSchema("{}") + .build() + .validate(); + }); + } + + @Test + public void testInvalidConfigMissingCollection() { + assertThrows( + IllegalStateException.class, + () -> { + MongoDbReadSchemaTransformConfiguration.builder() + .setUri("mongodb://localhost:27017") + .setDatabase("db") + .setSchema("{}") + .build() + .validate(); + }); + } + + @Test + public void testInvalidConfigMissingSchema() { + assertThrows( + IllegalStateException.class, + () -> { + MongoDbReadSchemaTransformConfiguration.builder() + .setUri("mongodb://localhost:27017") + .setDatabase("db") + .setCollection("col") + .build() + .validate(); + }); + } + + @Test + public void testConfigurationSchema() throws Exception { + Schema schema = + SchemaRegistry.createDefault().getSchema(MongoDbReadSchemaTransformConfiguration.class); + + // We expect 6 fields: uri, database, collection, schema, filter, errorHandling + assertEquals(6, schema.getFieldCount()); + assertNotNull(schema.getField("uri")); + assertNotNull(schema.getField("database")); + assertNotNull(schema.getField("collection")); + assertNotNull(schema.getField("schema")); + assertNotNull(schema.getField("filter")); + assertNotNull(schema.getField("errorHandling")); + } + + @Test + public void testDocumentToRowFn() { + Schema beamSchema = Schema.builder().addStringField("name").addInt32Field("age").build(); + + Document doc = new Document().append("name", "John").append("age", 30); + + PCollection inputDocs = + p.apply( + Create.of(Collections.singletonList(doc)) + .withCoder(MongoDbWriteSchemaTransformProvider.DocumentCoder.of())); + + Schema errorSchema = ErrorHandling.errorSchemaBytes(); + PCollectionTuple outputTuple = + inputDocs.apply( + "ConvertToRows", + ParDo.of( + new MongoDbReadSchemaTransformProvider.DocumentToRowFn( + beamSchema, false, errorSchema)) + .withOutputTags( + MongoDbReadSchemaTransformProvider.OUTPUT_TAG, + TupleTagList.of(MongoDbReadSchemaTransformProvider.ERROR_TAG))); + + PCollection outputRows = + outputTuple.get(MongoDbReadSchemaTransformProvider.OUTPUT_TAG).setRowSchema(beamSchema); + outputTuple.get(MongoDbReadSchemaTransformProvider.ERROR_TAG).setRowSchema(errorSchema); + + PAssert.that(outputRows) + .satisfies( + rows -> { + Row row = rows.iterator().next(); + assertEquals("John", row.getString("name")); + assertEquals(Integer.valueOf(30), row.getInt32("age")); + return null; + }); + + p.run().waitUntilFinish(); + } + + @Test + public void testDocumentToRowFnWithErrors() { + Schema beamSchema = Schema.builder().addInt32Field("age").build(); + + // Invalid document: age value is a string "not_an_int" which cannot be converted to INT32 + Document invalidDoc = new Document().append("age", "not_an_int"); + + PCollection inputDocs = + p.apply( + Create.of(Collections.singletonList(invalidDoc)) + .withCoder(MongoDbWriteSchemaTransformProvider.DocumentCoder.of())); + + Schema errorSchema = ErrorHandling.errorSchemaBytes(); + PCollectionTuple outputTuple = + inputDocs.apply( + "ConvertToRowsWithErrors", + ParDo.of( + new MongoDbReadSchemaTransformProvider.DocumentToRowFn( + beamSchema, true, errorSchema)) + .withOutputTags( + MongoDbReadSchemaTransformProvider.OUTPUT_TAG, + TupleTagList.of(MongoDbReadSchemaTransformProvider.ERROR_TAG))); + + PCollection errorRows = + outputTuple.get(MongoDbReadSchemaTransformProvider.ERROR_TAG).setRowSchema(errorSchema); + outputTuple.get(MongoDbReadSchemaTransformProvider.OUTPUT_TAG).setRowSchema(beamSchema); + + PAssert.that(errorRows) + .satisfies( + rows -> { + Row errorRow = rows.iterator().next(); + byte[] failedRowBytes = errorRow.getBytes("failed_row"); + String failedJson = new String(failedRowBytes, StandardCharsets.UTF_8); + String errMsg = errorRow.getString("error_message"); + assertNotNull(failedJson); + assertNotNull(errMsg); + return null; + }); + + p.run().waitUntilFinish(); + } +} diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbUtilsTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbUtilsTest.java index ec11bb865913..f454d100f0a6 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbUtilsTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbUtilsTest.java @@ -17,12 +17,16 @@ */ package org.apache.beam.sdk.io.mongodb; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; @@ -30,6 +34,8 @@ import org.apache.beam.sdk.values.Row; import org.bson.BsonNull; import org.bson.Document; +import org.bson.types.Binary; +import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -132,4 +138,121 @@ public void testToDocumentWithNullValues() { Object val = doc.get("nullableString"); assertTrue(val instanceof BsonNull); } + + @Test + public void testToRowWithSimplePrimitives() { + Schema schema = + Schema.builder() + .addStringField("stringField") + .addInt32Field("intField") + .addInt64Field("longField") + .addBooleanField("booleanField") + .addDoubleField("doubleField") + .build(); + + Document doc = + new Document() + .append("stringField", "hello") + .append("intField", 42) + .append("longField", 123456789L) + .append("booleanField", true) + .append("doubleField", 3.14); + + Row row = MongoDbUtils.toRow(doc, schema); + + assertNotNull(row); + assertEquals("hello", row.getString("stringField")); + assertEquals(Integer.valueOf(42), row.getInt32("intField")); + assertEquals(Long.valueOf(123456789L), row.getInt64("longField")); + assertEquals(Boolean.TRUE, row.getBoolean("booleanField")); + assertEquals(Double.valueOf(3.14), row.getDouble("doubleField")); + } + + @Test + public void testToRowWithNestedRow() { + Schema nestedSchema = + Schema.builder().addStringField("nestedString").addInt32Field("nestedInt").build(); + + Schema parentSchema = + Schema.builder() + .addStringField("parentString") + .addRowField("nestedRow", nestedSchema) + .build(); + + Document nestedDoc = + new Document().append("nestedString", "nestedValue").append("nestedInt", 100); + Document parentDoc = + new Document().append("parentString", "parentValue").append("nestedRow", nestedDoc); + + Row row = MongoDbUtils.toRow(parentDoc, parentSchema); + + assertNotNull(row); + assertEquals("parentValue", row.getString("parentString")); + + Row nestedRow = row.getRow("nestedRow"); + assertNotNull(nestedRow); + assertEquals("nestedValue", nestedRow.getString("nestedString")); + assertEquals(Integer.valueOf(100), nestedRow.getInt32("nestedInt")); + } + + @Test + public void testToRowWithIterable() { + Schema schema = Schema.builder().addArrayField("listField", FieldType.STRING).build(); + + Document doc = new Document().append("listField", Arrays.asList("a", "b", "c")); + + Row row = MongoDbUtils.toRow(doc, schema); + + assertNotNull(row); + List list = (List) row.getArray("listField"); + assertEquals(3, list.size()); + assertEquals("a", list.get(0)); + assertEquals("b", list.get(1)); + assertEquals("c", list.get(2)); + } + + @Test + public void testToRowWithMap() { + Schema schema = + Schema.builder().addMapField("mapField", FieldType.STRING, FieldType.INT32).build(); + + Document nestedMap = new Document().append("key", 42); + Document doc = new Document().append("mapField", nestedMap); + + Row row = MongoDbUtils.toRow(doc, schema); + + assertNotNull(row); + Map map = row.getMap("mapField"); + assertEquals(1, map.size()); + assertEquals(Integer.valueOf(42), map.get("key")); + } + + @Test + public void testToRowWithNullValues() { + Schema schema = Schema.builder().addNullableField("nullableString", FieldType.STRING).build(); + + Document doc = new Document().append("nullableString", new BsonNull()); + + Row row = MongoDbUtils.toRow(doc, schema); + + assertNotNull(row); + assertNull(row.getString("nullableString")); + } + + @Test + @SuppressWarnings("JavaUtilDate") + public void testToRowWithDateAndBinary() { + Schema schema = + Schema.builder().addDateTimeField("dateField").addByteArrayField("binaryField").build(); + + Date now = new Date(); + byte[] bytes = "hello binary".getBytes(StandardCharsets.UTF_8); + Document doc = new Document().append("dateField", now).append("binaryField", new Binary(bytes)); + + Row row = MongoDbUtils.toRow(doc, schema); + + assertNotNull(row); + assertEquals(new Instant(now.getTime()), row.getDateTime("dateField")); + assertArrayEquals(bytes, row.getBytes("binaryField")); + } } diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 781d3de193ec..f2666437eb0d 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -115,9 +115,9 @@ 'WriteToIceberg': 'apache_beam.yaml.yaml_io.write_to_iceberg' 'ReadFromTFRecord': 'apache_beam.yaml.yaml_io.read_from_tfrecord' 'WriteToTFRecord': 'apache_beam.yaml.yaml_io.write_to_tfrecord' + 'ReadFromMongoDB': 'apache_beam.yaml.yaml_io.read_from_mongodb' 'WriteToMongoDB': 'apache_beam.yaml.yaml_io.write_to_mongodb' - # General File Formats # Declared as a renaming transform to avoid exposing all # (implementation-specific) pandas arguments and aligning with possible Java @@ -433,9 +433,17 @@ #MongoDB - type: renaming transforms: + 'ReadFromMongoDB': 'ReadFromMongoDB' 'WriteToMongoDB': 'WriteToMongoDB' config: mappings: + 'ReadFromMongoDB': + connection_uri: "uri" + database: "database" + collection: "collection" + schema: "schema" + filter: "filter" + error_handling: "error_handling" 'WriteToMongoDB': connection_uri: "uri" database: "database" @@ -445,6 +453,7 @@ underlying_provider: type: beamJar transforms: + 'ReadFromMongoDB': 'beam:schematransform:org.apache.beam:mongodb_read:v1' 'WriteToMongoDB': 'beam:schematransform:org.apache.beam:mongodb_write:v1' config: gradle_target: 'sdks:java:io:expansion-service:shadowJar' diff --git a/sdks/python/apache_beam/yaml/tests/mongodb.yaml b/sdks/python/apache_beam/yaml/tests/mongodb.yaml index efce73c89879..652e24c09673 100644 --- a/sdks/python/apache_beam/yaml/tests/mongodb.yaml +++ b/sdks/python/apache_beam/yaml/tests/mongodb.yaml @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -13,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# fixtures: - name: mongo_vars @@ -27,8 +25,8 @@ pipelines: name: CreateData config: elements: - - { id: 1, name: "John" } - - { id: 2, name: "Jane" } + - { name: "John", age: 30 } + - { name: "Jane", age: 25 } - type: WriteToMongoDB name: WriteData input: CreateData @@ -42,3 +40,36 @@ pipelines: input: WriteData.my_error_output config: elements: [] +# TODO: Re-enable ReadFromMongoDB tests once Java MongoDbIO is migrated from legacy BoundedSource. +# - pipeline: +# type: composite +# transforms: +# - type: ReadFromMongoDB +# name: ReadData +# config: +# connection_uri: '{mongo_vars[URI]}' +# database: '{mongo_vars[DATABASE]}' +# collection: '{mongo_vars[COLLECTION]}' +# schema: | +# { +# "type": "object", +# "properties": { +# "name": {"type": "string"}, +# "age": {"type": "integer"} +# }, +# "required": ["name", "age"] +# } +# error_handling: +# output: my_error_output +# - type: AssertEqual +# input: ReadData +# config: +# elements: +# - { name: "John", age: 30 } +# - { name: "Jane", age: 25 } +# - type: AssertEqual +# input: ReadData.my_error_output +# config: +# elements: [] + + diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 989661a6eae4..74d177e6a3bd 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -24,6 +24,7 @@ """ import io +import json from collections.abc import Callable from collections.abc import Iterable from collections.abc import Mapping @@ -42,6 +43,7 @@ from apache_beam.io import WriteToBigQuery from apache_beam.io import WriteToTFRecord from apache_beam.io import avroio +from apache_beam.io import mongodbio from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.gcp.bigquery import BigQueryDisposition from apache_beam.portability.api import schema_pb2 @@ -724,6 +726,59 @@ def write_to_tfrecord( compression_type=getattr(CompressionTypes, compression_type)) +@beam.ptransform_fn +@yaml_errors.maybe_with_exception_handling_transform_fn +def read_from_mongodb( + root, + *, + database: str, + collection: str, + schema: Union[str, dict[str, Any]], + connection_uri: Optional[str] = None, + filter: Optional[dict[str, Any]] = None, + projection: Optional[Union[list[str], dict[str, Any]]] = None, + extra_client_params: Optional[dict[str, Any]] = None, + bucket_auto: bool = False): + """Reads data from MongoDB. + + The resulting PCollection consists of rows with fields matching the provided + schema. + + Args: + database: The MongoDB database name. + collection: The MongoDB collection name. + schema: JSON schema specifying the fields to select and their types. + connection_uri: The MongoDB connection string. e.g. "mongodb://localhost:27017" + filter: A JSON/bson mapping specifying elements which must be present. + projection: A list of field names that should be returned or a dict + specifying the fields to include/exclude. + extra_client_params: Optional MongoClient parameters. + bucket_auto: If True, use MongoDB $bucketAuto aggregation to split + collection into bundles instead of splitVector command. + """ + if isinstance(schema, str): + schema = json.loads(schema) + + beam_schema = json_utils.json_schema_to_beam_schema(schema) + beam_type = schema_pb2.FieldType( + row_type=schema_pb2.RowType(schema=beam_schema)) + to_row_fn = json_utils.json_to_row(beam_type) + + output = ( + root + | mongodbio.ReadFromMongoDB( + uri=connection_uri, + db=database, + coll=collection, + filter=filter, + projection=projection, + extra_client_params=extra_client_params, + bucket_auto=bucket_auto) + | beam.Map(to_row_fn)) + output.element_type = schemas.named_tuple_from_schema(beam_schema) + return output + + @beam.ptransform_fn @yaml_errors.maybe_with_exception_handling_transform_fn def write_to_mongodb( @@ -744,8 +799,6 @@ def write_to_mongodb( batch_size: Number of documents per bulk_write to MongoDB. extra_client_params: Optional MongoClient parameters. """ - from apache_beam.io import mongodbio - def row_to_dict(value): if value is None: return None diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index 057c4e3f47d1..121057690263 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -19,7 +19,7 @@ # configuration in /sdks/standard_expansion_services.yaml. # Refer to gen_xlang_wrappers.py for more info. # -# Last updated on: 2026-05-06 +# Last updated on: 2026-06-02 - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 @@ -50,6 +50,40 @@ type: int64 identifier: beam:schematransform:org.apache.beam:generate_sequence:v1 name: GenerateSequence +- default_service: sdks:java:io:expansion-service:shadowJar + description: '' + destinations: + python: apache_beam/io + fields: + - description: The MongoDB collection to read from. + name: collection + nullable: false + type: str + - description: The MongoDB database to read from. + name: database + nullable: false + type: str + - description: This option specifies whether and where to output rows that failed + to be read. + name: error_handling + nullable: true + type: Row(output=) + - description: An optional BSON filter to apply to the read. This should be a valid + JSON string. + name: filter + nullable: true + type: str + - description: The schema in which the data is encoded, defined with JSON-schema + syntax (https://json-schema.org/). + name: schema + nullable: false + type: str + - description: The connection URI for the MongoDB server. + name: uri + nullable: false + type: str + identifier: beam:schematransform:org.apache.beam:mongodb_read:v1 + name: MongodbRead - default_service: sdks:java:io:expansion-service:shadowJar description: '' destinations: