diff --git a/docs/src/config.md b/docs/src/config.md index 302cf761e..21e3b15ac 100644 --- a/docs/src/config.md +++ b/docs/src/config.md @@ -499,4 +499,19 @@ Lance Spark maintains index and metadata caches to minimize redundant I/O. Cache | `LANCE_INDEX_CACHE_SIZE` | 6GB | Index cache size in bytes. | | `LANCE_METADATA_CACHE_SIZE`| 1GB | Metadata cache size in bytes. | -For details on how caching works and tuning recommendations, see [Performance Tuning - Caching](performance.md#caching). \ No newline at end of file +For details on how caching works and tuning recommendations, see [Performance Tuning - Caching](performance.md#caching). + +## Blob v2 Reads + +Lance datasets that contain a blob v2 column expose that column to Spark as the native 5-field descriptor struct: `struct`. Querying the descriptor never fetches the blob bytes, so `SELECT payload.size` and `SELECT payload.blob_uri` are cheap. + +```sql +-- Query metadata only (no byte fetch): +SELECT id, payload.size, payload.kind FROM lance.ns.tbl; +``` + +A column is treated as blob v2 when the Arrow field carries `ARROW:extension:name = lance.blob.v2`, matching lance-core's blob v2 extension type. + +Filter pushdown for SQL `WHERE` is disabled on blob v2 tables; Spark evaluates predicates after the scan. Zonemap-based fragment pruning still runs. + +The connector does not materialize blob bytes on read; queries against descriptor fields fetch metadata only. diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java index c3e817dc9..730a95160 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java @@ -303,7 +303,7 @@ public String name() { @Override public StructType schema() { - return sparkSchema; + return BlobUtils.applyBlobV2DescriptorSchema(sparkSchema); } @Override diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java index 4df430989..834e9dd0b 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java @@ -26,6 +26,7 @@ import org.lance.schema.LanceSchema; import org.lance.spark.LanceSparkReadOptions; import org.lance.spark.sharding.SparkLanceShardingUtils; +import org.lance.spark.utils.BlobUtils; import org.lance.spark.utils.Optional; import org.lance.spark.utils.Utils; @@ -117,8 +118,8 @@ public LanceScanBuilder( String namespaceImpl, java.util.Map namespaceProperties, ShardingSpec shardingSpec) { - this.fullSchema = schema; - this.schema = schema; + this.fullSchema = BlobUtils.applyBlobV2DescriptorSchema(schema); + this.schema = this.fullSchema; this.readOptions = readOptions; this.initialStorageOptions = initialStorageOptions; this.namespaceImpl = namespaceImpl; diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/BlobUtils.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/BlobUtils.java index 2a94557a3..af387dd5b 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/BlobUtils.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/BlobUtils.java @@ -13,13 +13,36 @@ */ package org.lance.spark.utils; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; public class BlobUtils { public static final String LANCE_ENCODING_BLOB_KEY = "lance-encoding:blob"; public static final String LANCE_ENCODING_BLOB_VALUE = "true"; + public static final String ARROW_EXTENSION_NAME_KEY = "ARROW:extension:name"; + public static final String ARROW_EXTENSION_BLOB_V2 = "lance.blob.v2"; + + /** + * Spark struct type for a Lance blob v2 descriptor: {@code kind, position, size, blob_id, + * blob_uri}. + */ + public static final StructType BLOB_DESCRIPTOR_STRUCT = + new StructType() + .add("kind", DataTypes.ShortType) + .add("position", DataTypes.LongType) + .add("size", DataTypes.LongType) + .add("blob_id", DataTypes.LongType) + .add("blob_uri", DataTypes.StringType); + /** * Check if a Spark field is a blob field based on its metadata. * @@ -66,4 +89,83 @@ public static boolean isBlobArrowField(org.apache.arrow.vector.types.pojo.Field String value = metadata.get(LANCE_ENCODING_BLOB_KEY); return LANCE_ENCODING_BLOB_VALUE.equalsIgnoreCase(value); } + + /** Returns true when a Spark field carries the lance-core blob v2 Arrow extension. */ + public static boolean isBlobV2SparkField(StructField field) { + return field != null && isBlobV2SparkMetadata(field.metadata()); + } + + public static boolean isBlobV2SparkMetadata(Metadata metadata) { + if (metadata == null) { + return false; + } + + return metadata.contains(ARROW_EXTENSION_NAME_KEY) + && ARROW_EXTENSION_BLOB_V2.equals(metadata.getString(ARROW_EXTENSION_NAME_KEY)); + } + + /** + * Arrow-side counterpart of {@link #isBlobV2SparkField} used inside the columnar batch scanner. + */ + public static boolean isBlobV2ArrowField(Field field) { + if (field == null) { + return false; + } + + Map metadata = field.getMetadata(); + if (metadata != null + && ARROW_EXTENSION_BLOB_V2.equals(metadata.get(ARROW_EXTENSION_NAME_KEY))) { + return true; + } + + // lance-core scan batches expose the unloaded descriptor struct (no extension metadata). + return isBlobV2DescriptorArrowField(field); + } + + private static boolean isBlobV2DescriptorArrowField(Field field) { + if (!(field.getType() instanceof ArrowType.Struct)) { + return false; + } + List children = field.getChildren(); + if (children == null || children.size() != BLOB_DESCRIPTOR_STRUCT.fields().length) { + return false; + } + StructField[] expected = BLOB_DESCRIPTOR_STRUCT.fields(); + for (int i = 0; i < expected.length; i++) { + if (!expected[i].name().equals(children.get(i).getName())) { + return false; + } + } + return true; + } + + /** Returns true if any field in {@code schema} is a blob v2 column. */ + public static boolean hasBlobV2Fields(StructType schema) { + for (StructField field : schema.fields()) { + if (isBlobV2SparkField(field)) { + return true; + } + } + + return false; + } + + /** Rewrites blob v2 columns to the descriptor struct returned by Lance. */ + public static StructType applyBlobV2DescriptorSchema(StructType schema) { + StructField[] fields = new StructField[schema.fields().length]; + boolean changed = false; + for (int i = 0; i < schema.fields().length; i++) { + StructField field = schema.fields()[i]; + if (!isBlobV2SparkField(field)) { + fields[i] = field; + continue; + } + + fields[i] = + new StructField(field.name(), BLOB_DESCRIPTOR_STRUCT, field.nullable(), field.metadata()); + changed = true; + } + + return changed ? new StructType(fields) : schema; + } } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceArrowColumnVector.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceArrowColumnVector.java index 8d30f9783..2414666cb 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceArrowColumnVector.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceArrowColumnVector.java @@ -38,6 +38,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.StructVector; +import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.util.LanceArrowUtils; import org.apache.spark.sql.vectorized.ArrowColumnVector; @@ -71,7 +72,7 @@ public LanceArrowColumnVector(ValueVector vector) { } public LanceArrowColumnVector(ValueVector vector, boolean closeVectorOnClose) { - super(LanceArrowUtils.fromArrowField(vector.getField())); + super(computeDataType(vector)); this.closeVectorOnClose = closeVectorOnClose; if (vector instanceof UInt1Vector) { @@ -86,6 +87,8 @@ public LanceArrowColumnVector(ValueVector vector, boolean closeVectorOnClose) { fixedSizeBinaryAccessor = new FixedSizeBinaryAccessor((FixedSizeBinaryVector) vector); } else if (vector instanceof FixedSizeListVector) { fixedSizeListAccessor = new FixedSizeListAccessor((FixedSizeListVector) vector); + } else if (vector instanceof StructVector && BlobUtils.isBlobV2ArrowField(vector.getField())) { + structAccessor = new LanceStructAccessor((StructVector) vector); } else if (vector instanceof StructVector && BlobUtils.isBlobArrowField(vector.getField())) { blobStructAccessor = new BlobStructAccessor((StructVector) vector); } else if (vector instanceof StructVector) { @@ -522,4 +525,11 @@ public ColumnVector getChild(int ordinal) { public BlobStructAccessor getBlobStructAccessor() { return blobStructAccessor; } + + private static DataType computeDataType(ValueVector vector) { + if (vector instanceof StructVector && BlobUtils.isBlobV2ArrowField(vector.getField())) { + return BlobUtils.BLOB_DESCRIPTOR_STRUCT; + } + return LanceArrowUtils.fromArrowField(vector.getField()); + } } diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala index 3a07a3106..f5e40705d 100644 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala @@ -44,6 +44,8 @@ object LanceArrowUtils { val ARROW_FIXED_SIZE_LIST_SIZE_KEY = VectorUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY val ARROW_FLOAT16_KEY = Float16Utils.ARROW_FLOAT16_KEY val ENCODING_BLOB = BlobUtils.LANCE_ENCODING_BLOB_KEY + val ARROW_EXT_NAME_KEY = BlobUtils.ARROW_EXTENSION_NAME_KEY + val BLOB_V2_EXT_NAME = BlobUtils.ARROW_EXTENSION_BLOB_V2 val ARROW_LARGE_VAR_CHAR_KEY = LargeVarCharUtils.ARROW_LARGE_VAR_CHAR_KEY val ARROW_DATE_MILLISECOND_KEY = DateMilliUtils.ARROW_DATE_MILLISECOND_KEY @@ -82,6 +84,8 @@ object LanceArrowUtils { val elementType = fromArrowField(elementField) val containsNull = elementField.isNullable ArrayType(elementType, containsNull) + case _: ArrowType.Struct if isBlobField(field) => + BinaryType case _: ArrowType.Struct => // Always recurse through LanceArrowUtils for struct children so special cases // like Date(MILLISECOND), FixedSizeBinary, etc. are applied in nested schemas too. @@ -529,8 +533,10 @@ object LanceArrowUtils { private def isBlobField(field: Field): Boolean = { val metadata = field.getMetadata - metadata != null && metadata.containsKey(ENCODING_BLOB) && - "true".equalsIgnoreCase(metadata.get(ENCODING_BLOB)) + if (metadata == null) return false + (metadata.containsKey(ENCODING_BLOB) && + "true".equalsIgnoreCase(metadata.get(ENCODING_BLOB))) || + BLOB_V2_EXT_NAME.equals(metadata.get(ARROW_EXT_NAME_KEY)) } } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/utils/BlobUtilsTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/utils/BlobUtilsTest.java new file mode 100644 index 000000000..24edf3735 --- /dev/null +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/utils/BlobUtilsTest.java @@ -0,0 +1,152 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.utils; + +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlobUtilsTest { + + @Test + public void testBlobV2FieldWithArrowExtensionName() { + assertTrue(BlobUtils.isBlobV2SparkField(blobV2Field())); + } + + @Test + public void testBlobV2FieldNullSafety() { + assertFalse(BlobUtils.isBlobV2SparkField(null)); + } + + @Test + public void testV1Field() { + assertTrue(BlobUtils.isBlobSparkField(blobV1Field())); + } + + @Test + public void testBlobV2ArrowFieldRejectsUnrelated() { + Field f = + new Field( + "payload", + new FieldType(true, ArrowType.Binary.INSTANCE, null, Collections.emptyMap()), + null); + assertFalse(BlobUtils.isBlobV2ArrowField(f)); + assertFalse(BlobUtils.isBlobV2ArrowField(null)); + } + + @Test + public void testHasBlobV2FieldsInSchema() { + StructType schema = + new StructType( + new StructField[] { + field("id", DataTypes.IntegerType), blobV2Field(), + }); + assertTrue(BlobUtils.hasBlobV2Fields(schema)); + } + + @Test + public void testDescriptorStructShape() { + StructType s = BlobUtils.BLOB_DESCRIPTOR_STRUCT; + assertEquals(5, s.fields().length); + assertEquals(DataTypes.ShortType, s.apply("kind").dataType()); + assertEquals(DataTypes.LongType, s.apply("position").dataType()); + assertEquals(DataTypes.LongType, s.apply("size").dataType()); + assertEquals(DataTypes.LongType, s.apply("blob_id").dataType()); + assertEquals(DataTypes.StringType, s.apply("blob_uri").dataType()); + } + + @Test + public void testBlobV2DescriptorSchemaRewrite() { + StructType schema = + new StructType( + new StructField[] { + field("id", DataTypes.IntegerType), blobV2Field(), + }); + StructType rewritten = BlobUtils.applyBlobV2DescriptorSchema(schema); + assertEquals(DataTypes.IntegerType, rewritten.apply("id").dataType()); + assertEquals(BlobUtils.BLOB_DESCRIPTOR_STRUCT, rewritten.apply("payload").dataType()); + } + + @Test + public void testV1FieldsPreservedInRewrite() { + StructType schema = + new StructType( + new StructField[] { + field("id", DataTypes.IntegerType), blobV1Field(), + }); + StructType rewritten = BlobUtils.applyBlobV2DescriptorSchema(schema); + assertEquals(DataTypes.BinaryType, rewritten.apply("payload").dataType()); + } + + @Test + public void testUnloadedDescriptorStructRecognizedAsBlobV2() { + Field f = + new Field( + "payload", + new FieldType(true, ArrowType.Struct.INSTANCE, null, Collections.emptyMap()), + Arrays.asList( + intChild("kind"), + intChild("position"), + intChild("size"), + intChild("blob_id"), + utf8Child("blob_uri"))); + assertTrue(BlobUtils.isBlobV2ArrowField(f)); + assertFalse(BlobUtils.isBlobArrowField(f)); + } + + private static Field intChild(String name) { + return new Field( + name, + new FieldType(true, new ArrowType.Int(64, false), null, Collections.emptyMap()), + null); + } + + private static Field utf8Child(String name) { + return new Field( + name, new FieldType(true, ArrowType.Utf8.INSTANCE, null, Collections.emptyMap()), null); + } + + private static StructField field(String name, org.apache.spark.sql.types.DataType dt) { + return new StructField(name, dt, true, Metadata.empty()); + } + + private static StructField blobV2Field() { + Metadata md = + new MetadataBuilder() + .putString(BlobUtils.ARROW_EXTENSION_NAME_KEY, BlobUtils.ARROW_EXTENSION_BLOB_V2) + .build(); + return new StructField("payload", DataTypes.BinaryType, true, md); + } + + private static StructField blobV1Field() { + Metadata md = + new MetadataBuilder() + .putString(BlobUtils.LANCE_ENCODING_BLOB_KEY, BlobUtils.LANCE_ENCODING_BLOB_VALUE) + .build(); + return new StructField("payload", DataTypes.BinaryType, true, md); + } +} diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/vectorized/BlobV2DescriptorColumnVectorTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/vectorized/BlobV2DescriptorColumnVectorTest.java new file mode 100644 index 000000000..17451ec66 --- /dev/null +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/vectorized/BlobV2DescriptorColumnVectorTest.java @@ -0,0 +1,121 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.vectorized; + +import org.lance.spark.utils.BlobUtils; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.UInt1Vector; +import org.apache.arrow.vector.UInt4Vector; +import org.apache.arrow.vector.UInt8Vector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class BlobV2DescriptorColumnVectorTest { + + @Test + public void testDescriptorChildrenRead() { + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + StructVector struct = buildDescriptor(allocator, (short) 3, 12345L, 678L, 99L, "s3://b/o"); + LanceArrowColumnVector wrapper = new LanceArrowColumnVector(struct)) { + + assertEquals(1, struct.getValueCount()); + assertNotNull(wrapper.getChild(0)); + assertEquals((short) 3, wrapper.getChild(0).getShort(0)); + assertEquals(12345L, wrapper.getChild(1).getLong(0)); + assertEquals(678L, wrapper.getChild(2).getLong(0)); + assertEquals(99L, wrapper.getChild(3).getLong(0)); + assertEquals("s3://b/o", wrapper.getChild(4).getUTF8String(0).toString()); + } + } + + @Test + public void testDescriptorMaxUnsignedValues() { + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + StructVector struct = + buildDescriptor(allocator, (short) 255, 1L << 40, 1L << 30, 4294967295L, "x"); + LanceArrowColumnVector wrapper = new LanceArrowColumnVector(struct)) { + + assertEquals((short) 255, wrapper.getChild(0).getShort(0)); + assertEquals(1L << 40, wrapper.getChild(1).getLong(0)); + assertEquals(1L << 30, wrapper.getChild(2).getLong(0)); + assertEquals(4294967295L, wrapper.getChild(3).getLong(0)); + } + } + + private static StructVector buildDescriptor( + BufferAllocator allocator, + short kind, + long position, + long size, + long blobId, + String blobUri) { + Map structMd = new HashMap<>(); + structMd.put(BlobUtils.ARROW_EXTENSION_NAME_KEY, BlobUtils.ARROW_EXTENSION_BLOB_V2); + Field structField = + new Field( + "payload", + new FieldType(true, ArrowType.Struct.INSTANCE, null, structMd), + Arrays.asList( + intChild("kind", 8), + intChild("position", 64), + intChild("size", 64), + intChild("blob_id", 32), + utf8Child("blob_uri"))); + StructVector struct = (StructVector) structField.createVector(allocator); + struct.allocateNew(); + + UInt1Vector kindV = (UInt1Vector) struct.getChild("kind"); + UInt8Vector posV = (UInt8Vector) struct.getChild("position"); + UInt8Vector sizeV = (UInt8Vector) struct.getChild("size"); + UInt4Vector idV = (UInt4Vector) struct.getChild("blob_id"); + VarCharVector uriV = (VarCharVector) struct.getChild("blob_uri"); + + struct.setIndexDefined(0); + kindV.setSafe(0, kind); + posV.setSafe(0, position); + sizeV.setSafe(0, size); + idV.setSafe(0, (int) blobId); + uriV.setSafe(0, blobUri.getBytes(StandardCharsets.UTF_8)); + + struct.setValueCount(1); + return struct; + } + + private static Field intChild(String name, int bitWidth) { + return new Field( + name, + new FieldType(true, new ArrowType.Int(bitWidth, false), null, Collections.emptyMap()), + null); + } + + private static Field utf8Child(String name) { + return new Field( + name, new FieldType(true, ArrowType.Utf8.INSTANCE, null, Collections.emptyMap()), null); + } +}