From dd665c3b195f529ecfbb790b6423ca99e1ac5a3c Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Thu, 30 Apr 2026 13:12:38 -0400 Subject: [PATCH 1/4] fix(arrow/cdata): make nativeCRecordBatchReader deterministic --- arrow/cdata/cdata.go | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go index 352dfd9a..5e1f44aa 100644 --- a/arrow/cdata/cdata.go +++ b/arrow/cdata/cdata.go @@ -44,15 +44,16 @@ import ( "errors" "fmt" "io" - "runtime" "strconv" "strings" + "sync/atomic" "syscall" "unsafe" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/bitutil" + "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/memory" ) @@ -906,15 +907,6 @@ func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) error rdr.stream = C.get_stream() C.ArrowArrayStreamMove(stream, rdr.stream) rdr.arr = C.get_arr() - runtime.SetFinalizer(rdr, func(r *nativeCRecordBatchReader) { - if r.cur != nil { - r.cur.Release() - } - C.ArrowArrayStreamRelease(r.stream) - C.ArrowArrayRelease(r.arr) - C.free(unsafe.Pointer(r.stream)) - C.free(unsafe.Pointer(r.arr)) - }) var sc CArrowSchema errno := C.stream_get_schema(rdr.stream, &sc) @@ -940,12 +932,28 @@ type nativeCRecordBatchReader struct { cur arrow.RecordBatch err error + + refCount atomic.Int64 } -// No need to implement retain and release here as we used runtime.SetFinalizer when constructing -// the reader to free up the ArrowArrayStream memory when the garbage collector cleans it up. -func (n *nativeCRecordBatchReader) Retain() {} -func (n *nativeCRecordBatchReader) Release() {} +func (n *nativeCRecordBatchReader) Retain() { + n.refCount.Add(1) +} + +func (n *nativeCRecordBatchReader) Release() { + debug.Assert(n.refCount.Load() > 0, "too many releases") + + if n.refCount.Add(-1) == 0 { + if n.cur != nil { + n.cur.Release() + } + + C.ArrowArrayStreamRelease(n.stream) + C.ArrowArrayRelease(n.arr) + C.free(unsafe.Pointer(n.stream)) + C.free(unsafe.Pointer(n.arr)) + } +} func (n *nativeCRecordBatchReader) Err() error { return n.err } func (n *nativeCRecordBatchReader) RecordBatch() arrow.RecordBatch { return n.cur } From 8618907649f2ad72c86cb752fce159008ac21bb0 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Thu, 30 Apr 2026 13:18:19 -0400 Subject: [PATCH 2/4] cleanup on error --- arrow/cdata/cdata.go | 1 + arrow/cdata/interface.go | 1 + 2 files changed, 2 insertions(+) diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go index 5e1f44aa..0fd6121f 100644 --- a/arrow/cdata/cdata.go +++ b/arrow/cdata/cdata.go @@ -904,6 +904,7 @@ func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, er } func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) error { + rdr.refCount.Store(1) rdr.stream = C.get_stream() C.ArrowArrayStreamMove(stream, rdr.stream) rdr.arr = C.get_arr() diff --git a/arrow/cdata/interface.go b/arrow/cdata/interface.go index f776d7f7..fcd9ae73 100644 --- a/arrow/cdata/interface.go +++ b/arrow/cdata/interface.go @@ -189,6 +189,7 @@ func ImportCArrayStream(stream *CArrowArrayStream, schema *arrow.Schema) arrio.R func ImportCRecordReader(stream *CArrowArrayStream, schema *arrow.Schema) (arrio.Reader, error) { out := &nativeCRecordBatchReader{schema: schema} if err := initReader(out, stream); err != nil { + out.Release() return nil, err } return out, nil From e9034e6a8fb4c7bebcde3876dc7a42085a00feb2 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Thu, 30 Apr 2026 13:33:53 -0400 Subject: [PATCH 3/4] use runtime.AddCleanup as fallback --- arrow/cdata/cdata.go | 13 +++++++++++++ arrow/cdata/interface.go | 1 + 2 files changed, 14 insertions(+) diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go index 0fd6121f..261d1925 100644 --- a/arrow/cdata/cdata.go +++ b/arrow/cdata/cdata.go @@ -44,6 +44,7 @@ import ( "errors" "fmt" "io" + "runtime" "strconv" "strings" "sync/atomic" @@ -909,6 +910,15 @@ func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) error C.ArrowArrayStreamMove(stream, rdr.stream) rdr.arr = C.get_arr() + rdr.cleanUps[0] = runtime.AddCleanup(rdr, func(s *CArrowArrayStream) { + C.ArrowArrayStreamRelease(s) + C.free(unsafe.Pointer(s)) + }, rdr.stream) + rdr.cleanUps[1] = runtime.AddCleanup(rdr, func(a *CArrowArray) { + C.ArrowArrayRelease(a) + C.free(unsafe.Pointer(a)) + }, rdr.arr) + var sc CArrowSchema errno := C.stream_get_schema(rdr.stream, &sc) if errno != 0 { @@ -935,6 +945,7 @@ type nativeCRecordBatchReader struct { err error refCount atomic.Int64 + cleanUps [2]runtime.Cleanup } func (n *nativeCRecordBatchReader) Retain() { @@ -945,6 +956,8 @@ func (n *nativeCRecordBatchReader) Release() { debug.Assert(n.refCount.Load() > 0, "too many releases") if n.refCount.Add(-1) == 0 { + n.cleanUps[0].Stop() + n.cleanUps[1].Stop() if n.cur != nil { n.cur.Release() } diff --git a/arrow/cdata/interface.go b/arrow/cdata/interface.go index fcd9ae73..a3690662 100644 --- a/arrow/cdata/interface.go +++ b/arrow/cdata/interface.go @@ -192,6 +192,7 @@ func ImportCRecordReader(stream *CArrowArrayStream, schema *arrow.Schema) (arrio out.Release() return nil, err } + return out, nil } From 429f95f6bf9d176ed2ad715cdb73921b5b0e7d0e Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 1 May 2026 12:14:36 -0400 Subject: [PATCH 4/4] avoid an extra atomic op --- arrow/cdata/cdata.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go index 261d1925..68c6f7e3 100644 --- a/arrow/cdata/cdata.go +++ b/arrow/cdata/cdata.go @@ -953,9 +953,10 @@ func (n *nativeCRecordBatchReader) Retain() { } func (n *nativeCRecordBatchReader) Release() { - debug.Assert(n.refCount.Load() > 0, "too many releases") + rc := n.refCount.Add(-1) + debug.Assert(rc >= 0, "too many releases") - if n.refCount.Add(-1) == 0 { + if rc == 0 { n.cleanUps[0].Stop() n.cleanUps[1].Stop() if n.cur != nil {