Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,26 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets.newHashSet;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy;
import org.apache.beam.sdk.io.iceberg.cdc.IcebergCdcMetadataColumns;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
Expand Down Expand Up @@ -143,13 +148,13 @@ public org.apache.iceberg.Schema getRequiredSchema() {

@Pure
@Nullable
public Evaluator getEvaluator() {
public Evaluator getEvaluator(org.apache.iceberg.Schema requiredSchema) {
@Nullable Expression filter = getFilter();
if (filter == null) {
return null;
}
if (cachedEvaluator == null) {
cachedEvaluator = new Evaluator(getRequiredSchema().asStruct(), filter);
cachedEvaluator = new Evaluator(requiredSchema.asStruct(), filter);
}
return cachedEvaluator;
}
Expand Down Expand Up @@ -226,6 +231,9 @@ public Expression getFilter() {
@Pure
public abstract @Nullable List<String> getDropFields();

@Pure
public abstract List<String> getMetadataColumns();

@Pure
public static Builder builder() {
return new AutoValue_IcebergScanConfig.Builder()
Expand All @@ -248,7 +256,8 @@ public static Builder builder() {
.setPollInterval(null)
.setStartingStrategy(null)
.setTag(null)
.setBranch(null);
.setBranch(null)
.setMetadataColumns(ImmutableList.of());
}

@AutoValue.Builder
Expand Down Expand Up @@ -311,6 +320,8 @@ public Builder setTableIdentifier(String... names) {

public abstract Builder setDropFields(@Nullable List<String> fields);

public abstract Builder setMetadataColumns(List<String> metadataColumns);

public abstract IcebergScanConfig build();
}

Expand Down Expand Up @@ -364,13 +375,29 @@ void validate(Table table) {
if (getStartingStrategy() != null) {
invalidOptions.add("starting_strategy");
}
if (!getMetadataColumns().isEmpty()) {
invalidOptions.add("metadata_columns");
}
if (!invalidOptions.isEmpty()) {
throw new IllegalArgumentException(
error(
"the following options are currently only available when "
+ "reading with Managed.ICEBERG_CDC: "
+ invalidOptions));
}
} else {
Set<Integer> primaryKeyIds = new HashSet<>(table.schema().identifierFieldIds());
checkState(
!primaryKeyIds.isEmpty(),
"Cannot read CDC records as the table schema does not specified any primary key fields.");
Comment on lines +390 to +392
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a grammatical typo in the error message: 'does not specified' should be 'does not specify'.

Suggested change
checkState(
!primaryKeyIds.isEmpty(),
"Cannot read CDC records as the table schema does not specified any primary key fields.");
checkState(
!primaryKeyIds.isEmpty(),
"Cannot read CDC records as the table schema does not specify any primary key fields.");

Set<Integer> projectedFieldIds = TypeUtil.getProjectedIds(getProjectedSchema());
primaryKeyIds.removeAll(projectedFieldIds);
checkArgument(
primaryKeyIds.isEmpty(),
"When reading CDC records, the projected schema must not drop primary key fields. "
+ "The specified configuration drops the following PK fields: %s",
primaryKeyIds);
validateMetadataColumns(table);
}

if (getStartingStrategy() != null) {
Expand All @@ -393,6 +420,47 @@ void validate(Table table) {
}
}

private void validateMetadataColumns(Table table) {
List<String> metadataColumns = getMetadataColumns();
if (metadataColumns.isEmpty()) {
return;
}

Set<String> uniqueMetadataColumns = new LinkedHashSet<>(metadataColumns);
checkArgument(
uniqueMetadataColumns.size() == metadataColumns.size(),
error("metadata_columns contains duplicate entries: %s"),
metadataColumns);

List<String> unsupportedMetadataColumns = new ArrayList<>();
for (String metadataColumn : metadataColumns) {
if (!IcebergCdcMetadataColumns.isSupportedColumn(metadataColumn)) {
unsupportedMetadataColumns.add(metadataColumn);
}
}
checkArgument(
unsupportedMetadataColumns.isEmpty(),
error("unsupported metadata_columns: %s. Supported values are: %s"),
unsupportedMetadataColumns,
IcebergCdcMetadataColumns.SUPPORTED_COLUMNS);

for (String metadataColumn : metadataColumns) {
checkArgument(
getProjectedSchema().findField(metadataColumn) == null,
error("metadata column '%s' conflicts with a projected data column"),
metadataColumn);
}

boolean includesRowLineage =
metadataColumns.stream().anyMatch(IcebergCdcMetadataColumns::isRowMetadataColumn);
if (includesRowLineage) {
checkArgument(
TableUtil.formatVersion(table) >= 3,
error("row lineage metadata columns %s are only available for Iceberg format v3+ tables"),
metadataColumns);
}
}

private String error(String message) {
return "Invalid source configuration: " + message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,28 @@
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.iceberg.data.IdentityPartitionConverters.convertConstant;

import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.ChangelogScanTask;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.Nullable;

class PartitionUtils {
Expand All @@ -48,23 +58,28 @@ class PartitionUtils {
Pattern, BiFunction<PartitionSpec.Builder, Matcher, PartitionSpec.Builder>>
TRANSFORMATIONS =
ImmutableMap.of(
HOUR, (builder, matcher) -> builder.hour(checkStateNotNull(matcher.group(1))),
DAY, (builder, matcher) -> builder.day(checkStateNotNull(matcher.group(1))),
MONTH, (builder, matcher) -> builder.month(checkStateNotNull(matcher.group(1))),
YEAR, (builder, matcher) -> builder.year(checkStateNotNull(matcher.group(1))),
HOUR,
(builder, matcher) -> builder.hour(checkStateNotNull(matcher.group(1))),
DAY,
(builder, matcher) -> builder.day(checkStateNotNull(matcher.group(1))),
MONTH,
(builder, matcher) -> builder.month(checkStateNotNull(matcher.group(1))),
YEAR,
(builder, matcher) -> builder.year(checkStateNotNull(matcher.group(1))),
TRUNCATE,
(builder, matcher) ->
builder.truncate(
checkStateNotNull(matcher.group(1)),
Integer.parseInt(checkStateNotNull(matcher.group(2)))),
(builder, matcher) ->
builder.truncate(
checkStateNotNull(matcher.group(1)),
Integer.parseInt(checkStateNotNull(matcher.group(2)))),
BUCKET,
(builder, matcher) ->
builder.bucket(
checkStateNotNull(matcher.group(1)),
Integer.parseInt(checkStateNotNull(matcher.group(2)))),
VOID, (builder, matcher) -> builder.alwaysNull(checkStateNotNull(matcher.group(1))),
(builder, matcher) ->
builder.bucket(
checkStateNotNull(matcher.group(1)),
Integer.parseInt(checkStateNotNull(matcher.group(2)))),
VOID,
(builder, matcher) -> builder.alwaysNull(checkStateNotNull(matcher.group(1))),
IDENTITY,
(builder, matcher) -> builder.identity(checkStateNotNull(matcher.group(1))));
(builder, matcher) -> builder.identity(checkStateNotNull(matcher.group(1))));

static PartitionSpec toPartitionSpec(
@Nullable List<String> fields, org.apache.beam.sdk.schemas.Schema beamSchema) {
Expand Down Expand Up @@ -130,4 +145,61 @@ static Term toIcebergTerm(String field) {

throw new IllegalArgumentException("Could not find a partition term for '" + field + "'.");
}

/**
* Copied over from Apache Iceberg's <a
* href="https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java">PartitionUtil</a>.
*
* <p>Needed to accommodate CDC reads, where scans produce {@link ChangelogScanTask}s instead of
* {@link ContentScanTask}s.
*/
public static Map<Integer, ?> constantsMap(
PartitionSpec spec, ContentFile<?> file, @Nullable Long fileSequenceNumber) {
Preconditions.checkState(
spec.specId() == file.specId(),
"File spec ID (%s) does not match PartitionSpec ID (%s)",
file.specId(),
spec.specId());
StructLike partitionData = file.partition();

// use java.util.HashMap because partition data may contain null values
Map<Integer, Object> idToConstant = Maps.newHashMap();

// add first_row_id as _row_id
if (file.firstRowId() != null) {
idToConstant.put(
MetadataColumns.ROW_ID.fieldId(),
convertConstant(Types.LongType.get(), file.firstRowId()));
}

// When reconstructing a DataFile, we lose the ability to attach its fileSequenceNumber,
// so we pipe it along the util methods to include it here.
fileSequenceNumber =
fileSequenceNumber != null ? fileSequenceNumber : file.fileSequenceNumber();
idToConstant.put(
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(),
convertConstant(Types.LongType.get(), fileSequenceNumber));

// add _file
idToConstant.put(
MetadataColumns.FILE_PATH.fieldId(),
convertConstant(Types.StringType.get(), file.location()));

// add _spec_id
idToConstant.put(
MetadataColumns.SPEC_ID.fieldId(), convertConstant(Types.IntegerType.get(), file.specId()));

List<Types.NestedField> partitionFields = spec.partitionType().fields();
List<PartitionField> fields = spec.fields();
for (int pos = 0; pos < fields.size(); pos += 1) {
PartitionField field = fields.get(pos);
if (field.transform().isIdentity()) {
Object converted =
convertConstant(partitionFields.get(pos).type(), partitionData.get(pos, Object.class));
idToConstant.put(field.sourceId(), converted);
}
}

return idToConstant;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ public void process(
}
FileScanTask task = fileScanTasks.get((int) l);
Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema());
try (CloseableIterable<Record> fullIterable =
ReadUtils.createReader(task, table, scanConfig.getRequiredSchema())) {
CloseableIterable<Record> reader = ReadUtils.maybeApplyFilter(fullIterable, scanConfig);
try (CloseableIterable<Record> reader = ReadUtils.createReader(task, table, scanConfig)) {

for (Record record : reader) {
Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record);
Expand Down
Loading
Loading