From f177927ac3b82258cc353c6341c6dc5de8736eca Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Thu, 19 Mar 2026 12:29:15 +0000 Subject: [PATCH] [VIndex] Parallel computation of the mapped leaves This is a CPU bottleneck, so let's do it in parallel. ``` Before: 254.58user 15.41system 3:44.19elapsed 120%CPU (0avgtext+0avgdata 1409964maxresident)k 3031616inputs+2965888outputs (4major+756304minor)pagefaults 0swaps After: 282.40user 15.61system 1:57.16elapsed 254%CPU (0avgtext+0avgdata 1405804maxresident)k 9141704inputs+3113128outputs (5915major+1263537minor)pagefaults 0swaps ``` --- vindex/map.go | 216 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 171 insertions(+), 45 deletions(-) diff --git a/vindex/map.go b/vindex/map.go index c83d8b0..fc30e27 100644 --- a/vindex/map.go +++ b/vindex/map.go @@ -30,6 +30,7 @@ import ( "iter" "os" "path" + "runtime" "slices" "strconv" "sync" @@ -42,6 +43,7 @@ import ( "github.com/transparency-dev/incubator/vindex/api" "github.com/transparency-dev/merkle/compact" "github.com/transparency-dev/merkle/rfc6962" + "golang.org/x/sync/errgroup" "k8s.io/klog/v2" ) @@ -243,62 +245,186 @@ func (m *inputLogMapper) syncFromInputLog(ctx context.Context) error { } } }() - for l, err := range m.inputLog.Leaves(ctx, m.r.End(), cp.Size) { - idx := m.r.End() - workDone := r.trackWork(idx) - if err != nil { - return fmt.Errorf("failed to read leaf at index %d: %v", idx, err) - } - if idx >= cp.Size { - return fmt.Errorf("expected stop at cp.Size=%d, but got leaf at index=%d", cp.Size, idx) - } + g, gCtx := errgroup.WithContext(ctx) + producerCtx, cancelProducer := context.WithCancel(gCtx) + defer cancelProducer() + + const chunkSize = 512 + type chunk struct { + firstIdx uint64 + leaves [][]byte + backing []byte + leafHashes [][]byte + hashes [][][sha256.Size]byte + err error + done chan struct{} + } + + workers := runtime.NumCPU() + if workers < 2 { + workers = 2 + } else if workers > 16 { + workers = 16 + } + + workerCh := make(chan *chunk, workers*2) + promises := make(chan *chunk, workers*4) + + for i := 0; i < workers; i++ { + g.Go(func() error { + for c := range workerCh { + c.leafHashes = make([][]byte, len(c.leaves)) + c.hashes = make([][][sha256.Size]byte, len(c.leaves)) + + for j, l := range c.leaves { + c.leafHashes[j] = rfc6962.DefaultHasher.HashLeaf(l) + + var mapErr error + func() { + defer func() { + if r := recover(); r != nil { + mapErr = fmt.Errorf("panic detected mapping index %d: %s", c.firstIdx+uint64(j), r) + } + }() + c.hashes[j] = m.mapFn(l) + }() + + if mapErr != nil { + c.err = mapErr + break + } + } + close(c.done) + + select { + case <-producerCtx.Done(): + return producerCtx.Err() + default: + } + } + return nil + }) + } + + g.Go(func() error { + defer close(promises) + defer close(workerCh) - if err := m.r.Append(rfc6962.DefaultHasher.HashLeaf(l), nil); err != nil { - return fmt.Errorf("failed to update compact range: %v", err) + readIdx := m.r.End() + var currentChunk *chunk + + flushChunk := func() error { + if currentChunk == nil || len(currentChunk.leaves) == 0 { + return nil + } + select { + case promises <- currentChunk: + case <-producerCtx.Done(): + return producerCtx.Err() + } + + select { + case workerCh <- currentChunk: + case <-producerCtx.Done(): + return producerCtx.Err() + } + + currentChunk = nil + return nil } - // Apply the MapFn in as safe a way as possible. This involves trapping any panics - // and failing gracefully. - var hashes [][sha256.Size]byte - var mapErr error - func() { - defer func() { - if r := recover(); r != nil { - mapErr = fmt.Errorf("panic detected mapping index %d: %s", idx, r) + // Typical leaf length is a few hundred bytes, allocating chunk blocks for zero-alloc copies. + for lRaw, err := range m.inputLog.Leaves(producerCtx, readIdx, cp.Size) { + if err != nil { + return fmt.Errorf("failed to read leaf at index %d: %v", readIdx, err) + } + if readIdx >= cp.Size { + return fmt.Errorf("expected stop at cp.Size=%d, but got leaf at index=%d", cp.Size, readIdx) + } + + if currentChunk == nil { + currentChunk = &chunk{ + firstIdx: readIdx, + leaves: make([][]byte, 0, chunkSize), + backing: make([]byte, 0, chunkSize*256), + done: make(chan struct{}), } - }() - hashes = m.mapFn(l) - }() - if mapErr != nil { - return mapErr + } + + start := len(currentChunk.backing) + currentChunk.backing = append(currentChunk.backing, lRaw...) + currentChunk.leaves = append(currentChunk.leaves, currentChunk.backing[start:len(currentChunk.backing)]) + + if len(currentChunk.leaves) == chunkSize { + if err := flushChunk(); err != nil { + return err + } + } + readIdx++ } - workDone() - - // This is a performance tradeoff between flushing very often and allowing data to be indexed quickly, - // and too often, and having things block on syscalls. One full level-1 tile seems to be a good tradeoff. - const storeInterval = 256 * 256 - - storeCompactRange := m.r.End()%storeInterval == 0 || m.r.End() == cp.Size - if len(hashes) == 0 && !storeCompactRange { - // We can skip writing out values with no hashes, as long as we're - // not at the end of the log. - // If we are at the end of the log, we need to write out a value as a sentinel - // even if there are no hashes. - continue + return flushChunk() + }) + + var consumerErr error + ConsumerLoop: + for c := range promises { + select { + case <-c.done: + case <-gCtx.Done(): + consumerErr = gCtx.Err() + break ConsumerLoop } - if err := m.walWriter.append(idx, hashes); err != nil { - return fmt.Errorf("failed to add index to entry for leaf %d: %v", idx, err) + + if c.err != nil { + consumerErr = c.err + break } - if storeCompactRange { - // Periodically store the validated compact range consumed so far. - if err := m.walWriter.flush(); err != nil { - return fmt.Errorf("failed to flush the WAL: %v", err) + + for i := range c.leaves { + idx := c.firstIdx + uint64(i) + workDone := r.trackWork(idx) + + if err := m.r.Append(c.leafHashes[i], nil); err != nil { + consumerErr = fmt.Errorf("failed to update compact range: %v", err) + break ConsumerLoop } - if err := m.storeState(); err != nil { - return fmt.Errorf("failed to store incremental state: %v", err) + + workDone() + + hashes := c.hashes[i] + const storeInterval = 256 * 256 + + storeCompactRange := m.r.End()%storeInterval == 0 || m.r.End() == cp.Size + if len(hashes) == 0 && !storeCompactRange { + continue + } + if err := m.walWriter.append(idx, hashes); err != nil { + consumerErr = fmt.Errorf("failed to add index to entry for leaf %d: %v", idx, err) + break ConsumerLoop + } + if storeCompactRange { + if err := m.walWriter.flush(); err != nil { + consumerErr = fmt.Errorf("failed to flush the WAL: %v", err) + break ConsumerLoop + } + if err := m.storeState(); err != nil { + consumerErr = fmt.Errorf("failed to store incremental state: %v", err) + break ConsumerLoop + } } } } + + cancelProducer() + + if err := g.Wait(); err != nil { + if consumerErr == nil { + return err + } + } + if consumerErr != nil { + return consumerErr + } } if err := m.walWriter.flush(); err != nil { return fmt.Errorf("failed to flush: %v", err)