diff --git a/vindex/map.go b/vindex/map.go index 7c8b340..c83d8b0 100644 --- a/vindex/map.go +++ b/vindex/map.go @@ -274,7 +274,11 @@ func (m *inputLogMapper) syncFromInputLog(ctx context.Context) error { } workDone() - storeCompactRange := m.r.End()%256 == 0 || m.r.End() == cp.Size + // This is a performance tradeoff between flushing very often and allowing data to be indexed quickly, + // and too often, and having things block on syscalls. One full level-1 tile seems to be a good tradeoff. + const storeInterval = 256 * 256 + + storeCompactRange := m.r.End()%storeInterval == 0 || m.r.End() == cp.Size if len(hashes) == 0 && !storeCompactRange { // We can skip writing out values with no hashes, as long as we're // not at the end of the log. @@ -287,8 +291,6 @@ func (m *inputLogMapper) syncFromInputLog(ctx context.Context) error { } if storeCompactRange { // Periodically store the validated compact range consumed so far. - // The choice to align with every 256 entries is an implicit bias towards - // supporting tlog-tiles. if err := m.walWriter.flush(); err != nil { return fmt.Errorf("failed to flush the WAL: %v", err) } diff --git a/vindex/wal.go b/vindex/wal.go index 2630598..58483ef 100644 --- a/vindex/wal.go +++ b/vindex/wal.go @@ -57,6 +57,7 @@ func newWalWriter(walPath string, treeSize uint64) (*walWriter, error) { if err != nil { return nil, fmt.Errorf("failed to open file for writing: %s", err) } + w.bw = bufio.NewWriter(w.f) return w, err } @@ -66,10 +67,15 @@ func newWalWriter(walPath string, treeSize uint64) (*walWriter, error) { type walWriter struct { walPath string f *os.File + bw *bufio.Writer } func (l *walWriter) close() error { - return l.f.Close() + err := l.bw.Flush() + if cerr := l.f.Close(); err == nil { + err = cerr + } + return err } // validate reads the file and determines what the last mapped log index was, and returns it. @@ -160,6 +166,9 @@ func validate(walPath string, lastIdx uint64) error { } func (l *walWriter) flush() error { + if err := l.bw.Flush(); err != nil { + return err + } return l.f.Sync() } @@ -168,7 +177,7 @@ func (l *walWriter) append(idx uint64, hashes [][sha256.Size]byte) error { if err != nil { return fmt.Errorf("failed to marshal entry: %v", err) } - _, err = fmt.Fprintf(l.f, "%s\n", e) + _, err = fmt.Fprintf(l.bw, "%s\n", e) return err } diff --git a/vindex/wal_test.go b/vindex/wal_test.go index 06dc83f..40166b2 100644 --- a/vindex/wal_test.go +++ b/vindex/wal_test.go @@ -207,7 +207,7 @@ func TestWriteAheadLog_roundtrip(t *testing.T) { } } -func TestWriteAndWriteLog(t *testing.T) { +func TestWriteAndReadLog(t *testing.T) { f, err := os.CreateTemp("", "testWal") if err != nil { t.Fatal(err) @@ -237,8 +237,13 @@ func TestWriteAndWriteLog(t *testing.T) { if err := wal.append(uint64(i), [][sha256.Size]byte{hash}); err != nil { return err } + if i%256 == 0 { + if err := wal.flush(); err != nil { + return err + } + } } - return nil + return wal.flush() }) eg.Go(func() error { var expect uint64