Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.parquet:parquet-column:$parquet_version"
implementation "org.apache.parquet:parquet-common:$parquet_version"
implementation "org.apache.parquet:parquet-hadoop:$parquet_version"
implementation "org.apache.parquet:parquet-common:$parquet_version"
implementation project(":sdks:java:io:parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,19 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.streaming(configuration.getStreaming())
.keeping(configuration.getKeep())
.dropping(configuration.getDrop())
.withFilter(configuration.getFilter());
.withFilter(configuration.getFilter())
.withWatermarkColumn(configuration.getWatermarkColumn())
.withWatermarkColumnTimeUnit(configuration.getWatermarkColumnTimeUnit())
.withMetadataColumns(configuration.getIncludeMetadataColumns());

@Nullable Integer pollIntervalSeconds = configuration.getPollIntervalSeconds();
if (pollIntervalSeconds != null) {
readRows = readRows.withPollInterval(Duration.standardSeconds(pollIntervalSeconds));
}
@Nullable Long maxDelay = configuration.getMaxSnapshotDiscoveryDelay();
if (maxDelay != null) {
readRows = readRows.withMaxSnapshotDiscoveryDelay(Duration.standardSeconds(maxDelay));
}

PCollection<Row> output = input.getPipeline().apply(readRows);

Expand Down Expand Up @@ -194,6 +201,32 @@ static Builder builder() {
"A subset of column names to exclude from reading. If null or empty, all columns will be read.")
abstract @Nullable List<String> getDrop();

@SchemaFieldDescription(
"Column used to derive the source's output watermark. "
+ "Must be an existing, required, top-level column of type 'long' or 'timestamp'. "
+ "If not set, the watermark advances according to snapshot commit timestamp.")
abstract @Nullable String getWatermarkColumn();

@SchemaFieldDescription(
"Time unit used to interpret watermark column of type LONG. One of NANOSECONDS, MICROSECONDS, "
+ "MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS. Defaults to MICROSECONDS.")
abstract @Nullable String getWatermarkColumnTimeUnit();

@SchemaFieldDescription(
"Maximum expected snapshot discovery delay in seconds. While idle, the source may advance "
+ "the watermark to now() minus this delay; snapshots discovered later with older commit "
+ "timestamps may be treated as late by downstream windowing. Default: 600 seconds.")
abstract @Nullable Long getMaxSnapshotDiscoveryDelay();

@SchemaFieldDescription(
"List of top-level metadata columns to include with CDC output rows. Supported columns: \n"
+ "- `_change_type`\n"
+ "- `_row_id`\n"
+ "- `_last_updated_sequence_number`\n"
+ "- `_commit_snapshot_id`\n"
+ "- `_commit_snapshot_sequence_number`\n")
abstract @Nullable List<String> getIncludeMetadataColumns();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setTable(String table);
Expand Down Expand Up @@ -224,6 +257,14 @@ abstract static class Builder {

abstract Builder setFilter(String filter);

abstract Builder setWatermarkColumn(String watermarkColumn);

abstract Builder setWatermarkColumnTimeUnit(String timeUnit);

abstract Builder setMaxSnapshotDiscoveryDelay(Long seconds);

abstract Builder setIncludeMetadataColumns(List<String> metadataColumns);

abstract Configuration build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.iceberg.cdc.IncrementalChangelogSource;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
Expand Down Expand Up @@ -533,6 +535,7 @@ public static ReadRows readRows(IcebergCatalogConfig catalogConfig) {
return new AutoValue_IcebergIO_ReadRows.Builder()
.setCatalogConfig(catalogConfig)
.setUseCdc(false)
.setMetadataColumns(ImmutableList.of())
.build();
}

Expand Down Expand Up @@ -569,6 +572,14 @@ public enum StartingStrategy {

abstract @Nullable String getFilter();

abstract @Nullable String getWatermarkColumn();

abstract @Nullable String getWatermarkColumnTimeUnit();

abstract @Nullable Duration getMaxSnapshotDiscoveryDelay();

abstract List<String> getMetadataColumns();

abstract Builder toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -599,6 +610,14 @@ abstract static class Builder {

abstract Builder setFilter(@Nullable String filter);

abstract Builder setWatermarkColumn(@Nullable String watermarkColumn);

abstract Builder setWatermarkColumnTimeUnit(@Nullable String timeUnit);

abstract Builder setMaxSnapshotDiscoveryDelay(@Nullable Duration delay);

abstract Builder setMetadataColumns(List<String> metadataColumns);

abstract ReadRows build();
}

Expand Down Expand Up @@ -650,12 +669,48 @@ public ReadRows withFilter(@Nullable String filter) {
return toBuilder().setFilter(filter).build();
}

public ReadRows withWatermarkColumn(@Nullable String watermarkColumn) {
return toBuilder().setWatermarkColumn(watermarkColumn).build();
}

public ReadRows withWatermarkColumnTimeUnit(@Nullable String timeUnit) {
return toBuilder().setWatermarkColumnTimeUnit(timeUnit).build();
}

public ReadRows withMaxSnapshotDiscoveryDelay(@Nullable Duration delay) {
return toBuilder().setMaxSnapshotDiscoveryDelay(delay).build();
}

/**
* Appends top-level metadata columns to CDC output rows.
*
* <p>Supported values are {@code _change_type}, {@code _commit_snapshot_id}, {@code
* _commit_snapshot_sequence_number}, {@code _row_id}, and {@code
* _last_updated_sequence_number}. The row metadata columns are read from Iceberg data files and
* require a row-lineage table. The changelog metadata columns come from the emitted change kind
* and snapshot context and are appended when final Beam rows are emitted.
*
* <p>This option is only valid {@link #withCdc()}.
*/
public ReadRows withMetadataColumns(@Nullable List<String> metadataColumns) {
return toBuilder()
.setMetadataColumns(metadataColumns == null ? ImmutableList.of() : metadataColumns)
.build();
}

@Override
public PCollection<Row> expand(PBegin input) {
TableIdentifier tableId =
checkStateNotNull(getTableIdentifier(), "Must set a table to read from.");

Table table = getCatalogConfig().catalog().loadTable(tableId);
Table table;
try {
table = getCatalogConfig().catalog().loadTable(tableId);
} catch (Exception e) {
throw new RuntimeException(
"Could not fetch table at expansion time. Doing so is needed to "
+ "determine the output Row schema.",
e);
}

IcebergScanConfig scanConfig =
IcebergScanConfig.builder()
Expand All @@ -674,12 +729,17 @@ public PCollection<Row> expand(PBegin input) {
.setKeepFields(getKeep())
.setDropFields(getDrop())
.setFilterString(getFilter())
.setWatermarkColumn(getWatermarkColumn())
.setWatermarkColumnTimeUnit(getWatermarkColumnTimeUnit())
.setMaxSnapshotDiscoveryDelay(getMaxSnapshotDiscoveryDelay())
.setMetadataColumns(getMetadataColumns())
.build();
scanConfig.validate(table);

PTransform<PBegin, PCollection<Row>> source =
getUseCdc()
? new IncrementalScanSource(scanConfig)
? new IncrementalChangelogSource(scanConfig)
// ? new IncrementalScanSource(scanConfig)
: Read.from(new ScanSource(scanConfig));

return input.apply(source);
Expand Down
Loading
Loading