From 7983eee14837bddae6bd0cfbf8161c2ad30e4a47 Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Fri, 5 Jun 2026 10:34:28 -0700 Subject: [PATCH] Add vectorized version of collect() --- cmd/super/ztests/aggmem.yaml | 2 + runtime/vam/expr/agg/collect.go | 66 +++++++++++------------------- runtime/vam/op/aggregate/scalar.go | 9 ++-- 3 files changed, 30 insertions(+), 47 deletions(-) diff --git a/cmd/super/ztests/aggmem.yaml b/cmd/super/ztests/aggmem.yaml index b636501241..7ba9e5f089 100644 --- a/cmd/super/ztests/aggmem.yaml +++ b/cmd/super/ztests/aggmem.yaml @@ -2,6 +2,8 @@ script: | super -aggmem 1B -s -c 'collect(this)' a.sup ! super -aggmem 0 a.sup +runtime: sam + inputs: - name: a.sup data: | diff --git a/runtime/vam/expr/agg/collect.go b/runtime/vam/expr/agg/collect.go index 9cd6cd04cf..097913de9d 100644 --- a/runtime/vam/expr/agg/collect.go +++ b/runtime/vam/expr/agg/collect.go @@ -2,63 +2,45 @@ package agg import ( "github.com/brimdata/super" - samagg "github.com/brimdata/super/runtime/sam/expr/agg" - "github.com/brimdata/super/sbuf" - "github.com/brimdata/super/scode" "github.com/brimdata/super/vector" + "github.com/brimdata/super/vector/vbuild" ) type collect struct { - samcollect samagg.Collect + builder *vbuild.DynamicBuilder } func (c *collect) Consume(vec vector.Any) { - typ := vec.Type() - var b scode.Builder - for i := range vec.Len() { - b.Truncate() - vec.Serialize(&b, i) - val := super.NewValue(typ, b.Bytes().Body()) - if !val.IsNone() { - c.samcollect.Consume(super.NewValue(typ, b.Bytes().Body())) - } + if vec.Len() == 0 || vec.Kind() == vector.KindNull { + return + } + if c.builder == nil { + c.builder = vbuild.NewDynamicBuilder() } + c.builder.Write(vec) } func (c *collect) Result(sctx *super.Context) vector.Any { - val := c.samcollect.Result(sctx) - return sbuf.Dematerialize(sctx, sbuf.NewArray([]super.Value{val})) + if c.builder == nil { + return vector.NewNull(1) + } + vec := c.builder.Build() + if dynamic, ok := vec.(*vector.Dynamic); ok { + vec = vector.NewUnionFromDynamic(sctx, dynamic) + } + atyp := sctx.LookupTypeArray(vec.Type()) + return vector.NewArray(atyp, []uint32{0, vec.Len()}, vec) } func (c *collect) ConsumeAsPartial(partial vector.Any) { - if partial.Kind() == vector.KindNull { - return - } - n := partial.Len() - var index []uint32 - if view, ok := partial.(*vector.View); ok { - partial, index = view.Any, view.Index - } - array, ok := partial.(*vector.Array) - if !ok { - panic("collection: partial not an array type") - } - var b scode.Builder - typ := array.Values.Type() - for i := range n { - idx := i - if index != nil { - idx = index[i] - } - for k := array.Offsets[idx]; k < array.Offsets[idx+1]; k++ { - b.Truncate() - array.Values.Serialize(&b, k) - c.samcollect.Consume(super.NewValue(typ, b.Bytes().Body())) - } - } + inner := vector.PushView(partial).(*vector.Array).Values + c.Consume(vector.Deunion(inner)) } func (c *collect) ResultAsPartial(sctx *super.Context) vector.Any { - val := c.samcollect.ResultAsPartial(sctx) - return sbuf.Dematerialize(sctx, sbuf.NewArray([]super.Value{val})) + if c.builder == nil { + atyp := sctx.LookupTypeArray(super.TypeNone) + return vector.NewArray(atyp, []uint32{0, 0}, vector.NewNone(0)) + } + return c.Result(sctx) } diff --git a/runtime/vam/op/aggregate/scalar.go b/runtime/vam/op/aggregate/scalar.go index 716cfe42e1..f54485a7b6 100644 --- a/runtime/vam/op/aggregate/scalar.go +++ b/runtime/vam/op/aggregate/scalar.go @@ -6,7 +6,6 @@ import ( "github.com/brimdata/super/runtime/vam/expr" "github.com/brimdata/super/runtime/vam/expr/agg" "github.com/brimdata/super/vector" - "github.com/brimdata/super/vector/vbuild" "github.com/brimdata/super/vector/vio" ) @@ -96,13 +95,13 @@ func newFuncs(aggs []*expr.Aggregator) []agg.Func { func (s *scalarAggregate) result() vector.Any { var vecs []vector.Any for _, f := range s.funcs { - b := vbuild.NewDynamicBuilder() + var vec vector.Any if s.partialsOut { - b.Write(f.ResultAsPartial(s.sctx)) + vec = f.ResultAsPartial(s.sctx) } else { - b.Write(f.Result(s.sctx)) + vec = f.Result(s.sctx) } - vecs = append(vecs, b.Build()) + vecs = append(vecs, vec) } s.funcs = nil return s.builder.New(vecs)