From 8cc33a85d0db54861626b48cfa6f893f5856645b Mon Sep 17 00:00:00 2001 From: Phillip Cunliffe Date: Tue, 9 Jun 2026 10:48:19 -0700 Subject: [PATCH 1/3] Add scan pruning and sort-on-write (closes #20, #21, #22) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the Iceberg scan-performance set: prune non-matching data on read, and lay out data sorted so those bounds are tight. #20 — file pruning via manifest column bounds: - New src/write/serde.js: extract the Iceberg single-value codec + type comparator out of stats.js (no behavior change) and add deserializeValue, the read-side inverse of serializeValue. - New fileMightMatch in prune.js (sharing a refactored AND/OR/$nor walker with partitionMightMatch), AND'd into icebergDataSource scan planning. It decodes each data file's lower_bounds/upper_bounds and skips files whose bounds prove no row can match. Conservative: never range-prunes truncated string/binary bounds, keeps on any uncertainty. #21 — parquet row-group pruning (verify + test): - hyparquet's parquetPlan/canSkipRowGroup already skips row groups by per-row-group statistics + bloom filters when a filter is passed, which icebird already does. Added scanPruning.test.js asserting reduced bytes read for a selective predicate on a multi-row-group file, plus a missing-stats safe-fallback test. #22 — sort on append + compaction: - New src/write/sort.js buildSortComparator (direction, null-order, transforms, NaN-last, stable). prepareAppend now orders each written file by the table's default sort order and records the real sort_order_id; a sortOrderId override is threaded through icebergAppend/StageAppend/tx.append. Empty sort order is a no-op. - New src/write/rewrite.js + icebergRewrite (exported): reads every live row (deletes applied), sorts globally, regroups under the target spec, writes consolidated sorted files, and commits a replace snapshot. Not retried on conflict (would risk dropping concurrently-appended rows); v2-only for now (v3 row-lineage preservation is a follow-up). Docs: README Supported Features (Sorting, Scan Pruning) + a compaction snippet; data-source doc comment. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 15 +- src/index.js | 2 +- src/prune.js | 219 +++++++++++++++++++- src/sql/icebergDataSource.js | 29 +-- src/types.d.ts | 5 +- src/write/rewrite.js | 216 ++++++++++++++++++++ src/write/serde.js | 315 +++++++++++++++++++++++++++++ src/write/sort.js | 95 +++++++++ src/write/stage.js | 38 +++- src/write/stats.js | 229 +-------------------- src/write/write.js | 44 +++- test/prune.bounds.test.js | 175 ++++++++++++++++ test/serde.test.js | 121 +++++++++++ test/sql/icebergDataSource.test.js | 10 +- test/sql/scanPruning.test.js | 291 ++++++++++++++++++++++++++ test/write/rewrite.test.js | 241 ++++++++++++++++++++++ test/write/sort.test.js | 149 ++++++++++++++ 17 files changed, 1931 insertions(+), 263 deletions(-) create mode 100644 src/write/rewrite.js create mode 100644 src/write/serde.js create mode 100644 src/write/sort.js create mode 100644 test/prune.bounds.test.js create mode 100644 test/serde.test.js create mode 100644 test/sql/scanPruning.test.js create mode 100644 test/write/rewrite.test.js create mode 100644 test/write/sort.test.js diff --git a/README.md b/README.md index 8cbb3a4..f3a8203 100644 --- a/README.md +++ b/README.md @@ -140,6 +140,7 @@ import { icebergCreateTable, icebergDelete, icebergExpireSnapshots, + icebergRewrite, icebergSetRef, } from 'icebird' @@ -172,6 +173,17 @@ await icebergSetRef({ catalog, tableUrl, ref: 'main', snapshotId }) await icebergExpireSnapshots({ catalog, tableUrl, snapshotIds: [oldSnapshotId] }) ``` +If the table is created with a `sortOrder`, `icebergAppend` orders the rows in each written file by that order (tightening per-file column bounds for scan pruning). `icebergRewrite` compacts the current snapshot — reading every live row (deletes applied), sorting globally, and rewriting into consolidated, non-overlapping files via a `replace` snapshot (v2 tables): + +```javascript +// compact small files into sorted, non-overlapping ones +await icebergRewrite({ catalog, tableUrl }) +// optionally split large partitions and/or re-partition under another spec +await icebergRewrite({ catalog, tableUrl, targetFileRows: 1_000_000, partitionSpecId: 1 }) +``` + +A rewrite is not retried on a concurrent commit (it would risk dropping rows another writer appended meanwhile); on conflict it throws and should be re-run against fresh metadata. + For a REST catalog, swap `fileCatalog(...)` for the connect context and pass `namespace`/`table` instead of `tableUrl`: ```javascript @@ -212,7 +224,8 @@ Icebird aims to support reading any Iceberg table, but currently only supports a | Geometry Types | ✅ | | | Geography Types | ✅ | | | Row Lineage | ✅ | v3 `_row_id` and `_last_updated_sequence_number` inheritance. | -| Sorting | ❌ | | +| Sorting | ✅ | Orders rows by the declared sort order on append; `icebergRewrite` compacts to sorted, non-overlapping files (v2). | +| Scan Pruning | ✅ | Skips data files via partition tuples and manifest column bounds, and parquet row groups via column statistics. | | Encryption | ❌ | | ## References diff --git a/src/index.js b/src/index.js index 69cab9b..46f8f5d 100644 --- a/src/index.js +++ b/src/index.js @@ -1,4 +1,4 @@ -export { IcebergTransactionConflictError, icebergAppend, icebergCreateTable, icebergDelete, icebergDropTable, icebergExpireSnapshots, icebergSetRef, icebergTransaction } from './write/write.js' +export { IcebergTransactionConflictError, icebergAppend, icebergCreateTable, icebergDelete, icebergDropTable, icebergExpireSnapshots, icebergRewrite, icebergSetRef, icebergTransaction } from './write/write.js' export { icebergCreate } from './create.js' export { fileCatalog } from './catalog/file.js' export { restCatalogConnect, restCatalogCreateNamespace, restCatalogDropNamespace, restCatalogListNamespaces, restCatalogListTables, restCatalogLoadCredentials, restCatalogLoadTable, restCatalogRegisterTable, restCatalogRenameTable } from './catalog/rest.js' diff --git a/src/prune.js b/src/prune.js index ac26604..12c7ac4 100644 --- a/src/prune.js +++ b/src/prune.js @@ -1,4 +1,6 @@ +import { typeName } from './schema.js' import { applyTransform } from './write/transform.js' +import { compare, deserializeValue } from './write/serde.js' /** * Partition-level scan pruning. Given a hyparquet query filter (keyed by @@ -26,7 +28,34 @@ export function partitionMightMatch(filter, dataEntry, schema, metadata) { const spec = metadata['partition-specs'].find(s => s['spec-id'] === dataEntry.partition_spec_id) // No spec or unpartitioned: nothing to prune on. if (!spec || spec.fields.length === 0) return true - return nodeMightMatch(filter, { spec, schema, partition: dataEntry.data_file.partition }) + /** @type {PruneContext} */ + const ctx = { spec, schema, partition: dataEntry.data_file.partition } + return nodeMightMatch(filter, (column, condition) => columnMightMatch(column, condition, ctx)) +} + +/** + * File-level scan pruning via per-column manifest bounds. Given a hyparquet + * query filter (keyed by iceberg field name) and a data manifest entry, decide + * whether the entry's `lower_bounds` / `upper_bounds` could contain a row + * matching the filter. + * + * Like `partitionMightMatch` this is an inclusive projection: a file is skipped + * only when its bounds prove that no row can match, and any uncertainty keeps + * the file. Bounds for string/binary/fixed/uuid are stored truncated (Iceberg + * `truncate(16)` metrics) so those types are never range-pruned. Pruning never + * changes query results, only which files are read. + * + * @param {ParquetQueryFilter} filter - Filter keyed by iceberg field name. + * @param {ManifestEntry} dataEntry + * @param {Schema} schema - Current schema (filter column names map to its fields). + * @returns {boolean} true if the file must be read, false if it can be skipped. + */ +export function fileMightMatch(filter, dataEntry, schema) { + const { lower_bounds, upper_bounds } = dataEntry.data_file + // No bounds at all: nothing to prune on. + if (lower_bounds === undefined && upper_bounds === undefined) return true + return nodeMightMatch(filter, (column, condition) => + boundsMightMatch(column, condition, dataEntry.data_file, schema)) } /** @@ -34,26 +63,30 @@ export function partitionMightMatch(filter, dataEntry, schema, metadata) { */ /** + * Generic AND/OR/$nor walk over a hyparquet filter. Top-level keys are + * AND-combined: the file is ruled out if any branch is ruled out. The leaf + * evaluator decides a single `{column: condition}` predicate and returns false + * only when it proves the file cannot match. + * * @param {ParquetQueryFilter} node - * @param {PruneContext} ctx + * @param {(column: string, condition: any) => boolean} leafFn * @returns {boolean} */ -function nodeMightMatch(node, ctx) { +function nodeMightMatch(node, leafFn) { if (!node || typeof node !== 'object') return true const anyNode = /** @type {Record} */ (node) - // Top-level keys are AND-combined: the file is ruled out if any is ruled out. for (const [key, val] of Object.entries(anyNode)) { if (key === '$and') { const subs = /** @type {ParquetQueryFilter[]} */ (val) - if (!subs.every(sub => nodeMightMatch(sub, ctx))) return false + if (!subs.every(sub => nodeMightMatch(sub, leafFn))) return false } else if (key === '$or') { const subs = /** @type {ParquetQueryFilter[]} */ (val) - if (!subs.some(sub => nodeMightMatch(sub, ctx))) return false + if (!subs.some(sub => nodeMightMatch(sub, leafFn))) return false } else if (key === '$nor') { // NOT(a OR b): can't safely prune, keep. continue } else { - if (!columnMightMatch(key, val, ctx)) return false + if (!leafFn(key, val)) return false } } return true @@ -331,3 +364,175 @@ function numericOf(x) { function sign(n) { return n < 0 ? -1 : n > 0 ? 1 : 0 } + +/** + * Whether the file could match `condition` on `column` given the column's + * decoded [lower, upper] bounds in the manifest entry. Returns true (keep) on + * any uncertainty, including columns without bounds and non-orderable types. + * + * @param {string} column - iceberg field name + * @param {any} condition - operator object like {$eq: x} or a bare value (eq) + * @param {DataFile} dataFile + * @param {Schema} schema + * @returns {boolean} + */ +function boundsMightMatch(column, condition, dataFile, schema) { + const field = schema.fields.find(f => f.name === column) + if (!field) return true + // Bounds and metric ordering are only defined for orderable scalar types. + // String/binary/fixed/uuid bounds are truncated prefixes, so we never use + // them for pruning (equality on a truncated prefix is undecidable). + if (!isOrderableForBounds(field.type)) return true + + const lowerBytes = boundForField(dataFile.lower_bounds, field.id) + const upperBytes = boundForField(dataFile.upper_bounds, field.id) + if (lowerBytes === undefined && upperBytes === undefined) return true + const lo = lowerBytes !== undefined ? deserializeValue(lowerBytes, field.type) : undefined + const hi = upperBytes !== undefined ? deserializeValue(upperBytes, field.type) : undefined + if (lo === undefined && hi === undefined) return true + + for (const { op, value } of normalizeCondition(condition)) { + if (!boundsOpMightMatch(op, value, lo, hi, field.type)) return false + } + return true +} + +/** + * Look up a column's bound bytes by field id. Read-decoded Iceberg maps arrive + * as an array of `{key, value}` records (Avro int-keyed maps); hand-built + * entries may instead be a plain `Record`. Handle both. + * + * @param {any} map - lower_bounds or upper_bounds from a manifest data_file + * @param {number} fieldId + * @returns {Uint8Array | undefined} + */ +function boundForField(map, fieldId) { + if (map === undefined || map === null) return undefined + if (Array.isArray(map)) { + const entry = map.find(e => e && Number(e.key) === fieldId) + return entry ? entry.value : undefined + } + const v = map[fieldId] + return v instanceof Uint8Array ? v : undefined +} + +/** + * Whether a type has totally-ordered, untruncated single-value bounds suitable + * for range/equality pruning. String/binary/fixed/uuid are excluded because + * their bounds are stored truncated. + * + * @param {IcebergType} type + * @returns {boolean} + */ +function isOrderableForBounds(type) { + const name = typeName(type) + if (name.startsWith('decimal(')) return true + switch (name) { + case 'boolean': + case 'int': + case 'long': + case 'float': + case 'double': + case 'date': + case 'time': + case 'timestamp': + case 'timestamptz': + case 'timestamp_ns': + case 'timestamptz_ns': + return true + default: + return false + } +} + +/** + * Whether a value range [lo, hi] (either side may be open/undefined) could + * satisfy `op value`. Mirrors hyparquet's `canSkipRowGroup` operator semantics + * but at file granularity. Returns true (keep) on any uncertainty; returns + * false only when the predicate is provably unsatisfiable for the whole range. + * + * @param {string} op - mongo-style operator + * @param {any} value - the predicate literal (array for $in/$nin) + * @param {any} lo - decoded lower bound, or undefined (open below) + * @param {any} hi - decoded upper bound, or undefined (open above) + * @param {IcebergType} type + * @returns {boolean} + */ +function boundsOpMightMatch(op, value, lo, hi, type) { + switch (op) { + case '$lt': { + // need some x < value; smallest is lo. Skip if lo >= value. + if (lo === undefined) return true + const c = safeCompare(lo, value, type) + return c === undefined ? true : c < 0 + } + case '$lte': { + if (lo === undefined) return true + const c = safeCompare(lo, value, type) + return c === undefined ? true : c <= 0 + } + case '$gt': { + // need some x > value; largest is hi. Skip if hi <= value. + if (hi === undefined) return true + const c = safeCompare(hi, value, type) + return c === undefined ? true : c > 0 + } + case '$gte': { + if (hi === undefined) return true + const c = safeCompare(hi, value, type) + return c === undefined ? true : c >= 0 + } + case '$eq': + return eqInRange(value, lo, hi, type) + case '$in': + if (!Array.isArray(value)) return true + // Keep if any listed value could fall in [lo, hi]. + return value.some(x => eqInRange(x, lo, hi, type)) + // $ne / $nin can only prune a single-valued file fully covered by the + // excluded value(s); too rare to bother — keep. + default: + return true + } +} + +/** + * Whether `value` could lie within [lo, hi] (open sides allowed). Keep on any + * undecidable comparison. + * + * @param {any} value + * @param {any} lo + * @param {any} hi + * @param {IcebergType} type + * @returns {boolean} + */ +function eqInRange(value, lo, hi, type) { + if (lo !== undefined) { + const c = safeCompare(value, lo, type) + if (c !== undefined && c < 0) return false + } + if (hi !== undefined) { + const c = safeCompare(value, hi, type) + if (c !== undefined && c > 0) return false + } + return true +} + +/** + * Type-aware comparison that returns undefined (rather than throwing or + * returning NaN) when the two values cannot be meaningfully ordered, so the + * caller keeps the file. + * + * @param {any} a + * @param {any} b + * @param {IcebergType} type + * @returns {number | undefined} + */ +function safeCompare(a, b, type) { + if (a === null || a === undefined || b === null || b === undefined) return undefined + try { + const c = compare(a, b, type) + return Number.isNaN(c) ? undefined : c + } catch { + return undefined + } +} diff --git a/src/sql/icebergDataSource.js b/src/sql/icebergDataSource.js index 8a5e341..0152e3d 100644 --- a/src/sql/icebergDataSource.js +++ b/src/sql/icebergDataSource.js @@ -3,7 +3,7 @@ import { fetchDeleteMaps, urlResolver } from '../fetch.js' import { icebergManifests, splitManifestEntries } from '../manifest.js' import { icebergMetadata } from '../metadata.js' import { readDataFile } from '../read.js' -import { partitionMightMatch } from '../prune.js' +import { fileMightMatch, partitionMightMatch } from '../prune.js' import { whereToParquetFilter } from './whereFilter.js' /** @@ -22,11 +22,13 @@ import { whereToParquetFilter } from './whereFilter.js' * - Column projection (`columns`) is pushed into the parquet read so only the * requested columns are decoded. Equality-delete predicate columns and row * lineage columns are read regardless when needed. - * - WHERE is pushed down to hyparquet (row-group pruning via statistics and - * bloom filters, plus per-row matching) when the expression can be fully - * converted to a parquet filter (comparisons, IN, AND/OR/NOT on identifier - * vs literal). Unsupported nodes (LIKE, functions, arithmetic, identifier - * vs identifier) leave WHERE for the engine to apply. + * - WHERE prunes whole data files before they are opened, using each manifest + * entry's partition tuple and per-column `lower_bounds`/`upper_bounds`, and + * is pushed down to hyparquet (row-group pruning via statistics and bloom + * filters, plus per-row matching) when the expression can be fully converted + * to a parquet filter (comparisons, IN, AND/OR/NOT on identifier vs literal). + * Unsupported nodes (LIKE, functions, arithmetic, identifier vs identifier) + * leave WHERE for the engine to apply. * - When WHERE is resolved at scan time (either absent or fully pushed) we * cap the scan at `offset + limit` rows so the source terminates early. * OFFSET is also pushed into the parquet seek when there are no deletes; @@ -89,13 +91,16 @@ export async function icebergDataSource({ tableUrl, metadataFileName, metadata, // the engine must re-apply it. const filter = whereToParquetFilter(where) const appliedWhere = where !== undefined && filter !== undefined - // Partition-level scan pruning: drop data files whose partition tuple - // proves no row can match the filter. Manifest entries are already in - // memory, so this is a cheap synchronous pre-filter that avoids opening - // the pruned files entirely. Pruning never drops a file with a matching - // row, so query results are unchanged. + // Scan pruning: drop data files whose partition tuple OR per-column + // manifest bounds prove no row can match the filter. Manifest entries are + // already in memory, so this is a cheap synchronous pre-filter that + // avoids opening the pruned files entirely. Both pruners are inclusive + // projections (they never drop a file with a matching row), so query + // results are unchanged. const scanEntries = filter - ? dataEntries.filter(entry => partitionMightMatch(filter, entry, schema, tableMetadata)) + ? dataEntries.filter(entry => + partitionMightMatch(filter, entry, schema, tableMetadata) && + fileMightMatch(filter, entry, schema)) : dataEntries const pruned = scanEntries.length < dataEntries.length // Treat a fully-pushed-down WHERE the same as "no WHERE" for the diff --git a/src/types.d.ts b/src/types.d.ts index 05b56e0..d457a6a 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -206,6 +206,9 @@ export interface Snapshot { // 'spark.app.id'?: string 'added-data-files'?: string 'added-records'?: string + 'deleted-data-files'?: string + 'deleted-records'?: string + 'removed-files-size'?: string 'added-delete-files'?: string 'removed-delete-files'?: string 'added-position-deletes'?: string @@ -298,7 +301,7 @@ export type TableUpdate = * accumulated updates ship in one commit when the callback resolves. */ export interface IcebergTransaction { - append(options: { records: Record[] }): Promise + append(options: { records: Record[], sortOrderId?: number }): Promise delete(options: { deletes: { file_path: string, pos: bigint | number }[] mode?: 'puffin' | 'parquet' diff --git a/src/write/rewrite.js b/src/write/rewrite.js new file mode 100644 index 0000000..6f8144b --- /dev/null +++ b/src/write/rewrite.js @@ -0,0 +1,216 @@ +import { uuid4 } from '../utils.js' +import { icebergRead } from '../read.js' +import { writeDataManifest } from './manifest.js' +import { writeParquet } from './parquet.js' +import { groupByPartition } from './partition.js' +import { buildSortComparator } from './sort.js' +import { + buildPartitionSummaries, + buildSnapshotUpdate, + currentSnapshot, + loadPriorManifests, +} from './snapshot.js' +import { computeColumnStats } from './stats.js' +import { checkWriteFormat, newSnapshotId, resolveParquetCodec } from './stage.js' + +/** + * @import {Manifest, Resolver, Snapshot, StagedUpdate, TableMetadata} from '../../src/types.js' + */ + +/** + * Stage a compaction / rewrite of a table's current snapshot. Reads every live + * row (applying all delete files), orders the rows by the declared sort order, + * regroups them under the target partition spec, and writes new sorted data + * files. The result is committed as a `replace` snapshot that supersedes all of + * the prior snapshot's data and delete manifests. + * + * Because globally-sorted rows are written in order, the per-file bounds of the + * sort key are tight; with `targetFileRows` set, consecutive output files have + * non-overlapping sort-key ranges (clean splits assume distinct keys at the + * boundary). Row contents and counts are preserved (modulo deleted rows and + * order); deletes are consumed, so the new snapshot has no delete files. + * + * v2 only for now: a v3 rewrite would have to preserve `_row_id` row lineage + * rather than let `assignFirstRowIds` renumber the rewritten rows. + * + * @param {object} options + * @param {string} options.tableUrl + * @param {TableMetadata} options.metadata - Current (freshest) table metadata. + * @param {Resolver} options.resolver - Resolver with a writer method. + * @param {number} [options.sortOrderId] - Sort order id to apply; defaults to the table default. + * @param {number} [options.partitionSpecId] - Target partition spec id; defaults to `default-spec-id`. + * @param {number} [options.targetFileRows] - Max rows per output file (split large partitions). + * @returns {Promise} + */ +export async function icebergStageRewrite({ + tableUrl, metadata, resolver, sortOrderId, partitionSpecId, targetFileRows, +}) { + if (!tableUrl) throw new Error('tableUrl is required') + if (!resolver?.writer) throw new Error('resolver.writer is required') + const writerFn = resolver.writer + const formatVersion = metadata['format-version'] + if (formatVersion !== 2) { + throw new Error(`icebergRewrite supports format-version 2 only (got ${formatVersion}); v3 row lineage is not yet handled`) + } + if (targetFileRows !== undefined && !(targetFileRows > 0)) { + throw new Error('targetFileRows must be a positive number') + } + + const snapshot = currentSnapshot(metadata) + if (!snapshot) throw new Error('no current snapshot to rewrite') + + const schema = metadata.schemas.find(s => s['schema-id'] === metadata['current-schema-id']) + if (!schema) throw new Error('current schema not found in metadata') + const specId = partitionSpecId ?? metadata['default-spec-id'] + const partitionSpec = metadata['partition-specs'].find(s => s['spec-id'] === specId) + if (!partitionSpec) throw new Error(`partition spec ${specId} not found in metadata`) + + // Resolve the sort order to apply (table default unless overridden). + const orderId = sortOrderId ?? metadata['default-sort-order-id'] ?? 0 + const sortOrder = (metadata['sort-orders'] ?? []).find(o => o['order-id'] === orderId) + if (sortOrderId !== undefined && !sortOrder) { + throw new Error(`sort order ${sortOrderId} not found in metadata`) + } + const comparator = buildSortComparator(sortOrder, schema) + const appliedSortOrderId = comparator ? orderId : 0 + + checkWriteFormat(metadata.properties?.['write.format.default']) + const codec = resolveParquetCodec(metadata.properties?.['write.parquet.compression-codec']) + + // Read every live row (deletes applied), then sort globally. + const liveRows = await icebergRead({ tableUrl, metadata, resolver }) + const sortedRows = comparator ? [...liveRows].sort(comparator) : liveRows + + // Regroup under the target partition spec (re-derives tuples from values, so + // files written under an older spec are rewritten under the new one). + const groups = partitionSpec.fields.length + ? groupByPartition(sortedRows, schema, partitionSpec) + : [{ partition: {}, records: sortedRows }] + + const snapshotId = newSnapshotId(metadata) + const manifestUuid = uuid4() + + /** @type {{ partition: Record, dataFile: any, path: string }[]} */ + const writtenDataFiles = [] + for (const group of groups) { + const chunks = targetFileRows ? chunkRecords(group.records, targetFileRows) : [group.records] + for (const chunk of chunks) { + if (chunk.length === 0) continue + const dataPath = `${tableUrl}/data/${uuid4()}.parquet` + const dataWriter = writerFn(dataPath) + await writeParquet({ writer: dataWriter, schema, records: chunk, codec }) + const stats = computeColumnStats(chunk, schema) + writtenDataFiles.push({ + partition: group.partition, + dataFile: { + content: /** @type {0} */ (0), + file_path: dataPath, + file_format: /** @type {'parquet'} */ ('parquet'), + partition: group.partition, + record_count: BigInt(chunk.length), + file_size_in_bytes: BigInt(dataWriter.offset), + value_counts: stats.value_counts, + null_value_counts: stats.null_value_counts, + nan_value_counts: stats.nan_value_counts, + lower_bounds: stats.lower_bounds, + upper_bounds: stats.upper_bounds, + sort_order_id: appliedSortOrderId, + }, + path: dataPath, + }) + } + } + + const manifestPath = `${tableUrl}/metadata/${manifestUuid}-m0.avro` + const manifestWriter = writerFn(manifestPath) + await writeDataManifest({ + writer: manifestWriter, + schema, + partitionSpec, + snapshotId, + dataFiles: writtenDataFiles.map(f => f.dataFile), + formatVersion, + }) + const manifestLength = BigInt(manifestWriter.offset) + + const sequenceNumber = BigInt(metadata['last-sequence-number'] ?? 0) + 1n + const timestampMs = Date.now() + const addedRowCount = writtenDataFiles.reduce((sum, f) => sum + f.dataFile.record_count, 0n) + const addedFilesSize = writtenDataFiles.reduce((sum, f) => sum + f.dataFile.file_size_in_bytes, 0n) + const partitions = buildPartitionSummaries( + writtenDataFiles.map(f => f.dataFile.partition), + schema, + partitionSpec + ) + + /** @type {Manifest} */ + const newManifest = { + manifest_path: manifestPath, + manifest_length: manifestLength, + partition_spec_id: partitionSpec['spec-id'], + content: 0, + sequence_number: sequenceNumber, + min_sequence_number: sequenceNumber, + added_snapshot_id: snapshotId, + added_files_count: writtenDataFiles.length, + existing_files_count: 0, + deleted_files_count: 0, + added_rows_count: addedRowCount, + existing_rows_count: 0n, + deleted_rows_count: 0n, + partitions, + } + + // Supersede every prior manifest (data + delete) for the rewritten snapshot. + const priorManifests = await loadPriorManifests(metadata, resolver) + const skipPriorManifestPaths = new Set(priorManifests.map(m => m.manifest_path)) + let deletedDataFiles = 0 + let deletedRecords = 0n + for (const m of priorManifests) { + if (m.content !== 0) continue + deletedDataFiles += (m.added_files_count ?? 0) + (m.existing_files_count ?? 0) + deletedRecords += BigInt(m.added_rows_count ?? 0) + BigInt(m.existing_rows_count ?? 0) + } + + /** @type {Snapshot['summary']} */ + const summary = { + operation: 'replace', + 'added-data-files': String(writtenDataFiles.length), + 'added-records': String(addedRowCount), + 'added-files-size': String(addedFilesSize), + 'deleted-data-files': String(deletedDataFiles), + 'deleted-records': String(deletedRecords), + 'total-records': String(addedRowCount), + 'total-files-size': String(addedFilesSize), + 'total-data-files': String(writtenDataFiles.length), + 'total-delete-files': '0', + 'total-position-deletes': '0', + 'total-equality-deletes': '0', + } + + const staged = await buildSnapshotUpdate({ + tableUrl, metadata, resolver, + snapshotId, sequenceNumber, manifestUuid, timestampMs, formatVersion, + newManifests: [newManifest], + summary, + writtenFiles: [...writtenDataFiles.map(f => f.path), manifestPath], + priorManifests, + skipPriorManifestPaths, + }) + return staged +} + +/** + * Split an array into chunks of at most `size` elements. + * + * @template T + * @param {T[]} arr + * @param {number} size + * @returns {T[][]} + */ +function chunkRecords(arr, size) { + /** @type {T[][]} */ + const out = [] + for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)) + return out +} diff --git a/src/write/serde.js b/src/write/serde.js new file mode 100644 index 0000000..a4277fa --- /dev/null +++ b/src/write/serde.js @@ -0,0 +1,315 @@ +import { typeName } from '../schema.js' + +/** + * Iceberg single-value serialization (encode/decode) and the canonical + * per-type comparator. These are pure leaf utilities with no dependency on the + * stats/geospatial machinery, so the read path (`src/prune.js`) and the write + * path (`src/write/stats.js`, `src/write/sort.js`) can both share them without + * pulling geospatial code into the read/SQL bundle. + * + * @import {IcebergType} from '../../src/types.js' + */ + +/** + * Serialize a value per the Iceberg single-value serialization spec. + * Returns undefined for types we don't yet support so the bound is omitted. + * + * @param {any} value + * @param {IcebergType} type + * @returns {Uint8Array|undefined} + */ +export function serializeValue(value, type) { + const name = typeName(type) + if (name.startsWith('decimal(')) { + const m = /^decimal\((\d+),\s*(\d+)\)$/.exec(name) + if (!m) return undefined + const scale = parseInt(m[2], 10) + if (typeof value !== 'number' && typeof value !== 'bigint') return undefined + const factor = 10n ** BigInt(scale) + const unscaled = typeof value === 'bigint' + ? value * factor + : BigInt(Math.round(value * Number(factor))) + return twosComplementMinBigEndian(unscaled) + } + if (name.startsWith('fixed[')) { + return value instanceof Uint8Array ? value : undefined + } + switch (name) { + case 'boolean': { + return new Uint8Array([value ? 1 : 0]) + } + case 'int': { + const buf = new ArrayBuffer(4) + new DataView(buf).setInt32(0, value, true) + return new Uint8Array(buf) + } + case 'long': { + const buf = new ArrayBuffer(8) + new DataView(buf).setBigInt64(0, typeof value === 'bigint' ? value : BigInt(value), true) + return new Uint8Array(buf) + } + case 'float': { + const buf = new ArrayBuffer(4) + new DataView(buf).setFloat32(0, value, true) + return new Uint8Array(buf) + } + case 'double': { + const buf = new ArrayBuffer(8) + new DataView(buf).setFloat64(0, value, true) + return new Uint8Array(buf) + } + case 'date': { + // days since epoch, 4-byte little-endian int32 + const days = value instanceof Date + ? Math.floor(value.getTime() / 86400000) + : Number(value) + const buf = new ArrayBuffer(4) + new DataView(buf).setInt32(0, days, true) + return new Uint8Array(buf) + } + case 'time': { + // microseconds since midnight, 8-byte little-endian int64 + const buf = new ArrayBuffer(8) + new DataView(buf).setBigInt64(0, typeof value === 'bigint' ? value : BigInt(value), true) + return new Uint8Array(buf) + } + case 'timestamp': + case 'timestamptz': { + // micros since epoch, 8-byte little-endian + const buf = new ArrayBuffer(8) + new DataView(buf).setBigInt64(0, timestampToMicros(value), true) + return new Uint8Array(buf) + } + case 'timestamp_ns': + case 'timestamptz_ns': { + // nanos since epoch, 8-byte little-endian + const buf = new ArrayBuffer(8) + new DataView(buf).setBigInt64(0, timestampToNanos(value), true) + return new Uint8Array(buf) + } + case 'string': { + return new TextEncoder().encode(value) + } + case 'binary': { + return value instanceof Uint8Array ? value : undefined + } + case 'uuid': { + if (value instanceof Uint8Array && value.length === 16) return value + if (typeof value === 'string') return uuidStringToBytes(value) + return undefined + } + default: + return undefined + } +} + +/** + * Deserialize a single value from Iceberg single-value serialization, the + * inverse of `serializeValue`. Used to decode manifest `lower_bounds` / + * `upper_bounds` (and stat bounds generally) into JS values in the same domain + * the read-side comparators understand (number / bigint / string / boolean / + * Uint8Array). + * + * Returns `undefined` for unsupported types or malformed input so callers can + * treat the bound as absent (keep the file) rather than mis-prune. Note that + * string/binary/fixed bounds may be *truncated* prefixes (per Iceberg's + * default `truncate(16)` metrics): the decoded value is the stored prefix, not + * necessarily the true column min/max. + * + * @param {Uint8Array} bytes + * @param {IcebergType} type + * @returns {any} + */ +export function deserializeValue(bytes, type) { + if (!(bytes instanceof Uint8Array)) return undefined + const name = typeName(type) + try { + if (name.startsWith('decimal(')) { + const m = /^decimal\((\d+),\s*(\d+)\)$/.exec(name) + if (!m) return undefined + const scale = parseInt(m[2], 10) + const unscaled = twosComplementBigEndianToBigInt(bytes) + return Number(unscaled) / 10 ** scale + } + if (name.startsWith('fixed[')) { + return bytes + } + const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength) + switch (name) { + case 'boolean': return bytes[0] !== 0 + case 'int': return view.getInt32(0, true) + case 'long': return view.getBigInt64(0, true) + case 'float': return view.getFloat32(0, true) + case 'double': return view.getFloat64(0, true) + // days since epoch (number) + case 'date': return view.getInt32(0, true) + // micros since midnight (bigint) + case 'time': return view.getBigInt64(0, true) + // micros since epoch (bigint) + case 'timestamp': + case 'timestamptz': return view.getBigInt64(0, true) + // nanos since epoch (bigint) + case 'timestamp_ns': + case 'timestamptz_ns': return view.getBigInt64(0, true) + case 'string': return new TextDecoder().decode(bytes) + case 'binary': return bytes + case 'uuid': return bytes + default: return undefined + } + } catch { + return undefined + } +} + +/** + * Compare two non-null values of the given iceberg type. + * + * @param {any} a + * @param {any} b + * @param {IcebergType} type + * @returns {number} + */ +export function compare(a, b, type) { + switch (typeName(type)) { + case 'boolean': + return (a ? 1 : 0) - (b ? 1 : 0) + case 'int': + return a < b ? -1 : a > b ? 1 : 0 + case 'float': + case 'double': + return compareFloating(a, b) + case 'long': { + const ai = typeof a === 'bigint' ? a : BigInt(a) + const bi = typeof b === 'bigint' ? b : BigInt(b) + return ai < bi ? -1 : ai > bi ? 1 : 0 + } + case 'timestamp': + case 'timestamptz': + return compareBigInt(timestampToMicros(a), timestampToMicros(b)) + case 'timestamp_ns': + case 'timestamptz_ns': + return compareBigInt(timestampToNanos(a), timestampToNanos(b)) + case 'string': + return a < b ? -1 : a > b ? 1 : 0 + case 'binary': + case 'uuid': + return compareBytes(a, b) + default: + if (typeName(type).startsWith('fixed[')) return compareBytes(a, b) + return a < b ? -1 : a > b ? 1 : 0 + } +} + +/** + * Floating bounds must preserve -0.0 and +0.0 distinctly, with -0.0 ordered + * below +0.0. NaNs are counted separately and not compared for bounds. + * + * @param {number} a + * @param {number} b + * @returns {number} + */ +export function compareFloating(a, b) { + if (Object.is(a, b)) return 0 + if (a === 0 && b === 0) return Object.is(a, -0) ? -1 : 1 + return a < b ? -1 : a > b ? 1 : 0 +} + +/** + * @param {bigint} a + * @param {bigint} b + * @returns {number} + */ +export function compareBigInt(a, b) { + return a < b ? -1 : a > b ? 1 : 0 +} + +/** + * @param {any} value + * @returns {bigint} + */ +export function timestampToMicros(value) { + return typeof value === 'bigint' ? value + : value instanceof Date ? BigInt(value.getTime()) * 1000n + : BigInt(value) +} + +/** + * @param {any} value + * @returns {bigint} + */ +export function timestampToNanos(value) { + return typeof value === 'bigint' ? value + : value instanceof Date ? BigInt(value.getTime()) * 1000000n + : BigInt(value) +} + +/** + * Lexicographic unsigned-byte comparison. + * + * @param {Uint8Array} a + * @param {Uint8Array} b + * @returns {number} + */ +export function compareBytes(a, b) { + const len = Math.min(a.length, b.length) + for (let i = 0; i < len; i++) { + if (a[i] !== b[i]) return a[i] - b[i] + } + return a.length - b.length +} + +/** + * Encode a signed bigint as the minimum number of bytes in two's-complement + * big-endian form. Matches the Iceberg single-value serialization for + * decimals. + * + * @param {bigint} value + * @returns {Uint8Array} + */ +export function twosComplementMinBigEndian(value) { + const bytes = [] + let v = value + while (true) { + const byte = Number(v & 0xffn) + bytes.unshift(byte) + v >>= 8n + const sign = byte & 0x80 + if (!sign && v === 0n || sign && v === -1n) break + } + return new Uint8Array(bytes) +} + +/** + * Decode a two's-complement big-endian byte array into a signed bigint, the + * inverse of `twosComplementMinBigEndian`. + * + * @param {Uint8Array} bytes + * @returns {bigint} + */ +export function twosComplementBigEndianToBigInt(bytes) { + if (bytes.length === 0) return 0n + let v = 0n + for (const b of bytes) v = v << 8n | BigInt(b) + const bits = BigInt(bytes.length * 8) + // sign-extend if the high bit is set + if (v & 1n << bits - 1n) v -= 1n << bits + return v +} + +/** + * Parse a canonical UUID string to its 16 big-endian bytes. + * + * @param {string} s + * @returns {Uint8Array|undefined} + */ +export function uuidStringToBytes(s) { + const hex = s.replace(/-/g, '') + if (hex.length !== 32) return undefined + const out = new Uint8Array(16) + for (let i = 0; i < 16; i++) { + const byte = parseInt(hex.slice(i * 2, i * 2 + 2), 16) + if (Number.isNaN(byte)) return undefined + out[i] = byte + } + return out +} diff --git a/src/write/sort.js b/src/write/sort.js new file mode 100644 index 0000000..f6eb930 --- /dev/null +++ b/src/write/sort.js @@ -0,0 +1,95 @@ +import { compare } from './serde.js' +import { applyTransform, transformResultType } from './transform.js' + +/** + * @import {IcebergType, Schema, SortOrder} from '../../src/types.js' + */ + +/** + * Build a record comparator from a table sort order. Records are ordered by + * each sort field in turn, applying the field's transform to the source value + * and comparing in the transform's result type. Honors direction (asc/desc) + * and null ordering (nulls-first/last, independent of direction). Returns + * `undefined` for an empty sort order so callers can skip sorting entirely. + * + * The comparator returns 0 for records with equal sort keys, so a stable sort + * (`Array.prototype.sort`) preserves input order among ties. + * + * @param {SortOrder | undefined} sortOrder + * @param {Schema} schema + * @returns {((a: Record, b: Record) => number) | undefined} + */ +export function buildSortComparator(sortOrder, schema) { + if (!sortOrder?.fields?.length) return undefined + + const fields = sortOrder.fields.map(sf => { + const sourceId = sf['source-id'] ?? sf['source-ids']?.[0] + const sourceField = schema.fields.find(f => f.id === sourceId) + if (!sourceField) throw new Error(`sort source field id ${sourceId} not found in schema`) + return { + name: sourceField.name, + transform: sf.transform, + sourceType: sourceField.type, + resultType: transformResultType(sf.transform, sourceField.type), + desc: sf.direction === 'desc', + nullsFirst: sf['null-order'] === 'nulls-first', + } + }) + + return (a, b) => { + for (const f of fields) { + const ka = sortKey(a[f.name], f.transform, f.sourceType) + const kb = sortKey(b[f.name], f.transform, f.sourceType) + const c = compareKeys(ka, kb, f.resultType, f.desc, f.nullsFirst) + if (c !== 0) return c + } + return 0 + } +} + +/** + * Project a source value to its sort key under the field's transform. Null / + * undefined pass through as null. + * + * @param {any} value + * @param {string} transform + * @param {IcebergType} sourceType + * @returns {any} + */ +function sortKey(value, transform, sourceType) { + if (value === null || value === undefined) return null + if (transform === 'identity') return value + return applyTransform(/** @type {any} */ (transform), value, sourceType) +} + +/** + * Compare two sort keys honoring null ordering, NaN placement (greatest), and + * direction. Null ordering is independent of direction; direction reverses the + * comparison of present, non-NaN values. + * + * @param {any} ka + * @param {any} kb + * @param {IcebergType} resultType + * @param {boolean} desc + * @param {boolean} nullsFirst + * @returns {number} + */ +function compareKeys(ka, kb, resultType, desc, nullsFirst) { + const aNull = ka === null || ka === undefined + const bNull = kb === null || kb === undefined + if (aNull && bNull) return 0 + if (aNull) return nullsFirst ? -1 : 1 + if (bNull) return nullsFirst ? 1 : -1 + + // NaN is ordered greatest (Iceberg); direction still reverses. + const aNaN = typeof ka === 'number' && Number.isNaN(ka) + const bNaN = typeof kb === 'number' && Number.isNaN(kb) + if (aNaN || bNaN) { + if (aNaN && bNaN) return 0 + const c = aNaN ? 1 : -1 + return desc ? -c : c + } + + const c = compare(ka, kb, resultType) + return desc ? -c : c +} diff --git a/src/write/stage.js b/src/write/stage.js index 0601bea..35f6f64 100644 --- a/src/write/stage.js +++ b/src/write/stage.js @@ -3,6 +3,7 @@ import { uuid4 } from '../utils.js' import { writeParquet } from './parquet.js' import { writeDataManifest } from './manifest.js' import { groupByPartition } from './partition.js' +import { buildSortComparator } from './sort.js' import { buildPartitionSummaries, buildSnapshotUpdate, @@ -34,6 +35,11 @@ import { computeColumnStats } from './stats.js' * Only supports v2/v3 tables. Partitioning is supported with identity, void, * bucket[N], truncate[W], year, month, day, and hour transforms. * + * Records within each written data file are ordered by the table's + * `default-sort-order` (or `sortOrderId` override) when that order has fields; + * an empty sort order leaves input order unchanged. The resulting + * `sort_order_id` is recorded on each data file. + * * @param {object} options * @param {string} options.tableUrl - Base URL of the table. * @param {TableMetadata} options.metadata - Current table metadata. Used for @@ -42,9 +48,11 @@ import { computeColumnStats } from './stats.js' * per attempt in `stageSnapshotForAppend`. * @param {Record[]} options.records - Rows to append. * @param {Resolver} options.resolver - Resolver with a writer method. + * @param {number} [options.sortOrderId] - Sort order id to apply; defaults to + * the table's `default-sort-order-id`. * @returns {Promise} */ -export async function prepareAppend({ tableUrl, metadata, records, resolver }) { +export async function prepareAppend({ tableUrl, metadata, records, resolver, sortOrderId }) { if (!tableUrl) throw new Error('tableUrl is required') if (!resolver?.writer) throw new Error('resolver.writer is required') const writerFn = resolver.writer @@ -68,30 +76,43 @@ export async function prepareAppend({ tableUrl, metadata, records, resolver }) { checkWriteFormat(metadata.properties?.['write.format.default']) const codec = resolveParquetCodec(metadata.properties?.['write.parquet.compression-codec']) + // Resolve the sort order to apply (table default unless overridden). An + // empty order yields no comparator and leaves records in input order. + const orderId = sortOrderId ?? metadata['default-sort-order-id'] ?? 0 + const sortOrder = (metadata['sort-orders'] ?? []).find(o => o['order-id'] === orderId) + if (sortOrderId !== undefined && !sortOrder) { + throw new Error(`sort order ${sortOrderId} not found in metadata`) + } + const comparator = buildSortComparator(sortOrder, schema) + const appliedSortOrderId = comparator ? orderId : 0 + const groups = partitionSpec.fields.length ? groupByPartition(records, schema, partitionSpec) : [{ partition: {}, records }] const writtenDataFiles = await Promise.all(groups.map(async group => { + // Sort a copy so the caller's array is untouched; a stable sort keeps + // input order among equal sort keys. + const sortedRecords = comparator ? [...group.records].sort(comparator) : group.records const dataPath = `${tableUrl}/data/${uuid4()}.parquet` const dataWriter = writerFn(dataPath) - await writeParquet({ writer: dataWriter, schema, records: group.records, codec }) - const stats = computeColumnStats(group.records, schema) + await writeParquet({ writer: dataWriter, schema, records: sortedRecords, codec }) + const stats = computeColumnStats(sortedRecords, schema) return { partition: group.partition, - records: group.records, + records: sortedRecords, dataFile: { content: /** @type {0} */ (0), file_path: dataPath, file_format: /** @type {'parquet'} */ ('parquet'), partition: group.partition, - record_count: BigInt(group.records.length), + record_count: BigInt(sortedRecords.length), file_size_in_bytes: BigInt(dataWriter.offset), value_counts: stats.value_counts, null_value_counts: stats.null_value_counts, nan_value_counts: stats.nan_value_counts, lower_bounds: stats.lower_bounds, upper_bounds: stats.upper_bounds, - sort_order_id: 0, + sort_order_id: appliedSortOrderId, }, path: dataPath, } @@ -220,10 +241,11 @@ export async function stageSnapshotForAppend({ tableUrl, metadata, prepared, res * @param {TableMetadata} options.metadata * @param {Record[]} options.records * @param {Resolver} options.resolver + * @param {number} [options.sortOrderId] - Sort order id to apply; defaults to the table default. * @returns {Promise} */ -export async function icebergStageAppend({ tableUrl, metadata, records, resolver }) { - const prepared = await prepareAppend({ tableUrl, metadata, records, resolver }) +export async function icebergStageAppend({ tableUrl, metadata, records, resolver, sortOrderId }) { + const prepared = await prepareAppend({ tableUrl, metadata, records, resolver, sortOrderId }) const staged = await stageSnapshotForAppend({ tableUrl, metadata, prepared, resolver }) // Surface the prepare-phase writes alongside the manifest list so callers // can clean up everything on commit failure. diff --git a/src/write/stats.js b/src/write/stats.js index 53e8dd2..2d72257 100644 --- a/src/write/stats.js +++ b/src/write/stats.js @@ -1,5 +1,6 @@ import { typeName } from '../schema.js' import { computeGeoBounds, isGeoType } from './geospatial.js' +import { compare, serializeValue } from './serde.js' /** * @import {FieldSummary, IcebergType, Schema} from '../../src/types.js' @@ -102,196 +103,6 @@ export function computeColumnStats(records, schema) { return { value_counts, null_value_counts, nan_value_counts, lower_bounds, upper_bounds } } -/** - * Compare two non-null values of the given iceberg type. - * - * @param {any} a - * @param {any} b - * @param {IcebergType} type - * @returns {number} - */ -function compare(a, b, type) { - switch (typeName(type)) { - case 'boolean': - return (a ? 1 : 0) - (b ? 1 : 0) - case 'int': - return a < b ? -1 : a > b ? 1 : 0 - case 'float': - case 'double': - return compareFloating(a, b) - case 'long': { - const ai = typeof a === 'bigint' ? a : BigInt(a) - const bi = typeof b === 'bigint' ? b : BigInt(b) - return ai < bi ? -1 : ai > bi ? 1 : 0 - } - case 'timestamp': - case 'timestamptz': - return compareBigInt(timestampToMicros(a), timestampToMicros(b)) - case 'timestamp_ns': - case 'timestamptz_ns': - return compareBigInt(timestampToNanos(a), timestampToNanos(b)) - case 'string': - return a < b ? -1 : a > b ? 1 : 0 - case 'binary': - case 'uuid': - return compareBytes(a, b) - default: - if (typeName(type).startsWith('fixed[')) return compareBytes(a, b) - return a < b ? -1 : a > b ? 1 : 0 - } -} - -/** - * Floating bounds must preserve -0.0 and +0.0 distinctly, with -0.0 ordered - * below +0.0. NaNs are counted separately and not compared for bounds. - * - * @param {number} a - * @param {number} b - * @returns {number} - */ -function compareFloating(a, b) { - if (Object.is(a, b)) return 0 - if (a === 0 && b === 0) return Object.is(a, -0) ? -1 : 1 - return a < b ? -1 : a > b ? 1 : 0 -} - -/** - * @param {bigint} a - * @param {bigint} b - * @returns {number} - */ -function compareBigInt(a, b) { - return a < b ? -1 : a > b ? 1 : 0 -} - -/** - * @param {any} value - * @returns {bigint} - */ -function timestampToMicros(value) { - return typeof value === 'bigint' ? value - : value instanceof Date ? BigInt(value.getTime()) * 1000n - : BigInt(value) -} - -/** - * @param {any} value - * @returns {bigint} - */ -function timestampToNanos(value) { - return typeof value === 'bigint' ? value - : value instanceof Date ? BigInt(value.getTime()) * 1000000n - : BigInt(value) -} - -/** - * Lexicographic unsigned-byte comparison. - * - * @param {Uint8Array} a - * @param {Uint8Array} b - * @returns {number} - */ -function compareBytes(a, b) { - const len = Math.min(a.length, b.length) - for (let i = 0; i < len; i++) { - if (a[i] !== b[i]) return a[i] - b[i] - } - return a.length - b.length -} - -/** - * Serialize a value per the Iceberg single-value serialization spec. - * Returns undefined for types we don't yet support so the bound is omitted. - * - * @param {any} value - * @param {IcebergType} type - * @returns {Uint8Array|undefined} - */ -function serializeValue(value, type) { - const name = typeName(type) - if (name.startsWith('decimal(')) { - const m = /^decimal\((\d+),\s*(\d+)\)$/.exec(name) - if (!m) return undefined - const scale = parseInt(m[2], 10) - if (typeof value !== 'number' && typeof value !== 'bigint') return undefined - const factor = 10n ** BigInt(scale) - const unscaled = typeof value === 'bigint' - ? value * factor - : BigInt(Math.round(value * Number(factor))) - return twosComplementMinBigEndian(unscaled) - } - if (name.startsWith('fixed[')) { - return value instanceof Uint8Array ? value : undefined - } - switch (name) { - case 'boolean': { - return new Uint8Array([value ? 1 : 0]) - } - case 'int': { - const buf = new ArrayBuffer(4) - new DataView(buf).setInt32(0, value, true) - return new Uint8Array(buf) - } - case 'long': { - const buf = new ArrayBuffer(8) - new DataView(buf).setBigInt64(0, typeof value === 'bigint' ? value : BigInt(value), true) - return new Uint8Array(buf) - } - case 'float': { - const buf = new ArrayBuffer(4) - new DataView(buf).setFloat32(0, value, true) - return new Uint8Array(buf) - } - case 'double': { - const buf = new ArrayBuffer(8) - new DataView(buf).setFloat64(0, value, true) - return new Uint8Array(buf) - } - case 'date': { - // days since epoch, 4-byte little-endian int32 - const days = value instanceof Date - ? Math.floor(value.getTime() / 86400000) - : Number(value) - const buf = new ArrayBuffer(4) - new DataView(buf).setInt32(0, days, true) - return new Uint8Array(buf) - } - case 'time': { - // microseconds since midnight, 8-byte little-endian int64 - const buf = new ArrayBuffer(8) - new DataView(buf).setBigInt64(0, typeof value === 'bigint' ? value : BigInt(value), true) - return new Uint8Array(buf) - } - case 'timestamp': - case 'timestamptz': { - // micros since epoch, 8-byte little-endian - const buf = new ArrayBuffer(8) - new DataView(buf).setBigInt64(0, timestampToMicros(value), true) - return new Uint8Array(buf) - } - case 'timestamp_ns': - case 'timestamptz_ns': { - // nanos since epoch, 8-byte little-endian - const buf = new ArrayBuffer(8) - new DataView(buf).setBigInt64(0, timestampToNanos(value), true) - return new Uint8Array(buf) - } - case 'string': { - return new TextEncoder().encode(value) - } - case 'binary': { - return value instanceof Uint8Array ? value : undefined - } - case 'uuid': { - if (value instanceof Uint8Array && value.length === 16) return value - if (typeof value === 'string') return uuidStringToBytes(value) - return undefined - } - default: - return undefined - } -} - /** * Return whether Icebird can produce Iceberg lower/upper bounds for a type. * Geometry/geography are handled separately via `computeGeoBounds`. @@ -419,41 +230,3 @@ function truncateUpper(value, type) { return value } -/** - * Encode a signed bigint as the minimum number of bytes in two's-complement - * big-endian form. Matches the Iceberg single-value serialization for - * decimals. - * - * @param {bigint} value - * @returns {Uint8Array} - */ -function twosComplementMinBigEndian(value) { - const bytes = [] - let v = value - while (true) { - const byte = Number(v & 0xffn) - bytes.unshift(byte) - v >>= 8n - const sign = byte & 0x80 - if (!sign && v === 0n || sign && v === -1n) break - } - return new Uint8Array(bytes) -} - -/** - * Parse a canonical UUID string to its 16 big-endian bytes. - * - * @param {string} s - * @returns {Uint8Array|undefined} - */ -function uuidStringToBytes(s) { - const hex = s.replace(/-/g, '') - if (hex.length !== 32) return undefined - const out = new Uint8Array(16) - for (let i = 0; i < 16; i++) { - const byte = parseInt(hex.slice(i * 2, i * 2 + 2), 16) - if (Number.isNaN(byte)) return undefined - out[i] = byte - } - return out -} diff --git a/src/write/write.js b/src/write/write.js index 933b9d7..d12377f 100644 --- a/src/write/write.js +++ b/src/write/write.js @@ -6,6 +6,7 @@ import { applyUpdates, fileCatalogCommit } from './commit.js' import { icebergStageDeletionVector } from './stage-deletion-vector.js' import { icebergStagePositionDelete } from './stage-position-delete.js' import { icebergStageAppend, icebergStageExpireSnapshots, icebergStageSetRef, prepareAppend, stageSnapshotForAppend } from './stage.js' +import { icebergStageRewrite } from './rewrite.js' /** * @import {Catalog, IcebergTransaction, Lister, PartitionSpec, Resolver, Schema, Snapshot, SortOrder, StagedUpdate, TableMetadata, TableRequirement, TableUpdate} from '../../src/types.js' @@ -30,9 +31,10 @@ const DEFAULT_RETRY = Object.freeze({ * @param {string} [options.tableUrl] - File catalog only. * @param {Resolver} [options.resolver] * @param {Record[]} options.records + * @param {number} [options.sortOrderId] - Sort order id to apply; defaults to the table default. * @returns {Promise} */ -export async function icebergAppend({ catalog, namespace, table, tableUrl, resolver, records }) { +export async function icebergAppend({ catalog, namespace, table, tableUrl, resolver, records, sortOrderId }) { const ctx = await loadTable({ catalog, namespace, table, tableUrl, resolver }) // Spec v3 §"Manifest Inheritance": data and manifest files do NOT need to // be rewritten on optimistic-commit retry. Prepare them once outside the @@ -44,6 +46,7 @@ export async function icebergAppend({ catalog, namespace, table, tableUrl, resol metadata: ctx.metadata, records, resolver: requireResolver(ctx.resolver, 'icebergAppend'), + sortOrderId, }) return await commitWithRetry({ catalog, target: { namespace, table }, ctx, @@ -56,6 +59,42 @@ export async function icebergAppend({ catalog, namespace, table, tableUrl, resol }) } +/** + * Compact / rewrite a table in one call: load metadata, read every live row + * (deletes applied), rewrite the data sorted by the declared sort order under + * the target partition spec, and commit a `replace` snapshot. + * + * Unlike {@link icebergAppend}, a rewrite is NOT retried on a concurrent-commit + * conflict: it only rewrote the rows it read, so blindly retrying could drop + * rows another writer appended in the meantime. On conflict the underlying + * commit error surfaces and the caller should re-run the rewrite against fresh + * metadata. + * + * @param {object} options + * @param {Catalog} options.catalog + * @param {string | string[]} [options.namespace] - REST catalog only. + * @param {string} [options.table] - REST catalog only. + * @param {string} [options.tableUrl] - File catalog only. + * @param {Resolver} [options.resolver] + * @param {number} [options.sortOrderId] - Sort order id to apply; defaults to the table default. + * @param {number} [options.partitionSpecId] - Target partition spec id; defaults to `default-spec-id`. + * @param {number} [options.targetFileRows] - Max rows per output file. + * @returns {Promise} + */ +export async function icebergRewrite({ catalog, namespace, table, tableUrl, resolver, sortOrderId, partitionSpecId, targetFileRows }) { + const ctx = await loadTable({ catalog, namespace, table, tableUrl, resolver }) + const staged = await icebergStageRewrite({ + tableUrl: ctx.tableUrl, + metadata: ctx.metadata, + resolver: requireResolver(ctx.resolver, 'icebergRewrite'), + sortOrderId, + partitionSpecId, + targetFileRows, + }) + // Single commit attempt: see the doc comment on why a rewrite must not retry. + return await commitStaged(catalog, { namespace, table }, ctx, staged) +} + /** * Apply row-level position deletes in one call. Picks the v3 puffin deletion * vector path on format-version 3 and the v2 parquet position-delete path on @@ -232,13 +271,14 @@ export async function icebergTransaction({ catalog, namespace, table, tableUrl, /** @type {IcebergTransaction} */ const tx = { - async append({ records }) { + async append({ records, sortOrderId }) { const writer = requireResolver(ctx.resolver, 'icebergTransaction.append') const staged = await icebergStageAppend({ tableUrl: ctx.tableUrl, metadata: workingMetadata, records, resolver: writer, + sortOrderId, }) mergeStaged(staged) }, diff --git a/test/prune.bounds.test.js b/test/prune.bounds.test.js new file mode 100644 index 0000000..bf4a52c --- /dev/null +++ b/test/prune.bounds.test.js @@ -0,0 +1,175 @@ +import { describe, expect, it } from 'vitest' +import { fileMightMatch } from '../src/prune.js' +import { serializeValue } from '../src/write/serde.js' + +/** + * @import {IcebergType, ManifestEntry, Schema} from '../src/types.js' + */ + +/** @type {Schema} */ +const schema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { id: 2, name: 'name', required: false, type: 'string' }, + { id: 3, name: 'ts', required: false, type: 'timestamp' }, + { id: 4, name: 'price', required: false, type: 'double' }, + { id: 5, name: 'amount', required: false, type: 'decimal(10, 2)' }, + ], +} + +/** + * Build a manifest entry whose lower/upper bounds are the Iceberg-serialized + * [min, max] for the given fields. `shape` controls the decoded map layout: + * 'array' mimics the real read path (Avro int-keyed map -> array of {key,value}), + * 'object' mimics a hand-built Record. + * + * @param {Record} bounds + * @param {'array'|'object'} [shape] + * @returns {ManifestEntry} + */ +function entry(bounds, shape = 'array') { + const lowerEntries = [] + const upperEntries = [] + /** @type {Record} */ + const lowerObj = {} + /** @type {Record} */ + const upperObj = {} + for (const [id, { min, max, type }] of Object.entries(bounds)) { + const lo = serializeValue(min, type) + const hi = serializeValue(max, type) + if (lo) { lowerEntries.push({ key: Number(id), value: lo }); lowerObj[Number(id)] = lo } + if (hi) { upperEntries.push({ key: Number(id), value: hi }); upperObj[Number(id)] = hi } + } + return /** @type {ManifestEntry} */ ({ + status: 1, + data_file: shape === 'array' + ? { lower_bounds: lowerEntries, upper_bounds: upperEntries } + : { lower_bounds: lowerObj, upper_bounds: upperObj }, + }) +} + +describe('fileMightMatch — numeric range/equality', () => { + // id (long) in [1, 5] + const e = entry({ 1: { min: 1n, max: 5n, type: 'long' } }) + + it('range predicates skip vs keep at boundaries', () => { + expect(fileMightMatch({ id: { $gt: 5n } }, e, schema)).toBe(false) // max 5, nothing > 5 + expect(fileMightMatch({ id: { $gte: 5n } }, e, schema)).toBe(true) // 5 qualifies + expect(fileMightMatch({ id: { $lt: 1n } }, e, schema)).toBe(false) // min 1, nothing < 1 + expect(fileMightMatch({ id: { $lte: 1n } }, e, schema)).toBe(true) + expect(fileMightMatch({ id: { $gt: 4n } }, e, schema)).toBe(true) // 5 > 4 + expect(fileMightMatch({ id: { $lt: 2n } }, e, schema)).toBe(true) // 1 < 2 + }) + + it('equality skips out-of-range, keeps in-range', () => { + expect(fileMightMatch({ id: { $eq: 3n } }, e, schema)).toBe(true) + expect(fileMightMatch({ id: { $eq: 1n } }, e, schema)).toBe(true) + expect(fileMightMatch({ id: { $eq: 5n } }, e, schema)).toBe(true) + expect(fileMightMatch({ id: { $eq: 0n } }, e, schema)).toBe(false) + expect(fileMightMatch({ id: { $eq: 9n } }, e, schema)).toBe(false) + }) + + it('bare value is treated as equality', () => { + expect(fileMightMatch({ id: 9n }, e, schema)).toBe(false) + expect(fileMightMatch({ id: 3n }, e, schema)).toBe(true) + }) + + it('$in keeps if any value is in range, skips if all out', () => { + expect(fileMightMatch({ id: { $in: [9n, 10n] } }, e, schema)).toBe(false) + expect(fileMightMatch({ id: { $in: [3n, 9n] } }, e, schema)).toBe(true) + }) + + it('$ne / $nin never prune', () => { + expect(fileMightMatch({ id: { $ne: 3n } }, e, schema)).toBe(true) + expect(fileMightMatch({ id: { $nin: [3n] } }, e, schema)).toBe(true) + }) + + it('works with object-shaped bound maps too', () => { + const eo = entry({ 1: { min: 1n, max: 5n, type: 'long' } }, 'object') + expect(fileMightMatch({ id: { $gt: 5n } }, eo, schema)).toBe(false) + expect(fileMightMatch({ id: { $eq: 3n } }, eo, schema)).toBe(true) + }) +}) + +describe('fileMightMatch — double with mixed numeric literals', () => { + const e = entry({ 4: { min: 1.5, max: 9.99, type: 'double' } }) + + it('prunes price > 0 only when out of range', () => { + expect(fileMightMatch({ price: { $gt: 0n } }, e, schema)).toBe(true) // 9.99 > 0 + expect(fileMightMatch({ price: { $gt: 10n } }, e, schema)).toBe(false) // max 9.99 + expect(fileMightMatch({ price: { $lt: 1n } }, e, schema)).toBe(false) // min 1.5 + }) + + it('prunes the all-non-positive file', () => { + const neg = entry({ 4: { min: -5.0, max: 0.0, type: 'double' } }) + expect(fileMightMatch({ price: { $gt: 0n } }, neg, schema)).toBe(false) + }) +}) + +describe('fileMightMatch — timestamp with Date literal', () => { + // ts in micros for 2022-01-01 .. 2022-06-01 + const lo = BigInt(Date.parse('2022-01-01')) * 1000n + const hi = BigInt(Date.parse('2022-06-01')) * 1000n + const e = entry({ 3: { min: lo, max: hi, type: 'timestamp' } }) + + it('normalizes Date literal to the bound domain', () => { + expect(fileMightMatch({ ts: { $gt: new Date('2022-07-01') } }, e, schema)).toBe(false) + expect(fileMightMatch({ ts: { $gt: new Date('2022-03-01') } }, e, schema)).toBe(true) + expect(fileMightMatch({ ts: { $lt: new Date('2021-01-01') } }, e, schema)).toBe(false) + }) +}) + +describe('fileMightMatch — decimal', () => { + const e = entry({ 5: { min: 10.0, max: 20.0, type: 'decimal(10, 2)' } }) + + it('range prunes decimals', () => { + expect(fileMightMatch({ amount: { $gt: 20n } }, e, schema)).toBe(false) + expect(fileMightMatch({ amount: { $lt: 10n } }, e, schema)).toBe(false) + expect(fileMightMatch({ amount: { $eq: 15n } }, e, schema)).toBe(true) + }) +}) + +describe('fileMightMatch — conservative cases (always keep)', () => { + it('string range never prunes (truncated bounds)', () => { + const e = entry({ 2: { min: 'aaa', max: 'mmm', type: 'string' } }) + expect(fileMightMatch({ name: { $gt: 'zzz' } }, e, schema)).toBe(true) + expect(fileMightMatch({ name: { $eq: 'qqq' } }, e, schema)).toBe(true) + expect(fileMightMatch({ name: { $lt: 'aaa' } }, e, schema)).toBe(true) + }) + + it('missing bounds for the predicated column keeps the file', () => { + const e = entry({ 1: { min: 1n, max: 5n, type: 'long' } }) + // predicate on price, which has no bounds in this entry + expect(fileMightMatch({ price: { $gt: 1000n } }, e, schema)).toBe(true) + }) + + it('no bounds at all keeps the file', () => { + const e = /** @type {ManifestEntry} */ ({ status: 1, data_file: {} }) + expect(fileMightMatch({ id: { $gt: 1000n } }, e, schema)).toBe(true) + }) + + it('unknown column keeps the file', () => { + const e = entry({ 1: { min: 1n, max: 5n, type: 'long' } }) + expect(fileMightMatch({ nope: { $eq: 1n } }, e, schema)).toBe(true) + }) +}) + +describe('fileMightMatch — boolean combinators', () => { + const e = entry({ 1: { min: 1n, max: 5n, type: 'long' } }) + + it('$and rules out the file if any branch rules it out', () => { + expect(fileMightMatch({ $and: [{ id: { $gte: 1n } }, { id: { $gt: 5n } }] }, e, schema)).toBe(false) + expect(fileMightMatch({ $and: [{ id: { $gte: 1n } }, { id: { $lte: 5n } }] }, e, schema)).toBe(true) + }) + + it('$or keeps the file if any branch can match', () => { + expect(fileMightMatch({ $or: [{ id: { $gt: 5n } }, { id: { $eq: 3n } }] }, e, schema)).toBe(true) + expect(fileMightMatch({ $or: [{ id: { $gt: 5n } }, { id: { $lt: 1n } }] }, e, schema)).toBe(false) + }) + + it('$nor never prunes', () => { + expect(fileMightMatch({ $nor: [{ id: { $eq: 3n } }] }, e, schema)).toBe(true) + }) +}) diff --git a/test/serde.test.js b/test/serde.test.js new file mode 100644 index 0000000..4b1ed41 --- /dev/null +++ b/test/serde.test.js @@ -0,0 +1,121 @@ +import { describe, expect, it } from 'vitest' +import { deserializeValue, serializeValue } from '../src/write/serde.js' + +/** + * @import {IcebergType} from '../src/types.js' + */ + +/** + * @param {any} value + * @param {IcebergType} type + * @returns {any} + */ +function roundtrip(value, type) { + const bytes = serializeValue(value, type) + expect(bytes).toBeInstanceOf(Uint8Array) + return deserializeValue(/** @type {Uint8Array} */ (bytes), type) +} + +describe('serde round-trip', () => { + it('boolean', () => { + expect(roundtrip(true, 'boolean')).toBe(true) + expect(roundtrip(false, 'boolean')).toBe(false) + }) + + it('int', () => { + expect(roundtrip(0, 'int')).toBe(0) + expect(roundtrip(42, 'int')).toBe(42) + expect(roundtrip(-7, 'int')).toBe(-7) + expect(roundtrip(2147483647, 'int')).toBe(2147483647) + expect(roundtrip(-2147483648, 'int')).toBe(-2147483648) + }) + + it('long returns bigint', () => { + expect(roundtrip(0n, 'long')).toBe(0n) + expect(roundtrip(123n, 'long')).toBe(123n) + expect(roundtrip(-123n, 'long')).toBe(-123n) + expect(roundtrip(9223372036854775807n, 'long')).toBe(9223372036854775807n) + expect(roundtrip(-9223372036854775808n, 'long')).toBe(-9223372036854775808n) + }) + + it('float and double, with -0/+0', () => { + expect(roundtrip(1.5, 'float')).toBeCloseTo(1.5, 5) + expect(roundtrip(1.5, 'double')).toBe(1.5) + expect(roundtrip(-1.25, 'double')).toBe(-1.25) + // -0 preserves sign through IEEE bits + expect(Object.is(roundtrip(-0, 'double'), -0)).toBe(true) + expect(Object.is(roundtrip(0, 'double'), 0)).toBe(true) + }) + + it('date as days-since-epoch number', () => { + expect(roundtrip(0, 'date')).toBe(0) + expect(roundtrip(19000, 'date')).toBe(19000) + expect(roundtrip(-1, 'date')).toBe(-1) + }) + + it('timestamp / timestamptz as micros bigint', () => { + expect(roundtrip(1700000000000000n, 'timestamp')).toBe(1700000000000000n) + expect(roundtrip(1700000000000000n, 'timestamptz')).toBe(1700000000000000n) + }) + + it('timestamp_ns as nanos bigint', () => { + expect(roundtrip(1700000000000000000n, 'timestamp_ns')).toBe(1700000000000000000n) + }) + + it('time as micros bigint', () => { + expect(roundtrip(3600000000n, 'time')).toBe(3600000000n) + }) + + it('string', () => { + expect(roundtrip('', 'string')).toBe('') + expect(roundtrip('hello', 'string')).toBe('hello') + expect(roundtrip('💧 unicode', 'string')).toBe('💧 unicode') + }) + + it('binary returns the same bytes', () => { + const v = new Uint8Array([1, 2, 3, 255]) + expect(roundtrip(v, 'binary')).toEqual(v) + }) + + it('fixed returns the same bytes', () => { + const v = new Uint8Array([9, 8, 7, 6]) + expect(roundtrip(v, 'fixed[4]')).toEqual(v) + }) + + it('uuid round-trips string to bytes', () => { + const s = '12345678-1234-5678-1234-567812345678' + const bytes = serializeValue(s, 'uuid') + expect(bytes).toBeInstanceOf(Uint8Array) + expect(deserializeValue(/** @type {Uint8Array} */ (bytes), 'uuid')).toEqual(bytes) + }) + + it('decimal round-trips numerically', () => { + expect(roundtrip(12.34, 'decimal(10, 2)')).toBeCloseTo(12.34, 6) + expect(roundtrip(-5.5, 'decimal(10, 2)')).toBeCloseTo(-5.5, 6) + expect(roundtrip(0, 'decimal(10, 2)')).toBe(0) + expect(roundtrip(999.99, 'decimal(8, 2)')).toBeCloseTo(999.99, 6) + }) +}) + +describe('serde truncated bounds decode to stored prefix', () => { + it('string decodes to the (possibly truncated) stored bytes', () => { + // serializeValue does NOT truncate; truncation is applied upstream in + // stats.js. A pre-truncated 16-char prefix decodes back to that prefix. + const prefix = 'abcdefghijklmnop' // 16 chars + expect(roundtrip(prefix, 'string')).toBe(prefix) + }) +}) + +describe('serde decode failures return undefined', () => { + it('non-Uint8Array input', () => { + // @ts-expect-error intentional bad input + expect(deserializeValue('not bytes', 'int')).toBeUndefined() + }) + it('unsupported type', () => { + expect(deserializeValue(new Uint8Array([1]), 'variant')).toBeUndefined() + }) + it('truncated numeric buffer', () => { + // 2 bytes is too short for an int32 read -> DataView throws -> undefined + expect(deserializeValue(new Uint8Array([1, 2]), 'int')).toBeUndefined() + }) +}) diff --git a/test/sql/icebergDataSource.test.js b/test/sql/icebergDataSource.test.js index fd341ca..0b371a3 100644 --- a/test/sql/icebergDataSource.test.js +++ b/test/sql/icebergDataSource.test.js @@ -405,11 +405,13 @@ describe.concurrent('icebergDataSource partition pruning', () => { expect(resolver.dataFilesRead()).toBe(0) }) - it('does not prune on a non-partition predicate', async () => { + it('prunes a non-partition predicate via manifest column bounds', async () => { const resolver = countingResolver(localResolver('test/files')) const source = await icebergDataSource({ tableUrl, resolver, metadataFileName: 'v2.metadata.json' }) - // `price` is not a partition source, so every data file must be read. + // `price` is not a partition source, so partition pruning keeps every file. + // File-level bounds pruning (#20) still skips the data file whose price + // lower/upper bounds prove no row can be > 0, without changing the result. const where = /** @type {ExprNode} */ ({ type: 'binary', op: '>', left: { type: 'identifier', name: 'price' }, right: { type: 'literal', value: 0n }, }) @@ -417,6 +419,8 @@ describe.concurrent('icebergDataSource partition pruning', () => { for await (const row of source.scan({ where }).rows()) out.push(row.resolved) expect(out.map(r => r?.id).sort()).toEqual([1, 4]) - expect(resolver.dataFilesRead()).toBe(3) + // The id_bucket=3 file (id 3) has price <= 0, so its bounds prune it; the + // other two files are read. + expect(resolver.dataFilesRead()).toBe(2) }) }) diff --git a/test/sql/scanPruning.test.js b/test/sql/scanPruning.test.js new file mode 100644 index 0000000..850b1e5 --- /dev/null +++ b/test/sql/scanPruning.test.js @@ -0,0 +1,291 @@ +import { describe, expect, it, vi } from 'vitest' +import { ByteWriter, parquetWrite } from 'hyparquet-writer' +import { fileCatalogCommit } from '../../src/write/commit.js' +import { icebergCreate } from '../../src/create.js' +import { readDataFile } from '../../src/read.js' +import { icebergStageAppend } from '../../src/write/stage.js' +import { icebergDataSource } from '../../src/sql/icebergDataSource.js' +import { memResolver } from '../helpers.js' + +/** + * @import {AsyncBuffer} from 'hyparquet' + * @import {ExprNode} from 'squirreling' + * @import {ManifestEntry, Resolver, Schema, TableMetadata} from '../../src/types.js' + */ + +/** + * @param {string} url + * @returns {boolean} + */ +function isDataUrl(url) { + return /\/data\/.*\.parquet$/.test(url) +} + +/** + * Wrap a resolver, counting bytes sliced from data parquet files and the set + * of distinct data files opened. + * + * @param {Resolver} inner + * @returns {{ resolver: Resolver, dataBytes: () => number, dataFilesRead: () => number }} + */ +function countingResolver(inner) { + let dataBytes = 0 + /** @type {Set} */ + const dataFiles = new Set() + /** @type {Resolver} */ + const resolver = { + async reader(url, byteLength) { + const r = await inner.reader(url, byteLength) + if (!isDataUrl(url)) return r + dataFiles.add(url) + return { + byteLength: r.byteLength, + slice(s, e) { + dataBytes += (e ?? r.byteLength) - (s ?? 0) + return r.slice(s, e) + }, + } + }, + writer: inner.writer ? inner.writer.bind(inner) : undefined, + deleter: inner.deleter ? inner.deleter.bind(inner) : undefined, + } + return { resolver, dataBytes: () => dataBytes, dataFilesRead: () => dataFiles.size } +} + +/** + * @param {string} column + * @param {string} op + * @param {any} value + * @returns {ExprNode} + */ +function cmp(column, op, value) { + return /** @type {ExprNode} */ ({ + type: 'binary', op, + left: { type: 'identifier', name: column }, + right: { type: 'literal', value }, + }) +} + +describe('#20 file-level bounds pruning (icebird-written)', () => { + /** @type {Schema} */ + const schema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { id: 2, name: 'v', required: false, type: 'int' }, + ], + } + + /** + * Build a 3-file unpartitioned table with disjoint id ranges, returning a + * counting resolver bound to the written bytes. + * + * @returns {Promise<{ tableUrl: string, resolver: Resolver, dataFilesRead: () => number }>} + */ + async function build3FileTable() { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'mem://prune20' + const { resolver: memR } = memResolver() + let metadata = await icebergCreate({ tableUrl, resolver: memR, schema }) + for (const base of [0, 100, 1000]) { + const records = [] + for (let i = 0; i < 10; i++) records.push({ id: BigInt(base + i), v: base + i }) + const staged = await icebergStageAppend({ tableUrl, metadata, records, resolver: memR }) + metadata = await fileCatalogCommit({ tableUrl, metadata, staged, resolver: memR }) + } + const { resolver, dataFilesRead } = countingResolver({ reader: memR.reader }) + return { tableUrl, resolver, dataFilesRead } + } + + it('opens only the data file whose id bounds can match a range predicate', async () => { + const { tableUrl, resolver, dataFilesRead } = await build3FileTable() + const source = await icebergDataSource({ tableUrl, resolver }) + const out = [] + for await (const row of source.scan({ where: cmp('id', '>=', 1000n) }).rows()) out.push(row.resolved) + + expect(out.map(r => r?.id).sort((a, b) => Number(a - b))) + .toEqual([1000n, 1001n, 1002n, 1003n, 1004n, 1005n, 1006n, 1007n, 1008n, 1009n]) + expect(dataFilesRead()).toBe(1) + }) + + it('opens only the matching file for an equality predicate', async () => { + const { tableUrl, resolver, dataFilesRead } = await build3FileTable() + const source = await icebergDataSource({ tableUrl, resolver }) + const out = [] + for await (const row of source.scan({ where: cmp('id', '=', 105n) }).rows()) out.push(row.resolved) + + expect(out).toEqual([{ id: 105n, v: 105 }]) + expect(dataFilesRead()).toBe(1) + }) + + it('reads all files when the predicate spans every file', async () => { + const { tableUrl, resolver, dataFilesRead } = await build3FileTable() + const source = await icebergDataSource({ tableUrl, resolver }) + const out = [] + for await (const row of source.scan({ where: cmp('id', '>=', 0n) }).rows()) out.push(row.resolved) + + expect(out.length).toBe(30) + expect(dataFilesRead()).toBe(3) + }) +}) + +describe('#21 row-group pruning (readDataFile)', () => { + /** @type {Schema} */ + const schemaV = { + type: 'struct', + 'schema-id': 0, + fields: [{ id: 1, name: 'v', required: false, type: 'int' }], + } + /** @type {Schema} */ + const schemaVP = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'v', required: false, type: 'int' }, + { id: 2, name: 'payload', required: false, type: 'string' }, + ], + } + + /** + * Write a single parquet file with `rowGroupSize` rows per group (so the + * file has many disjoint-range row groups), into a counting resolver. When + * `payloadBytes` > 0 a large string column is added so the file exceeds + * hyparquet's 512KB whole-file prefetch threshold and per-row-group byte + * reads become observable. + * + * @param {object} opts + * @param {number} opts.count + * @param {number} opts.rowGroupSize + * @param {boolean} opts.statistics + * @param {number} [opts.payloadBytes] + * @returns {{ resolver: Resolver, dataBytes: () => number, dataEntry: ManifestEntry, metadata: TableMetadata, schema: Schema, count: number }} + */ + function writeFile({ count, rowGroupSize, statistics, payloadBytes = 0 }) { + const schema = payloadBytes > 0 ? schemaVP : schemaV + const values = [] + const payloads = [] + for (let i = 0; i < count; i++) { + values.push(i) + if (payloadBytes > 0) payloads.push(String(i).padEnd(payloadBytes, 'x')) + } + const columnData = [{ name: 'v', data: values }] + const parquetSchema = [ + { name: 'root', num_children: payloadBytes > 0 ? 2 : 1 }, + { name: 'v', type: 'INT32', repetition_type: 'OPTIONAL', field_id: 1 }, + ] + if (payloadBytes > 0) { + columnData.push({ name: 'payload', data: payloads }) + parquetSchema.push({ name: 'payload', type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type: 'OPTIONAL', field_id: 2 }) + } + const writer = new ByteWriter() + parquetWrite({ + writer, + columnData, + schema: parquetSchema, + kvMetadata: [{ key: 'iceberg.schema', value: JSON.stringify(schema) }], + codec: 'UNCOMPRESSED', + statistics, + rowGroupSize, + }) + const bytes = writer.getBytes() + const filePath = 'mem://data/file.parquet' + /** @type {Resolver} */ + const inner = { + reader() { + return { + byteLength: bytes.byteLength, + slice(start, end = bytes.byteLength) { + const s = bytes.subarray(start, end) + return s.buffer.slice(s.byteOffset, s.byteOffset + s.byteLength) + }, + } + }, + } + const { resolver, dataBytes } = countingResolver(inner) + /** @type {ManifestEntry} */ + const dataEntry = { + status: 1, + sequence_number: 0n, + partition_spec_id: 0, + data_file: { + content: 0, + file_path: filePath, + file_format: 'parquet', + partition: {}, + record_count: BigInt(count), + file_size_in_bytes: BigInt(bytes.byteLength), + }, + } + /** @type {TableMetadata} */ + const metadata = { + 'format-version': 2, + 'table-uuid': 'test', + location: 'mem://table', + 'last-sequence-number': 0, + 'last-updated-ms': 0, + 'last-column-id': payloadBytes > 0 ? 2 : 1, + 'current-schema-id': 0, + schemas: [schema], + 'default-spec-id': 0, + 'partition-specs': [{ 'spec-id': 0, fields: [] }], + 'sort-orders': [{ 'order-id': 0, fields: [] }], + 'default-sort-order-id': 0, + } + return { resolver, dataBytes, dataEntry, metadata, schema, count } + } + + /** + * @param {{ resolver: Resolver, dataEntry: ManifestEntry, metadata: TableMetadata, schema: Schema, count: number }} f + * @param {any} filter + * @returns {Promise} + */ + async function read(f, filter) { + const rows = [] + for await (const batch of readDataFile({ + dataEntry: f.dataEntry, + fileRowStart: 0, + fileRowEnd: f.count, + schema: f.schema, + metadata: f.metadata, + resolver: f.resolver, + rowLineage: false, + positionDeletesMap: new Map(), + equalityDeleteGroups: [], + filter, + })) { + for (const row of batch) rows.push(row) + } + return rows + } + + it('reads fewer bytes for a selective predicate on a multi-row-group file', async () => { + // 8 single-row groups, each carrying a ~100KB payload so the file exceeds + // the 512KB whole-file prefetch threshold; v ranges are [0]..[7]. + const full = writeFile({ count: 8, rowGroupSize: 1, statistics: true, payloadBytes: 100_000 }) + const fullRows = await read(full, undefined) + + const selective = writeFile({ count: 8, rowGroupSize: 1, statistics: true, payloadBytes: 100_000 }) + // v >= 7 matches only the last row group; the other 7 are pruned by stats, + // so their ~100KB payload chunks are never fetched. + const selRows = await read(selective, { v: { $gte: 7 } }) + + expect(fullRows.length).toBe(8) + expect(selRows.length).toBe(1) + expect(selRows[0].v).toBe(7) + // Reads roughly one row group instead of eight. + expect(selective.dataBytes()).toBeLessThan(full.dataBytes() / 2) + }) + + it('falls back to a full decode (correctly) when row-group stats are absent', async () => { + // Many single-row groups, NO statistics: hyparquet cannot stat-skip, so it + // must decode every group and match per-row. Result must still be correct. + const f = writeFile({ count: 50, rowGroupSize: 1, statistics: false }) + const all = await read(f, undefined) + const filtered = await read(f, { v: { $gte: 25 } }) + + expect(all.length).toBe(50) + expect(filtered).toEqual(all.filter(r => r.v >= 25)) + expect(filtered.length).toBe(25) + }) +}) diff --git a/test/write/rewrite.test.js b/test/write/rewrite.test.js new file mode 100644 index 0000000..bb1aae4 --- /dev/null +++ b/test/write/rewrite.test.js @@ -0,0 +1,241 @@ +import { describe, expect, it, vi } from 'vitest' +import { fileCatalog } from '../../src/catalog/file.js' +import { fileCatalogCommit } from '../../src/write/commit.js' +import { icebergCreate } from '../../src/create.js' +import { icebergManifests, splitManifestEntries } from '../../src/manifest.js' +import { icebergRead } from '../../src/read.js' +import { icebergAppend, icebergRewrite } from '../../src/write/write.js' +import { icebergStageAppend } from '../../src/write/stage.js' +import { icebergStagePositionDelete } from '../../src/write/stage-position-delete.js' +import { icebergStageRewrite } from '../../src/write/rewrite.js' +import { deserializeValue } from '../../src/write/serde.js' +import { memResolver } from '../helpers.js' + +/** + * @import {ManifestEntry, Schema, SortOrder, TableMetadata} from '../../src/types.js' + */ + +/** @type {Schema} */ +const schema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { id: 2, name: 'name', required: false, type: 'string' }, + ], +} + +/** @type {SortOrder} */ +const sortById = { + 'order-id': 1, + fields: [{ transform: 'identity', 'source-id': 1, direction: 'asc', 'null-order': 'nulls-last' }], +} + +/** + * @param {any[]} a + * @returns {any[]} + */ +function byId(a) { + return [...a].sort((x, y) => Number(x.id - y.id)) +} + +/** + * Decode a long-typed bound from a (read-decoded) manifest entry. + * @param {ManifestEntry} entry + * @param {number} fieldId + * @param {'lower'|'upper'} side + * @returns {any} + */ +function longBound(entry, fieldId, side) { + const map = side === 'lower' ? entry.data_file.lower_bounds : entry.data_file.upper_bounds + if (!map) return undefined + const e = Array.isArray(map) ? map.find(x => Number(x.key) === fieldId) : undefined + const bytes = e ? e.value : /** @type {any} */ (map)[fieldId] + return bytes ? deserializeValue(bytes, 'long') : undefined +} + +/** + * Create a sorted, multi-file table and return its committed metadata. + * @param {object} [opts] + * @param {SortOrder} [opts.sortOrder] + * @returns {Promise<{ tableUrl: string, resolver: import('../../src/types.js').Resolver, metadata: TableMetadata }>} + */ +async function makeMultiFileTable({ sortOrder } = {}) { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'mem://rewrite' + const { resolver } = memResolver() + let metadata = await icebergCreate({ tableUrl, resolver, schema, sortOrder }) + const batches = [ + [{ id: 5n, name: 'e' }, { id: 2n, name: 'b' }], + [{ id: 1n, name: 'a' }, { id: 6n, name: 'f' }], + [{ id: 4n, name: 'd' }, { id: 3n, name: 'c' }], + ] + for (const records of batches) { + const staged = await icebergStageAppend({ tableUrl, metadata, records, resolver }) + metadata = await fileCatalogCommit({ tableUrl, metadata, staged, resolver }) + } + return { tableUrl, resolver, metadata } +} + +describe('icebergRewrite — round-trip and file consolidation', () => { + it('merges files, preserves rows (modulo order), and globally sorts output', async () => { + const { tableUrl, resolver, metadata } = await makeMultiFileTable({ sortOrder: sortById }) + + const before = await icebergRead({ tableUrl, metadata, resolver }) + const beforeManifests = await icebergManifests({ metadata, resolver }) + expect(splitManifestEntries(beforeManifests).dataEntries.length).toBe(3) + + const staged = await icebergStageRewrite({ tableUrl, metadata, resolver }) + const after = await fileCatalogCommit({ tableUrl, metadata, staged, resolver }) + + const rows = await icebergRead({ tableUrl, metadata: after, resolver }) + // Same multiset. + expect(byId(rows)).toEqual(byId(before)) + // Output is globally sorted by id and consolidated into a single file. + expect(rows.map(r => r.id)).toEqual([1n, 2n, 3n, 4n, 5n, 6n]) + const afterEntries = splitManifestEntries(await icebergManifests({ metadata: after, resolver })).dataEntries + expect(afterEntries.length).toBe(1) + expect(afterEntries[0].data_file.sort_order_id).toBe(1) + }) + + it('produces non-overlapping sort-key bounds across split output files', async () => { + const { tableUrl, resolver, metadata } = await makeMultiFileTable({ sortOrder: sortById }) + + const staged = await icebergStageRewrite({ tableUrl, metadata, resolver, targetFileRows: 2 }) + const after = await fileCatalogCommit({ tableUrl, metadata, staged, resolver }) + + const entries = splitManifestEntries(await icebergManifests({ metadata: after, resolver })).dataEntries + expect(entries.length).toBe(3) + const ranges = entries + .map(e => ({ lo: longBound(e, 1, 'lower'), hi: longBound(e, 1, 'upper') })) + .sort((a, b) => Number(a.lo - b.lo)) + expect(ranges).toEqual([ + { lo: 1n, hi: 2n }, { lo: 3n, hi: 4n }, { lo: 5n, hi: 6n }, + ]) + // Strictly non-overlapping: each file's max < the next file's min. + for (let i = 0; i + 1 < ranges.length; i++) { + expect(ranges[i].hi < ranges[i + 1].lo).toBe(true) + } + }) +}) + +describe('icebergRewrite — deletes consumed', () => { + it('applies deletes during rewrite and leaves no delete files', async () => { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'mem://rewrite-del' + const { resolver } = memResolver() + const created = await icebergCreate({ tableUrl, resolver, schema }) + const records = [{ id: 1n, name: 'a' }, { id: 2n, name: 'b' }, { id: 3n, name: 'c' }] + const appended = await icebergStageAppend({ tableUrl, metadata: created, records, resolver }) + const afterAppend = await fileCatalogCommit({ tableUrl, metadata: created, staged: appended, resolver }) + const dataPath = appended.writtenFiles[0] + + const delStaged = await icebergStagePositionDelete({ + tableUrl, metadata: afterAppend, deletes: [{ file_path: dataPath, pos: 1n }], resolver, + }) + const afterDelete = await fileCatalogCommit({ tableUrl, metadata: afterAppend, staged: delStaged, resolver }) + // Sanity: there is a delete file before rewrite. + expect(splitManifestEntries(await icebergManifests({ metadata: afterDelete, resolver })).deleteEntries.length).toBe(1) + + const staged = await icebergStageRewrite({ tableUrl, metadata: afterDelete, resolver }) + const afterRewrite = await fileCatalogCommit({ tableUrl, metadata: afterDelete, staged, resolver }) + + const rows = await icebergRead({ tableUrl, metadata: afterRewrite, resolver }) + expect(rows).toEqual([{ id: 1n, name: 'a' }, { id: 3n, name: 'c' }]) + const { dataEntries, deleteEntries } = splitManifestEntries(await icebergManifests({ metadata: afterRewrite, resolver })) + expect(deleteEntries.length).toBe(0) + expect(dataEntries.length).toBe(1) + }) +}) + +describe('icebergRewrite — partition spec evolution', () => { + it('rewrites unpartitioned data under a new identity spec', async () => { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'mem://rewrite-evolve' + const { resolver } = memResolver() + const created = await icebergCreate({ tableUrl, resolver, schema }) + const records = [ + { id: 1n, name: 'x' }, { id: 2n, name: 'y' }, { id: 3n, name: 'x' }, { id: 4n, name: 'y' }, + ] + const appended = await icebergStageAppend({ tableUrl, metadata: created, records, resolver }) + const afterAppend = await fileCatalogCommit({ tableUrl, metadata: created, staged: appended, resolver }) + expect(splitManifestEntries(await icebergManifests({ metadata: afterAppend, resolver })).dataEntries.length).toBe(1) + + // Add a second partition spec: identity(name). + /** @type {TableMetadata} */ + const evolved = { + ...afterAppend, + 'partition-specs': [ + ...afterAppend['partition-specs'], + { 'spec-id': 1, fields: [{ 'source-id': 2, 'field-id': 1000, name: 'name', transform: 'identity' }] }, + ], + 'last-partition-id': 1000, + } + + const staged = await icebergStageRewrite({ tableUrl, metadata: evolved, resolver, partitionSpecId: 1 }) + const after = await fileCatalogCommit({ tableUrl, metadata: evolved, staged, resolver }) + + const entries = splitManifestEntries(await icebergManifests({ metadata: after, resolver })).dataEntries + // One file per distinct name value. + expect(entries.length).toBe(2) + expect(entries.every(e => e.partition_spec_id === 1)).toBe(true) + expect(entries.map(e => e.data_file.partition.name).sort()).toEqual(['x', 'y']) + const rows = await icebergRead({ tableUrl, metadata: after, resolver }) + expect([...rows].sort((a, b) => Number(a.id - b.id))).toEqual(records) + }) +}) + +describe('icebergRewrite — safety', () => { + it('throws on a concurrent commit rather than dropping rows', async () => { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'mem://rewrite-conflict' + const { resolver } = memResolver() + const created = await icebergCreate({ tableUrl, resolver, schema }) + const a1 = await icebergStageAppend({ tableUrl, metadata: created, records: [{ id: 1n, name: 'a' }], resolver }) + const m1 = await fileCatalogCommit({ tableUrl, metadata: created, staged: a1, resolver }) + + // Stage a rewrite against m1. + const rewriteStaged = await icebergStageRewrite({ tableUrl, metadata: m1, resolver }) + + // A concurrent append commits first, advancing main. + const a2 = await icebergStageAppend({ tableUrl, metadata: m1, records: [{ id: 2n, name: 'b' }], resolver }) + const m2 = await fileCatalogCommit({ tableUrl, metadata: m1, staged: a2, resolver }) + + // Committing the stale rewrite must fail (CAS on main), not clobber m2. + await expect(fileCatalogCommit({ tableUrl, metadata: m2, staged: rewriteStaged, resolver })) + .rejects.toThrow(/ref main expected snapshot/) + // The concurrently-appended row is still present. + const rows = await icebergRead({ tableUrl, metadata: m2, resolver }) + expect(rows.map(r => r.id).sort()).toEqual([1n, 2n]) + }) + + it('rejects format-version 3 (row lineage not yet handled)', async () => { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'mem://rewrite-v3' + const { resolver } = memResolver() + const created = await icebergCreate({ tableUrl, resolver, schema, formatVersion: 3 }) + const appended = await icebergStageAppend({ tableUrl, metadata: created, records: [{ id: 1n, name: 'a' }], resolver }) + const afterAppend = await fileCatalogCommit({ tableUrl, metadata: created, staged: appended, resolver }) + + await expect(() => icebergStageRewrite({ tableUrl, metadata: afterAppend, resolver })) + .rejects.toThrow(/format-version 2 only/) + }) +}) + +describe('icebergRewrite — one-call API', () => { + it('loads, rewrites, and commits through a file catalog', async () => { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'mem://rewrite-onecall' + const { resolver } = memResolver() + const catalog = fileCatalog({ resolver }) + await icebergCreate({ tableUrl, resolver, schema, sortOrder: sortById }) + await icebergAppend({ catalog, tableUrl, records: [{ id: 3n, name: 'c' }, { id: 1n, name: 'a' }] }) + await icebergAppend({ catalog, tableUrl, records: [{ id: 2n, name: 'b' }] }) + + const committed = await icebergRewrite({ catalog, tableUrl }) + const rows = await icebergRead({ tableUrl, metadata: committed, resolver }) + expect(rows.map(r => r.id)).toEqual([1n, 2n, 3n]) + const entries = splitManifestEntries(await icebergManifests({ metadata: committed, resolver })).dataEntries + expect(entries.length).toBe(1) + }) +}) diff --git a/test/write/sort.test.js b/test/write/sort.test.js new file mode 100644 index 0000000..ef30c14 --- /dev/null +++ b/test/write/sort.test.js @@ -0,0 +1,149 @@ +import { describe, expect, it, vi } from 'vitest' +import { buildSortComparator } from '../../src/write/sort.js' +import { fileCatalogCommit } from '../../src/write/commit.js' +import { icebergCreate } from '../../src/create.js' +import { icebergManifests, splitManifestEntries } from '../../src/manifest.js' +import { icebergRead } from '../../src/read.js' +import { icebergStageAppend } from '../../src/write/stage.js' +import { memResolver } from '../helpers.js' + +/** + * @import {Schema, SortOrder, TableMetadata} from '../../src/types.js' + */ + +/** @type {Schema} */ +const schema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { id: 2, name: 'v', required: false, type: 'int' }, + { id: 3, name: 'name', required: false, type: 'string' }, + ], +} + +describe('buildSortComparator', () => { + it('returns undefined for an empty sort order', () => { + expect(buildSortComparator({ 'order-id': 0, fields: [] }, schema)).toBeUndefined() + expect(buildSortComparator(undefined, schema)).toBeUndefined() + }) + + it('sorts ascending by a single field', () => { + const cmp = buildSortComparator( + { 'order-id': 1, fields: [{ transform: 'identity', 'source-id': 2, direction: 'asc', 'null-order': 'nulls-last' }] }, + schema + ) + const rows = [{ v: 3 }, { v: 1 }, { v: 2 }] + expect([...rows].sort(cmp).map(r => r.v)).toEqual([1, 2, 3]) + }) + + it('sorts descending', () => { + const cmp = buildSortComparator( + { 'order-id': 1, fields: [{ transform: 'identity', 'source-id': 2, direction: 'desc', 'null-order': 'nulls-last' }] }, + schema + ) + const rows = [{ v: 3 }, { v: 1 }, { v: 2 }] + expect([...rows].sort(cmp).map(r => r.v)).toEqual([3, 2, 1]) + }) + + it('honors null ordering independent of direction', () => { + const rows = [{ v: 2 }, { v: null }, { v: 1 }] + const ascFirst = buildSortComparator( + { 'order-id': 1, fields: [{ transform: 'identity', 'source-id': 2, direction: 'asc', 'null-order': 'nulls-first' }] }, + schema + ) + expect([...rows].sort(ascFirst).map(r => r.v)).toEqual([null, 1, 2]) + + const descFirst = buildSortComparator( + { 'order-id': 1, fields: [{ transform: 'identity', 'source-id': 2, direction: 'desc', 'null-order': 'nulls-first' }] }, + schema + ) + expect([...rows].sort(descFirst).map(r => r.v)).toEqual([null, 2, 1]) + + const ascLast = buildSortComparator( + { 'order-id': 1, fields: [{ transform: 'identity', 'source-id': 2, direction: 'asc', 'null-order': 'nulls-last' }] }, + schema + ) + expect([...rows].sort(ascLast).map(r => r.v)).toEqual([1, 2, null]) + }) + + it('breaks ties on a second field and is stable', () => { + const cmp = buildSortComparator( + { + 'order-id': 1, + fields: [ + { transform: 'identity', 'source-id': 2, direction: 'asc', 'null-order': 'nulls-last' }, + { transform: 'identity', 'source-id': 1, direction: 'desc', 'null-order': 'nulls-last' }, + ], + }, + schema + ) + const rows = [ + { v: 1, id: 10n }, { v: 1, id: 20n }, { v: 0, id: 5n }, + ] + expect([...rows].sort(cmp)).toEqual([ + { v: 0, id: 5n }, { v: 1, id: 20n }, { v: 1, id: 10n }, + ]) + }) + + it('sorts on a transform key (truncate)', () => { + const cmp = buildSortComparator( + { 'order-id': 1, fields: [{ transform: 'truncate[2]', 'source-id': 3, direction: 'asc', 'null-order': 'nulls-last' }] }, + schema + ) + // truncate[2] keys: zo, ap, an, ap. Sorted asc: an < ap < zo; within the + // 'ap' tie, stable input order (apple before apricot). + const rows = [{ name: 'zoo' }, { name: 'apple' }, { name: 'ant' }, { name: 'apricot' }] + expect([...rows].sort(cmp).map(r => r.name)).toEqual(['ant', 'apple', 'apricot', 'zoo']) + }) +}) + +describe('sort-on-append integration', () => { + /** + * @param {SortOrder} sortOrder + * @param {number} [sortOrderId] + * @returns {Promise<{ rows: any[], sortOrderIds: number[] }>} + */ + async function appendAndRead(sortOrder, sortOrderId) { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'mem://sorted' + const { resolver } = memResolver() + const created = await icebergCreate({ tableUrl, resolver, schema, sortOrder }) + const records = [ + { id: 3n, v: 30, name: 'c' }, + { id: 1n, v: 10, name: 'a' }, + { id: 2n, v: 20, name: 'b' }, + ] + const staged = await icebergStageAppend({ tableUrl, metadata: created, records, resolver, sortOrderId }) + const committed = await fileCatalogCommit({ tableUrl, metadata: created, staged, resolver }) + const rows = await icebergRead({ tableUrl, metadata: committed, resolver }) + const manifests = await icebergManifests({ metadata: committed, resolver }) + const { dataEntries } = splitManifestEntries(manifests) + return { rows, sortOrderIds: dataEntries.map(e => e.data_file.sort_order_id) } + } + + it('writes records ordered by the declared sort order and records sort_order_id', async () => { + const { rows, sortOrderIds } = await appendAndRead({ + 'order-id': 1, + fields: [{ transform: 'identity', 'source-id': 1, direction: 'desc', 'null-order': 'nulls-last' }], + }) + expect(rows.map(r => r.id)).toEqual([3n, 2n, 1n]) + expect(sortOrderIds).toEqual([1]) + }) + + it('leaves input order and sort_order_id 0 for an empty sort order', async () => { + const { rows, sortOrderIds } = await appendAndRead({ 'order-id': 0, fields: [] }) + expect(rows.map(r => r.id)).toEqual([3n, 1n, 2n]) + expect(sortOrderIds).toEqual([0]) + }) + + it('throws for an unknown sortOrderId override', async () => { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'mem://sorted-bad' + const { resolver } = memResolver() + const created = await icebergCreate({ tableUrl, resolver, schema }) + await expect(() => icebergStageAppend({ + tableUrl, metadata: created, records: [{ id: 1n, v: 1, name: 'a' }], resolver, sortOrderId: 99, + })).rejects.toThrow('sort order 99 not found') + }) +}) From 906e3a8ea46656a188fa08656e87875354e50626 Mon Sep 17 00:00:00 2001 From: Phillip Cunliffe Date: Tue, 9 Jun 2026 11:39:06 -0700 Subject: [PATCH 2/3] Fix date bound pruning and cover NaN sort branch (PR #23 review) Address the two major findings from the dual-agent review: - date mis-prune: compare() had no date case, so a Date query literal (coerced to milliseconds) was compared against bounds decoded as days-since-epoch, skipping matching files. Add a date case backed by a dateToDays() helper that normalizes Date/bigint/number/ISO-string to one domain and returns NaN (undecidable -> keep file) otherwise. Fixed in the canonical comparator so read-prune, write-stats, and sort all benefit; also fixes a latent bug for date columns mixing Date and number values. - Add fileMightMatch date tests (Date, numeric-days, ISO-string, unparseable). - Add buildSortComparator tests over a double field with NaN: NaN orders greatest under asc, reverses to first under desc, and NaN-vs-NaN is a stable tie. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/write/serde.js | 31 +++++++++++++++++++++++++++++++ test/prune.bounds.test.js | 26 ++++++++++++++++++++++++++ test/write/sort.test.js | 27 +++++++++++++++++++++++++++ 3 files changed, 84 insertions(+) diff --git a/src/write/serde.js b/src/write/serde.js index a4277fa..480885b 100644 --- a/src/write/serde.js +++ b/src/write/serde.js @@ -183,6 +183,15 @@ export function compare(a, b, type) { const bi = typeof b === 'bigint' ? b : BigInt(b) return ai < bi ? -1 : ai > bi ? 1 : 0 } + case 'date': { + // Bounds decode to days-since-epoch, but query literals can be `Date` + // objects (or ISO strings). Normalize both sides to days so they compare in + // one domain; NaN keeps the comparison undecided so callers don't mis-prune. + const ad = dateToDays(a) + const bd = dateToDays(b) + if (Number.isNaN(ad) || Number.isNaN(bd)) return NaN + return ad < bd ? -1 : ad > bd ? 1 : 0 + } case 'timestamp': case 'timestamptz': return compareBigInt(timestampToMicros(a), timestampToMicros(b)) @@ -223,6 +232,28 @@ export function compareBigInt(a, b) { return a < b ? -1 : a > b ? 1 : 0 } +/** + * Normalize a `date` value to days since the Unix epoch so a manifest bound + * (decoded as a day count) and a query literal compare in the same domain. + * Accepts a `Date` (its UTC day), a bigint or number already in days, or an ISO + * date string. Returns NaN for anything that can't be read as a date, so the + * comparator stays undecided and the caller keeps the file rather than + * mis-pruning. Matches `serializeValue`'s date encoding for `Date` inputs. + * + * @param {any} value + * @returns {number} + */ +export function dateToDays(value) { + if (value instanceof Date) return Math.floor(value.getTime() / 86400000) + if (typeof value === 'bigint') return Number(value) + if (typeof value === 'number') return value + if (typeof value === 'string') { + const ms = Date.parse(value) + return Number.isNaN(ms) ? NaN : Math.floor(ms / 86400000) + } + return NaN +} + /** * @param {any} value * @returns {bigint} diff --git a/test/prune.bounds.test.js b/test/prune.bounds.test.js index bf4a52c..3d43c19 100644 --- a/test/prune.bounds.test.js +++ b/test/prune.bounds.test.js @@ -16,6 +16,7 @@ const schema = { { id: 3, name: 'ts', required: false, type: 'timestamp' }, { id: 4, name: 'price', required: false, type: 'double' }, { id: 5, name: 'amount', required: false, type: 'decimal(10, 2)' }, + { id: 6, name: 'd', required: false, type: 'date' }, ], } @@ -121,6 +122,31 @@ describe('fileMightMatch — timestamp with Date literal', () => { }) }) +describe('fileMightMatch — date with Date literal', () => { + // d in days-since-epoch for 2022-01-01 .. 2022-06-01 + const lo = Math.floor(Date.parse('2022-01-01') / 86400000) + const hi = Math.floor(Date.parse('2022-06-01') / 86400000) + const e = entry({ 6: { min: lo, max: hi, type: 'date' } }) + + it('normalizes a Date literal (ms) against day-domain bounds', () => { + expect(fileMightMatch({ d: { $gt: new Date('2022-07-01') } }, e, schema)).toBe(false) + expect(fileMightMatch({ d: { $gt: new Date('2022-03-01') } }, e, schema)).toBe(true) + expect(fileMightMatch({ d: { $lt: new Date('2021-01-01') } }, e, schema)).toBe(false) + expect(fileMightMatch({ d: { $eq: new Date('2022-03-01') } }, e, schema)).toBe(true) + expect(fileMightMatch({ d: { $eq: new Date('2023-01-01') } }, e, schema)).toBe(false) + }) + + it('also prunes with numeric (days) and ISO-string literals', () => { + expect(fileMightMatch({ d: { $gt: hi } }, e, schema)).toBe(false) // nothing > max + expect(fileMightMatch({ d: { $gt: '2022-07-01' } }, e, schema)).toBe(false) + expect(fileMightMatch({ d: { $gt: '2022-03-01' } }, e, schema)).toBe(true) + }) + + it('keeps the file for an unparseable date literal (no mis-prune)', () => { + expect(fileMightMatch({ d: { $gt: 'not-a-date' } }, e, schema)).toBe(true) + }) +}) + describe('fileMightMatch — decimal', () => { const e = entry({ 5: { min: 10.0, max: 20.0, type: 'decimal(10, 2)' } }) diff --git a/test/write/sort.test.js b/test/write/sort.test.js index ef30c14..eeea800 100644 --- a/test/write/sort.test.js +++ b/test/write/sort.test.js @@ -19,6 +19,7 @@ const schema = { { id: 1, name: 'id', required: true, type: 'long' }, { id: 2, name: 'v', required: false, type: 'int' }, { id: 3, name: 'name', required: false, type: 'string' }, + { id: 4, name: 'd', required: false, type: 'double' }, ], } @@ -86,6 +87,32 @@ describe('buildSortComparator', () => { ]) }) + it('orders NaN greatest under asc and reverses it under desc', () => { + const rows = [{ d: 2.0 }, { d: NaN }, { d: -3.0 }, { d: 1.0 }] + const asc = buildSortComparator( + { 'order-id': 1, fields: [{ transform: 'identity', 'source-id': 4, direction: 'asc', 'null-order': 'nulls-last' }] }, + schema + ) + // NaN is ordered greatest (Iceberg), so it lands after the largest finite value. + expect([...rows].sort(asc).map(r => r.d)).toEqual([-3.0, 1.0, 2.0, NaN]) + + const desc = buildSortComparator( + { 'order-id': 1, fields: [{ transform: 'identity', 'source-id': 4, direction: 'desc', 'null-order': 'nulls-last' }] }, + schema + ) + // desc reverses the order of present values, so the greatest (NaN) comes first. + expect([...rows].sort(desc).map(r => r.d)).toEqual([NaN, 2.0, 1.0, -3.0]) + }) + + it('treats NaN vs NaN as a tie, preserving input order', () => { + const cmp = buildSortComparator( + { 'order-id': 1, fields: [{ transform: 'identity', 'source-id': 4, direction: 'asc', 'null-order': 'nulls-last' }] }, + schema + ) + const rows = [{ d: NaN, id: 1n }, { d: NaN, id: 2n }, { d: NaN, id: 3n }] + expect([...rows].sort(cmp).map(r => r.id)).toEqual([1n, 2n, 3n]) + }) + it('sorts on a transform key (truncate)', () => { const cmp = buildSortComparator( { 'order-id': 1, fields: [{ transform: 'truncate[2]', 'source-id': 3, direction: 'asc', 'null-order': 'nulls-last' }] }, From 62a7b289cd19ba736dee181f867d41a4fafbdcb5 Mon Sep 17 00:00:00 2001 From: Phillip Cunliffe Date: Tue, 9 Jun 2026 12:00:40 -0700 Subject: [PATCH 3/3] Fix typecheck errors in scan-pruning tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI runs `npx tsc` over src + test with strict/checkJs, but the new test files had several type errors: - prune.bounds: bare-value filters ({ id: 9n }) aren't modelled by hyparquet's ParquetQueryFilter — cast past the type. - scanPruning: type columnData/parquetSchema as ColumnSource[]/ SchemaElement[]; add required 'last-partition-id'; coerce ids with Number() before subtracting in the sort comparator. - sort: sort_order_id is optional, so widen the return to (number | undefined)[]. Co-Authored-By: Claude Opus 4.8 (1M context) --- test/prune.bounds.test.js | 6 ++++-- test/sql/scanPruning.test.js | 8 ++++++-- test/write/sort.test.js | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/test/prune.bounds.test.js b/test/prune.bounds.test.js index 3d43c19..e3d0ba7 100644 --- a/test/prune.bounds.test.js +++ b/test/prune.bounds.test.js @@ -73,8 +73,10 @@ describe('fileMightMatch — numeric range/equality', () => { }) it('bare value is treated as equality', () => { - expect(fileMightMatch({ id: 9n }, e, schema)).toBe(false) - expect(fileMightMatch({ id: 3n }, e, schema)).toBe(true) + // Bare values (no operator object) are a runtime convenience not modelled by + // hyparquet's ParquetQueryFilter type, so cast past it. + expect(fileMightMatch(/** @type {any} */ ({ id: 9n }), e, schema)).toBe(false) + expect(fileMightMatch(/** @type {any} */ ({ id: 3n }), e, schema)).toBe(true) }) it('$in keeps if any value is in range, skips if all out', () => { diff --git a/test/sql/scanPruning.test.js b/test/sql/scanPruning.test.js index 850b1e5..9fc6ee3 100644 --- a/test/sql/scanPruning.test.js +++ b/test/sql/scanPruning.test.js @@ -8,7 +8,8 @@ import { icebergDataSource } from '../../src/sql/icebergDataSource.js' import { memResolver } from '../helpers.js' /** - * @import {AsyncBuffer} from 'hyparquet' + * @import {AsyncBuffer, SchemaElement} from 'hyparquet' + * @import {ColumnSource} from 'hyparquet-writer' * @import {ExprNode} from 'squirreling' * @import {ManifestEntry, Resolver, Schema, TableMetadata} from '../../src/types.js' */ @@ -104,7 +105,7 @@ describe('#20 file-level bounds pruning (icebird-written)', () => { const out = [] for await (const row of source.scan({ where: cmp('id', '>=', 1000n) }).rows()) out.push(row.resolved) - expect(out.map(r => r?.id).sort((a, b) => Number(a - b))) + expect(out.map(r => r?.id).sort((a, b) => Number(a) - Number(b))) .toEqual([1000n, 1001n, 1002n, 1003n, 1004n, 1005n, 1006n, 1007n, 1008n, 1009n]) expect(dataFilesRead()).toBe(1) }) @@ -169,7 +170,9 @@ describe('#21 row-group pruning (readDataFile)', () => { values.push(i) if (payloadBytes > 0) payloads.push(String(i).padEnd(payloadBytes, 'x')) } + /** @type {ColumnSource[]} */ const columnData = [{ name: 'v', data: values }] + /** @type {SchemaElement[]} */ const parquetSchema = [ { name: 'root', num_children: payloadBytes > 0 ? 2 : 1 }, { name: 'v', type: 'INT32', repetition_type: 'OPTIONAL', field_id: 1 }, @@ -229,6 +232,7 @@ describe('#21 row-group pruning (readDataFile)', () => { schemas: [schema], 'default-spec-id': 0, 'partition-specs': [{ 'spec-id': 0, fields: [] }], + 'last-partition-id': payloadBytes > 0 ? 2 : 1, 'sort-orders': [{ 'order-id': 0, fields: [] }], 'default-sort-order-id': 0, } diff --git a/test/write/sort.test.js b/test/write/sort.test.js index eeea800..a977a04 100644 --- a/test/write/sort.test.js +++ b/test/write/sort.test.js @@ -129,7 +129,7 @@ describe('sort-on-append integration', () => { /** * @param {SortOrder} sortOrder * @param {number} [sortOrderId] - * @returns {Promise<{ rows: any[], sortOrderIds: number[] }>} + * @returns {Promise<{ rows: any[], sortOrderIds: (number | undefined)[] }>} */ async function appendAndRead(sortOrder, sortOrderId) { vi.spyOn(Date, 'now').mockReturnValue(1700000000000)