diff --git a/CHANGELOG.md b/CHANGELOG.md index a3ec532ac..085c73e36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes: ### Added +- 🛠 `pinning/pinner`: `Pinner` now has a `Close() error` method. Close waits for every in-flight operation, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, to return before unblocking. After Close, every other method fails fast with the new `ErrClosed` sentinel; streaming methods surface it as the `Err` field of a single entry on the returned channel. Close is idempotent and safe to call from any goroutine. Downstream implementations of `Pinner` must add a `Close` method. +- `pinning/pinner/dspinner`: implements the new `Close`. Stream goroutines now also select on the pinner shutdown signal when they send, so Close never stalls on a parked consumer. Hosts that own the backing datastore should call `Close` on the pinner before closing the datastore to avoid the panic-on-use-after-close path in datastores such as pebble. + ### Changed ### Removed diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index 3dc44863d..22ddb56f5 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -100,6 +100,15 @@ type pinner struct { rootsProvider provider.MultihashProvider pinnedProvider provider.MultihashProvider + + // Lifecycle state. closedMu serialises the isClosed check with + // wg.Add so Close reliably waits for every admitted operation. + // done is closed by Close and unblocks streaming goroutines parked + // on a send. + closedMu sync.Mutex + isClosed bool + done chan struct{} + wg sync.WaitGroup } var _ ipfspinner.Pinner = (*pinner)(nil) @@ -166,6 +175,7 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . nameIndex: dsindex.New(dstore, ds.NewKey(pinNameIndexPath)), dserv: dserv, dstore: dstore, + done: make(chan struct{}), } for _, o := range opts { @@ -191,9 +201,56 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . return p, nil } +// begin admits a caller. It returns [ipfspinner.ErrClosed] if the +// pinner has been closed; otherwise it increments the in-flight +// counter so that Close waits for the caller to finish. Successful +// callers MUST pair begin with a deferred p.wg.Done. +func (p *pinner) begin() error { + p.closedMu.Lock() + defer p.closedMu.Unlock() + if p.isClosed { + return ipfspinner.ErrClosed + } + p.wg.Add(1) + return nil +} + +// errClosedChan returns a pre-filled buffered channel carrying a single +// [ipfspinner.StreamedPin] with err, followed by close. Buffered so +// callers that never read the channel and never cancel their context +// do not leak the send goroutine. +func errClosedChan(err error) <-chan ipfspinner.StreamedPin { + out := make(chan ipfspinner.StreamedPin, 1) + out <- ipfspinner.StreamedPin{Err: err} + close(out) + return out +} + +// Close releases resources held by the pinner and blocks until every +// admitted operation has returned. Close does not close the backing +// datastore. After Close returns, every other method fails fast with +// [ipfspinner.ErrClosed]. Close is idempotent. +func (p *pinner) Close() error { + p.closedMu.Lock() + if p.isClosed { + p.closedMu.Unlock() + return nil + } + p.isClosed = true + close(p.done) + p.closedMu.Unlock() + + p.wg.Wait() + return nil +} + // SetAutosync allows auto-syncing to be enabled or disabled during runtime. // This may be used to turn off autosync before doing many repeated pinning // operations, and then turn it on after. Returns the previous value. +// +// SetAutosync is not part of the [ipfspinner.Pinner] interface and is +// not gated by Close: it mutates an in-memory flag only, never touches +// the datastore, and so is safe to call on a closed pinner. func (p *pinner) SetAutosync(auto bool) bool { p.lock.Lock() defer p.lock.Unlock() @@ -204,6 +261,11 @@ func (p *pinner) SetAutosync(auto bool) bool { // Pin the given node, optionally recursive func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, name string) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + err := p.dserv.Add(ctx, node) if err != nil { return err @@ -440,6 +502,11 @@ func (p *pinner) removePin(ctx context.Context, pp *pin) error { // Unpin a given key func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + cidKey := c.KeyString() p.lock.Lock() @@ -485,6 +552,11 @@ func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { // IsPinned returns whether or not the given key is pinned // and an explanation of why its pinned func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) { + if err := p.begin(); err != nil { + return "", false, err + } + defer p.wg.Done() + p.lock.RLock() defer p.lock.RUnlock() return p.isPinnedWithType(ctx, c, ipfspinner.Any) @@ -493,6 +565,11 @@ func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) // IsPinnedWithType returns whether or not the given cid is pinned with the // given pin type, as well as returning the type of pin its pinned with. func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) { + if err := p.begin(); err != nil { + return "", false, err + } + defer p.wg.Done() + p.lock.RLock() defer p.lock.RUnlock() return p.isPinnedWithType(ctx, c, mode) @@ -606,6 +683,11 @@ func (p *pinner) loadPinName(ctx context.Context, pin *ipfspinner.Pinned, pinID // CheckIfPinnedWithType implements the Pinner interface, checking specific pin types. // This method is optimized to only check the requested pin type(s). func (p *pinner) CheckIfPinnedWithType(ctx context.Context, mode ipfspinner.Mode, includeNames bool, cids ...cid.Cid) ([]ipfspinner.Pinned, error) { + if err := p.begin(); err != nil { + return nil, err + } + defer p.wg.Done() + p.lock.RLock() defer p.lock.RUnlock() @@ -944,29 +1026,38 @@ func (p *pinner) snapshotIndex(ctx context.Context, index dsindex.Indexer) ([]in } func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin { + if err := p.begin(); err != nil { + return errClosedChan(err) + } + out := make(chan ipfspinner.StreamedPin) go func() { + defer p.wg.Done() defer close(out) + // send delivers sp and reports whether the consumer is still + // listening. A closed pinner unblocks send via p.done so that + // Close can wait on wg without stalling on a parked send. send := func(sp ipfspinner.StreamedPin) (ok bool) { select { case <-ctx.Done(): return false + case <-p.done: + return false case out <- sp: return true } } - // If the backing datastore panics during enumeration, - // recover and surface the panic as an error on the output - // channel instead of crashing the process. This is - // datastore-implementation agnostic: any datastore may - // panic on use after Close (pebble being the prominent - // case), and the pinner does not own the datastore's - // lifecycle. The wording below gives the caller enough - // context to treat it as an expected shutdown-time - // interruption rather than a real failure. + // Defense in depth: if the backing datastore panics during + // enumeration, recover and surface the panic as an error on + // the output channel. Close on this pinner runs before the + // datastore is closed by the caller's lifecycle, so this path + // should no longer fire in practice. It remains here for + // callers that do not wire Close correctly; any datastore may + // panic on use after Close (pebble being the prominent case), + // and the pinner does not own the datastore's lifecycle. defer func() { if r := recover(); r != nil { send(ipfspinner.StreamedPin{Err: fmt.Errorf("pin stream interrupted by datastore panic (likely shutdown): %v", r)}) @@ -1020,8 +1111,17 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile } // InternalPins returns all cids kept pinned for the internal state of the -// pinner +// pinner. dspinner does not keep internal pins, so the returned channel +// is always empty; it carries a single [ipfspinner.ErrClosed] entry if +// Close has been called. func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin { + if err := p.begin(); err != nil { + return errClosedChan(err) + } + // Not tracked by p.wg: the channel is closed synchronously before + // we return, so there is no background work for Close to wait on. + defer p.wg.Done() + c := make(chan ipfspinner.StreamedPin) close(c) return c @@ -1032,6 +1132,11 @@ func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspin // // TODO: This will not work when multiple pins are supported func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + p.lock.Lock() defer p.lock.Unlock() @@ -1112,6 +1217,11 @@ func (p *pinner) flushPins(ctx context.Context, force bool) error { // Flush encodes and writes pinner keysets to the datastore func (p *pinner) Flush(ctx context.Context) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + p.lock.Lock() defer p.lock.Unlock() @@ -1126,6 +1236,11 @@ func (p *pinner) Flush(ctx context.Context) error { // PinWithMode allows the user to have fine grained control over pin // counts func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, name string) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + // TODO: remove his to support multiple pins per CID switch mode { case ipfspinner.Recursive: diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index b7a254657..df4d8acb2 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "path" + "sync" "sync/atomic" "testing" "testing/synctest" @@ -1590,3 +1591,200 @@ func TestStreamIndexDoesNotBlockWriters(t *testing.T) { }) } } + +// TestCloseIdempotent asserts Close can be called repeatedly and from +// multiple goroutines without error or panic. +func TestCloseIdempotent(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + require.NoError(t, p.Close()) + require.NoError(t, p.Close()) + + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, p.Close()) + }() + } + wg.Wait() +} + +// TestCloseErrClosedAllMethods asserts every public Pinner method +// returns ErrClosed after Close has returned. Streaming methods +// surface ErrClosed as StreamedPin.Err on the first (and only) +// channel entry. +func TestCloseErrClosedAllMethods(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + // Seed one pin so the index is not empty; Close still succeeds + // but we want post-close streaming methods to emit ErrClosed + // rather than an empty stream, which would be indistinguishable + // from "no pins". + n, k := randNode() + require.NoError(t, dserv.Add(ctx, n)) + require.NoError(t, p.Pin(ctx, n, true, "")) + + require.NoError(t, p.Close()) + + // Scalar methods. + require.ErrorIs(t, p.Pin(ctx, n, true, ""), ipfspin.ErrClosed) + require.ErrorIs(t, p.Unpin(ctx, k, true), ipfspin.ErrClosed) + require.ErrorIs(t, p.Update(ctx, k, k, true), ipfspin.ErrClosed) + require.ErrorIs(t, p.Flush(ctx), ipfspin.ErrClosed) + require.ErrorIs(t, p.PinWithMode(ctx, k, ipfspin.Recursive, ""), ipfspin.ErrClosed) + + _, _, err = p.IsPinned(ctx, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + _, _, err = p.IsPinnedWithType(ctx, k, ipfspin.Recursive) + require.ErrorIs(t, err, ipfspin.ErrClosed) + + _, err = p.CheckIfPinned(ctx, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + _, err = p.CheckIfPinnedWithType(ctx, ipfspin.Recursive, false, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + + // Streaming methods. + assertStreamedErrClosed := func(t *testing.T, name string, ch <-chan ipfspin.StreamedPin) { + t.Helper() + got, ok := <-ch + require.True(t, ok, "%s: expected one entry before close, channel already closed", name) + require.ErrorIs(t, got.Err, ipfspin.ErrClosed, "%s: want ErrClosed on first entry", name) + _, ok = <-ch + require.False(t, ok, "%s: channel must be closed after ErrClosed entry", name) + } + assertStreamedErrClosed(t, "RecursiveKeys", p.RecursiveKeys(ctx, false)) + assertStreamedErrClosed(t, "DirectKeys", p.DirectKeys(ctx, false)) + assertStreamedErrClosed(t, "InternalPins", p.InternalPins(ctx, false)) +} + +// TestCloseWaitsForInFlightOperation asserts Close blocks until an +// operation admitted before Close was called has finished. We hold +// the pinner's write lock from the test to stall an in-flight Pin +// past its begin() admission, then verify Close does not return +// while the Pin is still running. +func TestCloseWaitsForInFlightOperation(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + n, _ := randNode() + require.NoError(t, dserv.Add(ctx, n)) + + // Acquire the index read lock so Pin's internal Lock() call + // blocks after begin() has already incremented wg. + p.lock.RLock() + + pinStarted := make(chan struct{}) + pinReturned := make(chan error, 1) + go func() { + close(pinStarted) + pinReturned <- p.Pin(ctx, n, true, "") + }() + <-pinStarted + // Give Pin time to pass begin() and block on p.lock.Lock(). + time.Sleep(20 * time.Millisecond) + + closeReturned := make(chan error, 1) + go func() { + closeReturned <- p.Close() + }() + + select { + case <-closeReturned: + t.Fatal("Close returned while Pin was still in flight") + case <-time.After(50 * time.Millisecond): + } + + // Releasing the RLock lets Pin acquire Lock and finish; Close + // then unblocks. + p.lock.RUnlock() + + require.NoError(t, <-pinReturned) + require.NoError(t, <-closeReturned) +} + +// TestCloseUnblocksParkedStream asserts Close unblocks a streamIndex +// goroutine that is parked on a send because no consumer is reading. +// Without the p.done signal in send(), Close would stall on wg.Wait +// for the lifetime of the caller's context. +func TestCloseUnblocksParkedStream(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + // Seed a few pins so streamIndex has entries to emit (and + // will therefore park on the first send against an absent + // consumer). + pinNodes(makeNodes(4, dserv), p, true) + + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + kch := p.RecursiveKeys(streamCtx, false) + + // Wait for the streamIndex goroutine to park on send. + synctest.Wait() + + require.NoError(t, p.Close()) + + // Channel must close after the goroutine exits. + for range kch { + } + }) +} + +// TestCloseConcurrent hammers Pin, RecursiveKeys, and Close from many +// goroutines. Run with -race to catch lifecycle races. Every admitted +// operation completes and every post-close caller sees ErrClosed. +func TestCloseConcurrent(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + nodes := makeNodes(4, dserv) + + var wg sync.WaitGroup + for i := 0; i < 32; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + _ = p.Pin(ctx, nodes[i%len(nodes)], true, "") + }(i) + } + for i := 0; i < 32; i++ { + wg.Add(1) + go func() { + defer wg.Done() + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + for range p.RecursiveKeys(streamCtx, false) { + } + }() + } + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = p.Close() + }() + } + wg.Wait() + + require.ErrorIs(t, p.Flush(ctx), ipfspin.ErrClosed) +} diff --git a/pinning/pinner/pin.go b/pinning/pinner/pin.go index efcb1c771..2d4a91dcc 100644 --- a/pinning/pinner/pin.go +++ b/pinning/pinner/pin.go @@ -79,6 +79,12 @@ func StringToMode(s string) (Mode, bool) { // ErrNotPinned is returned when trying to unpin items that are not pinned. var ErrNotPinned = errors.New("not pinned or pinned indirectly") +// ErrClosed is returned by [Pinner] methods after [Pinner.Close] has been +// called. Streaming methods ([Pinner.DirectKeys], [Pinner.RecursiveKeys], +// [Pinner.InternalPins]) surface it as the [StreamedPin.Err] of a single +// entry on the returned channel, which is then closed. +var ErrClosed = errors.New("pinner closed") + // A Pinner provides the necessary methods to keep track of Nodes which are // to be kept locally, according to a pin mode. In practice, a Pinner is in // charge of keeping the list of items from the local storage that should @@ -148,6 +154,18 @@ type Pinner interface { // InternalPins returns all cids kept pinned for the internal state of the // pinner InternalPins(ctx context.Context, detailed bool) <-chan StreamedPin + + // Close releases resources held by the pinner and blocks until every + // in-flight operation, including streaming goroutines from DirectKeys, + // RecursiveKeys, and InternalPins, has returned. + // + // Close does not close the backing datastore; the caller owns that + // lifecycle and must keep the datastore open until Close returns. + // + // After Close returns, every other Pinner method fails fast with + // [ErrClosed]. Close is idempotent and safe to call from any + // goroutine. + Close() error } // Pinned represents CID which has been pinned with a pinning strategy.