Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -95,6 +96,7 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand Down Expand Up @@ -266,6 +268,7 @@ static class ConvertToDataFile extends DoFn<String, SerializableDataFile> {
private final @Nullable String prefix;
private final @Nullable List<String> partitionFields;
private final @Nullable Map<String, String> tableProps;
private transient @MonotonicNonNull Catalog catalog;
private transient @MonotonicNonNull ExecutorService executor;
private transient @MonotonicNonNull LinkedList<Future<ProcessResult>> activeTasks;
private transient volatile @MonotonicNonNull Table table;
Expand Down Expand Up @@ -319,11 +322,15 @@ private static class ProcessResult {

@Setup
public void setup() {
this.catalog = catalogConfig.newCatalog();
executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
}

@Teardown
public void teardown() {
public void teardown() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}
if (executor != null) {
executor.shutdownNow();
}
Expand Down Expand Up @@ -513,21 +520,20 @@ static <W, T> T transformValue(Transform<W, T> transform, Type type, ByteBuffer
}

private Table getOrCreateTable(String filePath, FileFormat format) throws IOException {
Catalog cat = checkStateNotNull(catalog);
TableIdentifier tableId = TableIdentifier.parse(identifier);
try {
return catalogConfig.catalog().loadTable(tableId);
return cat.loadTable(tableId);
} catch (NoSuchTableException e) {
try {
org.apache.iceberg.Schema schema = getSchema(filePath, format);
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema);

return tableProps == null
? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec)
: catalogConfig
.catalog()
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
? cat.createTable(tableId, schema, spec)
: cat.createTable(tableId, schema, spec, tableProps);
} catch (AlreadyExistsException e2) { // if table already exists, just load it
return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
return cat.loadTable(tableId);
}
}
}
Expand Down Expand Up @@ -646,13 +652,26 @@ static class CreateManifests
extends DoFn<KV<ShardedKey<Integer>, Iterable<SerializableDataFile>>, KV<String, byte[]>> {
private final IcebergCatalogConfig catalogConfig;
private final String identifier;
private transient @MonotonicNonNull Catalog catalog;
private transient @MonotonicNonNull Table table;

public CreateManifests(IcebergCatalogConfig catalogConfig, String identifier) {
this.catalogConfig = catalogConfig;
this.identifier = identifier;
}

@Setup
public void setup() {
this.catalog = catalogConfig.newCatalog();
}

@Teardown
public void teardown() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}
}

@ProcessElement
public void process(
@Element KV<ShardedKey<Integer>, Iterable<SerializableDataFile>> batch,
Expand All @@ -662,7 +681,7 @@ public void process(
return;
}
if (table == null) {
table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
table = checkStateNotNull(catalog).loadTable(TableIdentifier.parse(identifier));
}

PartitionSpec spec = checkStateNotNull(table.specs().get(batch.getKey().getKey()));
Expand Down Expand Up @@ -714,7 +733,8 @@ public void process(
static class CommitManifestFilesDoFn extends DoFn<KV<String, Iterable<byte[]>>, Row> {
private final IcebergCatalogConfig catalogConfig;
private final String identifier;
private transient @MonotonicNonNull Table table = null;
private transient @MonotonicNonNull Catalog catalog;
private transient @MonotonicNonNull Table table;
private static final String COMMIT_ID_KEY = "beam.add-files-commit-id";

@StateId("lastCommitTimestamp")
Expand All @@ -726,6 +746,18 @@ public CommitManifestFilesDoFn(IcebergCatalogConfig catalogConfig, String identi
this.identifier = identifier;
}

@Setup
public void setup() {
this.catalog = catalogConfig.newCatalog();
}

@Teardown
public void teardown() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}
}

@ProcessElement
public void process(
@Element KV<String, Iterable<byte[]>> batch,
Expand All @@ -738,7 +770,7 @@ public void process(
}
String commitId = commitHash(manifests);
if (table == null) {
table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
table = checkStateNotNull(catalog).loadTable(TableIdentifier.parse(identifier));
}
table.refresh();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -104,11 +107,9 @@ private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig, String manif
this.manifestFilePrefix = manifestFilePrefix;
}

private Catalog getCatalog() {
if (catalog == null) {
catalog = catalogConfig.catalog();
}
return catalog;
@Setup
public void setup() {
this.catalog = catalogConfig.newCatalog();
}

private boolean containsMultiplePartitionSpecs(Iterable<FileWriteResult> fileWriteResults) {
Expand All @@ -128,7 +129,7 @@ public void processElement(
BoundedWindow window)
throws IOException {
String tableStringIdentifier = element.getKey();
Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
Table table = checkStateNotNull(catalog).loadTable(TableIdentifier.parse(element.getKey()));
Iterable<FileWriteResult> fileWriteResults = element.getValue();
if (shouldSkip(table, fileWriteResults)) {
return;
Expand Down Expand Up @@ -251,5 +252,12 @@ private boolean shouldSkip(Table table, Iterable<FileWriteResult> fileWriteResul
}
return false;
}

@Teardown
public void teardown() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,41 @@ public static Builder builder() {

public abstract Builder toBuilder();

public org.apache.iceberg.catalog.Catalog catalog() {
/**
* Returns a cached Catalog instance for driver-side operations (pipeline construction, schema
* inference, etc.). Not intended for use within DoFns — use {@link #newCatalog()} instead.
*/
public Catalog catalog() {
if (cachedCatalog == null) {
String catalogName = getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
}
Map<String, String> catalogProps = getCatalogProperties();
if (catalogProps == null) {
catalogProps = Maps.newHashMap();
}
Map<String, String> confProps = getConfigProperties();
if (confProps == null) {
confProps = Maps.newHashMap();
}
Configuration config = new Configuration();
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
cachedCatalog = newCatalog();
}
return cachedCatalog;
}

/**
* Creates and returns a new Catalog instance. Use this in DoFn {@code @Setup} methods to ensure
* each DoFn owns its own catalog lifecycle independently of other transforms.
*/
public Catalog newCatalog() {
String catalogName = getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
}
Map<String, String> catalogProps = getCatalogProperties();
if (catalogProps == null) {
catalogProps = Maps.newHashMap();
}
Map<String, String> confProps = getConfigProperties();
if (confProps == null) {
confProps = Maps.newHashMap();
}
Configuration config = new Configuration();
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
return CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
}

private void checkSupportsNamespaces() {
Preconditions.checkState(
catalog() instanceof SupportsNamespaces,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -60,7 +58,6 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.transforms.Transforms;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -294,25 +291,34 @@ void refreshIfStale() {
}
}

@VisibleForTesting
static final Cache<TableIdentifier, LastRefreshedTable> LAST_REFRESHED_TABLE_CACHE =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
private final Cache<TableIdentifier, LastRefreshedTable> tableCache;

private boolean isClosed = false;

RecordWriterManager(Catalog catalog, String filePrefix, long maxFileSize, int maxNumWriters) {
/** Creates a new table cache with the default 10-minute TTL. */
static Cache<TableIdentifier, LastRefreshedTable> createTableCache() {
return CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
}

RecordWriterManager(
Catalog catalog,
String filePrefix,
long maxFileSize,
int maxNumWriters,
Cache<TableIdentifier, LastRefreshedTable> tableCache) {
this.catalog = catalog;
this.filePrefix = filePrefix;
this.maxFileSize = maxFileSize;
this.maxNumWriters = maxNumWriters;
this.tableCache = tableCache;
}

/**
* Returns an Iceberg {@link Table}.
*
* <p>First attempts to fetch the table from the {@link #LAST_REFRESHED_TABLE_CACHE}. If it's not
* there, we attempt to load it using the Iceberg API. If the table doesn't exist at all, we
* attempt to create it, inferring the table schema from the record schema.
* <p>First attempts to fetch the table from the {@link #tableCache}. If it's not there, we
* attempt to load it using the Iceberg API. If the table doesn't exist at all, we attempt to
* create it, inferring the table schema from the record schema.
*
* <p>Note that this is a best-effort operation that depends on the {@link Catalog}
* implementation. Although it is expected, some implementations may not support creating a table
Expand All @@ -321,8 +327,7 @@ void refreshIfStale() {
@VisibleForTesting
Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
TableIdentifier identifier = destination.getTableIdentifier();
@Nullable
LastRefreshedTable lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier);
@Nullable LastRefreshedTable lastRefreshedTable = tableCache.getIfPresent(identifier);
if (lastRefreshedTable != null && lastRefreshedTable.table != null) {
lastRefreshedTable.refreshIfStale();
return lastRefreshedTable.table;
Expand All @@ -338,7 +343,7 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
: Maps.newHashMap();

@Nullable Table table = null;
synchronized (LAST_REFRESHED_TABLE_CACHE) {
synchronized (tableCache) {
// Create namespace if it does not exist yet
if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) {
SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog;
Expand Down Expand Up @@ -375,7 +380,7 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
}
}
lastRefreshedTable = new LastRefreshedTable(table, Instant.now());
LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable);
tableCache.put(identifier, lastRefreshedTable);
return table;
}

Expand Down Expand Up @@ -434,20 +439,6 @@ public void close() throws IOException {
state.dataFiles.clear();
}
} finally {
// Close unique FileIO instances now that all writers are done.
// table.io() may return a shared FileIO; we deduplicate by identity
// so we close each underlying connection pool exactly once.
Set<FileIO> closedIOs = new HashSet<>();
for (DestinationState state : destinations.values()) {
FileIO io = state.table.io();
if (io != null && closedIOs.add(io)) {
try {
io.close();
} catch (Exception e) {
LOG.warn("Failed to close FileIO for table '{}'", state.table.name(), e);
}
}
}
destinations.clear();
}
checkArgument(
Expand Down
Loading
Loading