Skip to content

wal: fix flaky TestConcurrentWritersWithManyRecords#6000

Open
xxmplus wants to merge 1 commit intocockroachdb:masterfrom
xxmplus:xxm/deflake-concurrent-writers
Open

wal: fix flaky TestConcurrentWritersWithManyRecords#6000
xxmplus wants to merge 1 commit intocockroachdb:masterfrom
xxmplus:xxm/deflake-concurrent-writers

Conversation

@xxmplus
Copy link
Copy Markdown
Contributor

@xxmplus xxmplus commented Apr 24, 2026

Summary

  • Fix off-by-one in logWriterCreated channel synchronization that caused TestConcurrentWritersWithManyRecords to flake on Windows nightlies.
  • Add stopper.stop() before file verification to ensure all async LogWriter closes complete.
  • Add diagnostic t.Logf for each file's interval so future flakes produce actionable data.

Fixes #5995. Informs #4754.

Root cause

newFailoverWriter internally calls switchToNewDir, which async-creates writer 0 and sends a signal to logWriterCreated (buffered channel, cap 100). The test never drained this initial signal. Each <-logWriterCreated in the loop therefore drained the wrong writer's signal (off-by-one):

switchToNewDir(dirs[1])   // starts async creation of writer 1, nextWriterIndex=2
<-logWriterCreated        // DRAINS SIGNAL FROM WRITER 0, not writer 1

This caused writer creation goroutines to run after nextWriterIndex had advanced past them, triggering the closeWriter path (writerIndex+1 != nextWriterIndex) and skipping snapshotAndSwitchWriter. When combined with a writer whose flusher could advance the queue tail via doneSyncCallback, a later writer's snapshot started from a non-zero tail, producing interval.first > 0.

Deterministic reproduction

The following test (not included in this PR) deterministically reproduces the bug by:

  1. Letting writer 0 write freely (no blockingWrite) so its flusher pops entries
  2. Blocking writer 1's file creation (blockingCreate) to force it to be skipped
  3. Not draining the initial logWriterCreated signal (the bug)
Reproduction test code
func TestConcurrentWritersOffByOneRepro(t *testing.T) {
	defer leaktest.AfterTest(t)()

	rng := rand.New(rand.NewPCG(0, 12345))
	records := make([][]byte, 20<<10)
	recordsMap := map[string]int{}
	for i := range records {
		records[i] = make([]byte, 50+rng.IntN(100))
		for {
			randStr(records[i], rng)
			if _, ok := recordsMap[string(records[i])]; ok {
				continue
			} else {
				recordsMap[string(records[i])] = i
				break
			}
		}
	}
	const numLogWriters = 4
	memFS := vfs.NewCrashableMem()
	dirs := [numDirIndices]dirAndFileHandle{{Dir: Dir{Dirname: "pri"}}, {Dir: Dir{Dirname: "sec"}}}
	for _, dir := range dirs {
		require.NoError(t, memFS.MkdirAll(dir.Dirname, 0755))
		f, err := memFS.OpenDir("")
		require.NoError(t, err)
		require.NoError(t, f.Sync())
		require.NoError(t, f.Close())
	}
	bFS := newBlockingFS(memFS)
	for i := range dirs {
		dirs[i].FS = bFS
		f, err := bFS.OpenDir(dirs[i].Dirname)
		require.NoError(t, err)
		dirs[i].File = f
	}
	for i := 1; i < numLogWriters; i++ {
		bFS.setConf(makeLogFilename(0, LogNameIndex(i)), blockingWrite)
	}
	bFS.setConf(makeLogFilename(0, LogNameIndex(1)), blockingCreate|blockingWrite)

	stopper := newStopper()
	logWriterCreated := make(chan struct{}, 100)
	queueSemChan := make(chan struct{}, len(records))
	dirIndex := 0
	ww, err := newFailoverWriter(failoverWriterOpts{
		wn: 0, timeSource: defaultTime{}, logCreator: simpleLogCreator,
		preallocateSize: func() int { return 0 }, queueSemChan: queueSemChan,
		stopper: stopper,
		failoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
		writerClosed: func(_ logicalLogWithSizesEtc) {},
		segmentClosed: func(_ logicalLogWithSizesEtc) {},
		writerCreatedForTest: logWriterCreated,
		writeWALSyncOffsets: func() bool { return false },
	}, dirs[dirIndex])
	require.NoError(t, err)
	// BUG: intentionally do NOT drain the initial logWriterCreated signal.
	wg := &sync.WaitGroup{}
	switchInterval := len(records) / 4
	for i := 0; i < len(records); i++ {
		queueSemChan <- struct{}{}
		wg.Add(1)
		synco := SyncOptions{Done: wg, Err: new(error)}
		_, err := ww.WriteRecord(records[i], synco, nil)
		require.NoError(t, err)
		if i > 0 && i%switchInterval == 0 {
			dirIndex = (dirIndex + 1) % 2
			ww.switchToNewDir(dirs[dirIndex])
			<-logWriterCreated
		}
	}
	time.Sleep(5 * time.Millisecond)
	for i := range numLogWriters {
		bFS.setConf(makeLogFilename(0, LogNameIndex(i)), 0)
	}
	_, err = ww.Close()
	require.NoError(t, err)
	wg.Wait()
	func() {
		for range 100 {
			if len(queueSemChan) == 0 { return }
			time.Sleep(100 * time.Millisecond)
		}
		require.Equal(t, 0, len(queueSemChan))
	}()
	stopper.stop()

	type indexInterval struct { first, last int }
	foundNonZeroFirst := false
	for i := range numLogWriters {
		func() {
			f, err := memFS.Open(memFS.PathJoin(dirs[i%2].Dirname, makeLogFilename(0, LogNameIndex(i))))
			if err != nil {
				t.Logf("file %d: %s", i, err.Error())
				return
			}
			defer f.Close()
			rr := record.NewReader(f, base.DiskFileNum(0))
			interval := indexInterval{}
			for {
				r, err := rr.Next()
				if err != nil {
					if err != io.EOF { t.Logf("file %d: read error: %s", i, err.Error()) }
					break
				}
				var bb strings.Builder
				_, err = io.Copy(&bb, r)
				if err != nil { break }
				index, ok := recordsMap[bb.String()]
				if !ok { break }
				if interval.first == interval.last {
					interval.first = index
					interval.last = index + 1
				} else if interval.last == index {
					interval.last++
				} else {
					break
				}
			}
			t.Logf("file %d: interval [%d, %d)", i, interval.first, interval.last)
			if interval.first != interval.last && interval.first > 0 {
				foundNonZeroFirst = true
			}
		}()
	}
	if !foundNonZeroFirst {
		t.Fatal("expected at least one file with interval.first > 0")
	}
}

Output (without fix, demonstrating the bug):

file 0: interval [0, 10241)       // writer 0 wrote freely, received all records while q.writer
file 1: interval [0, 0)           // writer 1 skipped (blockingCreate + off-by-one)
file 2: interval [10241, 15361)   // snapshot started at 10241 (queue tail advanced by writer 0 pops)
file 3: interval [10241, 20480)   // same: snapshot started from non-zero tail

Output (with fix):

file 0: interval [0, 5121)        // writer 0 only got records 0-5120 (before first switch)
file 1: interval [0, 10241)       // writer 1 created properly, snapshot from 0
file 2: interval [0, 15361)       // writer 2 created properly, snapshot from 0
file 3: interval [0, 20480)       // all records accounted for

Test plan

  • go test ./wal/ -run TestConcurrentWritersWithManyRecords -count=50 -tags invariants passes (50/50)

newFailoverWriter internally calls switchToNewDir, which async-creates
writer 0 and sends a signal to the logWriterCreated channel. The test
never drained this initial signal, so each <-logWriterCreated in the
loop drained the wrong writer's signal (off-by-one). This caused writer
creation goroutines to run after nextWriterIndex had advanced past them,
triggering the closeWriter path (writerIndex+1 != nextWriterIndex) and
skipping snapshotAndSwitchWriter entirely.

When combined with a writer whose flusher could make progress (advancing
the queue tail via doneSyncCallback), a later writer's snapshot would
start from a non-zero tail, producing the observed interval.first > 0
failures.

Fix by draining the initial writer creation signal before the record
pushing loop. Also add stopper.stop() before file verification to ensure
all async LogWriter closes complete, and add diagnostic logging for each
file's interval.

Fixes cockroachdb#5995.
Informs cockroachdb#4754.

Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@xxmplus xxmplus marked this pull request as ready for review April 24, 2026 19:00
@xxmplus xxmplus requested a review from a team as a code owner April 24, 2026 19:00
@xxmplus xxmplus requested a review from RaduBerinde April 24, 2026 19:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

crl-release-25.2: nightly windows failed

2 participants