Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ docker_image=askiada/external-sort
include ./env.list
export $(shell sed 's/=.*//' ./env.list)


.PHONY: test
test:
go test ./...
Expand Down
3 changes: 2 additions & 1 deletion env.list
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ OUTPUT_PATH=./output.tsv
CHUNK_FOLDER=./data/chunks/
CHUNK_SIZE=1000000
MAX_WORKERS=10
OUTPUT_BUFFER_SIZE=1000
OUTPUT_BUFFER_SIZE=1000
MMAP_SIZE=10
6 changes: 4 additions & 2 deletions file/batchingchannels/batching_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ func (ch *BatchingChannel) Close() {
func (ch *BatchingChannel) batchingBuffer() {
ch.buffer = ch.allocate.Vector(ch.size, ch.allocate.Key)
for {
elem, open := <-ch.input
text, open := <-ch.input
if open {
err := ch.buffer.PushBack(elem)
err := ch.buffer.PushBack(text)
if err != nil {
ch.g.Go(func() error {
return err
Expand All @@ -104,8 +104,10 @@ func (ch *BatchingChannel) batchingBuffer() {
break
}
if ch.buffer.Len() == ch.size {
curr := ch.buffer.Get(ch.buffer.Len()-1).Offset + int64(ch.buffer.Get(ch.buffer.Len()-1).Len)
ch.output <- ch.buffer
ch.buffer = ch.allocate.Vector(ch.size, ch.allocate.Key)
ch.buffer.SetCurrOffet(curr)
}
}

Expand Down
110 changes: 97 additions & 13 deletions file/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,27 @@ package file

import (
"bufio"
"context"
"os"
"sort"
"strconv"
"strings"
"sync"

"github.com/askiada/external-sort/file/circularqueue"
"github.com/askiada/external-sort/vector"

"github.com/askiada/external-sort/vector/key"
"github.com/pkg/errors"
"golang.org/x/exp/mmap"
"golang.org/x/sync/errgroup"
)

// chunkInfo Describe a chunk.
// chunkInfo Describe a chunk .
type chunkInfo struct {
file *os.File
scanner *bufio.Scanner
buffer vector.Vector
buffer []*vector.Element
emptyKey func() key.Key
filename string
}

Expand All @@ -24,7 +32,27 @@ func (c *chunkInfo) pullSubset(size int) (err error) {
i := 0
for i < size && c.scanner.Scan() {
text := c.scanner.Text()
c.buffer.PushBack(text)
splitted := strings.Split(text, "\t")
keyVal := splitted[0]
offset, err := strconv.ParseInt(splitted[1], 10, 64)
if err != nil {
return err
}

keyStruct := c.emptyKey()
err = keyStruct.FromString(keyVal)
if err != nil {
return err
}
l, err := strconv.Atoi(splitted[2])
if err != nil {
return err
}
c.buffer = append(c.buffer, &vector.Element{
Key: keyStruct,
Offset: offset,
Len: l,
})
i++
}
if c.scanner.Err() != nil {
Expand All @@ -35,21 +63,32 @@ func (c *chunkInfo) pullSubset(size int) (err error) {

// chunks Pull of chunks.
type chunks struct {
list []*chunkInfo
list []*chunkInfo
originalFiles *circularqueue.Circularqueue
}

// new Create a new chunk and initialize it.
func (c *chunks) new(chunkPath string, allocate *vector.Allocate, size int) error {
func (c *chunks) new(inputPath, chunkPath string, emptyKey func() key.Key, size int, circQueueSize int) error {
f, err := os.Open(chunkPath)
if err != nil {
return err
}
c.originalFiles = circularqueue.New(circQueueSize)
for i := 0; i < circQueueSize; i++ {
originalFile, err := mmap.Open(inputPath)
if err != nil {
return err
}
c.originalFiles.Add(originalFile)
}

scanner := bufio.NewScanner(f)
elem := &chunkInfo{
filename: chunkPath,
file: f,
scanner: scanner,
buffer: allocate.Vector(size, allocate.Key),
buffer: make([]*vector.Element, 0, size),
emptyKey: emptyKey,
}
err = elem.pullSubset(size)
if err != nil {
Expand Down Expand Up @@ -99,7 +138,7 @@ func (c *chunks) len() int {
func (c *chunks) resetOrder() {
if len(c.list) > 1 {
sort.Slice(c.list, func(i, j int) bool {
return vector.Less(c.list[i].buffer.Get(0), c.list[j].buffer.Get(0))
return vector.Less(c.list[i].buffer[0], c.list[j].buffer[0])
})
}
}
Expand All @@ -109,16 +148,61 @@ func (c *chunks) moveFirstChunkToCorrectIndex() {
elem := c.list[0]
c.list = c.list[1:]
pos := sort.Search(len(c.list), func(i int) bool {
return !vector.Less(c.list[i].buffer.Get(0), elem.buffer.Get(0))
return !vector.Less(c.list[i].buffer[0], elem.buffer[0])
})
// TODO: c.list = c.list[1:] and the following line create an unecessary allocation.
c.list = append(c.list[:pos], append([]*chunkInfo{elem}, c.list[pos:]...)...)
}

// min Check all the first elements of all the chunks and returns the smallest value.
func (c *chunks) min() (minChunk *chunkInfo, minValue *vector.Element, minIdx int) {
minValue = c.list[0].buffer.Get(0)
minIdx = 0
func (c *chunks) min() (minChunk *chunkInfo, minElem *vector.Element, err error) {
minChunk = c.list[0]
return minChunk, minValue, minIdx
minElem = c.list[0].buffer[0]
return minChunk, minElem, err
}

func (c *chunks) WriteBuffer(buffer *bufio.Writer, elems []*vector.Element) error {
// var rows [][]byte

type rows struct {
data [][]byte
}

out := &rows{
data: make([][]byte, len(elems), len(elems)),
}
mu := &sync.Mutex{}

g, _ := errgroup.WithContext(context.Background())
for i, elem := range elems {
i := i
elem := elem
out := out
g.Go(func() error {
return c.originalFiles.Run(func(originalFile *mmap.ReaderAt) error {
data := make([]byte, elem.Len)
_, err := originalFile.ReadAt(data, elem.Offset)
if err != nil {
return err
}
if data[len(data)-1] != '\n' {
data = append(data, '\n')
}
mu.Lock()
out.data[i] = data
mu.Unlock()
return nil
})
})
}
if err := g.Wait(); err != nil {
return err
}
for _, row := range out.data {
_, err := buffer.Write(row)
if err != nil {
return err
}
}
return nil
}
34 changes: 34 additions & 0 deletions file/circularqueue/circularqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package circularqueue

import (
"golang.org/x/exp/mmap"
)

type Circularqueue struct {
next chan *mmap.ReaderAt
data []*mmap.ReaderAt
Size int
}

func New(size int) *Circularqueue {
return &Circularqueue{
data: make([]*mmap.ReaderAt, 0, size),
next: make(chan *mmap.ReaderAt, size),
Size: size,
}
}

func (cq *Circularqueue) Add(elem *mmap.ReaderAt) {
cq.next <- elem
cq.data = append(cq.data, elem)
}

func (cq *Circularqueue) Run(do func(elem *mmap.ReaderAt) error) error {
elem := <-cq.next
err := do(elem)
if err != nil {
return err
}
cq.next <- elem
return nil
}
4 changes: 3 additions & 1 deletion file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Info struct {
mu *MemUsage
Reader io.Reader
Allocate *vector.Allocate
InputPath string
OutputPath string
totalRows int
PrintMemUsage bool
Expand All @@ -43,6 +44,7 @@ func (f *Info) CreateSortedChunks(ctx context.Context, chunkFolder string, dumpS
row := 0
chunkPaths := []string{}
scanner := bufio.NewScanner(f.Reader)
scanner.Split(ScanLines)
mu := sync.Mutex{}
wg := &sync.WaitGroup{}
wg.Add(1)
Expand All @@ -67,7 +69,7 @@ func (f *Info) CreateSortedChunks(ctx context.Context, chunkFolder string, dumpS
chunkPath := path.Join(chunkFolder, "chunk_"+strconv.Itoa(chunkIdx)+".tsv")
mu.Unlock()
v.Sort()
err := vector.Dump(v, chunkPath)
err := vector.Dump(v /* f.InputPath,*/, chunkPath)
if err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions file/scan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package file

import "bytes"

// ScanLines is a split function for a Scanner that returns each line of
// text, stripped of any trailing end-of-line marker. The returned line may
// be empty. The end-of-line marker is one optional carriage return followed
// by one mandatory newline. In regular expression notation, it is `\r?\n`.
// The last non-empty line of input will be returned even if it has no
// newline.
func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line.
return i + 1, data[0 : i+1], nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
37 changes: 15 additions & 22 deletions file/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@ func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

func (f *Info) MergeSort(chunkPaths []string, k int) (err error) {
output := f.Allocate.Vector(k, f.Allocate.Key)
func (f *Info) MergeSort(chunkPaths []string, k, circQueueSize int) (err error) {
output := []*vector.Element{}
if f.PrintMemUsage && f.mu == nil {
f.mu = &MemUsage{}
}
// create a chunk per file path
chunks := &chunks{list: make([]*chunkInfo, 0, len(chunkPaths))}
if err != nil {
return err
}
for _, chunkPath := range chunkPaths {
err := chunks.new(chunkPath, f.Allocate, k)
err := chunks.new(f.InputPath, chunkPath, f.Allocate.EmptyKey, k, circQueueSize)
if err != nil {
return err
}
Expand All @@ -68,34 +71,35 @@ func (f *Info) MergeSort(chunkPaths []string, k int) (err error) {
if f.PrintMemUsage {
f.mu.Collect()
}
if chunks.len() == 0 || output.Len() == k {
err = WriteBuffer(outputBuffer, output)
if chunks.len() == 0 || len(output) == k {
err = chunks.WriteBuffer(outputBuffer, output)
if err != nil {
return err
}
output = nil
}
if chunks.len() == 0 {
break
}
toShrink := []int{}
// search the smallest value across chunk buffers by comparing first elements only
minChunk, minValue, minIdx := chunks.min()
err = output.PushBack(minValue.Line)
minChunk, minElem, err := chunks.min()
if err != nil {
return err
}
output = append(output, minElem)
// remove the first element from the chunk we pulled the smallest value
minChunk.buffer.FrontShift()
minChunk.buffer = minChunk.buffer[1:]
isEmpty := false
if minChunk.buffer.Len() == 0 {
if len(minChunk.buffer) == 0 {
err = minChunk.pullSubset(k)
if err != nil {
return err
}
// if after pulling data the chunk buffer is still empty then we can remove it
if minChunk.buffer.Len() == 0 {
if len(minChunk.buffer) == 0 {
isEmpty = true
toShrink = append(toShrink, minIdx)
toShrink = append(toShrink, 0)
err = chunks.shrink(toShrink)
if err != nil {
return err
Expand All @@ -118,14 +122,3 @@ func (f *Info) MergeSort(chunkPaths []string, k int) (err error) {
}
return chunks.close()
}

func WriteBuffer(buffer *bufio.Writer, rows vector.Vector) error {
for i := 0; i < rows.Len(); i++ {
_, err := buffer.WriteString(rows.Get(i).Line + "\n")
if err != nil {
return err
}
}
rows.Reset()
return nil
}
Loading