From 98ba0ba3bedcb2a078921e90bbe7ecb979431a90 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Fri, 5 Jun 2026 22:36:52 -0400 Subject: [PATCH 1/2] use ast.FileScan of stdin instead of ast.DefaultScan When its extInput parameter is true, compiler/semantic.Analyze prepends an ast.DefaultScan to the AST. Change it to prepend an ast.FileScan for standard input instead. To allow that FileScan to access sio.Readers passed to compiler.CompileWithAST, add a new runtime/exec.Environment.Stdin field, set the field in CompileWithAST if the length of the readers paramter is nonzero, and return a scanner for it in Environment.Open if the field is set when opening standard input. --- compiler/package.go | 6 +++++- compiler/semantic/analyzer.go | 7 +++---- fuzz/fuzz.go | 7 ++----- runtime/exec/environment.go | 5 +++++ runtime/sam/expr/filter_test.go | 4 ++-- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/compiler/package.go b/compiler/package.go index 0dfe777f88..db877cb104 100644 --- a/compiler/package.go +++ b/compiler/package.go @@ -62,6 +62,10 @@ func BuildWithBuilder(rctx *runtime.Context, main *dag.Main, env *exec.Environme } func CompileWithAST(rctx *runtime.Context, ast *parser.AST, env *exec.Environment, optimize bool, parallel int, readers []sio.Reader) (*exec.Query, error) { + if len(readers) > 0 { + env = new(*env) + env.Stdin = sio.ConcatReader(readers...) + } main, err := Analyze(rctx, ast, env, len(readers) > 0) if err != nil { return nil, err @@ -72,7 +76,7 @@ func CompileWithAST(rctx *runtime.Context, ast *parser.AST, env *exec.Environmen return nil, err } } - outputs, debugs, meter, err := Build(rctx, main, env, readers) + outputs, debugs, meter, err := Build(rctx, main, env, nil) if err != nil { return nil, err } diff --git a/compiler/semantic/analyzer.go b/compiler/semantic/analyzer.go index e9852a7979..89d5c58543 100644 --- a/compiler/semantic/analyzer.go +++ b/compiler/semantic/analyzer.go @@ -19,13 +19,12 @@ import ( // to DAG form, resolving syntax ambiguities, and performing constant propagation. // After semantic analysis, the DAG is ready for either optimization or compilation. func Analyze(ctx context.Context, p *parser.AST, env *exec.Environment, extInput bool) (*dag.Main, error) { - t := newTranslator(ctx, reporter{p.Files()}, env) - astseq := p.Parsed() if extInput { - astseq.Prepend(&ast.DefaultScan{Kind: "DefaultScan"}) + p.PrependFileScan([]string{"stdio:stdin"}) } + t := newTranslator(ctx, reporter{p.Files()}, env) t.checker.pushErrs() - seq, _ := t.seq(astseq, super.TypeNull) + seq, _ := t.seq(p.Parsed(), super.TypeNull) errs := t.checker.popErrs() errs.flushErrs(t.reporter) if err := t.Error(); err != nil { diff --git a/fuzz/fuzz.go b/fuzz/fuzz.go index ce885f1e2e..63f255efab 100644 --- a/fuzz/fuzz.go +++ b/fuzz/fuzz.go @@ -19,7 +19,6 @@ import ( "github.com/brimdata/super/csup" "github.com/brimdata/super/pkg/field" "github.com/brimdata/super/pkg/nano" - "github.com/brimdata/super/pkg/storage/mock" "github.com/brimdata/super/runtime" "github.com/brimdata/super/runtime/exec" "github.com/brimdata/super/sbuf" @@ -30,7 +29,6 @@ import ( "github.com/brimdata/super/sup" "github.com/stretchr/testify/require" "github.com/x448/float16" - "go.uber.org/mock/gomock" ) func ReadBSUP(bs []byte) ([]super.Value, error) { @@ -91,8 +89,7 @@ func RunQueryCSUP(t testing.TB, buf *bytes.Buffer, querySource string) []super.V func RunQuery(t testing.TB, sctx *super.Context, readers []sio.Reader, querySource string, useDemand func(demandIn demand.Demand)) []super.Value { // Compile query - engine := mock.NewMockEngine(gomock.NewController(t)) - comp := compiler.NewCompiler(engine) + comp := compiler.NewCompiler(nil) ast, err := parser.ParseText(querySource) if err != nil { t.Skipf("%v", err) @@ -105,7 +102,7 @@ func RunQuery(t testing.TB, sctx *super.Context, readers []sio.Reader, querySour // Infer demand // TODO This is a hack and should be replaced by a cleaner interface in CompileQuery. - env := exec.NewEnvironment(engine, nil) + env := exec.NewEnvironment(nil, nil) main, err := semantic.Analyze(t.Context(), ast, env, true) if err != nil { t.Skipf("%v", err) diff --git a/runtime/exec/environment.go b/runtime/exec/environment.go index a3f495ed04..3da60bb54e 100644 --- a/runtime/exec/environment.go +++ b/runtime/exec/environment.go @@ -14,6 +14,7 @@ import ( "github.com/brimdata/super/pkg/field" "github.com/brimdata/super/pkg/storage" "github.com/brimdata/super/sbuf" + "github.com/brimdata/super/sio" "github.com/brimdata/super/sio/anyio" "github.com/brimdata/super/sio/csupio" "github.com/brimdata/super/sio/fjsonio" @@ -56,6 +57,7 @@ type Environment struct { ReaderOpts anyio.ReaderOpts Runtime Runtime SampleSize int + Stdin sio.Reader } func NewEnvironment(engine storage.Engine, d *db.Root) *Environment { @@ -114,6 +116,9 @@ func (e *Environment) Open(ctx context.Context, sctx *super.Context, path, forma fields = proj.Paths() } } + if path == "stdio:stdin" && e.Stdin != nil { + return sbuf.NewScanner(ctx, e.Stdin, pushdown) + } file, err := anyio.Open(ctx, sctx, e.engine, path, e.readerOpts(fields, format)) if err != nil { return nil, fmt.Errorf("%s: %w", path, err) diff --git a/runtime/sam/expr/filter_test.go b/runtime/sam/expr/filter_test.go index 7b478d281e..255c4e0dc5 100644 --- a/runtime/sam/expr/filter_test.go +++ b/runtime/sam/expr/filter_test.go @@ -65,9 +65,9 @@ func runCasesHelper(t *testing.T, record string, cases []testcase, expectBufferF require.NoError(t, err, "filter: %q", c.filter) _, _, builder, err := compiler.BuildWithBuilder(rctx, main, env, nil) require.NoError(t, err, "filter: %q", c.filter) - scan, ok := main.Body[0].(*dag.DefaultScan) + scan, ok := main.Body[0].(*dag.FileScan) require.True(t, ok) - filterMaker := rungen.NewPushdown(builder, scan.Filter) + filterMaker := rungen.NewPushdown(builder, scan.Pushdown.DataFilter.Expr) f, err := filterMaker.DataFilter() assert.NoError(t, err, "filter: %q", c.filter) if f != nil { From b126cb2eba05355fb9c82815f46f47991d719454 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Fri, 5 Jun 2026 22:41:48 -0400 Subject: [PATCH 2/2] remove ast.DefaultScan, dag.DefaultScan, and sem.DefaultScan They're unneeded since ast.DefaultScan is no longer used. --- compiler/ast/op.go | 8 -------- compiler/ast/unpack.go | 1 - compiler/dag/op.go | 7 ------- compiler/dag/unpack.go | 1 - compiler/describe/analyze.go | 2 -- compiler/optimizer/demand.go | 2 +- compiler/optimizer/optimizer.go | 10 ++-------- compiler/rungen/op.go | 16 +--------------- compiler/rungen/vop.go | 6 ------ compiler/semantic/analyzer.go | 2 +- compiler/semantic/checker.go | 2 -- compiler/semantic/compare.go | 3 --- compiler/semantic/dagen.go | 4 ---- compiler/semantic/evaluator.go | 2 -- compiler/semantic/fmt.go | 2 -- compiler/semantic/op.go | 2 -- compiler/semantic/sem/op.go | 8 -------- compiler/sfmt/dag.go | 8 -------- 18 files changed, 5 insertions(+), 81 deletions(-) diff --git a/compiler/ast/op.go b/compiler/ast/op.go index 829aa0d055..17af59472a 100644 --- a/compiler/ast/op.go +++ b/compiler/ast/op.go @@ -239,9 +239,6 @@ type ( Meta *Text `json:"meta"` Loc `json:"loc"` } - DefaultScan struct { - Kind string `json:"kind" unpack:""` - } Delete struct { Kind string `json:"kind" unpack:""` Pool string `json:"pool"` @@ -284,9 +281,6 @@ type FromItem struct { Loc `json:"loc"` } -func (d *DefaultScan) Pos() int { return -1 } -func (d *DefaultScan) End() int { return -1 } - type ArgExpr struct { Kind string `json:"kind" unpack:""` Key string `json:"key"` @@ -374,5 +368,3 @@ func (*UniqOp) opNode() {} func (*UnnestOp) opNode() {} func (*ValuesOp) opNode() {} func (*WhereOp) opNode() {} - -func (*DefaultScan) opNode() {} diff --git a/compiler/ast/unpack.go b/compiler/ast/unpack.go index 6a759ed656..4d119c93ae 100644 --- a/compiler/ast/unpack.go +++ b/compiler/ast/unpack.go @@ -29,7 +29,6 @@ var unpacker = unpack.New( DebugOp{}, DeclsValue{}, Decorated{}, - DefaultScan{}, Delete{}, DoubleQuoteExpr{}, DropOp{}, diff --git a/compiler/dag/op.go b/compiler/dag/op.go index 50af13bca3..f10915684d 100644 --- a/compiler/dag/op.go +++ b/compiler/dag/op.go @@ -12,7 +12,6 @@ import ( "reflect" "slices" - "github.com/brimdata/super/order" "github.com/brimdata/super/pkg/field" "github.com/segmentio/ksuid" ) @@ -251,11 +250,6 @@ type ( Kind string `json:"kind" unpack:""` Meta string `json:"meta"` } - DefaultScan struct { - Kind string `json:"kind" unpack:""` - Filter Expr `json:"filter"` - SortKeys order.SortKeys `json:"sort_keys"` - } DeleterScan struct { Kind string `json:"kind" unpack:""` Pool ksuid.KSUID `json:"pool"` @@ -349,7 +343,6 @@ var CommitMetas = map[string]struct{}{ func (*CommitMetaScan) opNode() {} func (*DBMetaScan) opNode() {} -func (*DefaultScan) opNode() {} func (*DeleterScan) opNode() {} func (*DeleteScan) opNode() {} func (*FileScan) opNode() {} diff --git a/compiler/dag/unpack.go b/compiler/dag/unpack.go index 9efe89055e..3b48472a03 100644 --- a/compiler/dag/unpack.go +++ b/compiler/dag/unpack.go @@ -20,7 +20,6 @@ var unpacker = unpack.New( CountOp{}, CutOp{}, DebugOp{}, - DefaultScan{}, DeleterScan{}, DeleteScan{}, DistinctOp{}, diff --git a/compiler/describe/analyze.go b/compiler/describe/analyze.go index 10a9f98d72..da562f822d 100644 --- a/compiler/describe/analyze.go +++ b/compiler/describe/analyze.go @@ -112,8 +112,6 @@ func describeSources(ctx context.Context, root *db.Root, o dag.Op) ([]Source, er s = append(s, out...) } return s, nil - case *dag.DefaultScan: - return []Source{&Path{Kind: "Path", URI: "stdio://stdin"}}, nil case *dag.NullScan: return []Source{&Null{Kind: "Null"}}, nil case *dag.FileScan: diff --git a/compiler/optimizer/demand.go b/compiler/optimizer/demand.go index 565255fa59..a4b21ecc2d 100644 --- a/compiler/optimizer/demand.go +++ b/compiler/optimizer/demand.go @@ -114,7 +114,7 @@ func demandForSimpleOp(op dag.Op, downstream demand.Demand) demand.Demand { } return d - case *dag.CommitMetaScan, *dag.DefaultScan, *dag.DeleterScan, *dag.DeleteScan, *dag.DBMetaScan: + case *dag.CommitMetaScan, *dag.DeleterScan, *dag.DeleteScan, *dag.DBMetaScan: return demand.None() case *dag.FileScan: if mf := op.Pushdown.MetaFilter; mf != nil { diff --git a/compiler/optimizer/optimizer.go b/compiler/optimizer/optimizer.go index 7e6ddc12e7..5441a6cc33 100644 --- a/compiler/optimizer/optimizer.go +++ b/compiler/optimizer/optimizer.go @@ -281,10 +281,6 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) { } seq = dag.Seq{op, o} } - case *dag.DefaultScan: - o.nent++ - op.Filter = filter - seq = append(dag.Seq{op}, chain...) } return seq, nil }) @@ -363,7 +359,7 @@ func (o *Optimizer) propagateSortKeyOp(op dag.Op, parents []order.SortKeys) ([]o sortKeys = nil } return []order.SortKeys{sortKeys}, nil - case *dag.PoolScan, *dag.ListerScan, *dag.SeqScan, *dag.DefaultScan: + case *dag.PoolScan, *dag.ListerScan, *dag.SeqScan: out, err := o.sortKeysOfSource(op) return []order.SortKeys{out}, err default: @@ -374,8 +370,6 @@ func (o *Optimizer) propagateSortKeyOp(op dag.Op, parents []order.SortKeys) ([]o func (o *Optimizer) sortKeysOfSource(op dag.Op) (order.SortKeys, error) { switch op := op.(type) { - case *dag.DefaultScan: - return op.SortKeys, nil case *dag.FileScan: return nil, nil case *dag.HTTPScan: @@ -845,7 +839,7 @@ func setPushdownUnordered(seq dag.Seq, unordered bool) bool { for i := len(seq) - 1; i >= 0; i-- { switch op := seq[i].(type) { case *dag.AggregateOp, *dag.CombineOp, *dag.DistinctOp, *dag.HashJoinOp, *dag.JoinOp, *dag.SortOp, *dag.TopOp, - *dag.DefaultScan, *dag.HTTPScan, *dag.PoolScan, + *dag.HTTPScan, *dag.PoolScan, *dag.CommitMetaScan, *dag.DBMetaScan, *dag.PoolMetaScan: unordered = true case *dag.FileScan: diff --git a/compiler/rungen/op.go b/compiler/rungen/op.go index 2ceca1555f..f9defb5ef0 100644 --- a/compiler/rungen/op.go +++ b/compiler/rungen/op.go @@ -190,20 +190,6 @@ func (b *Builder) compileLeaf(o dag.Op, parent sbuf.Puller) (sbuf.Puller, error) return meta.NewCommitMetaScanner(b.rctx.Context, b.sctx(), b.env.DB(), v.Pool, v.Commit, v.Meta, pruner) case *dag.DBMetaScan: return meta.NewDBMetaScanner(b.rctx.Context, b.sctx(), b.env.DB(), v.Meta) - case *dag.DefaultScan: - pushdown := b.newPushdown(v.Filter, nil) - if len(b.readers) == 1 { - return sbuf.NewScanner(b.rctx.Context, b.readers[0], pushdown) - } - scanners := make([]sbuf.Scanner, 0, len(b.readers)) - for _, r := range b.readers { - scanner, err := sbuf.NewScanner(b.rctx.Context, r, pushdown) - if err != nil { - return nil, err - } - scanners = append(scanners, scanner) - } - return sbuf.MultiScanner(scanners...), nil case *dag.DeleterScan: pool, err := b.lookupPool(v.Pool) if err != nil { @@ -690,7 +676,7 @@ func isEntry(seq dag.Seq) bool { return false } switch op := seq[0].(type) { - case *dag.ListerScan, *dag.DefaultScan, *dag.FileScan, *dag.HTTPScan, *dag.PoolScan, *dag.DBMetaScan, *dag.PoolMetaScan, *dag.CommitMetaScan, *dag.NullScan: + case *dag.ListerScan, *dag.FileScan, *dag.HTTPScan, *dag.PoolScan, *dag.DBMetaScan, *dag.PoolMetaScan, *dag.CommitMetaScan, *dag.NullScan: return true case *dag.ForkOp: return len(op.Paths) > 0 && !slices.ContainsFunc(op.Paths, func(seq dag.Seq) bool { diff --git a/compiler/rungen/vop.go b/compiler/rungen/vop.go index 1c0f153103..a74ebd4961 100644 --- a/compiler/rungen/vop.go +++ b/compiler/rungen/vop.go @@ -217,12 +217,6 @@ func (b *Builder) compileVamLeaf(o dag.Op, parent vio.Puller) (vio.Puller, error } d := vamop.NewDebug(b.rctx, e, filter, b.debugs, parent) return d, nil - case *dag.DefaultScan: - sbufPuller, err := b.compileLeaf(o, nil) - if err != nil { - return nil, err - } - return sbuf.NewDematerializer(b.sctx(), sbufPuller), nil case *dag.DistinctOp: e, err := b.compileVamExpr(o.Expr) if err != nil { diff --git a/compiler/semantic/analyzer.go b/compiler/semantic/analyzer.go index 89d5c58543..40d9f22331 100644 --- a/compiler/semantic/analyzer.go +++ b/compiler/semantic/analyzer.go @@ -96,7 +96,7 @@ func HasSource(seq sem.Seq) bool { return false } switch op := seq[0].(type) { - case *sem.FileScan, *sem.HTTPScan, *sem.PoolScan, *sem.DBMetaScan, *sem.PoolMetaScan, *sem.CommitMetaScan, *sem.DeleteScan, *sem.NullScan, *sem.DefaultScan: + case *sem.FileScan, *sem.HTTPScan, *sem.PoolScan, *sem.DBMetaScan, *sem.PoolMetaScan, *sem.CommitMetaScan, *sem.DeleteScan, *sem.NullScan: return true case *sem.ForkOp: for _, path := range op.Paths { diff --git a/compiler/semantic/checker.go b/compiler/semantic/checker.go index 3ad5f47b20..e0b84996d1 100644 --- a/compiler/semantic/checker.go +++ b/compiler/semantic/checker.go @@ -56,8 +56,6 @@ func (c *checker) op(typ super.Type, op sem.Op) super.Type { // // Scanners first // - case *sem.DefaultScan: - return c.unknown case *sem.FileScan: if op.Type == nil { return c.unknown diff --git a/compiler/semantic/compare.go b/compiler/semantic/compare.go index 1ebcf0d1b1..827df7a2ac 100644 --- a/compiler/semantic/compare.go +++ b/compiler/semantic/compare.go @@ -37,9 +37,6 @@ func eqOp(aop, bop sem.Op) bool { case *sem.DBMetaScan: b, ok := bop.(*sem.DBMetaScan) return ok && a.Meta == b.Meta - case *sem.DefaultScan: - _, ok := bop.(*sem.DefaultScan) - return ok case *sem.FileScan: b, ok := bop.(*sem.FileScan) return ok && slices.Equal(a.Paths, b.Paths) && a.Format == b.Format diff --git a/compiler/semantic/dagen.go b/compiler/semantic/dagen.go index 2135379ef6..bb9563b6d6 100644 --- a/compiler/semantic/dagen.go +++ b/compiler/semantic/dagen.go @@ -73,10 +73,6 @@ func (d *dagen) op(op sem.Op) dag.Op { Kind: "DBMetaScan", Meta: op.Meta, } - case *sem.DefaultScan: - return &dag.DefaultScan{ - Kind: "DefaultScan", - } case *sem.DeleteScan: return &dag.DeleteScan{ Kind: "DeleteScan", diff --git a/compiler/semantic/evaluator.go b/compiler/semantic/evaluator.go index 4dcb12519e..03a696edf8 100644 --- a/compiler/semantic/evaluator.go +++ b/compiler/semantic/evaluator.go @@ -84,8 +84,6 @@ func (e *evaluator) op(op sem.Op) bool { // // Scanners first // - case *sem.DefaultScan: - return false case *sem.FileScan, *sem.HTTPScan, *sem.PoolScan, diff --git a/compiler/semantic/fmt.go b/compiler/semantic/fmt.go index 1c45cf839a..d635152d52 100644 --- a/compiler/semantic/fmt.go +++ b/compiler/semantic/fmt.go @@ -22,8 +22,6 @@ func clrOp(op sem.Op) { // // Scanners first // - case *sem.DefaultScan: - op.Node = nil case *sem.FileScan: op.Node = nil case *sem.HTTPScan: diff --git a/compiler/semantic/op.go b/compiler/semantic/op.go index b1e6f12543..7a05e7aba9 100644 --- a/compiler/semantic/op.go +++ b/compiler/semantic/op.go @@ -628,8 +628,6 @@ func (t *translator) semOp(o ast.Op, seq sem.Seq, inType super.Type) (sem.Seq, s case *ast.FromOp: seq, typ, _ := t.fromSource(o.Item.Source, o.Item.Args, seq) return seq, typ - case *ast.DefaultScan: - return append(seq, &sem.DefaultScan{Node: o}), t.checker.unknown case *ast.Delete: if len(seq) > 0 { panic("analyzer.SemOp: delete scan cannot have parent in AST") diff --git a/compiler/semantic/sem/op.go b/compiler/semantic/sem/op.go index 471731565a..83e6770e6a 100644 --- a/compiler/semantic/sem/op.go +++ b/compiler/semantic/sem/op.go @@ -36,9 +36,6 @@ type ( ast.Node Meta string } - DefaultScan struct { - ast.Node - } DeleteScan struct { ast.Node ID ksuid.KSUID @@ -80,7 +77,6 @@ type ( func (*CommitMetaScan) opNode() {} func (*DBMetaScan) opNode() {} -func (*DefaultScan) opNode() {} func (*DeleteScan) opNode() {} func (*FileScan) opNode() {} func (*HTTPScan) opNode() {} @@ -303,10 +299,6 @@ func CopyOp(op Op) Op { Node: op.Node, Meta: op.Meta, } - case *DefaultScan: - return &DefaultScan{ - Node: op.Node, - } case *DeleteScan: return &DeleteScan{ Node: op.Node, diff --git a/compiler/sfmt/dag.go b/compiler/sfmt/dag.go index 79bee77a54..84d3fdd16a 100644 --- a/compiler/sfmt/dag.go +++ b/compiler/sfmt/dag.go @@ -298,14 +298,6 @@ func (c *canonDAG) op(p dag.Op) { case *dag.DBMetaScan: c.next() c.write(":%s", p.Meta) - case *dag.DefaultScan: - c.next() - c.write("reader") - if p.Filter != nil { - c.write(" filter (") - c.expr(p.Filter, "") - c.write(")") - } case *dag.FileScan: c.next() c.write("file %s", strings.Join(p.Paths, ","))