Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/java/io/mongodb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies {
implementation library.java.mongodb_driver_core
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre
provided library.java.everit_json_schema
permitUnusedDeclared library.java.everit_json_schema
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:common")
testImplementation project(path: ":sdks:java:testing:test-utils")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.mongodb;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Configuration class for the MongoDB Read transform. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class MongoDbReadSchemaTransformConfiguration implements Serializable {

@SchemaFieldDescription("The connection URI for the MongoDB server.")
public abstract String getUri();

@SchemaFieldDescription("The MongoDB database to read from.")
public abstract String getDatabase();

@SchemaFieldDescription("The MongoDB collection to read from.")
public abstract String getCollection();

@SchemaFieldDescription(
"The schema in which the data is encoded, defined with JSON-schema syntax (https://json-schema.org/).")
public abstract String getSchema();

@SchemaFieldDescription(
"An optional BSON filter to apply to the read. This should be a valid JSON string.")
@Nullable
public abstract String getFilter();

@SchemaFieldDescription(
"This option specifies whether and where to output rows that failed to be read.")
@Nullable
public abstract ErrorHandling getErrorHandling();

public void validate() {
checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified.");
checkArgument(
getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified.");
checkArgument(
getCollection() != null && !getCollection().isEmpty(),
"MongoDB collection must be specified.");
checkArgument(
getSchema() != null && !getSchema().isEmpty(), "MongoDB schema must be specified.");
}

public static Builder builder() {
return new AutoValue_MongoDbReadSchemaTransformConfiguration.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setUri(String uri);

public abstract Builder setDatabase(String database);

public abstract Builder setCollection(String collection);

public abstract Builder setSchema(String schema);

public abstract Builder setFilter(String filter);

public abstract Builder setErrorHandling(ErrorHandling errorHandling);

public abstract MongoDbReadSchemaTransformConfiguration build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.mongodb;

import com.google.auto.service.AutoService;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.bson.Document;

/** An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. */
@AutoService(SchemaTransformProvider.class)
public class MongoDbReadSchemaTransformProvider
extends TypedSchemaTransformProvider<MongoDbReadSchemaTransformConfiguration> {

private static final String OUTPUT_TAG_NAME = "output";
public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};

private static final org.apache.beam.sdk.metrics.Counter errorCounter =
org.apache.beam.sdk.metrics.Metrics.counter(
MongoDbReadSchemaTransformProvider.class, "MongoDB-read-error-counter");

@Override
protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) {
return new MongoDbReadSchemaTransform(configuration);
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:mongodb_read:v1";
}

@Override
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG_NAME);
}

/** The {@link SchemaTransform} that performs the read operation. */
private static class MongoDbReadSchemaTransform extends SchemaTransform {
private final MongoDbReadSchemaTransformConfiguration configuration;

MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
Schema schema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema());

MongoDbIO.Read read =
MongoDbIO.read()
.withUri(configuration.getUri())
.withDatabase(configuration.getDatabase())
.withCollection(configuration.getCollection());

final String filterStr = configuration.getFilter();
if (filterStr != null) {
read =
read.withQueryFn(
new SerializableFunction<MongoCollection<Document>, MongoCursor<Document>>() {
@Override
public MongoCursor<Document> apply(MongoCollection<Document> collection) {
return collection.find(Document.parse(filterStr)).iterator();
}
});
}

PCollection<Document> mongoDocs = input.getPipeline().apply("ReadFromMongoDb", read);

boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());
Schema errorSchema = ErrorHandling.errorSchemaBytes();

PCollectionTuple outputTuple =
mongoDocs.apply(
"ConvertToBeamRows",
ParDo.of(new DocumentToRowFn(schema, handleErrors, errorSchema))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));

PCollection<Row> beamRows = outputTuple.get(OUTPUT_TAG).setRowSchema(schema);
PCollection<Row> errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(errorSchema);

PCollectionRowTuple output = PCollectionRowTuple.of(OUTPUT_TAG_NAME, beamRows);
ErrorHandling errorHandling = configuration.getErrorHandling();
if (handleErrors && errorHandling != null) {
output = output.and(errorHandling.getOutput(), errorOutput);
}
return output;
}
}

/** Converts a MongoDB BSON {@link Document} to a Beam {@link Row}. */
static class DocumentToRowFn extends DoFn<Document, Row> {
private final Schema schema;
private final boolean handleErrors;
private final Schema errorSchema;

DocumentToRowFn(Schema schema, boolean handleErrors, Schema errorSchema) {
this.schema = schema;
this.handleErrors = handleErrors;
this.errorSchema = errorSchema;
}

@ProcessElement
public void processElement(@Element Document doc, MultiOutputReceiver receiver) {
try {
receiver.get(OUTPUT_TAG).output(MongoDbUtils.toRow(doc, schema));
} catch (Exception e) {
if (!handleErrors) {
throw new RuntimeException(
"Failed to convert BSON Document to Beam Row: " + doc.toJson(), e);
}
errorCounter.inc();
byte[] docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8);
receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
package org.apache.beam.sdk.io.mongodb;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.values.Row;
import org.bson.BsonNull;
import org.bson.Document;
import org.bson.types.Binary;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

/** Utility methods for MongoDB IO. */
public class MongoDbUtils {
Expand Down Expand Up @@ -71,4 +77,104 @@ public static Document toDocument(Row row) {
}
return value;
}

/** Converts a BSON {@link Document} to a Beam {@link Row} matching the given {@link Schema}. */
public static Row toRow(Document doc, Schema schema) {
Row.Builder rowBuilder = Row.withSchema(schema);
for (Field field : schema.getFields()) {
Object value = doc.get(field.getName());
rowBuilder.addValue(convertFromBsonValue(value, field.getType()));
}
return rowBuilder.build();
}

@SuppressWarnings({"nullness", "JavaUtilDate"})
private static @Nullable Object convertFromBsonValue(
@Nullable Object value, FieldType fieldType) {
if (value == null || value instanceof BsonNull) {
return null;
}

switch (fieldType.getTypeName()) {
case BYTE:
return (value instanceof Number)
? ((Number) value).byteValue()
: Byte.parseByte(value.toString());
case INT16:
return (value instanceof Number)
? ((Number) value).shortValue()
: Short.parseShort(value.toString());
case INT32:
return (value instanceof Number)
? ((Number) value).intValue()
: Integer.parseInt(value.toString());
case INT64:
return (value instanceof Number)
? ((Number) value).longValue()
: Long.parseLong(value.toString());
case FLOAT:
return (value instanceof Number)
? ((Number) value).floatValue()
: Float.parseFloat(value.toString());
case DOUBLE:
return (value instanceof Number)
? ((Number) value).doubleValue()
: Double.parseDouble(value.toString());
case DECIMAL:
return (value instanceof Number)
? java.math.BigDecimal.valueOf(((Number) value).doubleValue())
: new java.math.BigDecimal(value.toString());
case STRING:
return value.toString();
case BOOLEAN:
return (value instanceof Boolean)
? (Boolean) value
: Boolean.parseBoolean(value.toString());
case DATETIME:
if (value instanceof java.util.Date) {
return new Instant(((java.util.Date) value).getTime());
} else if (value instanceof Number) {
return new Instant(((Number) value).longValue());
} else {
return Instant.parse(value.toString());
}
case BYTES:
if (value instanceof Binary) {
return ((Binary) value).getData();
} else if (value instanceof byte[]) {
return (byte[]) value;
} else {
return value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
}
case ARRAY:
case ITERABLE:
Iterable<?> iterable = (Iterable<?>) value;
List<Object> rowList = new ArrayList<>();
FieldType elementType = Objects.requireNonNull(fieldType.getCollectionElementType());
for (Object item : iterable) {
rowList.add(convertFromBsonValue(item, elementType));
}
return rowList;
case MAP:
Map<?, ?> map = (Map<?, ?>) value;
Map<String, Object> rowMap = new HashMap<>();
FieldType valueType = Objects.requireNonNull(fieldType.getMapValueType());
for (Map.Entry<?, ?> entry : map.entrySet()) {
rowMap.put(
String.valueOf(entry.getKey()), convertFromBsonValue(entry.getValue(), valueType));
}
return rowMap;
case ROW:
Schema rowSchema = Objects.requireNonNull(fieldType.getRowSchema());
if (value instanceof Document) {
return toRow((Document) value, rowSchema);
} else if (value instanceof Map) {
return toRow(new Document((Map<String, Object>) value), rowSchema);
} else {
throw new IllegalArgumentException("Cannot convert value to Row: " + value);
}
default:
throw new IllegalArgumentException("Unsupported field type: " + fieldType);
}
}
}
Loading
Loading