Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
34ea689
chore: add script to regenerate golden files for plan stability tests
andygrove Jan 16, 2026
6d43d52
docs: update contributor guide to reference golden files script
andygrove Jan 16, 2026
ee2ffec
fix: ensure native code is built before installing
andygrove Jan 16, 2026
c2c8ba0
feat: add experimental native columnar to row conversion
andygrove Jan 19, 2026
49a5b20
cargo fmt
andygrove Jan 19, 2026
e558073
cargo clippy
andygrove Jan 19, 2026
a44066f
docs
andygrove Jan 19, 2026
fd58cba
update benchmark [skip ci]
andygrove Jan 19, 2026
bac9164
fix: use correct element sizes in native columnar to row for array/map
andygrove Jan 19, 2026
3ca5553
test: add fuzz test with nested types to native C2R suite
andygrove Jan 19, 2026
7f2e64d
test: add deeply nested type tests to native C2R suite
andygrove Jan 19, 2026
7afc4ba
test: add fuzz test with generateNestedSchema for native C2R
andygrove Jan 20, 2026
adc13a6
format
andygrove Jan 20, 2026
56df742
fix: handle LargeList and improve error handling in native C2R
andygrove Jan 20, 2026
461c625
fix
andygrove Jan 20, 2026
8b8741c
fix: add Dictionary-encoded array support to native C2R
andygrove Jan 20, 2026
b8ed2e7
format
andygrove Jan 20, 2026
330dbb2
clippy [skip ci]
andygrove Jan 20, 2026
8231a75
test: add benchmark comparing JVM and native columnar to row conversion
andygrove Jan 20, 2026
f2cc61c
perf: optimize native C2R by eliminating Vec allocations for strings
andygrove Jan 20, 2026
3ebcaca
perf: add fixed-width fast path for native C2R
andygrove Jan 20, 2026
ed72c29
test: add fixed-width-only benchmark and refactor C2R benchmark
andygrove Jan 20, 2026
17d83d5
perf: optimize complex types in native C2R by eliminating intermediat…
andygrove Jan 20, 2026
5f26a81
perf: add bulk copy optimization for primitive arrays in native C2R
andygrove Jan 20, 2026
e5b2c61
perf: add pre-downcast optimization for native C2R general path
andygrove Jan 20, 2026
7743138
fix: correct array element bulk copy for Date32, Timestamp, Boolean
andygrove Jan 20, 2026
9c66ef6
perf: Velox-style optimization for array/map C2R (40-52% faster)
andygrove Jan 20, 2026
64c5212
perf: inline type dispatch for struct fields in native C2R
andygrove Jan 20, 2026
04c49fb
perf: pre-downcast struct fields for native C2R
andygrove Jan 20, 2026
47d4c50
perf: optimize general path for mixed fixed/variable-length columns
andygrove Jan 20, 2026
081b3ed
revert
andygrove Jan 20, 2026
f696595
upmerge
andygrove Jan 20, 2026
92e1abb
revert doc format change
andygrove Jan 20, 2026
e735434
fix: address clippy warnings and remove dead code in native C2R
andygrove Jan 20, 2026
ab074bd
Remove #[inline] hint from bulk_copy_range
andygrove Jan 20, 2026
a4d5eeb
enable native c2r by default
andygrove Jan 20, 2026
01b5dd0
fix
andygrove Jan 20, 2026
691fb4c
fix
andygrove Jan 21, 2026
5687f79
fix
andygrove Jan 21, 2026
1dc720b
Fix dictionary type mismatch in columnar_to_row conversion
andygrove Jan 21, 2026
5c7da07
Merge remote-tracking branch 'origin/dev/regenerate-golden-files-scri…
andygrove Jan 21, 2026
2581ea5
Add doExecuteBroadcast support to CometNativeColumnarToRowExec
andygrove Jan 21, 2026
90cc9ba
update golden files
andygrove Jan 21, 2026
d46b56a
Add NullVector/NullArray support for native columnar-to-row conversion
andygrove Jan 22, 2026
537d62e
Fix clippy warnings for Rust 1.93
andygrove Jan 22, 2026
90d06d5
Merge branch 'fix-clippy-rust-1.93' into native-c2r-enabled
andygrove Jan 22, 2026
46733bf
Fix dictionary-encoded decimal handling in native columnar-to-row con…
andygrove Jan 22, 2026
26cfa2b
Handle NullArray in native columnar-to-row conversion
andygrove Jan 22, 2026
67383ad
Add FixedSizeBinary support for native columnar-to-row conversion
andygrove Jan 23, 2026
9e60ca5
Fix dictionary-encoded decimal cast to use schema type
andygrove Jan 23, 2026
61b03b7
Disable native C2R when query contains native_comet scan
andygrove Jan 24, 2026
4c29171
format
andygrove Jan 24, 2026
4e53cc8
upmerge
andygrove Jan 24, 2026
ad1d2cc
update golden files
andygrove Jan 24, 2026
f64c6f4
fix: Handle Int32/Int64 to Decimal128 conversion in native C2R
andygrove Jan 24, 2026
2a7bfdc
reimplement native_comet fallback and revert nodeName changes
andygrove Jan 24, 2026
3f8991b
update golden files
andygrove Jan 24, 2026
5ad33cc
fix: Fallback to JVM C2R for scans using mutable buffers
andygrove Jan 24, 2026
3c41493
enable
andygrove Jan 24, 2026
990b1f0
update golden files
andygrove Jan 24, 2026
aabc6d1
disable by default, enable in comet tests
andygrove Jan 24, 2026
3cdb03c
Merge apache/main into native-c2r-comet-tests
andygrove Jan 25, 2026
95327ac
refactor: Reduce duplicate code in columnar_to_row.rs
andygrove Jan 25, 2026
d3417b6
disable in stability suite
andygrove Jan 25, 2026
993c7c6
update golden files
andygrove Jan 25, 2026
ee05d03
update golden files
andygrove Jan 25, 2026
44581c2
lint
andygrove Jan 25, 2026
a585707
fix: Close ColumnarBatch after native columnar to row conversion
andygrove Jan 25, 2026
fa54060
fix: Remove redundant closure (clippy warning)
andygrove Jan 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.nio.channels.Channels
import scala.jdk.CollectionConverters._

import org.apache.arrow.c.CDataDictionaryProvider
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, NullVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.arrow.vector.ipc.ArrowStreamWriter
Expand Down Expand Up @@ -282,7 +282,7 @@ object Utils extends CometTypeShim {
_: BigIntVector | _: Float4Vector | _: Float8Vector | _: VarCharVector |
_: DecimalVector | _: DateDayVector | _: TimeStampMicroTZVector | _: VarBinaryVector |
_: FixedSizeBinaryVector | _: TimeStampMicroVector | _: StructVector | _: ListVector |
_: MapVector) =>
_: MapVector | _: NullVector) =>
v.asInstanceOf[FieldVector]
case _ =>
throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}")
Expand Down
859 changes: 427 additions & 432 deletions native/core/src/execution/columnar_to_row.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ package org.apache.comet.rules
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.{CometBatchScanExec, CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometScanExec, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec

import org.apache.comet.CometConf
import org.apache.comet.parquet.CometParquetScan

// This rule is responsible for eliminating redundant transitions between row-based and
// columnar-based operators for Comet. Currently, three potential redundant transitions are:
Expand Down Expand Up @@ -139,12 +140,39 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
private def createColumnarToRowExec(child: SparkPlan): SparkPlan = {
val schema = child.schema
val useNative = CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.get() &&
CometNativeColumnarToRowExec.supportsSchema(schema)
CometNativeColumnarToRowExec.supportsSchema(schema) &&
!hasScanUsingMutableBuffers(child)

if (useNative) {
CometNativeColumnarToRowExec(child)
} else {
CometColumnarToRowExec(child)
}
}

/**
* Checks if the plan contains a scan that uses mutable buffers. Native C2R is not compatible
* with such scans because the buffers may be modified after C2R reads them.
*
* This includes:
* - CometScanExec with native_comet scan implementation (V1 path) - uses BatchReader
* - CometScanExec with native_iceberg_compat and partition columns - uses
* ConstantColumnReader
* - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses BatchReader
*/
private def hasScanUsingMutableBuffers(op: SparkPlan): Boolean = {
op match {
case c: QueryStageExec => hasScanUsingMutableBuffers(c.plan)
case c: ReusedExchangeExec => hasScanUsingMutableBuffers(c.child)
case _ =>
op.exists {
case scan: CometScanExec =>
scan.scanImpl == CometConf.SCAN_NATIVE_COMET ||
(scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
scan.relation.partitionSchema.nonEmpty)
case scan: CometBatchScanExec => scan.scan.isInstanceOf[CometParquetScan]
case _ => false
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,25 @@

package org.apache.spark.sql.comet

import org.apache.spark.TaskContext
import java.util.UUID
import java.util.concurrent.{Future, TimeoutException, TimeUnit}

import scala.concurrent.Promise
import scala.util.control.NonFatal

import org.apache.spark.{broadcast, SparkException, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}
import org.apache.spark.sql.comet.util.{Utils => CometUtils}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
import org.apache.spark.util.{SparkFatalException, Utils}

import org.apache.comet.{CometConf, NativeColumnarToRowConverter}

Expand Down Expand Up @@ -64,6 +74,116 @@ case class CometNativeColumnarToRowExec(child: SparkPlan)
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"convertTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time in conversion"))

@transient
private lazy val promise = Promise[broadcast.Broadcast[Any]]()

@transient
private val timeout: Long = conf.broadcastTimeout

private val runId: UUID = UUID.randomUUID

private lazy val cometBroadcastExchange = findCometBroadcastExchange(child)

@transient
lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
session,
CometBroadcastExchangeExec.executionContext) {
try {
// Setup a job group here so later it may get cancelled by groupId if necessary.
sparkContext.setJobGroup(
runId.toString,
s"CometNativeColumnarToRow broadcast exchange (runId $runId)",
interruptOnCancel = true)

val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val localSchema = this.schema
val batchSize = CometConf.COMET_BATCH_SIZE.get()
val broadcastColumnar = child.executeBroadcast()
val serializedBatches =
broadcastColumnar.value.asInstanceOf[Array[org.apache.spark.util.io.ChunkedByteBuffer]]

// Use native converter to convert columnar data to rows
val converter = new NativeColumnarToRowConverter(localSchema, batchSize)
try {
val rows = serializedBatches.iterator
.flatMap(CometUtils.decodeBatches(_, this.getClass.getSimpleName))
.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
val result = converter.convert(batch)
// Wrap iterator to close batch after consumption
new Iterator[InternalRow] {
override def hasNext: Boolean = {
val hasMore = result.hasNext
if (!hasMore) {
batch.close()
}
hasMore
}
override def next(): InternalRow = result.next()
}
}

val mode = cometBroadcastExchange.get.mode
val relation = mode.transform(rows, Some(numOutputRows.value))
val broadcasted = sparkContext.broadcastInternal(relation, serializedOnly = true)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
promise.trySuccess(broadcasted)
broadcasted
} finally {
converter.close()
}
} catch {
// SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we throw
// SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult
// will catch this exception and re-throw the wrapped fatal throwable.
case oe: OutOfMemoryError =>
val ex = new SparkFatalException(oe)
promise.tryFailure(ex)
throw ex
case e if !NonFatal(e) =>
val ex = new SparkFatalException(e)
promise.tryFailure(ex)
throw ex
case e: Throwable =>
promise.tryFailure(e)
throw e
}
}
}

override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
if (cometBroadcastExchange.isEmpty) {
throw new SparkException(
"CometNativeColumnarToRowExec only supports doExecuteBroadcast when child contains a " +
"CometBroadcastExchange, but got " + child)
}

try {
relationFuture.get(timeout, TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]]
} catch {
case ex: TimeoutException =>
logError(s"Could not execute broadcast in $timeout secs.", ex)
if (!relationFuture.isDone) {
sparkContext.cancelJobGroup(runId.toString)
relationFuture.cancel(true)
}
throw QueryExecutionErrors.executeBroadcastTimeoutError(timeout, Some(ex))
}
}

private def findCometBroadcastExchange(op: SparkPlan): Option[CometBroadcastExchangeExec] = {
op match {
case b: CometBroadcastExchangeExec => Some(b)
case b: BroadcastQueryStageExec => findCometBroadcastExchange(b.plan)
case b: ReusedExchangeExec => findCometBroadcastExchange(b.child)
case _ => op.children.collectFirst(Function.unlift(findCometBroadcastExchange))
}
}

override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
Expand Down Expand Up @@ -91,7 +211,17 @@ case class CometNativeColumnarToRowExec(child: SparkPlan)
val result = converter.convert(batch)
convertTime += System.nanoTime() - startTime

result
// Wrap iterator to close batch after consumption
new Iterator[InternalRow] {
override def hasNext: Boolean = {
val hasMore = result.hasNext
if (!hasMore) {
batch.close()
}
hasMore
}
override def next(): InternalRow = result.next()
}
}
}
}
Expand Down
10 changes: 3 additions & 7 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime, Literal, TruncDate, TruncTimestamp}
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec}
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.comet.{CometNativeColumnarToRowExec, CometProjectExec}
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1020,11 +1020,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
val query = sql(s"select cast(id as string) from $table")
val (_, cometPlan) = checkSparkAnswerAndOperator(query)
val project = cometPlan
.asInstanceOf[WholeStageCodegenExec]
.child
.asInstanceOf[CometColumnarToRowExec]
.child
.asInstanceOf[InputAdapter]
.asInstanceOf[CometNativeColumnarToRowExec]
.child
.asInstanceOf[CometProjectExec]
val id = project.expressions.head
Expand Down
31 changes: 22 additions & 9 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, He
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateMode, BloomFilterAggregate}
import org.apache.spark.sql.comet._
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SparkPlan, SQLExecution, UnionExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
Expand Down Expand Up @@ -864,9 +864,11 @@ class CometExecSuite extends CometTestBase {
checkSparkAnswerAndOperator(df)

// Before AQE: one CometBroadcastExchange, no CometColumnarToRow
var columnarToRowExec = stripAQEPlan(df.queryExecution.executedPlan).collect {
case s: CometColumnarToRowExec => s
}
var columnarToRowExec: Seq[SparkPlan] =
stripAQEPlan(df.queryExecution.executedPlan).collect {
case s: CometColumnarToRowExec => s
case s: CometNativeColumnarToRowExec => s
}
assert(columnarToRowExec.isEmpty)

// Disable CometExecRule after the initial plan is generated. The CometSortMergeJoin and
Expand All @@ -880,14 +882,25 @@ class CometExecSuite extends CometTestBase {
// After AQE: CometBroadcastExchange has to be converted to rows to conform to Spark
// BroadcastHashJoin.
val plan = stripAQEPlan(df.queryExecution.executedPlan)
columnarToRowExec = plan.collect { case s: CometColumnarToRowExec =>
s
columnarToRowExec = plan.collect {
case s: CometColumnarToRowExec => s
case s: CometNativeColumnarToRowExec => s
}
assert(columnarToRowExec.length == 1)

// This ColumnarToRowExec should be the immediate child of BroadcastHashJoinExec
val parent = plan.find(_.children.contains(columnarToRowExec.head))
assert(parent.get.isInstanceOf[BroadcastHashJoinExec])
// This ColumnarToRowExec should be a descendant of BroadcastHashJoinExec (possibly
// wrapped by InputAdapter for codegen).
val broadcastJoins = plan.collect { case b: BroadcastHashJoinExec => b }
assert(broadcastJoins.nonEmpty, s"Expected BroadcastHashJoinExec in plan:\n$plan")
val hasC2RDescendant = broadcastJoins.exists { join =>
join.find {
case _: CometColumnarToRowExec | _: CometNativeColumnarToRowExec => true
case _ => false
}.isDefined
}
assert(
hasC2RDescendant,
"BroadcastHashJoinExec should have a columnar-to-row descendant")

// There should be a CometBroadcastExchangeExec under CometColumnarToRowExec
val broadcastQueryStage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ abstract class CometTestBase
conf.set(CometConf.COMET_ONHEAP_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.key, "true")
conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ trait CometPlanChecker {
case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
_: CometIcebergNativeScanExec =>
case _: CometSinkPlaceHolder | _: CometScanWrapper =>
case _: CometColumnarToRowExec =>
case _: CometColumnarToRowExec | _: CometNativeColumnarToRowExec =>
case _: CometSparkToColumnarExec =>
case _: CometExec | _: CometShuffleExchangeExec =>
case _: CometBroadcastExchangeExec =>
Expand Down
Loading