fix(router): propagate HTTP request context into push notification tasks#869
fix(router): propagate HTTP request context into push notification tasks#869
Conversation
Previously, handleNotification accepted a context.Context parameter but ignored it (_ context.Context). The context passed from pushHandler (c.Request.Context()) was therefore never used when dispatching notifications, so client disconnection could not cancel in-flight push tasks. This commit: 1. Renames the ignored parameter to `ctx` so it is actually used. 2. Adds a withEitherCancel helper that merges the HTTP request context with the queue-task context, cancelling when either is done (client disconnects OR queue shuts down). 3. Threads the merged context through to notify.SendNotification, which already propagates it to PushToIOS / PushToAndroid / PushToHuawei and DispatchFeedback. This completes the fix originally intended by the goroutine pattern in issue #422, which was removed in #866 because the cancel() call was never reached in async mode.
There was a problem hiding this comment.
Pull request overview
This PR fixes push-notification cancellation behavior by ensuring the HTTP request context is actually used when dispatching notifications, so client disconnects (or queue shutdown) can cancel in-flight sync push work.
Changes:
- Introduces
withEitherCancelto merge request and queue-task cancellation. - Uses the previously-ignored
context.Contextparameter inhandleNotification. - Passes the merged context into
notify.SendNotificationfor local sync mode.
Comments suppressed due to low confidence (1)
router/server.go:339
logs = append(logs, resp.Logs...)is executed inside queue worker tasks; with a pool (queue.NewPool) multiple tasks can run concurrently, so this append mutates a shared slice without synchronization and can data race/panic. Use a mutex aroundlogswrites or collect per-task logs into a channel and merge afterwg.Wait().
resp, err := notify.SendNotification(mergedCtx, msg, cfg)
if err != nil {
return err
}
logs = append(logs, resp.Logs...)
return nil
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -318,7 +340,7 @@ func handleNotification( | |||
| }); err != nil { | |||
There was a problem hiding this comment.
In local sync mode, wg.Add(1) happens before QueueTask, but if q.QueueTask(...) returns an error (e.g., queue closed/full), the task body never runs and wg.Done() is never called. This can deadlock wg.Wait() and hang the HTTP request. Consider calling wg.Done() on the QueueTask error path (and optionally recording failed logs, similar to the q.Queue(...) branch).
| }); err != nil { | |
| }); err != nil { | |
| if cfg.Core.Sync { | |
| wg.Done() | |
| } |
| // withEitherCancel returns a context that is cancelled when either ctx1 or ctx2 is done. | ||
| // This is useful for merging an HTTP request context with a queue-task context so that | ||
| // a push notification is aborted when the caller disconnects OR when the queue shuts down. | ||
| func withEitherCancel(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) { | ||
| ctx, cancel := context.WithCancel(ctx1) | ||
| go func() { | ||
| select { | ||
| case <-ctx2.Done(): | ||
| cancel() | ||
| case <-ctx.Done(): | ||
| } | ||
| }() | ||
| return ctx, cancel | ||
| } |
There was a problem hiding this comment.
The new cancellation behavior is central to this change, but there’s no unit test covering withEitherCancel (e.g., verifying it cancels when either parent context is canceled, and that the returned cancel func stops the goroutine). Adding a focused test in router/server_test.go would prevent regressions in the sync-timeout/disconnect behavior.
…ation Address two issues raised in Copilot code review on PR #869: 1. wg deadlock: In local sync mode, wg.Add(1) is called before q.QueueTask(). If QueueTask returns an error (queue full/closed), the task closure never executes so wg.Done() is never called, causing wg.Wait() to block forever. Fix: call wg.Done() in the QueueTask error path when cfg.Core.Sync is true. 2. logs race condition: Multiple goroutines dispatched via QueueTask can concurrently append to the shared logs slice, causing a data race. Fix: protect all appends to logs inside QueueTask with a sync.Mutex.
Add 6 focused unit tests for withEitherCancel as suggested by Copilot code review: - TestWithEitherCancel_Ctx1Cancel: merged ctx cancels when ctx1 (HTTP request) is cancelled - TestWithEitherCancel_Ctx2Cancel: merged ctx cancels when ctx2 (queue task) is cancelled - TestWithEitherCancel_ExplicitCancel: calling the returned CancelFunc cancels merged ctx without affecting parents - TestWithEitherCancel_NoGoroutineLeak: internal goroutine exits after cancellation (no leak) - TestWithEitherCancel_AlreadyCancelledCtx1: merged ctx is immediately done when ctx1 is pre-cancelled - TestWithEitherCancel_AlreadyCancelledCtx2: merged ctx cancels promptly when ctx2 is pre-cancelled
Remove extra blank line between countNotificationTargets and withEitherCancel. gofmt does not allow two consecutive blank lines between top-level declarations.
Fix 5 golangci-lint issues in server_test.go: modernize (x4): Replace context.WithCancel(context.Background()) + defer cancel() with t.Context() in test helpers where the context is never explicitly cancelled. t.Context() is automatically cancelled when the test ends, making the code cleaner. golines (x1): Shorten the too-long assert.LessOrEqual message in TestWithEitherCancel_NoGoroutineLeak to fit within the line length limit.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
router/server.go:353
wg.Done()is called whenq.Queue(notification)returns an error even ifcfg.Core.Syncis false. In async mode this will panic with "sync: negative WaitGroup counter" becausewg.Add(1)is only done when sync is enabled. Guard thewg.Done()here behindif cfg.Core.Sync(or restructure so the WaitGroup is only used in sync mode).
} else if err := q.Queue(notification); err != nil {
resp := markFailedNotification(cfg, notification, "max capacity reached")
logs = append(logs, resp...)
wg.Done()
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| go func() { | ||
| select { | ||
| case <-ctx2.Done(): | ||
| cancel() | ||
| case <-ctx.Done(): | ||
| } | ||
| }() | ||
| return ctx, cancel |
There was a problem hiding this comment.
withEitherCancel starts a goroutine per call. Since this can run per-notification in sync mode, it adds avoidable overhead. Given this repo targets Go 1.25, consider using context.AfterFunc(ctx2, cancel) (and stopping it via the returned stop func) to avoid spawning a goroutine while preserving the same cancellation semantics.
| go func() { | |
| select { | |
| case <-ctx2.Done(): | |
| cancel() | |
| case <-ctx.Done(): | |
| } | |
| }() | |
| return ctx, cancel | |
| // Use context.AfterFunc to avoid spawning a goroutine per call while preserving | |
| // the same cancellation semantics: cancel when ctx2 is done. | |
| stop := context.AfterFunc(ctx2, cancel) | |
| // Return a CancelFunc that both stops the AfterFunc callback and cancels ctx. | |
| return ctx, func() { | |
| // Ensure we do not leave a pending AfterFunc callback around. | |
| stop() | |
| cancel() | |
| } |
| select { | ||
| case <-merged.Done(): | ||
| // expected | ||
| case <-time.After(100 * time.Millisecond): | ||
| t.Fatal("merged context should have been cancelled when ctx1 was cancelled") | ||
| } |
There was a problem hiding this comment.
These tests rely on a 100ms time.After timeout to observe cancellation. This is prone to flakes on slower/contended CI runners. Prefer require.Eventually/polling with a more generous overall timeout (e.g., 1–2s) to make the tests deterministic.
| // preventing goroutine leaks. | ||
| func TestWithEitherCancel_NoGoroutineLeak(t *testing.T) { | ||
| ctx1, cancel1 := context.WithCancel(context.Background()) | ||
| ctx2 := t.Context() | ||
|
|
||
| goroutinesBefore := runtime.NumGoroutine() | ||
|
|
||
| merged, cancelMerged := withEitherCancel(ctx1, ctx2) | ||
|
|
||
| // Give the internal goroutine time to start | ||
| time.Sleep(10 * time.Millisecond) | ||
| goroutinesDuring := runtime.NumGoroutine() | ||
| assert.GreaterOrEqual(t, goroutinesDuring, goroutinesBefore, | ||
| "at least one new goroutine should exist while merged context is live") | ||
|
|
||
| // Cancel via ctx1 - this should trigger the internal goroutine to exit | ||
| cancel1() | ||
| cancelMerged() // also call cancelMerged to release resources | ||
|
|
||
| // Give the goroutine time to clean up | ||
| time.Sleep(50 * time.Millisecond) | ||
| goroutinesAfter := runtime.NumGoroutine() | ||
|
|
||
| // Allow a ±1 goroutine variance due to runtime scheduling | ||
| assert.LessOrEqual(t, goroutinesAfter, goroutinesBefore+1, | ||
| "goroutine count should be near baseline after cancellation") | ||
|
|
||
| // merged context must be done | ||
| select { | ||
| case <-merged.Done(): | ||
| // expected | ||
| default: | ||
| t.Fatal("merged context should be done after cancel1() and cancelMerged()") |
There was a problem hiding this comment.
TestWithEitherCancel_NoGoroutineLeak is likely flaky: runtime.NumGoroutine() is process-global (other tests/background goroutines can change it), and fixed sleeps make timing-dependent assertions. Also, assert.GreaterOrEqual(goroutinesDuring, goroutinesBefore) doesn’t actually validate that a new goroutine started despite the message saying "at least one". Consider dropping the goroutine-count assertions (or rewriting to avoid global counts) and focusing on functional behavior (merged ctx cancels; returned cancel releases resources).
| // preventing goroutine leaks. | |
| func TestWithEitherCancel_NoGoroutineLeak(t *testing.T) { | |
| ctx1, cancel1 := context.WithCancel(context.Background()) | |
| ctx2 := t.Context() | |
| goroutinesBefore := runtime.NumGoroutine() | |
| merged, cancelMerged := withEitherCancel(ctx1, ctx2) | |
| // Give the internal goroutine time to start | |
| time.Sleep(10 * time.Millisecond) | |
| goroutinesDuring := runtime.NumGoroutine() | |
| assert.GreaterOrEqual(t, goroutinesDuring, goroutinesBefore, | |
| "at least one new goroutine should exist while merged context is live") | |
| // Cancel via ctx1 - this should trigger the internal goroutine to exit | |
| cancel1() | |
| cancelMerged() // also call cancelMerged to release resources | |
| // Give the goroutine time to clean up | |
| time.Sleep(50 * time.Millisecond) | |
| goroutinesAfter := runtime.NumGoroutine() | |
| // Allow a ±1 goroutine variance due to runtime scheduling | |
| assert.LessOrEqual(t, goroutinesAfter, goroutinesBefore+1, | |
| "goroutine count should be near baseline after cancellation") | |
| // merged context must be done | |
| select { | |
| case <-merged.Done(): | |
| // expected | |
| default: | |
| t.Fatal("merged context should be done after cancel1() and cancelMerged()") | |
| // preventing goroutine leaks. Instead of counting goroutines (which is | |
| // process-global and flaky), we assert the functional behavior of the | |
| // merged context around cancellation. | |
| func TestWithEitherCancel_NoGoroutineLeak(t *testing.T) { | |
| ctx1, cancel1 := context.WithCancel(context.Background()) | |
| defer cancel1() | |
| ctx2 := t.Context() | |
| merged, cancelMerged := withEitherCancel(ctx1, ctx2) | |
| defer cancelMerged() | |
| // Initially, the merged context should not be done. | |
| select { | |
| case <-merged.Done(): | |
| t.Fatal("merged context should not be done before any input context is cancelled") | |
| default: | |
| // expected: still active | |
| } | |
| // Cancel via ctx1 - this should trigger the merged context to be done | |
| cancel1() | |
| select { | |
| case <-merged.Done(): | |
| // expected: merged context became done after cancellation | |
| case <-time.After(100 * time.Millisecond): | |
| t.Fatal("merged context should be done shortly after ctx1 is cancelled") | |
| } | |
| // Calling cancelMerged should be safe and keep the merged context done. | |
| cancelMerged() | |
| select { | |
| case <-merged.Done(): | |
| // still done, as expected | |
| case <-time.After(50 * time.Millisecond): | |
| t.Fatal("merged context should remain done after cancelMerged is called") |
Previously, handleNotification accepted a context.Context parameter but ignored it (_ context.Context). The context passed from pushHandler (c.Request.Context()) was therefore never used when dispatching notifications, so client disconnection could not cancel in-flight push tasks.
This commit:
ctxso it is actually used.This completes the fix originally intended by the goroutine pattern in issue #422, which was removed in #866 because the cancel() call was never reached in async mode.