diff --git a/Makefile b/Makefile index acd6107..c7566ae 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,6 @@ docker_image=askiada/external-sort include ./env.list export $(shell sed 's/=.*//' ./env.list) - .PHONY: test test: go test ./... diff --git a/env.list b/env.list index 169be0f..b3b733f 100644 --- a/env.list +++ b/env.list @@ -3,4 +3,5 @@ OUTPUT_PATH=./output.tsv CHUNK_FOLDER=./data/chunks/ CHUNK_SIZE=1000000 MAX_WORKERS=10 -OUTPUT_BUFFER_SIZE=1000 \ No newline at end of file +OUTPUT_BUFFER_SIZE=1000 +MMAP_SIZE=10 diff --git a/file/batchingchannels/batching_channel.go b/file/batchingchannels/batching_channel.go index 6826685..7e035f6 100644 --- a/file/batchingchannels/batching_channel.go +++ b/file/batchingchannels/batching_channel.go @@ -89,9 +89,9 @@ func (ch *BatchingChannel) Close() { func (ch *BatchingChannel) batchingBuffer() { ch.buffer = ch.allocate.Vector(ch.size, ch.allocate.Key) for { - elem, open := <-ch.input + text, open := <-ch.input if open { - err := ch.buffer.PushBack(elem) + err := ch.buffer.PushBack(text) if err != nil { ch.g.Go(func() error { return err @@ -104,8 +104,10 @@ func (ch *BatchingChannel) batchingBuffer() { break } if ch.buffer.Len() == ch.size { + curr := ch.buffer.Get(ch.buffer.Len()-1).Offset + int64(ch.buffer.Get(ch.buffer.Len()-1).Len) ch.output <- ch.buffer ch.buffer = ch.allocate.Vector(ch.size, ch.allocate.Key) + ch.buffer.SetCurrOffet(curr) } } diff --git a/file/chunk.go b/file/chunk.go index 77966c0..32b7541 100644 --- a/file/chunk.go +++ b/file/chunk.go @@ -2,19 +2,27 @@ package file import ( "bufio" + "context" "os" "sort" + "strconv" + "strings" + "sync" + "github.com/askiada/external-sort/file/circularqueue" "github.com/askiada/external-sort/vector" - + "github.com/askiada/external-sort/vector/key" "github.com/pkg/errors" + "golang.org/x/exp/mmap" + "golang.org/x/sync/errgroup" ) -// chunkInfo Describe a chunk. +// chunkInfo Describe a chunk . type chunkInfo struct { file *os.File scanner *bufio.Scanner - buffer vector.Vector + buffer []*vector.Element + emptyKey func() key.Key filename string } @@ -24,7 +32,27 @@ func (c *chunkInfo) pullSubset(size int) (err error) { i := 0 for i < size && c.scanner.Scan() { text := c.scanner.Text() - c.buffer.PushBack(text) + splitted := strings.Split(text, "\t") + keyVal := splitted[0] + offset, err := strconv.ParseInt(splitted[1], 10, 64) + if err != nil { + return err + } + + keyStruct := c.emptyKey() + err = keyStruct.FromString(keyVal) + if err != nil { + return err + } + l, err := strconv.Atoi(splitted[2]) + if err != nil { + return err + } + c.buffer = append(c.buffer, &vector.Element{ + Key: keyStruct, + Offset: offset, + Len: l, + }) i++ } if c.scanner.Err() != nil { @@ -35,21 +63,32 @@ func (c *chunkInfo) pullSubset(size int) (err error) { // chunks Pull of chunks. type chunks struct { - list []*chunkInfo + list []*chunkInfo + originalFiles *circularqueue.Circularqueue } // new Create a new chunk and initialize it. -func (c *chunks) new(chunkPath string, allocate *vector.Allocate, size int) error { +func (c *chunks) new(inputPath, chunkPath string, emptyKey func() key.Key, size int, circQueueSize int) error { f, err := os.Open(chunkPath) if err != nil { return err } + c.originalFiles = circularqueue.New(circQueueSize) + for i := 0; i < circQueueSize; i++ { + originalFile, err := mmap.Open(inputPath) + if err != nil { + return err + } + c.originalFiles.Add(originalFile) + } + scanner := bufio.NewScanner(f) elem := &chunkInfo{ filename: chunkPath, file: f, scanner: scanner, - buffer: allocate.Vector(size, allocate.Key), + buffer: make([]*vector.Element, 0, size), + emptyKey: emptyKey, } err = elem.pullSubset(size) if err != nil { @@ -99,7 +138,7 @@ func (c *chunks) len() int { func (c *chunks) resetOrder() { if len(c.list) > 1 { sort.Slice(c.list, func(i, j int) bool { - return vector.Less(c.list[i].buffer.Get(0), c.list[j].buffer.Get(0)) + return vector.Less(c.list[i].buffer[0], c.list[j].buffer[0]) }) } } @@ -109,16 +148,61 @@ func (c *chunks) moveFirstChunkToCorrectIndex() { elem := c.list[0] c.list = c.list[1:] pos := sort.Search(len(c.list), func(i int) bool { - return !vector.Less(c.list[i].buffer.Get(0), elem.buffer.Get(0)) + return !vector.Less(c.list[i].buffer[0], elem.buffer[0]) }) // TODO: c.list = c.list[1:] and the following line create an unecessary allocation. c.list = append(c.list[:pos], append([]*chunkInfo{elem}, c.list[pos:]...)...) } // min Check all the first elements of all the chunks and returns the smallest value. -func (c *chunks) min() (minChunk *chunkInfo, minValue *vector.Element, minIdx int) { - minValue = c.list[0].buffer.Get(0) - minIdx = 0 +func (c *chunks) min() (minChunk *chunkInfo, minElem *vector.Element, err error) { minChunk = c.list[0] - return minChunk, minValue, minIdx + minElem = c.list[0].buffer[0] + return minChunk, minElem, err +} + +func (c *chunks) WriteBuffer(buffer *bufio.Writer, elems []*vector.Element) error { + // var rows [][]byte + + type rows struct { + data [][]byte + } + + out := &rows{ + data: make([][]byte, len(elems), len(elems)), + } + mu := &sync.Mutex{} + + g, _ := errgroup.WithContext(context.Background()) + for i, elem := range elems { + i := i + elem := elem + out := out + g.Go(func() error { + return c.originalFiles.Run(func(originalFile *mmap.ReaderAt) error { + data := make([]byte, elem.Len) + _, err := originalFile.ReadAt(data, elem.Offset) + if err != nil { + return err + } + if data[len(data)-1] != '\n' { + data = append(data, '\n') + } + mu.Lock() + out.data[i] = data + mu.Unlock() + return nil + }) + }) + } + if err := g.Wait(); err != nil { + return err + } + for _, row := range out.data { + _, err := buffer.Write(row) + if err != nil { + return err + } + } + return nil } diff --git a/file/circularqueue/circularqueue.go b/file/circularqueue/circularqueue.go new file mode 100644 index 0000000..86bf783 --- /dev/null +++ b/file/circularqueue/circularqueue.go @@ -0,0 +1,34 @@ +package circularqueue + +import ( + "golang.org/x/exp/mmap" +) + +type Circularqueue struct { + next chan *mmap.ReaderAt + data []*mmap.ReaderAt + Size int +} + +func New(size int) *Circularqueue { + return &Circularqueue{ + data: make([]*mmap.ReaderAt, 0, size), + next: make(chan *mmap.ReaderAt, size), + Size: size, + } +} + +func (cq *Circularqueue) Add(elem *mmap.ReaderAt) { + cq.next <- elem + cq.data = append(cq.data, elem) +} + +func (cq *Circularqueue) Run(do func(elem *mmap.ReaderAt) error) error { + elem := <-cq.next + err := do(elem) + if err != nil { + return err + } + cq.next <- elem + return nil +} diff --git a/file/file.go b/file/file.go index 5bc397f..7bc691b 100644 --- a/file/file.go +++ b/file/file.go @@ -19,6 +19,7 @@ type Info struct { mu *MemUsage Reader io.Reader Allocate *vector.Allocate + InputPath string OutputPath string totalRows int PrintMemUsage bool @@ -43,6 +44,7 @@ func (f *Info) CreateSortedChunks(ctx context.Context, chunkFolder string, dumpS row := 0 chunkPaths := []string{} scanner := bufio.NewScanner(f.Reader) + scanner.Split(ScanLines) mu := sync.Mutex{} wg := &sync.WaitGroup{} wg.Add(1) @@ -67,7 +69,7 @@ func (f *Info) CreateSortedChunks(ctx context.Context, chunkFolder string, dumpS chunkPath := path.Join(chunkFolder, "chunk_"+strconv.Itoa(chunkIdx)+".tsv") mu.Unlock() v.Sort() - err := vector.Dump(v, chunkPath) + err := vector.Dump(v /* f.InputPath,*/, chunkPath) if err != nil { return err } diff --git a/file/scan.go b/file/scan.go new file mode 100644 index 0000000..3f5ab6f --- /dev/null +++ b/file/scan.go @@ -0,0 +1,25 @@ +package file + +import "bytes" + +// ScanLines is a split function for a Scanner that returns each line of +// text, stripped of any trailing end-of-line marker. The returned line may +// be empty. The end-of-line marker is one optional carriage return followed +// by one mandatory newline. In regular expression notation, it is `\r?\n`. +// The last non-empty line of input will be returned even if it has no +// newline. +func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + if i := bytes.IndexByte(data, '\n'); i >= 0 { + // We have a full newline-terminated line. + return i + 1, data[0 : i+1], nil + } + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + return len(data), data, nil + } + // Request more data. + return 0, nil, nil +} diff --git a/file/sort.go b/file/sort.go index fd90f33..fe7c69f 100644 --- a/file/sort.go +++ b/file/sort.go @@ -39,15 +39,18 @@ func bToMb(b uint64) uint64 { return b / 1024 / 1024 } -func (f *Info) MergeSort(chunkPaths []string, k int) (err error) { - output := f.Allocate.Vector(k, f.Allocate.Key) +func (f *Info) MergeSort(chunkPaths []string, k, circQueueSize int) (err error) { + output := []*vector.Element{} if f.PrintMemUsage && f.mu == nil { f.mu = &MemUsage{} } // create a chunk per file path chunks := &chunks{list: make([]*chunkInfo, 0, len(chunkPaths))} + if err != nil { + return err + } for _, chunkPath := range chunkPaths { - err := chunks.new(chunkPath, f.Allocate, k) + err := chunks.new(f.InputPath, chunkPath, f.Allocate.EmptyKey, k, circQueueSize) if err != nil { return err } @@ -68,34 +71,35 @@ func (f *Info) MergeSort(chunkPaths []string, k int) (err error) { if f.PrintMemUsage { f.mu.Collect() } - if chunks.len() == 0 || output.Len() == k { - err = WriteBuffer(outputBuffer, output) + if chunks.len() == 0 || len(output) == k { + err = chunks.WriteBuffer(outputBuffer, output) if err != nil { return err } + output = nil } if chunks.len() == 0 { break } toShrink := []int{} // search the smallest value across chunk buffers by comparing first elements only - minChunk, minValue, minIdx := chunks.min() - err = output.PushBack(minValue.Line) + minChunk, minElem, err := chunks.min() if err != nil { return err } + output = append(output, minElem) // remove the first element from the chunk we pulled the smallest value - minChunk.buffer.FrontShift() + minChunk.buffer = minChunk.buffer[1:] isEmpty := false - if minChunk.buffer.Len() == 0 { + if len(minChunk.buffer) == 0 { err = minChunk.pullSubset(k) if err != nil { return err } // if after pulling data the chunk buffer is still empty then we can remove it - if minChunk.buffer.Len() == 0 { + if len(minChunk.buffer) == 0 { isEmpty = true - toShrink = append(toShrink, minIdx) + toShrink = append(toShrink, 0) err = chunks.shrink(toShrink) if err != nil { return err @@ -118,14 +122,3 @@ func (f *Info) MergeSort(chunkPaths []string, k int) (err error) { } return chunks.close() } - -func WriteBuffer(buffer *bufio.Writer, rows vector.Vector) error { - for i := 0; i < rows.Len(); i++ { - _, err := buffer.WriteString(rows.Get(i).Line + "\n") - if err != nil { - return err - } - } - rows.Reset() - return nil -} diff --git a/go.mod b/go.mod index 42f2d65..ab88375 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,8 @@ require ( github.com/spf13/cobra v1.2.1 github.com/spf13/viper v1.8.1 github.com/stretchr/testify v1.7.0 - golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2 + golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 + golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) @@ -17,25 +18,25 @@ require ( github.com/VividCortex/ewma v1.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/kr/fs v0.1.0 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect - github.com/mattn/go-runewidth v0.0.13 // indirect - github.com/mitchellh/mapstructure v1.4.1 // indirect - github.com/pelletier/go-toml v1.9.3 // indirect + github.com/mattn/go-runewidth v0.0.12 // indirect + github.com/mitchellh/mapstructure v1.4.3 // indirect + github.com/pelletier/go-toml v1.9.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/spf13/afero v1.6.0 // indirect - github.com/spf13/cast v1.3.1 // indirect + github.com/spf13/cast v1.4.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.2.0 // indirect - golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect - golang.org/x/text v0.3.6 // indirect + golang.org/x/sys v0.0.0-20211210111614-af8b64212486 // indirect + golang.org/x/text v0.3.7 // indirect gopkg.in/ini.v1 v1.62.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/go.sum b/go.sum index 45da8db..530c4c9 100644 --- a/go.sum +++ b/go.sum @@ -39,9 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= -github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= -github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -75,8 +74,9 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= +github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -197,9 +197,8 @@ github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-runewidth v0.0.12 h1:Y41i/hVW3Pgwr8gV+J23B9YEY0zxjptBuCWEaxmAOow= github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= -github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= -github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -208,14 +207,16 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= -github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= +github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= -github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= +github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -241,8 +242,9 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= -github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cast v1.4.1 h1:s0hze+J0196ZfEMTs80N7UlFt0BDuQ7Q+JDnHiMWKdA= +github.com/spf13/cast v1.4.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v1.2.1 h1:+KmjbUw1hriSNMF55oPrkZcb27aECyrj8V2ytv7kWDw= github.com/spf13/cobra v1.2.1/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t68Nk= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= @@ -287,8 +289,8 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2 h1:XdAboW3BNMv9ocSCOk/u1MFioZGzCNkiJZ19v9Oe3Ig= -golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE= +golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -298,6 +300,7 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= +golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -432,8 +435,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c= -golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211210111614-af8b64212486 h1:5hpz5aRr+W1erYCL5JRhSUBJRph7l9XkNveoExlrKYk= +golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -443,8 +446,9 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/internal/env.go b/internal/env.go index 852adde..a526622 100644 --- a/internal/env.go +++ b/internal/env.go @@ -14,6 +14,7 @@ const ( ChunkSizeName = "chunk_size" MaxWorkersName = "max_workers" OutputBufferSizeName = "output_buffer_size" + MmapSizeName = "mmap_size" ) // Environment variables. @@ -24,6 +25,7 @@ var ( ChunkSize int MaxWorkers int64 OutputBufferSize int + MmapSize int ) func init() { @@ -34,4 +36,5 @@ func init() { viper.SetDefault(ChunkSizeName, 0) viper.SetDefault(MaxWorkersName, 0) viper.SetDefault(OutputBufferSizeName, 0) + viper.SetDefault(MmapSizeName, 0) } diff --git a/main.go b/main.go index b44da1c..9dfdb10 100644 --- a/main.go +++ b/main.go @@ -28,6 +28,7 @@ func main() { rootCmd.PersistentFlags().IntVarP(&internal.ChunkSize, internal.ChunkSizeName, "s", viper.GetInt(internal.ChunkSizeName), "chunk size.") rootCmd.PersistentFlags().Int64VarP(&internal.MaxWorkers, internal.MaxWorkersName, "w", viper.GetInt64(internal.MaxWorkersName), "max worker.") rootCmd.PersistentFlags().IntVarP(&internal.OutputBufferSize, internal.OutputBufferSizeName, "b", viper.GetInt(internal.OutputBufferSizeName), "output buffer size.") + rootCmd.PersistentFlags().IntVarP(&internal.MmapSize, internal.MmapSizeName, "m", viper.GetInt(internal.MmapSizeName), "mmap size.") fmt.Println("Input file", internal.InputFile) fmt.Println("Output file", internal.OutputFile) @@ -60,7 +61,7 @@ func rootRun(cmd *cobra.Command, args []string) error { } // perform a merge sort on all the chunks files. // we sort using a buffer so we don't have to load the entire chunks when merging - err = fI.MergeSort(chunkPaths, internal.OutputBufferSize) + err = fI.MergeSort(chunkPaths, internal.OutputBufferSize, internal.MmapSize) if err != nil { return err } diff --git a/main_bench_test.go b/main_bench_test.go index a600863..57a4ae3 100644 --- a/main_bench_test.go +++ b/main_bench_test.go @@ -29,7 +29,7 @@ func BenchmarkMergeSort(b *testing.B) { assert.NoError(b, err) b.ResetTimer() for i := 0; i < b.N; i++ { - err = fI.MergeSort(chunkPaths, bufferSize) + err = fI.MergeSort(chunkPaths, bufferSize, bufferSize) _ = err } f.Close() diff --git a/main_test.go b/main_test.go index 69b54d8..28cff81 100644 --- a/main_test.go +++ b/main_test.go @@ -25,6 +25,7 @@ func prepareChunks(ctx context.Context, t *testing.T, allocate *vector.Allocate, fI := &file.Info{ Reader: f, Allocate: allocate, + InputPath: filename, OutputPath: outputFilename, } chunkPaths, err := fI.CreateSortedChunks(ctx, "testdata/chunks", chunkSize, 10) @@ -79,7 +80,7 @@ func TestBasics(t *testing.T) { ctx := context.Background() fI, chunkPaths := prepareChunks(ctx, t, allocate, filename, outputFilename, chunkSize) fI.OutputPath = outputFilename - err := fI.MergeSort(chunkPaths, bufferSize) + err := fI.MergeSort(chunkPaths, bufferSize, bufferSize) assert.NoError(t, err) outputFile, err := os.Open(outputFilename) assert.NoError(t, err) @@ -120,8 +121,48 @@ func Test100Elems(t *testing.T) { expectedErr := tc.expectedErr t.Run(name, func(t *testing.T) { ctx := context.Background() - fI, chunkPaths := prepareChunks(ctx, t, allocate, filename, outputFilename, 21) - err := fI.MergeSort(chunkPaths, 10) + fI, chunkPaths := prepareChunks(ctx, t, allocate, filename, outputFilename, 50) + err := fI.MergeSort(chunkPaths, 10, 10) + assert.NoError(t, err) + outputFile, err := os.Open(outputFilename) + assert.NoError(t, err) + outputScanner := bufio.NewScanner(outputFile) + count := 0 + for outputScanner.Scan() { + assert.Equal(t, expectedOutput[count], outputScanner.Text()) + count++ + } + assert.NoError(t, outputScanner.Err()) + assert.Equal(t, len(expectedOutput), count) + assert.True(t, errors.Is(err, expectedErr)) + outputFile.Close() + }) + } +} + +func Test10Elems(t *testing.T) { + tcs := map[string]struct { + filename string + outputFilename string + expectedErr error + expectedOutput []string + }{ + "100 elems": { + filename: "testdata/10elems.tsv", + expectedOutput: []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}, + outputFilename: "testdata/chunks/output.tsv", + }, + } + allocate := vector.DefaultVector(key.AllocateInt) + for name, tc := range tcs { + filename := tc.filename + outputFilename := tc.outputFilename + expectedOutput := tc.expectedOutput + expectedErr := tc.expectedErr + t.Run(name, func(t *testing.T) { + ctx := context.Background() + fI, chunkPaths := prepareChunks(ctx, t, allocate, filename, outputFilename, 50) + err := fI.MergeSort(chunkPaths, 10, 10) assert.NoError(t, err) outputFile, err := os.Open(outputFilename) assert.NoError(t, err) @@ -161,9 +202,13 @@ func TestTsvKey(t *testing.T) { outputFilename: "testdata/chunks/output.tsv", }, } - allocate := vector.DefaultVector(func(line string) (key.Key, error) { - return key.AllocateTsv(line, 1) - }) + allocate := &vector.Allocate{ + Vector: vector.AllocateSlice, + Key: func(line string) (key.Key, error) { + return key.AllocateTsv(line, 1) + }, + EmptyKey: key.AllocateEmptyTsv, + } for name, tc := range tcs { filename := tc.filename outputFilename := tc.outputFilename @@ -172,7 +217,7 @@ func TestTsvKey(t *testing.T) { t.Run(name, func(t *testing.T) { ctx := context.Background() fI, chunkPaths := prepareChunks(ctx, t, allocate, filename, outputFilename, 21) - err := fI.MergeSort(chunkPaths, 10) + err := fI.MergeSort(chunkPaths, 10, 10) assert.NoError(t, err) outputFile, err := os.Open(outputFilename) assert.NoError(t, err) diff --git a/testdata/100elems.tsv b/testdata/100elems.tsv index 629cd95..5ec0d10 100644 --- a/testdata/100elems.tsv +++ b/testdata/100elems.tsv @@ -97,4 +97,4 @@ 39 9 18 -29 \ No newline at end of file +29 diff --git a/testdata/10elems.tsv b/testdata/10elems.tsv new file mode 100644 index 0000000..20fb88c --- /dev/null +++ b/testdata/10elems.tsv @@ -0,0 +1,10 @@ +4 +8 +9 +7 +3 +6 +5 +1 +10 +2 diff --git a/vector/element.go b/vector/element.go index e49fb3e..12b70e5 100644 --- a/vector/element.go +++ b/vector/element.go @@ -3,8 +3,9 @@ package vector import "github.com/askiada/external-sort/vector/key" type Element struct { - Key key.Key - Line string + Key key.Key + Offset int64 + Len int } // Less returns wether v1 is smaller than v2 based on the keys. diff --git a/vector/key/int_key.go b/vector/key/int_key.go index 0744e82..1bcddcd 100644 --- a/vector/key/int_key.go +++ b/vector/key/int_key.go @@ -1,19 +1,56 @@ package key -import "strconv" +import ( + "bytes" + "fmt" + "strconv" +) type Int struct { + text string value int } +// dropCR drops a terminal \r from the data. +func dropCR(data []byte) []byte { + if len(data) > 0 && data[len(data)-1] == '\r' { + return data[0 : len(data)-1] + } + return data +} + +func AllocateEmptyInt() Key { + return &Int{} +} + func AllocateInt(line string) (Key, error) { - num, err := strconv.Atoi(line) + fmt.Println(line) + bline := []byte(line) + if i := bytes.IndexByte(bline, '\n'); i >= 0 { + // We have a full newline-terminated line. + bline = dropCR(bline[0:i]) + } + num, err := strconv.Atoi(string(bline)) if err != nil { return nil, err } - return &Int{num}, nil + return &Int{value: num, text: string(bline)}, nil } func (k *Int) Less(other Key) bool { return k.value < other.(*Int).value } + +func (k *Int) String() string { + return k.text +} + +func (k *Int) FromString(text string) error { + value, err := strconv.Atoi(text) + if err != nil { + return err + } + k.value = value + k.text = text + return nil +} diff --git a/vector/key/key.go b/vector/key/key.go index eb05ce1..9e0e90a 100644 --- a/vector/key/key.go +++ b/vector/key/key.go @@ -3,4 +3,6 @@ package key type Key interface { // Less returns wether the key is smaller than v2 Less(v2 Key) bool + String() string + FromString(string) error } diff --git a/vector/key/string_key.go b/vector/key/string_key.go index d774e0c..1dcdde5 100644 --- a/vector/key/string_key.go +++ b/vector/key/string_key.go @@ -7,7 +7,19 @@ type String struct { func AllocateString(line string) (Key, error) { return &String{line}, nil } +func AllocateEmptyString() Key { + return &String{} +} func (k *String) Less(other Key) bool { return k.value < other.(*String).value } + +func (k *String) String() string { + return k.value +} + +func (k *String) FromString(text string) error { + k.value = text + return nil +} diff --git a/vector/key/tsv_key.go b/vector/key/tsv_key.go index d3d3f8e..3465a9a 100644 --- a/vector/key/tsv_key.go +++ b/vector/key/tsv_key.go @@ -13,3 +13,24 @@ func AllocateTsv(line string, pos int) (Key, error) { } return &String{splitted[pos]}, nil } + +func AllocateEmptyTsv() Key { + return &Tsv{} +} + +func (k *Tsv) Get() interface{} { + return k.value +} + +func (k *Tsv) Less(other Key) bool { + return k.value < other.(*Tsv).value +} + +func (k *Tsv) String() string { + return k.value +} + +func (k *Tsv) FromString(text string) error { + k.value = text + return nil +} diff --git a/vector/slice_vector.go b/vector/slice_vector.go index 270015d..9e845e8 100644 --- a/vector/slice_vector.go +++ b/vector/slice_vector.go @@ -18,6 +18,7 @@ func AllocateSlice(size int, allocateKey func(line string) (key.Key, error)) Vec type SliceVec struct { allocateKey func(line string) (key.Key, error) s []*Element + currOffset int64 } func (v *SliceVec) Reset() { @@ -33,11 +34,17 @@ func (v *SliceVec) Len() int { } func (v *SliceVec) PushBack(line string) error { - k, err := v.allocateKey(line) + keyValue, err := v.allocateKey(line) if err != nil { return err } - v.s = append(v.s, &Element{Line: line, Key: k}) + v.s = append(v.s, &Element{ + //Line: line, + Key: keyValue, + Offset: v.currOffset, + Len: len(line), + }) + v.currOffset += int64(len(line)) return nil } @@ -50,3 +57,7 @@ func (v *SliceVec) Sort() { func (v *SliceVec) FrontShift() { v.s = v.s[1:] } + +func (v *SliceVec) SetCurrOffet(curr int64) { + v.currOffset = curr +} diff --git a/vector/vector.go b/vector/vector.go index cc5471e..71f00b3 100644 --- a/vector/vector.go +++ b/vector/vector.go @@ -3,14 +3,16 @@ package vector import ( "bufio" "os" + "strconv" "github.com/askiada/external-sort/vector/key" "github.com/pkg/errors" ) type Allocate struct { - Vector func(int, func(line string) (key.Key, error)) Vector - Key func(line string) (key.Key, error) + Vector func(int, func(line string) (key.Key, error)) Vector + Key func(line string) (key.Key, error) + EmptyKey func() key.Key } func DefaultVector(allocateKey func(line string) (key.Key, error)) *Allocate { @@ -33,16 +35,19 @@ type Vector interface { Reset() // Sort sort the vector in ascending order Sort() + + SetCurrOffet(curr int64) } func Dump(v Vector, filename string) error { file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { - return errors.Errorf("failed creating file: %s", err) + return errors.Errorf("failed creating file (%s): %s", filename, err) } datawriter := bufio.NewWriter(file) for i := 0; i < v.Len(); i++ { - _, err = datawriter.WriteString(v.Get(i).Line + "\n") + text := v.Get(i).Key.String() + "\t" + strconv.FormatInt(v.Get(i).Offset, 10) + "\t" + strconv.Itoa(v.Get(i).Len) + "\n" + _, err = datawriter.WriteString(text) if err != nil { return errors.Errorf("failed writing file: %s", err) }