Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ import {
icebergCreateTable,
icebergDelete,
icebergExpireSnapshots,
icebergRewrite,
icebergSetRef,
} from 'icebird'

Expand Down Expand Up @@ -172,6 +173,17 @@ await icebergSetRef({ catalog, tableUrl, ref: 'main', snapshotId })
await icebergExpireSnapshots({ catalog, tableUrl, snapshotIds: [oldSnapshotId] })
```

If the table is created with a `sortOrder`, `icebergAppend` orders the rows in each written file by that order (tightening per-file column bounds for scan pruning). `icebergRewrite` compacts the current snapshot — reading every live row (deletes applied), sorting globally, and rewriting into consolidated, non-overlapping files via a `replace` snapshot (v2 tables):

```javascript
// compact small files into sorted, non-overlapping ones
await icebergRewrite({ catalog, tableUrl })
// optionally split large partitions and/or re-partition under another spec
await icebergRewrite({ catalog, tableUrl, targetFileRows: 1_000_000, partitionSpecId: 1 })
```

A rewrite is not retried on a concurrent commit (it would risk dropping rows another writer appended meanwhile); on conflict it throws and should be re-run against fresh metadata.

For a REST catalog, swap `fileCatalog(...)` for the connect context and pass `namespace`/`table` instead of `tableUrl`:

```javascript
Expand Down Expand Up @@ -212,7 +224,8 @@ Icebird aims to support reading any Iceberg table, but currently only supports a
| Geometry Types | ✅ | |
| Geography Types | ✅ | |
| Row Lineage | ✅ | v3 `_row_id` and `_last_updated_sequence_number` inheritance. |
| Sorting | ❌ | |
| Sorting | ✅ | Orders rows by the declared sort order on append; `icebergRewrite` compacts to sorted, non-overlapping files (v2). |
| Scan Pruning | ✅ | Skips data files via partition tuples and manifest column bounds, and parquet row groups via column statistics. |
| Encryption | ❌ | |

## References
Expand Down
2 changes: 1 addition & 1 deletion src/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export { IcebergTransactionConflictError, icebergAppend, icebergCreateTable, icebergDelete, icebergDropTable, icebergExpireSnapshots, icebergSetRef, icebergTransaction } from './write/write.js'
export { IcebergTransactionConflictError, icebergAppend, icebergCreateTable, icebergDelete, icebergDropTable, icebergExpireSnapshots, icebergRewrite, icebergSetRef, icebergTransaction } from './write/write.js'
export { icebergCreate } from './create.js'
export { fileCatalog } from './catalog/file.js'
export { restCatalogConnect, restCatalogCreateNamespace, restCatalogDropNamespace, restCatalogListNamespaces, restCatalogListTables, restCatalogLoadCredentials, restCatalogLoadTable, restCatalogRegisterTable, restCatalogRenameTable } from './catalog/rest.js'
Expand Down
219 changes: 212 additions & 7 deletions src/prune.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { typeName } from './schema.js'
import { applyTransform } from './write/transform.js'
import { compare, deserializeValue } from './write/serde.js'

/**
* Partition-level scan pruning. Given a hyparquet query filter (keyed by
Expand Down Expand Up @@ -26,34 +28,65 @@ export function partitionMightMatch(filter, dataEntry, schema, metadata) {
const spec = metadata['partition-specs'].find(s => s['spec-id'] === dataEntry.partition_spec_id)
// No spec or unpartitioned: nothing to prune on.
if (!spec || spec.fields.length === 0) return true
return nodeMightMatch(filter, { spec, schema, partition: dataEntry.data_file.partition })
/** @type {PruneContext} */
const ctx = { spec, schema, partition: dataEntry.data_file.partition }
return nodeMightMatch(filter, (column, condition) => columnMightMatch(column, condition, ctx))
}

/**
* File-level scan pruning via per-column manifest bounds. Given a hyparquet
* query filter (keyed by iceberg field name) and a data manifest entry, decide
* whether the entry's `lower_bounds` / `upper_bounds` could contain a row
* matching the filter.
*
* Like `partitionMightMatch` this is an inclusive projection: a file is skipped
* only when its bounds prove that no row can match, and any uncertainty keeps
* the file. Bounds for string/binary/fixed/uuid are stored truncated (Iceberg
* `truncate(16)` metrics) so those types are never range-pruned. Pruning never
* changes query results, only which files are read.
*
* @param {ParquetQueryFilter} filter - Filter keyed by iceberg field name.
* @param {ManifestEntry} dataEntry
* @param {Schema} schema - Current schema (filter column names map to its fields).
* @returns {boolean} true if the file must be read, false if it can be skipped.
*/
export function fileMightMatch(filter, dataEntry, schema) {
const { lower_bounds, upper_bounds } = dataEntry.data_file
// No bounds at all: nothing to prune on.
if (lower_bounds === undefined && upper_bounds === undefined) return true
return nodeMightMatch(filter, (column, condition) =>
boundsMightMatch(column, condition, dataEntry.data_file, schema))
}

/**
* @typedef {{ spec: PartitionSpec, schema: Schema, partition: DataFile['partition'] }} PruneContext
*/

/**
* Generic AND/OR/$nor walk over a hyparquet filter. Top-level keys are
* AND-combined: the file is ruled out if any branch is ruled out. The leaf
* evaluator decides a single `{column: condition}` predicate and returns false
* only when it proves the file cannot match.
*
* @param {ParquetQueryFilter} node
* @param {PruneContext} ctx
* @param {(column: string, condition: any) => boolean} leafFn
* @returns {boolean}
*/
function nodeMightMatch(node, ctx) {
function nodeMightMatch(node, leafFn) {
if (!node || typeof node !== 'object') return true
const anyNode = /** @type {Record<string, any>} */ (node)
// Top-level keys are AND-combined: the file is ruled out if any is ruled out.
for (const [key, val] of Object.entries(anyNode)) {
if (key === '$and') {
const subs = /** @type {ParquetQueryFilter[]} */ (val)
if (!subs.every(sub => nodeMightMatch(sub, ctx))) return false
if (!subs.every(sub => nodeMightMatch(sub, leafFn))) return false
} else if (key === '$or') {
const subs = /** @type {ParquetQueryFilter[]} */ (val)
if (!subs.some(sub => nodeMightMatch(sub, ctx))) return false
if (!subs.some(sub => nodeMightMatch(sub, leafFn))) return false
} else if (key === '$nor') {
// NOT(a OR b): can't safely prune, keep.
continue
} else {
if (!columnMightMatch(key, val, ctx)) return false
if (!leafFn(key, val)) return false
}
}
return true
Expand Down Expand Up @@ -331,3 +364,175 @@ function numericOf(x) {
function sign(n) {
return n < 0 ? -1 : n > 0 ? 1 : 0
}

/**
* Whether the file could match `condition` on `column` given the column's
* decoded [lower, upper] bounds in the manifest entry. Returns true (keep) on
* any uncertainty, including columns without bounds and non-orderable types.
*
* @param {string} column - iceberg field name
* @param {any} condition - operator object like {$eq: x} or a bare value (eq)
* @param {DataFile} dataFile
* @param {Schema} schema
* @returns {boolean}
*/
function boundsMightMatch(column, condition, dataFile, schema) {
const field = schema.fields.find(f => f.name === column)
if (!field) return true
// Bounds and metric ordering are only defined for orderable scalar types.
// String/binary/fixed/uuid bounds are truncated prefixes, so we never use
// them for pruning (equality on a truncated prefix is undecidable).
if (!isOrderableForBounds(field.type)) return true

const lowerBytes = boundForField(dataFile.lower_bounds, field.id)
const upperBytes = boundForField(dataFile.upper_bounds, field.id)
if (lowerBytes === undefined && upperBytes === undefined) return true
const lo = lowerBytes !== undefined ? deserializeValue(lowerBytes, field.type) : undefined
const hi = upperBytes !== undefined ? deserializeValue(upperBytes, field.type) : undefined
if (lo === undefined && hi === undefined) return true

for (const { op, value } of normalizeCondition(condition)) {
if (!boundsOpMightMatch(op, value, lo, hi, field.type)) return false
}
return true
}

/**
* Look up a column's bound bytes by field id. Read-decoded Iceberg maps arrive
* as an array of `{key, value}` records (Avro int-keyed maps); hand-built
* entries may instead be a plain `Record<fieldId, bytes>`. Handle both.
*
* @param {any} map - lower_bounds or upper_bounds from a manifest data_file
* @param {number} fieldId
* @returns {Uint8Array | undefined}
*/
function boundForField(map, fieldId) {
if (map === undefined || map === null) return undefined
if (Array.isArray(map)) {
const entry = map.find(e => e && Number(e.key) === fieldId)
return entry ? entry.value : undefined
}
const v = map[fieldId]
return v instanceof Uint8Array ? v : undefined
}

/**
* Whether a type has totally-ordered, untruncated single-value bounds suitable
* for range/equality pruning. String/binary/fixed/uuid are excluded because
* their bounds are stored truncated.
*
* @param {IcebergType} type
* @returns {boolean}
*/
function isOrderableForBounds(type) {
const name = typeName(type)
if (name.startsWith('decimal(')) return true
switch (name) {
case 'boolean':
case 'int':
case 'long':
case 'float':
case 'double':
case 'date':
case 'time':
case 'timestamp':
case 'timestamptz':
case 'timestamp_ns':
case 'timestamptz_ns':
return true
default:
return false
}
}

/**
* Whether a value range [lo, hi] (either side may be open/undefined) could
* satisfy `op value`. Mirrors hyparquet's `canSkipRowGroup` operator semantics
* but at file granularity. Returns true (keep) on any uncertainty; returns
* false only when the predicate is provably unsatisfiable for the whole range.
*
* @param {string} op - mongo-style operator
* @param {any} value - the predicate literal (array for $in/$nin)
* @param {any} lo - decoded lower bound, or undefined (open below)
* @param {any} hi - decoded upper bound, or undefined (open above)
* @param {IcebergType} type
* @returns {boolean}
*/
function boundsOpMightMatch(op, value, lo, hi, type) {
switch (op) {
case '$lt': {
// need some x < value; smallest is lo. Skip if lo >= value.
if (lo === undefined) return true
const c = safeCompare(lo, value, type)
return c === undefined ? true : c < 0
}
case '$lte': {
if (lo === undefined) return true
const c = safeCompare(lo, value, type)
return c === undefined ? true : c <= 0
}
case '$gt': {
// need some x > value; largest is hi. Skip if hi <= value.
if (hi === undefined) return true
const c = safeCompare(hi, value, type)
return c === undefined ? true : c > 0
}
case '$gte': {
if (hi === undefined) return true
const c = safeCompare(hi, value, type)
return c === undefined ? true : c >= 0
}
case '$eq':
return eqInRange(value, lo, hi, type)
case '$in':
if (!Array.isArray(value)) return true
// Keep if any listed value could fall in [lo, hi].
return value.some(x => eqInRange(x, lo, hi, type))
// $ne / $nin can only prune a single-valued file fully covered by the
// excluded value(s); too rare to bother — keep.
default:
return true
}
}

/**
* Whether `value` could lie within [lo, hi] (open sides allowed). Keep on any
* undecidable comparison.
*
* @param {any} value
* @param {any} lo
* @param {any} hi
* @param {IcebergType} type
* @returns {boolean}
*/
function eqInRange(value, lo, hi, type) {
if (lo !== undefined) {
const c = safeCompare(value, lo, type)
if (c !== undefined && c < 0) return false
}
if (hi !== undefined) {
const c = safeCompare(value, hi, type)
if (c !== undefined && c > 0) return false
}
return true
}

/**
* Type-aware comparison that returns undefined (rather than throwing or
* returning NaN) when the two values cannot be meaningfully ordered, so the
* caller keeps the file.
*
* @param {any} a
* @param {any} b
* @param {IcebergType} type
* @returns {number | undefined}
*/
function safeCompare(a, b, type) {
if (a === null || a === undefined || b === null || b === undefined) return undefined
try {
const c = compare(a, b, type)
return Number.isNaN(c) ? undefined : c
} catch {
return undefined
}
}
29 changes: 17 additions & 12 deletions src/sql/icebergDataSource.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { fetchDeleteMaps, urlResolver } from '../fetch.js'
import { icebergManifests, splitManifestEntries } from '../manifest.js'
import { icebergMetadata } from '../metadata.js'
import { readDataFile } from '../read.js'
import { partitionMightMatch } from '../prune.js'
import { fileMightMatch, partitionMightMatch } from '../prune.js'
import { whereToParquetFilter } from './whereFilter.js'

/**
Expand All @@ -22,11 +22,13 @@ import { whereToParquetFilter } from './whereFilter.js'
* - Column projection (`columns`) is pushed into the parquet read so only the
* requested columns are decoded. Equality-delete predicate columns and row
* lineage columns are read regardless when needed.
* - WHERE is pushed down to hyparquet (row-group pruning via statistics and
* bloom filters, plus per-row matching) when the expression can be fully
* converted to a parquet filter (comparisons, IN, AND/OR/NOT on identifier
* vs literal). Unsupported nodes (LIKE, functions, arithmetic, identifier
* vs identifier) leave WHERE for the engine to apply.
* - WHERE prunes whole data files before they are opened, using each manifest
* entry's partition tuple and per-column `lower_bounds`/`upper_bounds`, and
* is pushed down to hyparquet (row-group pruning via statistics and bloom
* filters, plus per-row matching) when the expression can be fully converted
* to a parquet filter (comparisons, IN, AND/OR/NOT on identifier vs literal).
* Unsupported nodes (LIKE, functions, arithmetic, identifier vs identifier)
* leave WHERE for the engine to apply.
* - When WHERE is resolved at scan time (either absent or fully pushed) we
* cap the scan at `offset + limit` rows so the source terminates early.
* OFFSET is also pushed into the parquet seek when there are no deletes;
Expand Down Expand Up @@ -89,13 +91,16 @@ export async function icebergDataSource({ tableUrl, metadataFileName, metadata,
// the engine must re-apply it.
const filter = whereToParquetFilter(where)
const appliedWhere = where !== undefined && filter !== undefined
// Partition-level scan pruning: drop data files whose partition tuple
// proves no row can match the filter. Manifest entries are already in
// memory, so this is a cheap synchronous pre-filter that avoids opening
// the pruned files entirely. Pruning never drops a file with a matching
// row, so query results are unchanged.
// Scan pruning: drop data files whose partition tuple OR per-column
// manifest bounds prove no row can match the filter. Manifest entries are
// already in memory, so this is a cheap synchronous pre-filter that
// avoids opening the pruned files entirely. Both pruners are inclusive
// projections (they never drop a file with a matching row), so query
// results are unchanged.
const scanEntries = filter
? dataEntries.filter(entry => partitionMightMatch(filter, entry, schema, tableMetadata))
? dataEntries.filter(entry =>
partitionMightMatch(filter, entry, schema, tableMetadata) &&
fileMightMatch(filter, entry, schema))
: dataEntries
const pruned = scanEntries.length < dataEntries.length
// Treat a fully-pushed-down WHERE the same as "no WHERE" for the
Expand Down
5 changes: 4 additions & 1 deletion src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ export interface Snapshot {
// 'spark.app.id'?: string
'added-data-files'?: string
'added-records'?: string
'deleted-data-files'?: string
'deleted-records'?: string
'removed-files-size'?: string
'added-delete-files'?: string
'removed-delete-files'?: string
'added-position-deletes'?: string
Expand Down Expand Up @@ -298,7 +301,7 @@ export type TableUpdate =
* accumulated updates ship in one commit when the callback resolves.
*/
export interface IcebergTransaction {
append(options: { records: Record<string, any>[] }): Promise<void>
append(options: { records: Record<string, any>[], sortOrderId?: number }): Promise<void>
delete(options: {
deletes: { file_path: string, pos: bigint | number }[]
mode?: 'puffin' | 'parquet'
Expand Down
Loading