Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToParquet.Options;
import com.google.cloud.teleport.io.parquet.ParquetIO;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
Expand All @@ -36,7 +37,6 @@
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -168,6 +168,22 @@ public interface Options extends PipelineOptions {

@SuppressWarnings("unused")
void setMinRowCountForPageSizeCheck(ValueProvider<Integer> minRowCountForPageSizeCheck);

@TemplateParameter.Integer(
order = 9,
optional = true,
description = "Maximum row count for page size check",
helpText =
"The maximum number of rows to buffer before a page size check is forced. By default "
+ "Parquet estimates when to check from the average row size and can defer it (up to "
+ "10000 rows); with many small rows followed by large rows, that estimate can let the "
+ "in-memory page buffer overflow before a flush. Set a low value (for example, 1), "
+ "together with minRowCountForPageSizeCheck, to bound memory for tables whose row "
+ "sizes vary widely. The default is 10000.")
ValueProvider<Integer> getMaxRowCountForPageSizeCheck();

@SuppressWarnings("unused")
void setMaxRowCountForPageSizeCheck(ValueProvider<Integer> maxRowCountForPageSizeCheck);
}

/**
Expand Down Expand Up @@ -209,9 +225,12 @@ public static PipelineResult run(Options options) {
* Steps: 1) Read records from Bigtable. 2) Convert a Bigtable Row to a GenericRecord. 3) Write
* GenericRecord(s) to GCS in parquet format.
*/
// Uses a forked copy of Beam's ParquetIO (com.google.cloud.teleport.io.parquet) so that the
// maxRowCountForPageSizeCheck knob is available without waiting for a Beam release.
ParquetIO.Sink parquetSink =
ParquetIO.sink(BigtableRow.getClassSchema())
.withMinRowCountForPageSizeCheck(options.getMinRowCountForPageSizeCheck());
.withMinRowCountForPageSizeCheck(options.getMinRowCountForPageSizeCheck())
.withMaxRowCountForPageSizeCheck(options.getMaxRowCountForPageSizeCheck());
FileIO.Write<Void, GenericRecord> write =
FileIO.<GenericRecord>write()
.via(parquetSink)
Expand Down
Loading
Loading