Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dist
node_modules
tests/data

# Vendored third-party source — keep byte-identical to upstream
src/vendor
9 changes: 1 addition & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@
"license": "MIT",
"dependencies": {
"@msgpack/msgpack": "^3.1.3",
"flatbuffers": "^25.9.23",
"fzstd": "^0.1.1"
"flatbuffers": "^25.9.23"
},
"devDependencies": {
"@types/node": "^20.0.0",
Expand Down
15 changes: 15 additions & 0 deletions src/format/flatbuffers/generated/snapshot-info.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,19 @@ export class SnapshotInfo {
const offset = this.bb!.__offset(this.bb_pos, 12);
return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0;
}

prunedAncestorTxLogs(index: number, obj?: ObjectId12): ObjectId12 | null {
const offset = this.bb!.__offset(this.bb_pos, 14);
return offset
? (obj || new ObjectId12()).__init(
this.bb!.__vector(this.bb_pos + offset) + index * 12,
this.bb!,
)
: null;
}

prunedAncestorTxLogsLength(): number {
const offset = this.bb!.__offset(this.bb_pos, 14);
return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0;
}
}
190 changes: 178 additions & 12 deletions src/format/flatbuffers/manifest-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import { ByteBuffer } from "flatbuffers";
import { decompress } from "../../vendor/fzstd/index.js";
import { Manifest as FbsManifest } from "./generated/manifest.js";
import { ArrayManifest as FbsArrayManifest } from "./generated/array-manifest.js";
import { ChunkRef as FbsChunkRef } from "./generated/chunk-ref.js";
Expand All @@ -18,6 +19,99 @@ import {
type ObjectId8,
} from "./types.js";

/** compression_algorithm value for zstd dictionary-compressed locations. */
const COMPRESSION_ALG_ZSTD_DICT = 1;

/** Upper bound on a decompressed location, matching the Rust
* `MAX_DECOMPRESSED_LOCATION_SIZE`. */
const MAX_DECOMPRESSED_LOCATION_SIZE = 1024;

/**
* Bytes that a zstd decoder allocates up front for `frame`, read from the frame
* header: the advertised frame content size, falling back to the window size.
* Mirrors the allocation in the vendored fzstd `rzfh`. Requires `frame` to be a
* single complete frame covering the whole input; returns `Infinity` for a
* missing/short/unrecognized header, a malformed block, or any trailing bytes
* (a concatenated second frame), so callers reject rather than trust it.
*
* Used to bound allocation *before* decompressing an untrusted
* `compressed_location`: the size check on the decompressed output alone is too
* late, since the decoder would already have allocated per the advertised size,
* and it decodes every concatenated frame.
*/
function zstdFrameAllocSize(frame: Uint8Array): number {
// Magic number 0xFD2FB528 (little-endian), then the frame header descriptor.
if (
frame.length < 5 ||
frame[0] !== 0x28 ||
frame[1] !== 0xb5 ||
frame[2] !== 0x2f ||
frame[3] !== 0xfd
) {
return Infinity;
}
const flg = frame[4];
const singleSegment = (flg >> 5) & 1;
const contentChecksum = (flg >> 2) & 1;
const dictIdFlag = flg & 3;
const contentSizeFlag = flg >> 6;

let pos = 5;
let windowSize = 0;
if (!singleSegment) {
if (pos >= frame.length) return Infinity;
const wd = frame[pos++];
// Arithmetic (not bitwise `1 <<`/`>> 3`): the exponent can reach 41, which
// would overflow 32-bit JS shifts and let a crafted window descriptor
// compute a bogus small size that slips past the bound below.
const base = 2 ** (10 + (wd >> 3));
windowSize = base + (base / 8) * (wd & 7);
}
pos += dictIdFlag === 3 ? 4 : dictIdFlag; // skip dictionary id

// Frame content size: 0/1/2/4/8 bytes per the flags (matching fzstd's rzfh).
const fcsBytes = contentSizeFlag ? 1 << contentSizeFlag : singleSegment;
let contentSize = 0;
if (fcsBytes) {
if (pos + fcsBytes > frame.length) return Infinity;
for (let i = 0; i < fcsBytes; i++) {
contentSize += frame[pos + i] * 2 ** (8 * i);
}
if (contentSizeFlag === 1) contentSize += 256;
if (singleSegment) windowSize = contentSize;
}
pos += fcsBytes;

// Walk the data blocks to the frame end. `decompress()` decodes *every*
// concatenated frame and allocates per frame, so a first-frame-only size
// check is bypassable by appending a frame that advertises a huge size.
// Require exactly one complete frame covering the whole input; reject
// trailing bytes (a second frame) or any malformed block.
for (;;) {
if (pos + 3 > frame.length) return Infinity;
const blockHeader =
frame[pos] | (frame[pos + 1] << 8) | (frame[pos + 2] << 16);
pos += 3;
const lastBlock = blockHeader & 1;
const blockType = (blockHeader >> 1) & 3;
if (blockType === 3) return Infinity; // reserved block type
pos += blockType === 1 ? 1 : blockHeader >> 3; // RLE block content is 1 byte
if (pos > frame.length) return Infinity;
if (lastBlock) break;
}
if (contentChecksum) pos += 4;
if (pos !== frame.length) return Infinity; // trailing/concatenated frame

return contentSize || windowSize;
}

/**
* Decode a ChunkRef's `compressed_location` byte vector into a location string.
* Returns `null` when this manifest has no location dictionary (locations are
* stored uncompressed).
*/
type LocationDecoder = (compressed: Uint8Array) => string;

/** Parse a Manifest from FlatBuffer data */
export function parseManifest(data: Uint8Array): Manifest {
const bb = new ByteBuffer(data);
Expand All @@ -30,20 +124,73 @@ export function parseManifest(data: Uint8Array): Manifest {
idObj.bb!.bytes().slice(idObj.bb_pos, idObj.bb_pos + 12),
);

// Build the location decoder once per manifest from its zstd dictionary.
const decodeLocation = makeLocationDecoder(fbsManifest);

// Parse arrays
const arraysLength = fbsManifest.arraysLength();
const arrays: ArrayManifest[] = [];
for (let i = 0; i < arraysLength; i++) {
const fbsArray = fbsManifest.arrays(i);
if (fbsArray) {
arrays.push(parseArrayManifest(fbsArray));
arrays.push(parseArrayManifest(fbsArray, decodeLocation));
}
}

return { id, arrays };
}

function parseArrayManifest(fbsArray: FbsArrayManifest): ArrayManifest {
/**
* Build the function that turns a ChunkRef's `compressed_location` into a
* location string, using the manifest's zstd dictionary.
*
* Mirrors the Rust `Manifest::decompressor`: a dictionary is only used when
* `compression_algorithm == ZSTD_DICT` and `location_dictionary` is present.
* When a `compressed_location` is encountered without a dictionary, decoding
* throws (matching the Rust `MissingLocationCompressionDictionary` error).
*
* Note: `manifest.fbs` prose says `compression_algorithm == 0` means
* `compressed_location` holds raw (uncompressed) bytes, but that path is not
* implemented in the Rust reference — its writer only emits `compressed_location`
* for ZSTD_DICT (otherwise it writes the plain `location` field) and its reader
* errors here. We follow the reference behavior, not the (inconsistent) schema
* comment. See earth-mover/icechunk manifest.fbs vs manifest.rs.
*/
function makeLocationDecoder(fbsManifest: FbsManifest): LocationDecoder {
const dictionary =
fbsManifest.compressionAlgorithm() === COMPRESSION_ALG_ZSTD_DICT
? fbsManifest.locationDictionaryArray()
: null;
// `fatal` makes invalid UTF-8 throw, matching the Rust `String::from_utf8`.
const textDecoder = new TextDecoder("utf-8", { fatal: true });
return (compressed: Uint8Array): string => {
if (!dictionary) {
throw new Error(
"ChunkRef has a compressed_location but the manifest has no location dictionary",
);
}
// Reject before decompressing: a malformed frame can advertise a huge size
// and make the decoder allocate far beyond the bound (or OOM) before the
// post-decompress length check below would run.
if (zstdFrameAllocSize(compressed) > MAX_DECOMPRESSED_LOCATION_SIZE) {
throw new Error(
`Compressed location declares a size over ${MAX_DECOMPRESSED_LOCATION_SIZE} bytes`,
);
}
const decompressed = decompress(compressed, undefined, dictionary);
if (decompressed.length > MAX_DECOMPRESSED_LOCATION_SIZE) {
throw new Error(
`Decompressed location exceeds ${MAX_DECOMPRESSED_LOCATION_SIZE} bytes`,
);
}
return textDecoder.decode(decompressed);
};
}

function parseArrayManifest(
fbsArray: FbsArrayManifest,
decodeLocation: LocationDecoder,
): ArrayManifest {
// Parse node_id (required)
const nodeIdObj = fbsArray.nodeId();
if (!nodeIdObj)
Expand All @@ -58,14 +205,17 @@ function parseArrayManifest(fbsArray: FbsArrayManifest): ArrayManifest {
for (let i = 0; i < refsLength; i++) {
const fbsRef = fbsArray.refs(i);
if (fbsRef) {
refs.push(parseChunkRef(fbsRef));
refs.push(parseChunkRef(fbsRef, decodeLocation));
}
}

return { nodeId, refs };
}

function parseChunkRef(fbsRef: FbsChunkRef): ChunkRef {
function parseChunkRef(
fbsRef: FbsChunkRef,
decodeLocation: LocationDecoder,
): ChunkRef {
// Parse index (required, vector of uint32)
const indexLength = fbsRef.indexLength();
const index: number[] = [];
Expand All @@ -89,8 +239,18 @@ function parseChunkRef(fbsRef: FbsChunkRef): ChunkRef {
)
: null;

// Parse location (optional string)
const location = fbsRef.location();
// Resolve the virtual location. Per the Rust `ref_to_payload` priority
// (chunk_id -> compressed_location -> location), a dictionary-compressed
// location takes precedence over the plain `location` string and is decoded
// here at parse time. `chunk_id` refs are native, not virtual, so they never
// carry a location — skip decoding for them.
let location = fbsRef.location();
if (chunkId === null) {
const compressed = fbsRef.compressedLocationArray();
if (compressed) {
location = decodeLocation(compressed);
}
}

// Parse checksum fields
const checksumEtag = fbsRef.checksumEtag();
Expand Down Expand Up @@ -193,12 +353,14 @@ function binarySearchChunkRef(
return null;
}

/** Extract the payload type from a ChunkRef */
/**
* Extract the payload type from a ChunkRef.
*
* Priority matches the Rust `ref_to_payload`: chunk_id (native) ->
* location (virtual; already decoded from compressed_location at parse time)
* -> inline.
*/
export function getChunkPayload(ref: ChunkRef): ChunkPayload {
if (ref.inline !== null) {
return { type: "inline", data: ref.inline };
}

if (ref.chunkId !== null) {
return {
type: "native",
Expand All @@ -219,5 +381,9 @@ export function getChunkPayload(ref: ChunkRef): ChunkPayload {
};
}

throw new Error("Invalid ChunkRef: no inline, chunkId, or location");
if (ref.inline !== null) {
return { type: "inline", data: ref.inline };
}

throw new Error("Invalid ChunkRef: no chunkId, location, or inline");
}
2 changes: 1 addition & 1 deletion src/format/flatbuffers/repo-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import { ByteBuffer } from "flatbuffers";
import * as flexbuffers from "flatbuffers/js/flexbuffers.js";
import { Repo as FbsRepo } from "./generated/repo.js";
import { decompress } from "fzstd";
import { decompress } from "../../vendor/fzstd/index.js";
import {
parseHeader,
getDataAfterHeader,
Expand Down
2 changes: 1 addition & 1 deletion src/reader/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type {
Storage,
RequestOptions,
} from "../storage/storage.js";
import { decompress } from "fzstd";
import { decompress } from "../vendor/fzstd/index.js";
import { LRUCache } from "../cache/lru.js";
import { singleFlight, type SingleFlight } from "../cache/single-flight.js";
import {
Expand Down
21 changes: 21 additions & 0 deletions src/vendor/fzstd/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2020 Arjun Barrett

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Loading
Loading