From 3cdde219473f9b33e389034e5e8ca56b90509427 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Fri, 24 Apr 2026 14:30:57 +0200 Subject: [PATCH] feat(pinner): add Close with ErrClosed lifecycle Pinner gains Close() error. Close waits for every in-flight operation, including streaming goroutines from RecursiveKeys, DirectKeys, and InternalPins, to finish. After Close, every other Pinner method fails fast with a new ErrClosed sentinel; streaming methods surface it as the Err of a single entry on the returned channel, which is then closed. dspinner gains the matching implementation: Close is idempotent, admission is serialised with sync.Mutex + sync.WaitGroup, and stream sends select on a shutdown channel so a parked consumer cannot stall Close. The panic-recovery and context guards added in #1146 stay in place as defence in depth for hosts that do not wire Close correctly. Hosts that own the backing datastore (e.g. kubo) should call Close on the pinner before closing the datastore to avoid use-after-close panics in pebble and similar stores. --- CHANGELOG.md | 3 + pinning/pinner/dspinner/pin.go | 135 +++++++++++++++++-- pinning/pinner/dspinner/pin_test.go | 198 ++++++++++++++++++++++++++++ pinning/pinner/pin.go | 18 +++ 4 files changed, 344 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a3ec532ac..085c73e36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes: ### Added +- 🛠 `pinning/pinner`: `Pinner` now has a `Close() error` method. Close waits for every in-flight operation, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, to return before unblocking. After Close, every other method fails fast with the new `ErrClosed` sentinel; streaming methods surface it as the `Err` field of a single entry on the returned channel. Close is idempotent and safe to call from any goroutine. Downstream implementations of `Pinner` must add a `Close` method. +- `pinning/pinner/dspinner`: implements the new `Close`. Stream goroutines now also select on the pinner shutdown signal when they send, so Close never stalls on a parked consumer. Hosts that own the backing datastore should call `Close` on the pinner before closing the datastore to avoid the panic-on-use-after-close path in datastores such as pebble. + ### Changed ### Removed diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index 3dc44863d..22ddb56f5 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -100,6 +100,15 @@ type pinner struct { rootsProvider provider.MultihashProvider pinnedProvider provider.MultihashProvider + + // Lifecycle state. closedMu serialises the isClosed check with + // wg.Add so Close reliably waits for every admitted operation. + // done is closed by Close and unblocks streaming goroutines parked + // on a send. + closedMu sync.Mutex + isClosed bool + done chan struct{} + wg sync.WaitGroup } var _ ipfspinner.Pinner = (*pinner)(nil) @@ -166,6 +175,7 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . nameIndex: dsindex.New(dstore, ds.NewKey(pinNameIndexPath)), dserv: dserv, dstore: dstore, + done: make(chan struct{}), } for _, o := range opts { @@ -191,9 +201,56 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . return p, nil } +// begin admits a caller. It returns [ipfspinner.ErrClosed] if the +// pinner has been closed; otherwise it increments the in-flight +// counter so that Close waits for the caller to finish. Successful +// callers MUST pair begin with a deferred p.wg.Done. +func (p *pinner) begin() error { + p.closedMu.Lock() + defer p.closedMu.Unlock() + if p.isClosed { + return ipfspinner.ErrClosed + } + p.wg.Add(1) + return nil +} + +// errClosedChan returns a pre-filled buffered channel carrying a single +// [ipfspinner.StreamedPin] with err, followed by close. Buffered so +// callers that never read the channel and never cancel their context +// do not leak the send goroutine. +func errClosedChan(err error) <-chan ipfspinner.StreamedPin { + out := make(chan ipfspinner.StreamedPin, 1) + out <- ipfspinner.StreamedPin{Err: err} + close(out) + return out +} + +// Close releases resources held by the pinner and blocks until every +// admitted operation has returned. Close does not close the backing +// datastore. After Close returns, every other method fails fast with +// [ipfspinner.ErrClosed]. Close is idempotent. +func (p *pinner) Close() error { + p.closedMu.Lock() + if p.isClosed { + p.closedMu.Unlock() + return nil + } + p.isClosed = true + close(p.done) + p.closedMu.Unlock() + + p.wg.Wait() + return nil +} + // SetAutosync allows auto-syncing to be enabled or disabled during runtime. // This may be used to turn off autosync before doing many repeated pinning // operations, and then turn it on after. Returns the previous value. +// +// SetAutosync is not part of the [ipfspinner.Pinner] interface and is +// not gated by Close: it mutates an in-memory flag only, never touches +// the datastore, and so is safe to call on a closed pinner. func (p *pinner) SetAutosync(auto bool) bool { p.lock.Lock() defer p.lock.Unlock() @@ -204,6 +261,11 @@ func (p *pinner) SetAutosync(auto bool) bool { // Pin the given node, optionally recursive func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, name string) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + err := p.dserv.Add(ctx, node) if err != nil { return err @@ -440,6 +502,11 @@ func (p *pinner) removePin(ctx context.Context, pp *pin) error { // Unpin a given key func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + cidKey := c.KeyString() p.lock.Lock() @@ -485,6 +552,11 @@ func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { // IsPinned returns whether or not the given key is pinned // and an explanation of why its pinned func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) { + if err := p.begin(); err != nil { + return "", false, err + } + defer p.wg.Done() + p.lock.RLock() defer p.lock.RUnlock() return p.isPinnedWithType(ctx, c, ipfspinner.Any) @@ -493,6 +565,11 @@ func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) // IsPinnedWithType returns whether or not the given cid is pinned with the // given pin type, as well as returning the type of pin its pinned with. func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) { + if err := p.begin(); err != nil { + return "", false, err + } + defer p.wg.Done() + p.lock.RLock() defer p.lock.RUnlock() return p.isPinnedWithType(ctx, c, mode) @@ -606,6 +683,11 @@ func (p *pinner) loadPinName(ctx context.Context, pin *ipfspinner.Pinned, pinID // CheckIfPinnedWithType implements the Pinner interface, checking specific pin types. // This method is optimized to only check the requested pin type(s). func (p *pinner) CheckIfPinnedWithType(ctx context.Context, mode ipfspinner.Mode, includeNames bool, cids ...cid.Cid) ([]ipfspinner.Pinned, error) { + if err := p.begin(); err != nil { + return nil, err + } + defer p.wg.Done() + p.lock.RLock() defer p.lock.RUnlock() @@ -944,29 +1026,38 @@ func (p *pinner) snapshotIndex(ctx context.Context, index dsindex.Indexer) ([]in } func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin { + if err := p.begin(); err != nil { + return errClosedChan(err) + } + out := make(chan ipfspinner.StreamedPin) go func() { + defer p.wg.Done() defer close(out) + // send delivers sp and reports whether the consumer is still + // listening. A closed pinner unblocks send via p.done so that + // Close can wait on wg without stalling on a parked send. send := func(sp ipfspinner.StreamedPin) (ok bool) { select { case <-ctx.Done(): return false + case <-p.done: + return false case out <- sp: return true } } - // If the backing datastore panics during enumeration, - // recover and surface the panic as an error on the output - // channel instead of crashing the process. This is - // datastore-implementation agnostic: any datastore may - // panic on use after Close (pebble being the prominent - // case), and the pinner does not own the datastore's - // lifecycle. The wording below gives the caller enough - // context to treat it as an expected shutdown-time - // interruption rather than a real failure. + // Defense in depth: if the backing datastore panics during + // enumeration, recover and surface the panic as an error on + // the output channel. Close on this pinner runs before the + // datastore is closed by the caller's lifecycle, so this path + // should no longer fire in practice. It remains here for + // callers that do not wire Close correctly; any datastore may + // panic on use after Close (pebble being the prominent case), + // and the pinner does not own the datastore's lifecycle. defer func() { if r := recover(); r != nil { send(ipfspinner.StreamedPin{Err: fmt.Errorf("pin stream interrupted by datastore panic (likely shutdown): %v", r)}) @@ -1020,8 +1111,17 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile } // InternalPins returns all cids kept pinned for the internal state of the -// pinner +// pinner. dspinner does not keep internal pins, so the returned channel +// is always empty; it carries a single [ipfspinner.ErrClosed] entry if +// Close has been called. func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin { + if err := p.begin(); err != nil { + return errClosedChan(err) + } + // Not tracked by p.wg: the channel is closed synchronously before + // we return, so there is no background work for Close to wait on. + defer p.wg.Done() + c := make(chan ipfspinner.StreamedPin) close(c) return c @@ -1032,6 +1132,11 @@ func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspin // // TODO: This will not work when multiple pins are supported func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + p.lock.Lock() defer p.lock.Unlock() @@ -1112,6 +1217,11 @@ func (p *pinner) flushPins(ctx context.Context, force bool) error { // Flush encodes and writes pinner keysets to the datastore func (p *pinner) Flush(ctx context.Context) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + p.lock.Lock() defer p.lock.Unlock() @@ -1126,6 +1236,11 @@ func (p *pinner) Flush(ctx context.Context) error { // PinWithMode allows the user to have fine grained control over pin // counts func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, name string) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + // TODO: remove his to support multiple pins per CID switch mode { case ipfspinner.Recursive: diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index b7a254657..df4d8acb2 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "path" + "sync" "sync/atomic" "testing" "testing/synctest" @@ -1590,3 +1591,200 @@ func TestStreamIndexDoesNotBlockWriters(t *testing.T) { }) } } + +// TestCloseIdempotent asserts Close can be called repeatedly and from +// multiple goroutines without error or panic. +func TestCloseIdempotent(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + require.NoError(t, p.Close()) + require.NoError(t, p.Close()) + + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, p.Close()) + }() + } + wg.Wait() +} + +// TestCloseErrClosedAllMethods asserts every public Pinner method +// returns ErrClosed after Close has returned. Streaming methods +// surface ErrClosed as StreamedPin.Err on the first (and only) +// channel entry. +func TestCloseErrClosedAllMethods(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + // Seed one pin so the index is not empty; Close still succeeds + // but we want post-close streaming methods to emit ErrClosed + // rather than an empty stream, which would be indistinguishable + // from "no pins". + n, k := randNode() + require.NoError(t, dserv.Add(ctx, n)) + require.NoError(t, p.Pin(ctx, n, true, "")) + + require.NoError(t, p.Close()) + + // Scalar methods. + require.ErrorIs(t, p.Pin(ctx, n, true, ""), ipfspin.ErrClosed) + require.ErrorIs(t, p.Unpin(ctx, k, true), ipfspin.ErrClosed) + require.ErrorIs(t, p.Update(ctx, k, k, true), ipfspin.ErrClosed) + require.ErrorIs(t, p.Flush(ctx), ipfspin.ErrClosed) + require.ErrorIs(t, p.PinWithMode(ctx, k, ipfspin.Recursive, ""), ipfspin.ErrClosed) + + _, _, err = p.IsPinned(ctx, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + _, _, err = p.IsPinnedWithType(ctx, k, ipfspin.Recursive) + require.ErrorIs(t, err, ipfspin.ErrClosed) + + _, err = p.CheckIfPinned(ctx, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + _, err = p.CheckIfPinnedWithType(ctx, ipfspin.Recursive, false, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + + // Streaming methods. + assertStreamedErrClosed := func(t *testing.T, name string, ch <-chan ipfspin.StreamedPin) { + t.Helper() + got, ok := <-ch + require.True(t, ok, "%s: expected one entry before close, channel already closed", name) + require.ErrorIs(t, got.Err, ipfspin.ErrClosed, "%s: want ErrClosed on first entry", name) + _, ok = <-ch + require.False(t, ok, "%s: channel must be closed after ErrClosed entry", name) + } + assertStreamedErrClosed(t, "RecursiveKeys", p.RecursiveKeys(ctx, false)) + assertStreamedErrClosed(t, "DirectKeys", p.DirectKeys(ctx, false)) + assertStreamedErrClosed(t, "InternalPins", p.InternalPins(ctx, false)) +} + +// TestCloseWaitsForInFlightOperation asserts Close blocks until an +// operation admitted before Close was called has finished. We hold +// the pinner's write lock from the test to stall an in-flight Pin +// past its begin() admission, then verify Close does not return +// while the Pin is still running. +func TestCloseWaitsForInFlightOperation(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + n, _ := randNode() + require.NoError(t, dserv.Add(ctx, n)) + + // Acquire the index read lock so Pin's internal Lock() call + // blocks after begin() has already incremented wg. + p.lock.RLock() + + pinStarted := make(chan struct{}) + pinReturned := make(chan error, 1) + go func() { + close(pinStarted) + pinReturned <- p.Pin(ctx, n, true, "") + }() + <-pinStarted + // Give Pin time to pass begin() and block on p.lock.Lock(). + time.Sleep(20 * time.Millisecond) + + closeReturned := make(chan error, 1) + go func() { + closeReturned <- p.Close() + }() + + select { + case <-closeReturned: + t.Fatal("Close returned while Pin was still in flight") + case <-time.After(50 * time.Millisecond): + } + + // Releasing the RLock lets Pin acquire Lock and finish; Close + // then unblocks. + p.lock.RUnlock() + + require.NoError(t, <-pinReturned) + require.NoError(t, <-closeReturned) +} + +// TestCloseUnblocksParkedStream asserts Close unblocks a streamIndex +// goroutine that is parked on a send because no consumer is reading. +// Without the p.done signal in send(), Close would stall on wg.Wait +// for the lifetime of the caller's context. +func TestCloseUnblocksParkedStream(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + // Seed a few pins so streamIndex has entries to emit (and + // will therefore park on the first send against an absent + // consumer). + pinNodes(makeNodes(4, dserv), p, true) + + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + kch := p.RecursiveKeys(streamCtx, false) + + // Wait for the streamIndex goroutine to park on send. + synctest.Wait() + + require.NoError(t, p.Close()) + + // Channel must close after the goroutine exits. + for range kch { + } + }) +} + +// TestCloseConcurrent hammers Pin, RecursiveKeys, and Close from many +// goroutines. Run with -race to catch lifecycle races. Every admitted +// operation completes and every post-close caller sees ErrClosed. +func TestCloseConcurrent(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + nodes := makeNodes(4, dserv) + + var wg sync.WaitGroup + for i := 0; i < 32; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + _ = p.Pin(ctx, nodes[i%len(nodes)], true, "") + }(i) + } + for i := 0; i < 32; i++ { + wg.Add(1) + go func() { + defer wg.Done() + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + for range p.RecursiveKeys(streamCtx, false) { + } + }() + } + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = p.Close() + }() + } + wg.Wait() + + require.ErrorIs(t, p.Flush(ctx), ipfspin.ErrClosed) +} diff --git a/pinning/pinner/pin.go b/pinning/pinner/pin.go index efcb1c771..2d4a91dcc 100644 --- a/pinning/pinner/pin.go +++ b/pinning/pinner/pin.go @@ -79,6 +79,12 @@ func StringToMode(s string) (Mode, bool) { // ErrNotPinned is returned when trying to unpin items that are not pinned. var ErrNotPinned = errors.New("not pinned or pinned indirectly") +// ErrClosed is returned by [Pinner] methods after [Pinner.Close] has been +// called. Streaming methods ([Pinner.DirectKeys], [Pinner.RecursiveKeys], +// [Pinner.InternalPins]) surface it as the [StreamedPin.Err] of a single +// entry on the returned channel, which is then closed. +var ErrClosed = errors.New("pinner closed") + // A Pinner provides the necessary methods to keep track of Nodes which are // to be kept locally, according to a pin mode. In practice, a Pinner is in // charge of keeping the list of items from the local storage that should @@ -148,6 +154,18 @@ type Pinner interface { // InternalPins returns all cids kept pinned for the internal state of the // pinner InternalPins(ctx context.Context, detailed bool) <-chan StreamedPin + + // Close releases resources held by the pinner and blocks until every + // in-flight operation, including streaming goroutines from DirectKeys, + // RecursiveKeys, and InternalPins, has returned. + // + // Close does not close the backing datastore; the caller owns that + // lifecycle and must keep the datastore open until Close returns. + // + // After Close returns, every other Pinner method fails fast with + // [ErrClosed]. Close is idempotent and safe to call from any + // goroutine. + Close() error } // Pinned represents CID which has been pinned with a pinning strategy.