Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/outputflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (f *Flags) SetFlagsWithFormat(fs *flag.FlagSet, format string) {

func (f *Flags) SetFormatFlags(fs *flag.FlagSet) {
if f.DefaultFormat == "" {
f.DefaultFormat = "bsup"
f.DefaultFormat = "csup"
}
fs.StringVar(&f.Format, "f", f.DefaultFormat, "format for output data [arrows,bsup,csup,csv,db,json,line,parquet,sup,table,tsv,zeek]")
fs.BoolVar(&f.forceBinary, "B", false, "allow Super Binary to be sent to a terminal output")
Expand Down Expand Up @@ -100,7 +100,7 @@ func (f *Flags) Init() error {
if f.outputFile == "-" {
f.outputFile = ""
}
if f.outputFile == "" && f.split == "" && f.Format == "bsup" && !f.forceBinary &&
if f.outputFile == "" && f.split == "" && f.Format == f.DefaultFormat && !f.forceBinary &&
terminal.IsTerminalFile(os.Stdout) {
f.Format = "sup"
f.SUP.Pretty = 0
Expand Down
1 change: 1 addition & 0 deletions cmd/super/db/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
}

func (c *Command) SetLeafFlags(f *flag.FlagSet) {
c.outputFlags.DefaultFormat = "bsup"
c.outputFlags.SetFlags(f)
c.queryFlags.SetFlags(f)
c.runtimeFlags.SetFlags(f)
Expand Down
4 changes: 2 additions & 2 deletions cmd/super/dev/bsup/ztests/frames.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
script: |
super a.sup > c.bsup
super b.sup >> c.bsup
super -f bsup a.sup > c.bsup
super -f bsup b.sup >> c.bsup
super dev bsup frames -s c.bsup

inputs:
Expand Down
6 changes: 5 additions & 1 deletion csup/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ func (w *Serializer) Push(vec vector.Any) error {
}

func (w *Serializer) finalizeObject() error {
enc := NewDynamicEncoder(w.dynamic.BuildDynamic())
vec := w.dynamic.BuildDynamic()
if vec.Len() == 0 {
return nil
}
enc := NewDynamicEncoder(vec)
root, dataSize, err := enc.Encode()
if err != nil {
return fmt.Errorf("system error: could not encode CSUP metadata: %w", err)
Expand Down
16 changes: 13 additions & 3 deletions csup/ztests/empty-file.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
runtime: sam

script: |
! super -i csup /dev/null
super -i csup /dev/null

outputs:
- name: stdout
data: ""
- name: stderr
data: |
/dev/null: EOF
data: ""

---

script: |
super -f csup -o test.csup -c 'values false | where this'

outputs:
- name: test.csup
data: ""
2 changes: 1 addition & 1 deletion db/ztests/appmeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ script: |
super db load -q -use logs -meta '"original"' babble.sup
super db load -q -use logs -meta '"normalized-v1"' babble.sup
super db load -q -use logs -meta '"normalized-v2"' babble.sup
super db -c "from logs@main:log | meta:=defuse(meta) | grep('normalized.*', meta) | sort date | cut meta" | super -s -
super db -s -c "from logs@main:log | meta:=defuse(meta) | grep('normalized.*', meta) | sort date | cut meta"

inputs:
- name: babble.sup
Expand Down
2 changes: 1 addition & 1 deletion db/ztests/db-version.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ script: |
super db init -q
super db create -q test
mv test/superdb.bsup superdb-orig.bsup
super -o test/superdb.bsup -c 'version:=1' superdb-orig.bsup
super -f bsup -o test/superdb.bsup -c 'version:=1' superdb-orig.bsup
! super db serve

outputs:
Expand Down
2 changes: 1 addition & 1 deletion service/ztests/curl-load-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ inputs:
outputs:
- name: stdout
data: |
{"type":"Error","kind":"invalid operation","error":"format detection error\n\tarrows: schema message length exceeds 1 MiB\n\tbsup: BSUP version mismatch: expected 5, found 0\n\tcsup: auto-detection requires seekable input\n\tcsv: line 1: EOF\n\tjson: invalid character 'T' looking for beginning of value\n\tline: auto-detection not supported\n\tparquet: auto-detection requires seekable input\n\tsup: line 1: syntax error\n\ttsv: line 1: EOF\n\tzeek: line 1: bad types/fields definition in zeek header"}
{"type":"Error","kind":"invalid operation","error":"format detection error\n\tarrows: schema message length exceeds 1 MiB\n\tbsup: BSUP version mismatch: expected 5, found 0\n\tcsup: file size too small\n\tcsv: line 1: EOF\n\tjson: invalid character 'T' looking for beginning of value\n\tline: auto-detection not supported\n\tparquet: invalid header\n\tsup: line 1: syntax error\n\ttsv: line 1: EOF\n\tzeek: line 1: bad types/fields definition in zeek header"}
code 400
{"type":"Error","kind":"invalid operation","error":"unsupported MIME type: unsupported"}
code 400
4 changes: 2 additions & 2 deletions service/ztests/load-garbage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ outputs:
stdio:stdin: format detection error
arrows: schema message length exceeds 1 MiB
bsup: BSUP version mismatch: expected 5, found 0
csup: auto-detection requires seekable input
csup: file size too small
csv: line 1: delimiter ',' not found
json: invalid character 'T' looking for beginning of value
line: auto-detection not supported
parquet: auto-detection requires seekable input
parquet: invalid header
sup: line 1: syntax error
tsv: line 1: delimiter '\t' not found
zeek: line 1: bad types/fields definition in zeek header
Expand Down
90 changes: 62 additions & 28 deletions sio/anyio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/brimdata/super"
"github.com/brimdata/super/csup"
"github.com/brimdata/super/pkg/field"
"github.com/brimdata/super/sio"
"github.com/brimdata/super/sio/arrowio"
Expand All @@ -34,37 +35,25 @@ func NewReader(sctx *super.Context, r io.Reader, opts ReaderOpts) (sio.ReadClose
return lookupReader(sctx, r, opts)
}

var parquetErr, csupErr error
if rs, ok := r.(io.ReadSeeker); ok {
if n, err := rs.Seek(0, io.SeekCurrent); err == nil {
var rc sio.ReadCloser
rc, parquetErr = parquetio.NewReader(sctx, rs, opts.Fields)
if parquetErr == nil {
return rc, nil
}
if _, err := rs.Seek(n, io.SeekStart); err != nil {
return nil, err
}
var zr sio.Reader
zr, csupErr = csupio.NewReader(sctx, rs, opts.Fields)
if csupErr == nil {
return sio.NopReadCloser(zr), nil
}
if _, err := rs.Seek(n, io.SeekStart); err != nil {
return nil, err
}
} else {
parquetErr = err
csupErr = err
track := NewTrack(r)

csupErr := isCSUPStream(track)
if csupErr == nil {
cr, err := csupio.NewReader(sctx, track.Reader(), opts.Fields)
if err != nil {
return nil, err
}
parquetErr = fmt.Errorf("parquet: %w", parquetErr)
csupErr = fmt.Errorf("csup: %w", csupErr)
} else {
parquetErr = errors.New("parquet: auto-detection requires seekable input")
csupErr = errors.New("csup: auto-detection requires seekable input")
return sio.NopReadCloser(cr), nil
}
csupErr = fmt.Errorf("csup: %w", csupErr)
track.Reset()

track := NewTrack(r)
parquetErr := isParquetStream(track)
if parquetErr == nil {
return parquetio.NewReader(sctx, track.Reader(), opts.Fields)
}
parquetErr = fmt.Errorf("parquet: %w", parquetErr)
track.Reset()

arrowsErr := isArrowStream(track)
if arrowsErr == nil {
Expand Down Expand Up @@ -165,6 +154,28 @@ func isArrowStream(track *Track) error {
return err
}

func isCSUPStream(track *Track) error {
var buf [csup.HeaderSize]byte
if _, err := io.ReadFull(track, buf[:]); err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) {
return errors.New("file size too small")
}
return err
}
if err := new(csup.Header{}).Deserialize(buf[:]); err != nil {
return err
}
if track.recorder != nil {
track.Reset()
b, err := io.ReadAll(track)
if err != nil {
return err
}
*track = *NewTrack(bytes.NewReader(b))
}
return nil
}

func isCSVStream(track *Track, delim rune, name string) error {
if line, err := bufio.NewReader(track).ReadSlice('\n'); err != nil {
return fmt.Errorf("%s: line 1: %w", name, err)
Expand All @@ -175,6 +186,29 @@ func isCSVStream(track *Track, delim rune, name string) error {
return match(csvio.NewReader(super.NewContext(), track, csvio.ReaderOpts{Delim: delim}), name, 1)
}

func isParquetStream(track *Track) error {
// a parquet stream starts with a 4-byte magic: PAR1 or PARE. If we find
// this we probably have a parquet but to be sure we'll have to read the
// entire stream till EOF and then check the footer.
var buf [4]byte
if _, err := io.ReadFull(track, buf[:]); err != nil {
return err
}
if s := string(buf[:]); s != "PAR1" && s != "PARE" {
return errors.New("invalid header")
}
if track.recorder != nil {
track.Reset()
b, err := io.ReadAll(track)
if err != nil {
return err
}
*track = *NewTrack(bytes.NewReader(b))
}
_, err := parquetio.NewReader(super.NewContext(), track.Reader(), nil)
return err
}

func joinErrs(errs []error) error {
var b strings.Builder
b.WriteString("format detection error")
Expand Down
2 changes: 1 addition & 1 deletion sio/anyio/ztests/detector-dev-zero.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ outputs:
csv: line 1: bufio: buffer full
json: invalid character '\x00' looking for beginning of value
line: auto-detection not supported
parquet: parquet: file too small (size=0)
parquet: invalid header
sup: short buffer
tsv: line 1: bufio: buffer full
zeek: line 1: line too long
4 changes: 2 additions & 2 deletions sio/anyio/ztests/huge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ outputs:
stdio:stdin: format detection error
arrows: schema message length exceeds 1 MiB
bsup: BSUP version mismatch: expected 5, found 0
csup: auto-detection requires seekable input
csup: invalid CSUP header
csv: line 1: delimiter ',' not found
json: buffer exceeded max size trying to infer input format
line: auto-detection not supported
parquet: auto-detection requires seekable input
parquet: invalid header
sup: buffer exceeded max size trying to infer input format
tsv: line 1: delimiter '\t' not found
zeek: line 1: bad types/fields definition in zeek header
4 changes: 2 additions & 2 deletions sio/bsupio/ztests/big-value.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
script: |
super -o out.bsup in.sup
super -f bsup -o out.bsup in.sup
for i in {1..7}; do
cat out.bsup out.bsup out.bsup out.bsup > out2.bsup
mv out2.bsup out.bsup
done
super -bsup.compress=false -o bigrow.bsup -c "collect:=collect(s)" out.bsup
super -f bsup -bsup.compress=false -o bigrow.bsup -c "collect:=collect(s)" out.bsup
! super -i bsup -o /dev/null -bsup.readmax 10KB -c "count:=count()" bigrow.bsup

inputs:
Expand Down
2 changes: 1 addition & 1 deletion sio/bsupio/ztests/primitive.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
script: |
super -i sup - | super -i bsup -s -
super -f bsup -i sup - | super -i bsup -s -

inputs:
- name: stdin
Expand Down
2 changes: 1 addition & 1 deletion sio/bsupio/ztests/pushdown-filter.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Test that union true values get de-unioned in filter pushdowns.
script: |
super in.sup > test.bsup
super -f bsup in.sup > test.bsup
super -s -c 'from test.bsup | where this'

inputs:
Expand Down
4 changes: 0 additions & 4 deletions sio/csupio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ func NewReader(sctx *super.Context, r io.Reader, fields []field.Path) (sio.Reade
if !ok {
return nil, errors.New("Super Columnar requires a seekable input")
}
// CSUP autodetection requires that we return error on open if invalid format.
if _, err := csup.ReadHeader(ra); err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &reader{
sctx: sctx,
Expand Down
Loading