Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ test:

.PHONY: run
run:
go run main.go
go run main.go
7 changes: 3 additions & 4 deletions file/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ import (
"os"

"github.com/askiada/external-sort/vector"

"github.com/pkg/errors"
)

// chunkInfo Describe a chunk.
type chunkInfo struct {
file *os.File
scanner *bufio.Scanner
buffer vector.Vector
buffer *vector.Vector
filename string
}

Expand All @@ -37,7 +36,7 @@ type chunks struct {
}

// new Create a new chunk and initialize it.
func (c *chunks) new(chunkPath string, allocate func(size int) vector.Vector, size int) error {
func (c *chunks) new(chunkPath string, allocate func(size int) *vector.Vector, size int) error {
f, err := os.Open(chunkPath)
if err != nil {
return err
Expand Down Expand Up @@ -92,7 +91,7 @@ func (c *chunks) len() int {
}

// min Check all the first elements of all the chunks and returns the smallest value.
func (c chunks) min() (minChunk *chunkInfo, minValue interface{}, minIdx int) {
func (c chunks) min() (minChunk *chunkInfo, minValue vector.Element, minIdx int) {
for i, chunk := range c.list {
currValue := chunk.buffer.Get(0)
if i == 0 {
Expand Down
18 changes: 9 additions & 9 deletions file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ package file

import (
"bufio"

"io"
"path"
"strconv"

"github.com/askiada/external-sort/vector"

"github.com/pkg/errors"
)

type Info struct {
Reader io.Reader
Allocate func(int) vector.Vector
Separator string
Pos int
Reader io.Reader
Allocate func(int) *vector.Vector
}

// Sort Perform a naive sort of a reader and put the results in ascending order in a Vector.
Expand All @@ -23,9 +23,9 @@ func (f *Info) Sort(file io.Reader) error {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
text := scanner.Text()
err := vector.Sort(ans, text)
err := vector.Sort(ans, text, f.Separator, f.Pos)
if err != nil {
return err
return errors.Wrap(err, "sorting file")
}
}
if scanner.Err() != nil {
Expand All @@ -50,7 +50,7 @@ func (f *Info) CreateSortedChunks(chunkFolder string, dumpSize int) ([]string, e
chunkIdx := 0
chunkPaths := []string{}
scanner := bufio.NewScanner(f.Reader)
var ans vector.Vector
var ans *vector.Vector
for scanner.Scan() {
if row%dumpSize == 0 {
if row != 0 {
Expand All @@ -64,7 +64,7 @@ func (f *Info) CreateSortedChunks(chunkFolder string, dumpSize int) ([]string, e
ans = f.Allocate(dumpSize)
}
text := scanner.Text()
err := vector.Sort(ans, text)
err := vector.Sort(ans, text, f.Separator, f.Pos)
if err != nil {
return nil, errors.Wrap(err, fn)
}
Expand All @@ -85,7 +85,7 @@ func (f *Info) CreateSortedChunks(chunkFolder string, dumpSize int) ([]string, e
return chunkPaths, nil
}

func dumpChunk(ans vector.Vector, folder string, chunkIdx int) (string, error) {
func dumpChunk(ans *vector.Vector, folder string, chunkIdx int) (string, error) {
fn := "dump chunk"
chunkPath := path.Join(folder, "chunk_"+strconv.Itoa(chunkIdx)+".tsv")
err := ans.Dump(chunkPath)
Expand Down
13 changes: 6 additions & 7 deletions file/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package file
import (
"fmt"
"runtime"

"github.com/askiada/external-sort/vector"
)

type MemUsage struct {
Expand Down Expand Up @@ -34,7 +36,7 @@ func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

func (f *Info) MergeSort(chunkPaths []string, k int) (output []interface{}, err error) {
func (f *Info) MergeSort(chunkPaths []string, k int) (output []vector.Element, err error) {
mu := &MemUsage{}
// create a chunk per file path
chunks := &chunks{list: make([]*chunkInfo, 0, len(chunkPaths))}
Expand All @@ -44,24 +46,21 @@ func (f *Info) MergeSort(chunkPaths []string, k int) (output []interface{}, err
return nil, err
}
}
for {
if chunks.len() == 0 {
break
}
for chunks.len() > 0 {
mu.Collect()
toShrink := []int{}
// search the smallest value across chunk buffers by comparing first elements only
minChunk, minValue, minIdx := chunks.min()
output = append(output, minValue)
// remove the first element from the chunk we pulled the smallest value
minChunk.buffer.FrontShift()
if minChunk.buffer.End() == 0 {
if minChunk.buffer.Len() == 0 {
err = minChunk.pullSubset(k)
if err != nil {
return nil, err
}
// if after pulling data the chunk buffer is still empty then we can remove it
if minChunk.buffer.End() == 0 {
if minChunk.buffer.Len() == 0 {
toShrink = append(toShrink, minIdx)
err = chunks.shrink(toShrink)
if err != nil {
Expand Down
15 changes: 10 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,28 @@ func main() {
}
defer f.Close()
fI := &file.Info{
Reader: f,
Allocate: vector.AllocateIntVector,
Reader: f,
Allocate: vector.AllocateTableVector("\t", 0),
Separator: "\t",
}

// create small files with maximum 30 rows in each
chunkPaths, err := fI.CreateSortedChunks("data/chunks", 4)
chunkPaths, err := fI.CreateSortedChunks("data/chunks", 1000)
if err != nil {
panic(err)
}
// TODO: remove files if the process crashes.

// perform a merge sort on all the chunks files.
// we sort using a buffer so we don't have to load the entire chunks when merging
output, err := fI.MergeSort(chunkPaths, 3)
output, err := fI.MergeSort(chunkPaths, 10_000)
if err != nil {
panic(err)
}
// this output could be saved on hard drive
// or we can imagine send events everytime an element is added to it
// of course it will require MergeSort to return a channel
fmt.Println(output)
for _, line := range output {
fmt.Println(line.Value())
}
}
3 changes: 1 addition & 2 deletions main_bench_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main_test

import (
"io/ioutil"
"os"
"path"
"testing"
Expand Down Expand Up @@ -31,7 +30,7 @@ func BenchmarkMergeSort(b *testing.B) {
_ = err
}
f.Close()
dir, err := ioutil.ReadDir("testdata/chunks")
dir, err := os.ReadDir("testdata/chunks")
assert.NoError(b, err)
for _, d := range dir {
err = os.RemoveAll(path.Join("testdata/chunks", d.Name()))
Expand Down
32 changes: 23 additions & 9 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package main_test

import (
"errors"
"io/ioutil"
"os"
"path"
"strings"
"testing"

"github.com/askiada/external-sort/file"
"github.com/askiada/external-sort/vector"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func prepareChunks(t *testing.T, filname string, chunkSize int) (*file.Info, []string) {
Expand All @@ -20,14 +20,14 @@ func prepareChunks(t *testing.T, filname string, chunkSize int) (*file.Info, []s

fI := &file.Info{
Reader: f,
Allocate: vector.AllocateIntVector,
Allocate: vector.AllocateTableVector("", 0),
}
chunkPaths, err := fI.CreateSortedChunks("testdata/chunks", chunkSize)
assert.NoError(t, err)

t.Cleanup(func() {
defer f.Close()
dir, err := ioutil.ReadDir("testdata/chunks")
dir, err := os.ReadDir("testdata/chunks")
assert.NoError(t, err)
for _, d := range dir {
err = os.RemoveAll(path.Join("testdata/chunks", d.Name()))
Expand All @@ -38,22 +38,32 @@ func prepareChunks(t *testing.T, filname string, chunkSize int) (*file.Info, []s
return fI, chunkPaths
}

func Test(t *testing.T) {
func TestMergeSort(t *testing.T) {
got, err := os.ReadFile("testdata/table.sorted.tsv")
require.NoError(t, err)

tableSorted := strings.Split(string(got), "\n")
tableSorted = tableSorted[:len(tableSorted)-1]

tcs := map[string]struct {
filename string
expectedErr error
expectedOutput []interface{}
expectedOutput []string
}{
"empty file": {
filename: "testdata/emptyfile.tsv",
},
"one elem": {
filename: "testdata/oneelem.tsv",
expectedOutput: []interface{}{1},
expectedOutput: []string{"1"},
},
"100 elems": {
filename: "testdata/100elems.tsv",
expectedOutput: []interface{}{3, 4, 5, 6, 6, 7, 7, 7, 8, 8, 9, 9, 10, 10, 15, 18, 18, 18, 18, 21, 22, 22, 25, 25, 25, 25, 25, 26, 26, 27, 27, 28, 28, 29, 29, 29, 30, 30, 31, 31, 33, 33, 34, 36, 37, 39, 39, 39, 40, 41, 41, 42, 43, 43, 47, 47, 49, 50, 50, 52, 52, 53, 54, 55, 55, 55, 56, 57, 57, 59, 60, 61, 62, 63, 67, 71, 71, 72, 72, 73, 74, 75, 78, 79, 80, 80, 82, 89, 89, 89, 91, 91, 92, 92, 93, 93, 94, 97, 97, 99},
expectedOutput: []string{"3", "4", "5", "6", "6", "7", "7", "7", "8", "8", "9", "9", "10", "10", "15", "18", "18", "18", "18", "21", "22", "22", "25", "25", "25", "25", "25", "26", "26", "27", "27", "28", "28", "29", "29", "29", "30", "30", "31", "31", "33", "33", "34", "36", "37", "39", "39", "39", "40", "41", "41", "42", "43", "43", "47", "47", "49", "50", "50", "52", "52", "53", "54", "55", "55", "55", "56", "57", "57", "59", "60", "61", "62", "63", "67", "71", "71", "72", "72", "73", "74", "75", "78", "79", "80", "80", "82", "89", "89", "89", "91", "91", "92", "92", "93", "93", "94", "97", "97", "99"},
},
"table file": {
filename: "testdata/table.shuffled.tsv",
expectedOutput: tableSorted,
},
}

Expand All @@ -65,7 +75,11 @@ func Test(t *testing.T) {
for chunkSize := 1; chunkSize < 152; chunkSize += 10 {
for bufferSize := 1; bufferSize < 152; bufferSize += 10 {
fI, chunkPaths := prepareChunks(t, filename, chunkSize)
got, err := fI.MergeSort(chunkPaths, bufferSize)
elements, err := fI.MergeSort(chunkPaths, bufferSize)
got := make([]string, 0, len(elements))
for _, e := range elements {
got = append(got, e.Value())
}
assert.ElementsMatch(t, got, expectedOutput)
assert.True(t, errors.Is(err, expectedErr))
}
Expand Down
Loading