Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes:

### Added

- 🛠 `pinning/pinner`: `Pinner` now has a `Close() error` method. Close waits for every in-flight operation, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, to return before unblocking. After Close, every other method fails fast with the new `ErrClosed` sentinel; streaming methods surface it as the `Err` field of a single entry on the returned channel. Close is idempotent and safe to call from any goroutine. Downstream implementations of `Pinner` must add a `Close` method.
- `pinning/pinner/dspinner`: implements the new `Close`. Stream goroutines now also select on the pinner shutdown signal when they send, so Close never stalls on a parked consumer. Hosts that own the backing datastore should call `Close` on the pinner before closing the datastore to avoid the panic-on-use-after-close path in datastores such as pebble.

### Changed

### Removed
Expand Down
135 changes: 125 additions & 10 deletions pinning/pinner/dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ type pinner struct {

rootsProvider provider.MultihashProvider
pinnedProvider provider.MultihashProvider

// Lifecycle state. closedMu serialises the isClosed check with
// wg.Add so Close reliably waits for every admitted operation.
// done is closed by Close and unblocks streaming goroutines parked
// on a send.
closedMu sync.Mutex
isClosed bool
done chan struct{}
wg sync.WaitGroup
}

var _ ipfspinner.Pinner = (*pinner)(nil)
Expand Down Expand Up @@ -166,6 +175,7 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts .
nameIndex: dsindex.New(dstore, ds.NewKey(pinNameIndexPath)),
dserv: dserv,
dstore: dstore,
done: make(chan struct{}),
}

for _, o := range opts {
Expand All @@ -191,9 +201,56 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts .
return p, nil
}

// begin admits a caller. It returns [ipfspinner.ErrClosed] if the
// pinner has been closed; otherwise it increments the in-flight
// counter so that Close waits for the caller to finish. Successful
// callers MUST pair begin with a deferred p.wg.Done.
func (p *pinner) begin() error {
p.closedMu.Lock()
defer p.closedMu.Unlock()
if p.isClosed {
return ipfspinner.ErrClosed
}
p.wg.Add(1)
return nil
}

// errClosedChan returns a pre-filled buffered channel carrying a single
// [ipfspinner.StreamedPin] with err, followed by close. Buffered so
// callers that never read the channel and never cancel their context
// do not leak the send goroutine.
func errClosedChan(err error) <-chan ipfspinner.StreamedPin {
out := make(chan ipfspinner.StreamedPin, 1)
out <- ipfspinner.StreamedPin{Err: err}
close(out)
return out
}

// Close releases resources held by the pinner and blocks until every
// admitted operation has returned. Close does not close the backing
// datastore. After Close returns, every other method fails fast with
// [ipfspinner.ErrClosed]. Close is idempotent.
func (p *pinner) Close() error {
p.closedMu.Lock()
if p.isClosed {
p.closedMu.Unlock()
return nil
}
p.isClosed = true
close(p.done)
p.closedMu.Unlock()

p.wg.Wait()
return nil
}

// SetAutosync allows auto-syncing to be enabled or disabled during runtime.
// This may be used to turn off autosync before doing many repeated pinning
// operations, and then turn it on after. Returns the previous value.
//
// SetAutosync is not part of the [ipfspinner.Pinner] interface and is
// not gated by Close: it mutates an in-memory flag only, never touches
// the datastore, and so is safe to call on a closed pinner.
func (p *pinner) SetAutosync(auto bool) bool {
p.lock.Lock()
defer p.lock.Unlock()
Expand All @@ -204,6 +261,11 @@ func (p *pinner) SetAutosync(auto bool) bool {

// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, name string) error {
if err := p.begin(); err != nil {
return err
}
defer p.wg.Done()

err := p.dserv.Add(ctx, node)
if err != nil {
return err
Expand Down Expand Up @@ -440,6 +502,11 @@ func (p *pinner) removePin(ctx context.Context, pp *pin) error {

// Unpin a given key
func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error {
if err := p.begin(); err != nil {
return err
}
defer p.wg.Done()

cidKey := c.KeyString()

p.lock.Lock()
Expand Down Expand Up @@ -485,6 +552,11 @@ func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error {
// IsPinned returns whether or not the given key is pinned
// and an explanation of why its pinned
func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) {
if err := p.begin(); err != nil {
return "", false, err
}
defer p.wg.Done()

p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, ipfspinner.Any)
Expand All @@ -493,6 +565,11 @@ func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error)
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) {
if err := p.begin(); err != nil {
return "", false, err
}
defer p.wg.Done()

p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, mode)
Expand Down Expand Up @@ -606,6 +683,11 @@ func (p *pinner) loadPinName(ctx context.Context, pin *ipfspinner.Pinned, pinID
// CheckIfPinnedWithType implements the Pinner interface, checking specific pin types.
// This method is optimized to only check the requested pin type(s).
func (p *pinner) CheckIfPinnedWithType(ctx context.Context, mode ipfspinner.Mode, includeNames bool, cids ...cid.Cid) ([]ipfspinner.Pinned, error) {
if err := p.begin(); err != nil {
return nil, err
}
defer p.wg.Done()

p.lock.RLock()
defer p.lock.RUnlock()

Expand Down Expand Up @@ -944,29 +1026,38 @@ func (p *pinner) snapshotIndex(ctx context.Context, index dsindex.Indexer) ([]in
}

func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin {
if err := p.begin(); err != nil {
return errClosedChan(err)
}

out := make(chan ipfspinner.StreamedPin)

go func() {
defer p.wg.Done()
defer close(out)

// send delivers sp and reports whether the consumer is still
// listening. A closed pinner unblocks send via p.done so that
// Close can wait on wg without stalling on a parked send.
send := func(sp ipfspinner.StreamedPin) (ok bool) {
select {
case <-ctx.Done():
return false
case <-p.done:
return false
case out <- sp:
return true
}
}

// If the backing datastore panics during enumeration,
// recover and surface the panic as an error on the output
// channel instead of crashing the process. This is
// datastore-implementation agnostic: any datastore may
// panic on use after Close (pebble being the prominent
// case), and the pinner does not own the datastore's
// lifecycle. The wording below gives the caller enough
// context to treat it as an expected shutdown-time
// interruption rather than a real failure.
// Defense in depth: if the backing datastore panics during
// enumeration, recover and surface the panic as an error on
// the output channel. Close on this pinner runs before the
// datastore is closed by the caller's lifecycle, so this path
// should no longer fire in practice. It remains here for
// callers that do not wire Close correctly; any datastore may
// panic on use after Close (pebble being the prominent case),
// and the pinner does not own the datastore's lifecycle.
defer func() {
if r := recover(); r != nil {
send(ipfspinner.StreamedPin{Err: fmt.Errorf("pin stream interrupted by datastore panic (likely shutdown): %v", r)})
Expand Down Expand Up @@ -1020,8 +1111,17 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile
}

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
// pinner. dspinner does not keep internal pins, so the returned channel
// is always empty; it carries a single [ipfspinner.ErrClosed] entry if
// Close has been called.
func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin {
if err := p.begin(); err != nil {
return errClosedChan(err)
}
// Not tracked by p.wg: the channel is closed synchronously before
// we return, so there is no background work for Close to wait on.
defer p.wg.Done()

c := make(chan ipfspinner.StreamedPin)
close(c)
return c
Expand All @@ -1032,6 +1132,11 @@ func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspin
//
// TODO: This will not work when multiple pins are supported
func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error {
if err := p.begin(); err != nil {
return err
}
defer p.wg.Done()

p.lock.Lock()
defer p.lock.Unlock()

Expand Down Expand Up @@ -1112,6 +1217,11 @@ func (p *pinner) flushPins(ctx context.Context, force bool) error {

// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush(ctx context.Context) error {
if err := p.begin(); err != nil {
return err
}
defer p.wg.Done()

p.lock.Lock()
defer p.lock.Unlock()

Expand All @@ -1126,6 +1236,11 @@ func (p *pinner) Flush(ctx context.Context) error {
// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, name string) error {
if err := p.begin(); err != nil {
return err
}
defer p.wg.Done()

// TODO: remove his to support multiple pins per CID
switch mode {
case ipfspinner.Recursive:
Expand Down
Loading
Loading