From 0b9a0ab73652a140aad5a589c563bbcc2cc3e652 Mon Sep 17 00:00:00 2001 From: Sergey Vilgelm Date: Fri, 17 Apr 2026 09:10:36 -0700 Subject: [PATCH 1/2] fix(runs): make TestWatchActionUpdates_OnlyStreamsTargetAction deterministic Start watcher before creating actions so the subscriber is registered when the creation NOTIFY arrives. Drain the creation notification before the real assertions to eliminate the race that caused spurious "unexpected update for action target" failures. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Sergey Vilgelm --- runs/repository/impl/action_test.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/runs/repository/impl/action_test.go b/runs/repository/impl/action_test.go index 019ba8d8c0..edad3272da 100644 --- a/runs/repository/impl/action_test.go +++ b/runs/repository/impl/action_test.go @@ -133,11 +133,10 @@ func TestWatchActionUpdates_OnlyStreamsTargetAction(t *testing.T) { otherActionID := &common.ActionIdentifier{Run: runID, Name: "other"} ctx := context.Background() - _, err = actionRepo.CreateAction(ctx, models.NewActionModel(targetActionID), false) - require.NoError(t, err) - _, err = actionRepo.CreateAction(ctx, models.NewActionModel(otherActionID), false) - require.NoError(t, err) + // Start watcher before creating actions so we can deterministically + // drain the creation notification and avoid a race where the async + // NOTIFY arrives after the subscriber registers. watchCtx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -145,8 +144,21 @@ func TestWatchActionUpdates_OnlyStreamsTargetAction(t *testing.T) { errs := make(chan error, 1) go actionRepo.WatchActionUpdates(watchCtx, targetActionID, updates, errs) - time.Sleep(1100 * time.Millisecond) + time.Sleep(100 * time.Millisecond) // let the subscriber register + + _, err = actionRepo.CreateAction(ctx, models.NewActionModel(targetActionID), false) + require.NoError(t, err) + _, err = actionRepo.CreateAction(ctx, models.NewActionModel(otherActionID), false) + require.NoError(t, err) + + // Drain the creation notification for the target action. + select { + case <-updates: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for creation notification") + } + // Update "other" — should NOT produce an update for "target". err = actionRepo.UpdateActionPhase(ctx, otherActionID, common.ActionPhase_ACTION_PHASE_RUNNING, 1, core.CatalogCacheStatus_CACHE_DISABLED, nil) require.NoError(t, err) @@ -158,6 +170,7 @@ func TestWatchActionUpdates_OnlyStreamsTargetAction(t *testing.T) { case <-time.After(1200 * time.Millisecond): } + // Update "target" — should produce an update. err = actionRepo.UpdateActionPhase(ctx, targetActionID, common.ActionPhase_ACTION_PHASE_RUNNING, 1, core.CatalogCacheStatus_CACHE_DISABLED, nil) require.NoError(t, err) From 88eaa7eb11aa0bedda043b7e35e5462b2a7929cb Mon Sep 17 00:00:00 2001 From: Sergey Vilgelm Date: Fri, 17 Apr 2026 09:32:50 -0700 Subject: [PATCH 2/2] fix(runs): replace sleep with deterministic subscriber check Replace time.Sleep with require.Eventually polling actionSubscribers to ensure the watcher is registered before creating actions. This removes all timing dependencies from the test. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Sergey Vilgelm --- runs/repository/impl/action_test.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/runs/repository/impl/action_test.go b/runs/repository/impl/action_test.go index edad3272da..c3ef62e6c5 100644 --- a/runs/repository/impl/action_test.go +++ b/runs/repository/impl/action_test.go @@ -120,8 +120,9 @@ func TestUpdateActionPhasePersistsAttemptsAndCacheStatus(t *testing.T) { func TestWatchActionUpdates_OnlyStreamsTargetAction(t *testing.T) { db := setupActionDB(t) defer func() { db.Exec("DELETE FROM actions") }() - actionRepo, err := NewActionRepo(db, testDbConfig) + repo, err := NewActionRepo(db, testDbConfig) require.NoError(t, err) + repoImpl := repo.(*actionRepo) runID := &common.RunIdentifier{ Org: "org1", @@ -142,13 +143,17 @@ func TestWatchActionUpdates_OnlyStreamsTargetAction(t *testing.T) { updates := make(chan *models.Action, 2) errs := make(chan error, 1) - go actionRepo.WatchActionUpdates(watchCtx, targetActionID, updates, errs) + go repo.WatchActionUpdates(watchCtx, targetActionID, updates, errs) - time.Sleep(100 * time.Millisecond) // let the subscriber register + require.Eventually(t, func() bool { + repoImpl.mu.RLock() + defer repoImpl.mu.RUnlock() + return len(repoImpl.actionSubscribers) > 0 + }, 2*time.Second, 10*time.Millisecond, "timed out waiting for watcher registration") - _, err = actionRepo.CreateAction(ctx, models.NewActionModel(targetActionID), false) + _, err = repo.CreateAction(ctx, models.NewActionModel(targetActionID), false) require.NoError(t, err) - _, err = actionRepo.CreateAction(ctx, models.NewActionModel(otherActionID), false) + _, err = repo.CreateAction(ctx, models.NewActionModel(otherActionID), false) require.NoError(t, err) // Drain the creation notification for the target action. @@ -159,7 +164,7 @@ func TestWatchActionUpdates_OnlyStreamsTargetAction(t *testing.T) { } // Update "other" — should NOT produce an update for "target". - err = actionRepo.UpdateActionPhase(ctx, otherActionID, common.ActionPhase_ACTION_PHASE_RUNNING, 1, core.CatalogCacheStatus_CACHE_DISABLED, nil) + err = repo.UpdateActionPhase(ctx, otherActionID, common.ActionPhase_ACTION_PHASE_RUNNING, 1, core.CatalogCacheStatus_CACHE_DISABLED, nil) require.NoError(t, err) select { @@ -171,7 +176,7 @@ func TestWatchActionUpdates_OnlyStreamsTargetAction(t *testing.T) { } // Update "target" — should produce an update. - err = actionRepo.UpdateActionPhase(ctx, targetActionID, common.ActionPhase_ACTION_PHASE_RUNNING, 1, core.CatalogCacheStatus_CACHE_DISABLED, nil) + err = repo.UpdateActionPhase(ctx, targetActionID, common.ActionPhase_ACTION_PHASE_RUNNING, 1, core.CatalogCacheStatus_CACHE_DISABLED, nil) require.NoError(t, err) select {