Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion docs/src/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,4 +499,19 @@ Lance Spark maintains index and metadata caches to minimize redundant I/O. Cache
| `LANCE_INDEX_CACHE_SIZE` | 6GB | Index cache size in bytes. |
| `LANCE_METADATA_CACHE_SIZE`| 1GB | Metadata cache size in bytes. |

For details on how caching works and tuning recommendations, see [Performance Tuning - Caching](performance.md#caching).
For details on how caching works and tuning recommendations, see [Performance Tuning - Caching](performance.md#caching).

## Blob v2 Reads

Lance datasets that contain a blob v2 column expose that column to Spark as the native 5-field descriptor struct: `struct<kind:short, position:long, size:long, blob_id:long, blob_uri:string>`. Querying the descriptor never fetches the blob bytes, so `SELECT payload.size` and `SELECT payload.blob_uri` are cheap.

```sql
-- Query metadata only (no byte fetch):
SELECT id, payload.size, payload.kind FROM lance.ns.tbl;
```

A column is treated as blob v2 when the Arrow field carries `ARROW:extension:name = lance.blob.v2`, matching lance-core's blob v2 extension type.

Filter pushdown for SQL `WHERE` is disabled on blob v2 tables; Spark evaluates predicates after the scan. Zonemap-based fragment pruning still runs.

The connector does not materialize blob bytes on read; queries against descriptor fields fetch metadata only.
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public String name() {

@Override
public StructType schema() {
return sparkSchema;
return BlobUtils.applyBlobV2DescriptorSchema(sparkSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.lance.schema.LanceSchema;
import org.lance.spark.LanceSparkReadOptions;
import org.lance.spark.sharding.SparkLanceShardingUtils;
import org.lance.spark.utils.BlobUtils;
import org.lance.spark.utils.Optional;
import org.lance.spark.utils.Utils;

Expand Down Expand Up @@ -117,8 +118,8 @@ public LanceScanBuilder(
String namespaceImpl,
java.util.Map<String, String> namespaceProperties,
ShardingSpec shardingSpec) {
this.fullSchema = schema;
this.schema = schema;
this.fullSchema = BlobUtils.applyBlobV2DescriptorSchema(schema);
this.schema = this.fullSchema;
this.readOptions = readOptions;
this.initialStorageOptions = initialStorageOptions;
this.namespaceImpl = namespaceImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,36 @@
*/
package org.lance.spark.utils;

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.List;
import java.util.Map;

public class BlobUtils {

public static final String LANCE_ENCODING_BLOB_KEY = "lance-encoding:blob";
public static final String LANCE_ENCODING_BLOB_VALUE = "true";

public static final String ARROW_EXTENSION_NAME_KEY = "ARROW:extension:name";
public static final String ARROW_EXTENSION_BLOB_V2 = "lance.blob.v2";

/**
* Spark struct type for a Lance blob v2 descriptor: {@code kind, position, size, blob_id,
* blob_uri}.
*/
public static final StructType BLOB_DESCRIPTOR_STRUCT =
new StructType()
.add("kind", DataTypes.ShortType)
.add("position", DataTypes.LongType)
.add("size", DataTypes.LongType)
.add("blob_id", DataTypes.LongType)
.add("blob_uri", DataTypes.StringType);

/**
* Check if a Spark field is a blob field based on its metadata.
*
Expand Down Expand Up @@ -66,4 +89,83 @@ public static boolean isBlobArrowField(org.apache.arrow.vector.types.pojo.Field
String value = metadata.get(LANCE_ENCODING_BLOB_KEY);
return LANCE_ENCODING_BLOB_VALUE.equalsIgnoreCase(value);
}

/** Returns true when a Spark field carries the lance-core blob v2 Arrow extension. */
public static boolean isBlobV2SparkField(StructField field) {
return field != null && isBlobV2SparkMetadata(field.metadata());
}

public static boolean isBlobV2SparkMetadata(Metadata metadata) {
if (metadata == null) {
return false;
}

return metadata.contains(ARROW_EXTENSION_NAME_KEY)
&& ARROW_EXTENSION_BLOB_V2.equals(metadata.getString(ARROW_EXTENSION_NAME_KEY));
}

/**
* Arrow-side counterpart of {@link #isBlobV2SparkField} used inside the columnar batch scanner.
*/
public static boolean isBlobV2ArrowField(Field field) {
if (field == null) {
return false;
}

Map<String, String> metadata = field.getMetadata();
if (metadata != null
&& ARROW_EXTENSION_BLOB_V2.equals(metadata.get(ARROW_EXTENSION_NAME_KEY))) {
return true;
}

// lance-core scan batches expose the unloaded descriptor struct (no extension metadata).
return isBlobV2DescriptorArrowField(field);
}

private static boolean isBlobV2DescriptorArrowField(Field field) {
if (!(field.getType() instanceof ArrowType.Struct)) {
return false;
}
List<Field> children = field.getChildren();
if (children == null || children.size() != BLOB_DESCRIPTOR_STRUCT.fields().length) {
return false;
}
StructField[] expected = BLOB_DESCRIPTOR_STRUCT.fields();
for (int i = 0; i < expected.length; i++) {
if (!expected[i].name().equals(children.get(i).getName())) {
return false;
}
}
return true;
}

/** Returns true if any field in {@code schema} is a blob v2 column. */
public static boolean hasBlobV2Fields(StructType schema) {
for (StructField field : schema.fields()) {
if (isBlobV2SparkField(field)) {
return true;
}
}

return false;
}

/** Rewrites blob v2 columns to the descriptor struct returned by Lance. */
public static StructType applyBlobV2DescriptorSchema(StructType schema) {
StructField[] fields = new StructField[schema.fields().length];
boolean changed = false;
for (int i = 0; i < schema.fields().length; i++) {
StructField field = schema.fields()[i];
if (!isBlobV2SparkField(field)) {
fields[i] = field;
continue;
}

fields[i] =
new StructField(field.name(), BLOB_DESCRIPTOR_STRUCT, field.nullable(), field.metadata());
changed = true;
}

return changed ? new StructType(fields) : schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.util.LanceArrowUtils;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
Expand Down Expand Up @@ -71,7 +72,7 @@ public LanceArrowColumnVector(ValueVector vector) {
}

public LanceArrowColumnVector(ValueVector vector, boolean closeVectorOnClose) {
super(LanceArrowUtils.fromArrowField(vector.getField()));
super(computeDataType(vector));
this.closeVectorOnClose = closeVectorOnClose;

if (vector instanceof UInt1Vector) {
Expand All @@ -86,6 +87,8 @@ public LanceArrowColumnVector(ValueVector vector, boolean closeVectorOnClose) {
fixedSizeBinaryAccessor = new FixedSizeBinaryAccessor((FixedSizeBinaryVector) vector);
} else if (vector instanceof FixedSizeListVector) {
fixedSizeListAccessor = new FixedSizeListAccessor((FixedSizeListVector) vector);
} else if (vector instanceof StructVector && BlobUtils.isBlobV2ArrowField(vector.getField())) {
structAccessor = new LanceStructAccessor((StructVector) vector);
} else if (vector instanceof StructVector && BlobUtils.isBlobArrowField(vector.getField())) {
blobStructAccessor = new BlobStructAccessor((StructVector) vector);
} else if (vector instanceof StructVector) {
Expand Down Expand Up @@ -522,4 +525,11 @@ public ColumnVector getChild(int ordinal) {
public BlobStructAccessor getBlobStructAccessor() {
return blobStructAccessor;
}

private static DataType computeDataType(ValueVector vector) {
if (vector instanceof StructVector && BlobUtils.isBlobV2ArrowField(vector.getField())) {
return BlobUtils.BLOB_DESCRIPTOR_STRUCT;
}
return LanceArrowUtils.fromArrowField(vector.getField());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ object LanceArrowUtils {
val ARROW_FIXED_SIZE_LIST_SIZE_KEY = VectorUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY
val ARROW_FLOAT16_KEY = Float16Utils.ARROW_FLOAT16_KEY
val ENCODING_BLOB = BlobUtils.LANCE_ENCODING_BLOB_KEY
val ARROW_EXT_NAME_KEY = BlobUtils.ARROW_EXTENSION_NAME_KEY
val BLOB_V2_EXT_NAME = BlobUtils.ARROW_EXTENSION_BLOB_V2
val ARROW_LARGE_VAR_CHAR_KEY = LargeVarCharUtils.ARROW_LARGE_VAR_CHAR_KEY
val ARROW_DATE_MILLISECOND_KEY = DateMilliUtils.ARROW_DATE_MILLISECOND_KEY

Expand Down Expand Up @@ -82,6 +84,8 @@ object LanceArrowUtils {
val elementType = fromArrowField(elementField)
val containsNull = elementField.isNullable
ArrayType(elementType, containsNull)
case _: ArrowType.Struct if isBlobField(field) =>
BinaryType
case _: ArrowType.Struct =>
// Always recurse through LanceArrowUtils for struct children so special cases
// like Date(MILLISECOND), FixedSizeBinary, etc. are applied in nested schemas too.
Expand Down Expand Up @@ -529,8 +533,10 @@ object LanceArrowUtils {

private def isBlobField(field: Field): Boolean = {
val metadata = field.getMetadata
metadata != null && metadata.containsKey(ENCODING_BLOB) &&
"true".equalsIgnoreCase(metadata.get(ENCODING_BLOB))
if (metadata == null) return false
(metadata.containsKey(ENCODING_BLOB) &&
"true".equalsIgnoreCase(metadata.get(ENCODING_BLOB))) ||
BLOB_V2_EXT_NAME.equals(metadata.get(ARROW_EXT_NAME_KEY))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark.utils;

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class BlobUtilsTest {

@Test
public void testBlobV2FieldWithArrowExtensionName() {
assertTrue(BlobUtils.isBlobV2SparkField(blobV2Field()));
}

@Test
public void testBlobV2FieldNullSafety() {
assertFalse(BlobUtils.isBlobV2SparkField(null));
}

@Test
public void testV1Field() {
assertTrue(BlobUtils.isBlobSparkField(blobV1Field()));
}

@Test
public void testBlobV2ArrowFieldRejectsUnrelated() {
Field f =
new Field(
"payload",
new FieldType(true, ArrowType.Binary.INSTANCE, null, Collections.emptyMap()),
null);
assertFalse(BlobUtils.isBlobV2ArrowField(f));
assertFalse(BlobUtils.isBlobV2ArrowField(null));
}

@Test
public void testHasBlobV2FieldsInSchema() {
StructType schema =
new StructType(
new StructField[] {
field("id", DataTypes.IntegerType), blobV2Field(),
});
assertTrue(BlobUtils.hasBlobV2Fields(schema));
}

@Test
public void testDescriptorStructShape() {
StructType s = BlobUtils.BLOB_DESCRIPTOR_STRUCT;
assertEquals(5, s.fields().length);
assertEquals(DataTypes.ShortType, s.apply("kind").dataType());
assertEquals(DataTypes.LongType, s.apply("position").dataType());
assertEquals(DataTypes.LongType, s.apply("size").dataType());
assertEquals(DataTypes.LongType, s.apply("blob_id").dataType());
assertEquals(DataTypes.StringType, s.apply("blob_uri").dataType());
}

@Test
public void testBlobV2DescriptorSchemaRewrite() {
StructType schema =
new StructType(
new StructField[] {
field("id", DataTypes.IntegerType), blobV2Field(),
});
StructType rewritten = BlobUtils.applyBlobV2DescriptorSchema(schema);
assertEquals(DataTypes.IntegerType, rewritten.apply("id").dataType());
assertEquals(BlobUtils.BLOB_DESCRIPTOR_STRUCT, rewritten.apply("payload").dataType());
}

@Test
public void testV1FieldsPreservedInRewrite() {
StructType schema =
new StructType(
new StructField[] {
field("id", DataTypes.IntegerType), blobV1Field(),
});
StructType rewritten = BlobUtils.applyBlobV2DescriptorSchema(schema);
assertEquals(DataTypes.BinaryType, rewritten.apply("payload").dataType());
}

@Test
public void testUnloadedDescriptorStructRecognizedAsBlobV2() {
Field f =
new Field(
"payload",
new FieldType(true, ArrowType.Struct.INSTANCE, null, Collections.emptyMap()),
Arrays.asList(
intChild("kind"),
intChild("position"),
intChild("size"),
intChild("blob_id"),
utf8Child("blob_uri")));
assertTrue(BlobUtils.isBlobV2ArrowField(f));
assertFalse(BlobUtils.isBlobArrowField(f));
}

private static Field intChild(String name) {
return new Field(
name,
new FieldType(true, new ArrowType.Int(64, false), null, Collections.emptyMap()),
null);
}

private static Field utf8Child(String name) {
return new Field(
name, new FieldType(true, ArrowType.Utf8.INSTANCE, null, Collections.emptyMap()), null);
}

private static StructField field(String name, org.apache.spark.sql.types.DataType dt) {
return new StructField(name, dt, true, Metadata.empty());
}

private static StructField blobV2Field() {
Metadata md =
new MetadataBuilder()
.putString(BlobUtils.ARROW_EXTENSION_NAME_KEY, BlobUtils.ARROW_EXTENSION_BLOB_V2)
.build();
return new StructField("payload", DataTypes.BinaryType, true, md);
}

private static StructField blobV1Field() {
Metadata md =
new MetadataBuilder()
.putString(BlobUtils.LANCE_ENCODING_BLOB_KEY, BlobUtils.LANCE_ENCODING_BLOB_VALUE)
.build();
return new StructField("payload", DataTypes.BinaryType, true, md);
}
}
Loading
Loading