diff --git a/runs/repository/impl/action_test.go b/runs/repository/impl/action_test.go index 019ba8d8c0..c3ef62e6c5 100644 --- a/runs/repository/impl/action_test.go +++ b/runs/repository/impl/action_test.go @@ -120,8 +120,9 @@ func TestUpdateActionPhasePersistsAttemptsAndCacheStatus(t *testing.T) { func TestWatchActionUpdates_OnlyStreamsTargetAction(t *testing.T) { db := setupActionDB(t) defer func() { db.Exec("DELETE FROM actions") }() - actionRepo, err := NewActionRepo(db, testDbConfig) + repo, err := NewActionRepo(db, testDbConfig) require.NoError(t, err) + repoImpl := repo.(*actionRepo) runID := &common.RunIdentifier{ Org: "org1", @@ -133,21 +134,37 @@ func TestWatchActionUpdates_OnlyStreamsTargetAction(t *testing.T) { otherActionID := &common.ActionIdentifier{Run: runID, Name: "other"} ctx := context.Background() - _, err = actionRepo.CreateAction(ctx, models.NewActionModel(targetActionID), false) - require.NoError(t, err) - _, err = actionRepo.CreateAction(ctx, models.NewActionModel(otherActionID), false) - require.NoError(t, err) + // Start watcher before creating actions so we can deterministically + // drain the creation notification and avoid a race where the async + // NOTIFY arrives after the subscriber registers. watchCtx, cancel := context.WithCancel(context.Background()) defer cancel() updates := make(chan *models.Action, 2) errs := make(chan error, 1) - go actionRepo.WatchActionUpdates(watchCtx, targetActionID, updates, errs) + go repo.WatchActionUpdates(watchCtx, targetActionID, updates, errs) + + require.Eventually(t, func() bool { + repoImpl.mu.RLock() + defer repoImpl.mu.RUnlock() + return len(repoImpl.actionSubscribers) > 0 + }, 2*time.Second, 10*time.Millisecond, "timed out waiting for watcher registration") + + _, err = repo.CreateAction(ctx, models.NewActionModel(targetActionID), false) + require.NoError(t, err) + _, err = repo.CreateAction(ctx, models.NewActionModel(otherActionID), false) + require.NoError(t, err) - time.Sleep(1100 * time.Millisecond) + // Drain the creation notification for the target action. + select { + case <-updates: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for creation notification") + } - err = actionRepo.UpdateActionPhase(ctx, otherActionID, common.ActionPhase_ACTION_PHASE_RUNNING, 1, core.CatalogCacheStatus_CACHE_DISABLED, nil) + // Update "other" — should NOT produce an update for "target". + err = repo.UpdateActionPhase(ctx, otherActionID, common.ActionPhase_ACTION_PHASE_RUNNING, 1, core.CatalogCacheStatus_CACHE_DISABLED, nil) require.NoError(t, err) select { @@ -158,7 +175,8 @@ func TestWatchActionUpdates_OnlyStreamsTargetAction(t *testing.T) { case <-time.After(1200 * time.Millisecond): } - err = actionRepo.UpdateActionPhase(ctx, targetActionID, common.ActionPhase_ACTION_PHASE_RUNNING, 1, core.CatalogCacheStatus_CACHE_DISABLED, nil) + // Update "target" — should produce an update. + err = repo.UpdateActionPhase(ctx, targetActionID, common.ActionPhase_ACTION_PHASE_RUNNING, 1, core.CatalogCacheStatus_CACHE_DISABLED, nil) require.NoError(t, err) select {