Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions vindex/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,11 @@ func (m *inputLogMapper) syncFromInputLog(ctx context.Context) error {
}
workDone()

storeCompactRange := m.r.End()%256 == 0 || m.r.End() == cp.Size
// This is a performance tradeoff between flushing very often and allowing data to be indexed quickly,
// and too often, and having things block on syscalls. One full level-1 tile seems to be a good tradeoff.
const storeInterval = 256 * 256

storeCompactRange := m.r.End()%storeInterval == 0 || m.r.End() == cp.Size
if len(hashes) == 0 && !storeCompactRange {
// We can skip writing out values with no hashes, as long as we're
// not at the end of the log.
Expand All @@ -287,8 +291,6 @@ func (m *inputLogMapper) syncFromInputLog(ctx context.Context) error {
}
if storeCompactRange {
// Periodically store the validated compact range consumed so far.
// The choice to align with every 256 entries is an implicit bias towards
// supporting tlog-tiles.
if err := m.walWriter.flush(); err != nil {
return fmt.Errorf("failed to flush the WAL: %v", err)
}
Expand Down
13 changes: 11 additions & 2 deletions vindex/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func newWalWriter(walPath string, treeSize uint64) (*walWriter, error) {
if err != nil {
return nil, fmt.Errorf("failed to open file for writing: %s", err)
}
w.bw = bufio.NewWriter(w.f)
return w, err
}

Expand All @@ -66,10 +67,15 @@ func newWalWriter(walPath string, treeSize uint64) (*walWriter, error) {
type walWriter struct {
walPath string
f *os.File
bw *bufio.Writer
}

func (l *walWriter) close() error {
return l.f.Close()
err := l.bw.Flush()
if cerr := l.f.Close(); err == nil {
err = cerr
}
return err
}

// validate reads the file and determines what the last mapped log index was, and returns it.
Expand Down Expand Up @@ -160,6 +166,9 @@ func validate(walPath string, lastIdx uint64) error {
}

func (l *walWriter) flush() error {
if err := l.bw.Flush(); err != nil {
return err
}
return l.f.Sync()
}

Expand All @@ -168,7 +177,7 @@ func (l *walWriter) append(idx uint64, hashes [][sha256.Size]byte) error {
if err != nil {
return fmt.Errorf("failed to marshal entry: %v", err)
}
_, err = fmt.Fprintf(l.f, "%s\n", e)
_, err = fmt.Fprintf(l.bw, "%s\n", e)
return err
}

Expand Down
9 changes: 7 additions & 2 deletions vindex/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestWriteAheadLog_roundtrip(t *testing.T) {
}
}

func TestWriteAndWriteLog(t *testing.T) {
func TestWriteAndReadLog(t *testing.T) {
f, err := os.CreateTemp("", "testWal")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -237,8 +237,13 @@ func TestWriteAndWriteLog(t *testing.T) {
if err := wal.append(uint64(i), [][sha256.Size]byte{hash}); err != nil {
return err
}
if i%256 == 0 {
if err := wal.flush(); err != nil {
return err
}
}
}
return nil
return wal.flush()
})
eg.Go(func() error {
var expect uint64
Expand Down
Loading