Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion docs/src/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,4 +499,63 @@ Lance Spark maintains index and metadata caches to minimize redundant I/O. Cache
| `LANCE_INDEX_CACHE_SIZE` | 6GB | Index cache size in bytes. |
| `LANCE_METADATA_CACHE_SIZE`| 1GB | Metadata cache size in bytes. |

For details on how caching works and tuning recommendations, see [Performance Tuning - Caching](performance.md#caching).
For details on how caching works and tuning recommendations, see [Performance Tuning - Caching](performance.md#caching).

## Blob v2 Reads

Lance datasets that contain a blob v2 column expose that column to Spark as the native 5-field descriptor struct: `struct<kind:short, position:long, size:long, blob_id:long, blob_uri:string>`. Querying the descriptor never fetches the blob bytes, so `SELECT payload.size` and `SELECT payload.blob_uri` are cheap.

```sql
-- Query metadata only (no byte fetch):
SELECT id, payload.size, payload.kind FROM lance.ns.tbl;
```

A column is treated as blob v2 when the Arrow field carries `ARROW:extension:name = lance.blob.v2`, matching lance-core's blob v2 extension type.

Filter pushdown for SQL `WHERE` is disabled on blob v2 tables; Spark evaluates predicates after the scan. Zonemap-based fragment pruning still runs.

The connector does not materialize blob bytes on read; queries against descriptor fields fetch metadata only. See [Blob v2 Writes](#blob-v2-writes) below for the write path.

## Blob v2 Writes

To write blob v2 columns, set `file_format_version` to `2.2` or higher and set
`<column>.lance.encoding = blob` in `TBLPROPERTIES`.

Spark still sees the column as `BINARY` when writing. The connector converts that binary
value into the Arrow blob write struct during encoding.

On reads, blob v2 columns are exposed as descriptor structs. See
[Blob v2 Reads](#blob-v2-reads). For writes, `INSERT` and DataFrame append still take
`BINARY`.

```sql
CREATE TABLE lance.mydb.users (
id INT NOT NULL,
content BINARY
) USING lance
TBLPROPERTIES (
'content.lance.encoding' = 'blob',
'file_format_version' = '2.2'
);
```

With `file_format_version = '2.2'` or higher, blob columns are written using blob v2
encoding and `ARROW:extension:name = lance.blob.v2 metadata`.

With an older version, or when `file_format_version` is not set, blob columns use the
legacy v1 encoding with `lance-encoding:blob = true` metadata.

Blob encoding requires a numeric `file_format_version`, such as `2.2`.

Blob v2 writes must go through the catalog path. Use SQL DDL with `TBLPROPERTIES`, as
shown above, or use the `DataFrameWriterV2` API:

```python
df.writeTo("lance.ns.users") \
.tableProperty("content.lance.encoding", "blob") \
.tableProperty("file_format_version", "2.2") \
.create()
```

Setting only `file_format_version` does not enable blob encoding. Without
`<column>.lance.encoding = blob`, the column is written as plain `BINARY`.
27 changes: 27 additions & 0 deletions docs/src/operations/ddl/create-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,33 @@ To create a table with blob columns, use the table property pattern `<column_nam
.createOrReplace();
```

### Blob v2 Columns

To create blob v2 columns, set the blob encoding property and use `file_format_version = '2.2'` or higher.

Spark writes blob v2 columns as `BINARY`. On reads, the same columns are exposed as
descriptor structs. See [Blob v2 Reads](../../config.md#blob-v2-reads).

=== "SQL"
```sql
CREATE TABLE documents (
id INT NOT NULL,
content BINARY
) USING lance
TBLPROPERTIES (
'content.lance.encoding' = 'blob',
'file_format_version' = '2.2'
);
```

=== "Python"
```python
df.writeTo("documents") \
.tableProperty("content.lance.encoding", "blob") \
.tableProperty("file_format_version", "2.2") \
.createOrReplace()
```

## Large String Columns

Lance supports large string columns for storing very large text data. By default, Arrow uses `Utf8` (VarChar) type with 32-bit offsets, which limits total string data to 2GB per batch. For columns containing very large strings (e.g., document content, base64-encoded data), you can use `LargeUtf8` (LargeVarChar) with 64-bit offsets.
Expand Down
70 changes: 70 additions & 0 deletions integration-tests/test_lance_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def _lance_storage_options(spark):
}


def _sql_binary_literal(data: bytes) -> str:
return "X'" + data.hex().upper() + "'"


def _table_location(spark, table_name):
rows = (
spark.sql(f"DESCRIBE EXTENDED {table_name}")
Expand Down Expand Up @@ -705,6 +709,72 @@ def test_compression_metadata_reaches_lance_file(self, spark):
assert b"lance-encoding:compression" not in (id_meta or {})


class TestDDLBlobV2:
def test_blob_v2_table_reads_content_as_descriptor(self, spark):
spark.sql("""
CREATE TABLE default.test_blob_v2 (
id INT NOT NULL,
content BINARY
) USING lance
TBLPROPERTIES (
'content.lance.encoding' = 'blob',
'file_format_version' = '2.2'
)
""")

first_content = b"SQL insert content 1"
second_content = b"SQL insert content 2"

spark.sql(
f"INSERT INTO default.test_blob_v2 VALUES (1, {_sql_binary_literal(first_content)})"
)
spark.sql(
f"INSERT INTO default.test_blob_v2 VALUES (2, {_sql_binary_literal(second_content)})"
)

describe_rows = spark.sql("DESCRIBE default.test_blob_v2").collect()
content_field = next(row for row in describe_rows if row.col_name == "content")
content_type = content_field.data_type.lower()

assert "struct" in content_type
assert "kind" in content_type
assert "blob_uri" in content_type

rows = spark.sql("""
SELECT id, content.size, content.kind, content.blob_id, content.blob_uri
FROM default.test_blob_v2
ORDER BY id
""").collect()

assert len(rows) == 2

assert rows[0].id == 1
assert rows[0].size == len(first_content)
assert rows[0].kind == 0

assert rows[1].id == 2
assert rows[1].size == len(second_content)
assert rows[1].kind == 0

def test_blob_v2_insert_rejects_non_binary_content(self, spark):
spark.sql("""
CREATE TABLE default.test_blob_v2_bad_insert (
id INT NOT NULL,
content BINARY
) USING lance
TBLPROPERTIES (
'content.lance.encoding' = 'blob',
'file_format_version' = '2.2'
)
""")

with pytest.raises(Exception, match="got string"):
spark.sql("""
INSERT INTO default.test_blob_v2_bad_insert
VALUES (1, 'not-binary')
""")


class TestDDLIndex:
"""Test DDL index operations: CREATE INDEX (BTree, FTS)."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,9 @@ public Table createTable(
// Build the table ID for credential vending
List<String> tableIdList = buildTableId(actualIdent);

StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
StructType processedSchema =
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);

// Create dataset using namespace - WriteDatasetBuilder handles declareTable internally
// and properly leverages namespace client for credential vending
Expand All @@ -613,7 +615,6 @@ public Table createTable(
.mode(WriteParams.WriteMode.CREATE)
.enableStableRowIds(catalogConfig.isEnableStableRowIds(properties))
.storageOptions(catalogConfig.getStorageOptions());
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
if (fileFormatVersion != null) {
writeBuilder.dataStorageVersion(fileFormatVersion);
}
Expand Down Expand Up @@ -672,12 +673,13 @@ private Table createTableAtPath(
throws TableAlreadyExistsException {
String datasetUri = getDatasetUri(ident);

StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
StructType processedSchema =
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);
LanceSparkReadOptions readOptions =
createReadOptions(
datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name);

String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
Map<String, String> tableProperties = copyUserTableProperties(properties);
try {
WriteDatasetBuilder writeBuilder =
Expand Down Expand Up @@ -895,7 +897,9 @@ public StagedTable stageCreate(

Identifier actualIdent = transformIdentifierForApi(ident);
List<String> tableIdList = buildTableId(actualIdent);
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
StructType processedSchema =
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);

DeclareTableRequest declareRequest = new DeclareTableRequest();
tableIdList.forEach(declareRequest::addIdItem);
Expand Down Expand Up @@ -925,7 +929,6 @@ public StagedTable stageCreate(
managedVersioning);
StagedCommit stagedCommit = StagedCommit.forNewTable(arrowSchema, location, commitOptions);
stagedCommit.setShardingSpec(shardingSpec);
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
return createStagedDataset(
readOptions,
processedSchema,
Expand All @@ -946,7 +949,9 @@ private StagedTable stageCreateAtPath(
Map<String, String> properties,
ShardingSpec shardingSpec) {
String datasetUri = getDatasetUri(ident);
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
StructType processedSchema =
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);

LanceSparkReadOptions readOptions =
createReadOptions(
Expand All @@ -958,7 +963,6 @@ private StagedTable stageCreateAtPath(
catalogConfig.getStorageOptions(), catalogConfig.isEnableStableRowIds(properties));
StagedCommit stagedCommit = StagedCommit.forNewTable(arrowSchema, datasetUri, commitOptions);
stagedCommit.setShardingSpec(shardingSpec);
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
return createStagedDataset(
readOptions,
processedSchema,
Expand Down Expand Up @@ -986,7 +990,9 @@ public StagedTable stageReplace(

ResolvedTable resolved = resolveIdentifier(ident);
DescribeTableResponse describeResponse = resolved.describeResponse;
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
StructType processedSchema =
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);
Map<String, String> initialStorageOptions = describeResponse.getStorageOptions();
boolean managedVersioning = Boolean.TRUE.equals(describeResponse.getManagedVersioning());

Expand All @@ -1004,7 +1010,6 @@ public StagedTable stageReplace(
StagedCommit stagedCommit = StagedCommit.forExistingTable(ds, arrowSchema, commitOptions);
stagedCommit.setShardingSpec(shardingSpec);
// Use specified file format version, or fall back to existing table's version
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
if (fileFormatVersion == null) {
fileFormatVersion = ds.getLanceFileFormatVersion();
}
Expand All @@ -1029,7 +1034,9 @@ private StagedTable stageReplaceAtPath(
ShardingSpec shardingSpec)
throws NoSuchTableException {
String datasetUri = getDatasetUri(ident);
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
StructType processedSchema =
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);

LanceSparkReadOptions readOptions =
createReadOptions(
Expand All @@ -1049,7 +1056,6 @@ private StagedTable stageReplaceAtPath(
StagedCommit stagedCommit = StagedCommit.forExistingTable(ds, arrowSchema, commitOptions);
stagedCommit.setShardingSpec(shardingSpec);
// Use specified file format version, or fall back to existing table's version
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
if (fileFormatVersion == null) {
fileFormatVersion = ds.getLanceFileFormatVersion();
}
Expand Down Expand Up @@ -1086,7 +1092,9 @@ public StagedTable stageCreateOrReplace(

Identifier actualIdent = transformIdentifierForApi(ident);
List<String> tableIdList = buildTableId(actualIdent);
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
StructType processedSchema =
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);

boolean exists = tableExists(ident);
String location;
Expand Down Expand Up @@ -1120,7 +1128,6 @@ public StagedTable stageCreateOrReplace(

Schema arrowSchema = LanceArrowUtils.toArrowSchema(processedSchema, "UTC", true);
// Use specified file format version, or fall back to existing table's version
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
Map<String, String> merged =
LanceRuntime.mergeStorageOptions(catalogConfig.getStorageOptions(), initialStorageOptions);
final StagedCommitOptions commitOptions =
Expand Down Expand Up @@ -1161,7 +1168,9 @@ private StagedTable stageCreateOrReplaceAtPath(
Map<String, String> properties,
ShardingSpec shardingSpec) {
String datasetUri = getDatasetUri(ident);
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
StructType processedSchema =
SchemaConverter.processSchemaWithProperties(schema, properties, fileFormatVersion);

LanceSparkReadOptions readOptions =
createReadOptions(
Expand All @@ -1174,7 +1183,6 @@ private StagedTable stageCreateOrReplaceAtPath(
catalogConfig.getStorageOptions(), catalogConfig.isEnableStableRowIds(properties));
StagedCommit stagedCommit;
// Use specified file format version, or fall back to existing table's version
String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
if (exists) {
Dataset ds = Utils.openDatasetBuilder(readOptions).build();
stagedCommit = StagedCommit.forExistingTable(ds, arrowSchema, commitOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.lance.spark.utils.BlobSourceContext;
import org.lance.spark.utils.BlobUtils;
import org.lance.spark.write.AddColumnsBackfillWrite;
import org.lance.spark.write.LanceWriteSchemaValidator;
import org.lance.spark.write.SparkWrite;
import org.lance.spark.write.StagedCommit;
import org.lance.spark.write.UpdateColumnsBackfillWrite;
Expand Down Expand Up @@ -60,6 +61,14 @@ public class LanceDataset
ImmutableSet.of(
TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.TRUNCATE);

// Blob v2 is read as descriptor structs, but written as BINARY from sparkSchema.
private static final Set<TableCapability> CAPABILITIES_WITH_BLOB_V2 =
ImmutableSet.of(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
TableCapability.TRUNCATE,
TableCapability.ACCEPT_ANY_SCHEMA);

public static final MetadataColumn FRAGMENT_ID_COLUMN =
new MetadataColumn() {
@Override
Expand Down Expand Up @@ -303,7 +312,7 @@ public String name() {

@Override
public StructType schema() {
return sparkSchema;
return BlobUtils.applyBlobV2DescriptorSchema(sparkSchema);
}

@Override
Expand All @@ -318,11 +327,14 @@ public Map<String, String> properties() {

@Override
public Set<TableCapability> capabilities() {
return CAPABILITIES;
return BlobUtils.hasBlobV2Fields(sparkSchema) ? CAPABILITIES_WITH_BLOB_V2 : CAPABILITIES;
}

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) {
if (capabilities().contains(TableCapability.ACCEPT_ANY_SCHEMA)) {
LanceWriteSchemaValidator.validate(sparkSchema, logicalWriteInfo.schema());
}
// Merge write-time options with the base options from read options
CaseInsensitiveStringMap sparkWriteOptions = logicalWriteInfo.options();
Map<String, BlobSourceContext> blobSourceContexts = decodeBlobSourceContexts(sparkWriteOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.lance.schema.LanceSchema;
import org.lance.spark.LanceSparkReadOptions;
import org.lance.spark.sharding.SparkLanceShardingUtils;
import org.lance.spark.utils.BlobUtils;
import org.lance.spark.utils.Optional;
import org.lance.spark.utils.Utils;

Expand Down Expand Up @@ -117,8 +118,8 @@ public LanceScanBuilder(
String namespaceImpl,
java.util.Map<String, String> namespaceProperties,
ShardingSpec shardingSpec) {
this.fullSchema = schema;
this.schema = schema;
this.fullSchema = BlobUtils.applyBlobV2DescriptorSchema(schema);
this.schema = this.fullSchema;
this.readOptions = readOptions;
this.initialStorageOptions = initialStorageOptions;
this.namespaceImpl = namespaceImpl;
Expand Down
Loading
Loading