Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 197 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ScenarioProgressMessage struct {
Code string `json:"code"`
OverallStatus string `json:"overall_status,omitempty"`
FailedCount int64 `json:"failed_count,omitempty"`
CancelledCount int64 `json:"cancelled_count,omitempty"`
FailedScenarios []string `json:"failed_scenarios,omitempty"`
CommitSHA string `json:"commit_sha,omitempty"`
Repository string `json:"repository,omitempty"`
Expand Down Expand Up @@ -388,8 +389,93 @@ type appctx struct {
rpub *lspubsub.PubsubPublisher // topic to publish reports
mtx *sync.Mutex
topicArn *string

activeRunsMu sync.RWMutex
activeRuns map[string]map[string]context.CancelFunc
pendingCancelsMu sync.RWMutex
pendingCancels map[string]struct{}
}

func (a *appctx) registerRun(commitSHA string, cancel context.CancelFunc) string {
if commitSHA == "" {
return ""
}
instanceID := uniuri.NewLen(12)
a.activeRunsMu.Lock()
defer a.activeRunsMu.Unlock()
if a.activeRuns == nil {
a.activeRuns = make(map[string]map[string]context.CancelFunc)
}
if a.activeRuns[commitSHA] == nil {
a.activeRuns[commitSHA] = make(map[string]context.CancelFunc)
}
a.activeRuns[commitSHA][instanceID] = cancel
return instanceID
}

func (a *appctx) unregisterRun(commitSHA, instanceID string) {
if commitSHA == "" || instanceID == "" {
return
}
a.activeRunsMu.Lock()
defer a.activeRunsMu.Unlock()
delete(a.activeRuns[commitSHA], instanceID)
if len(a.activeRuns[commitSHA]) == 0 {
delete(a.activeRuns, commitSHA)
}
}

// cancelRun cancels every in-flight scenario for commitSHA and returns true
// if at least one was found.
func (a *appctx) cancelRun(commitSHA string) bool {
if commitSHA == "" {
return false
}
a.activeRunsMu.RLock()
funcs := a.activeRuns[commitSHA]
a.activeRunsMu.RUnlock()
if len(funcs) == 0 {
return false
}
for _, cancel := range funcs {
cancel()
}
log.Printf("cancelRun: commit_sha=%s cancelled %d in-flight scenario(s)", commitSHA, len(funcs))
Comment on lines +435 to +443
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancelRun reads the per-commit map under RLock, releases the lock, and then later iterates that same map. Because registerRun/unregisterRun can mutate this map concurrently, this can trigger a data race and even panic (concurrent map iteration and map write). Copy the cancel funcs into a slice while holding the lock (or keep the lock held during iteration) before calling them.

Suggested change
funcs := a.activeRuns[commitSHA]
a.activeRunsMu.RUnlock()
if len(funcs) == 0 {
return false
}
for _, cancel := range funcs {
cancel()
}
log.Printf("cancelRun: commit_sha=%s cancelled %d in-flight scenario(s)", commitSHA, len(funcs))
funcsMap := a.activeRuns[commitSHA]
if len(funcsMap) == 0 {
a.activeRunsMu.RUnlock()
return false
}
cancels := make([]context.CancelFunc, 0, len(funcsMap))
for _, cancel := range funcsMap {
cancels = append(cancels, cancel)
}
a.activeRunsMu.RUnlock()
for _, cancel := range cancels {
cancel()
}
log.Printf("cancelRun: commit_sha=%s cancelled %d in-flight scenario(s)", commitSHA, len(cancels))

Copilot uses AI. Check for mistakes.
return true
}

func (a *appctx) markCancelled(commitSHA string) {
if commitSHA == "" {
return
}
a.pendingCancelsMu.Lock()
defer a.pendingCancelsMu.Unlock()
if a.pendingCancels == nil {
a.pendingCancels = make(map[string]struct{})
}
a.pendingCancels[commitSHA] = struct{}{}
log.Printf("markCancelled: commit_sha=%s tombstoned", commitSHA)
}

func (a *appctx) isCancelled(commitSHA string) bool {
if commitSHA == "" {
return false
}
a.pendingCancelsMu.RLock()
defer a.pendingCancelsMu.RUnlock()
_, ok := a.pendingCancels[commitSHA]
return ok
}

func (a *appctx) unmarkCancelled(commitSHA string) {
if commitSHA == "" {
return
}
a.pendingCancelsMu.Lock()
defer a.pendingCancelsMu.Unlock()
delete(a.pendingCancels, commitSHA)
log.Printf("unmarkCancelled: commit_sha=%s tombstone cleared", commitSHA)
}
// Our message processing callback.
func process(ctx any, data []byte) error {
app := ctx.(*appctx)
Expand Down Expand Up @@ -484,15 +570,47 @@ func process(ctx any, data []byte) error {
}
case "process":
log.Printf("process: %+v", c)
doScenario(&doScenarioInput{
commitSHA, _ := c.Metadata["commit_sha"].(string)
if commitSHA != "" && app.isCancelled(commitSHA) {
log.Printf("process: commit_sha=%s is tombstoned, publishing cancelled result for %s", commitSHA, c.Scenario)
in := &doScenarioInput{
app: app,
ScenarioFiles: []string{c.Scenario},
ReportPubsub: reppubsub,
Metadata: c.Metadata,
RunID: c.ID,
}
publishCancelledResult(app, c.Scenario, in)
return nil
}

runCtx, runCancel := context.WithCancel(context.Background())
defer runCancel()
var instanceID string
if commitSHA != "" {
instanceID = app.registerRun(commitSHA, runCancel)
defer app.unregisterRun(commitSHA, instanceID)
}
Comment on lines +587 to +593
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s a race where a "closed" event can tombstone commitSHA after the initial app.isCancelled(commitSHA) check but before registerRun. In that case cancelRun won’t find this instance (not registered yet), and the scenario will run even though the PR is closed. Re-check app.isCancelled(commitSHA) immediately after registering (or make registerRun aware of tombstones and auto-cancel) and publish a cancelled result when the SHA is tombstoned.

Copilot uses AI. Check for mistakes.

in := &doScenarioInput{
app: app,
ScenarioFiles: []string{c.Scenario},
ReportSlack: repslack,
ReportPubsub: reppubsub,
Verbose: verbose,
Metadata: c.Metadata,
RunID: c.ID,
})
cancelCtx: runCtx,
}
select {
case <-runCtx.Done():
log.Printf("process: commit_sha=%s cancelled just after register, publishing cancelled result for %s", commitSHA, c.Scenario)
publishCancelledResult(app, c.Scenario, in)
return nil
default:
}

doScenario(in)
}

return nil
Expand All @@ -506,8 +624,73 @@ func handleScenarioCompletion(ctx any, data []byte) error {
}

log.Printf("scenario progress: run_id=%s code=%s progress=%s", msg.RunID, msg.Code, msg.TotalScenarios)
var app *appctx
if ctx != nil {
app, _ = ctx.(*appctx)
}

switch msg.Code {
case "closed":
log.Printf("received closed event: commit_sha=%s repo=%s pr=%s run_id=%s",
msg.CommitSHA, msg.Repository, msg.PRNumber, msg.RunID)

if msg.CommitSHA == "" {
log.Printf("cancel: missing commit_sha, skipping")
return nil
}
if app != nil {
app.markCancelled(msg.CommitSHA)
}

cancelled := false
if app != nil {
cancelled = app.cancelRun(msg.CommitSHA)
}

if cancelled {
log.Printf("cancel: commit_sha=%s cancelled successfully", msg.CommitSHA)
} else {
log.Printf("cancel: commit_sha=%s not found in active runs (may have already finished or not yet registered)", msg.CommitSHA)
}

if msg.Repository != "" {
if err := postCommitStatus(githubtoken, msg.CommitSHA, msg.Repository, msg.RunURL, "error", "PR closed — test run cancelled"); err != nil {
log.Printf("cancel: postCommitStatus failed: %v", err)
}
}

if repslack != "" {
env := "dev"
if strings.Contains(pubsub, "prod") {
env = "prod"
} else if strings.Contains(pubsub, "next") {
env = "next"
}

text := fmt.Sprintf("*Environment:* %s\n*Repository:* %s\n*PR:* #%s\n*Commit:* %s",
env, msg.Repository, msg.PRNumber, msg.CommitSHA)
if msg.RunURL != "" {
text += fmt.Sprintf("\n\n<%s|View run>", msg.RunURL)
}

payload := SlackMessage{
Attachments: []SlackAttachment{
{
Color: "warning",
Title: "Test Run Cancelled (PR Closed)",
Text: text,
Footer: fmt.Sprintf("oops • sha: %s", msg.CommitSHA),
Timestamp: time.Now().Unix(),
MrkdwnIn: []string{"text"},
},
},
}

if err := payload.Notify(repslack); err != nil {
log.Printf("cancel: Notify (slack) failed: %v", err)
}
}

case "approve":
log.Printf("received approve event: repo=%s sha=%s approvals=%d reviewers=%s",
msg.Repository, msg.CommitSHA, msg.ApprovalCount, msg.Reviewers)
Expand Down Expand Up @@ -559,8 +742,16 @@ func handleScenarioCompletion(ctx any, data []byte) error {
}

case "completed":
log.Printf("run completed: run_id=%s overall_status=%s failed=%d repo=%s sha=%s",
msg.RunID, msg.OverallStatus, msg.FailedCount, msg.Repository, msg.CommitSHA)
log.Printf("run completed: run_id=%s overall_status=%s failed=%d cancelled=%d repo=%s sha=%s",
msg.RunID, msg.OverallStatus, msg.FailedCount, msg.CancelledCount, msg.Repository, msg.CommitSHA)
if app != nil {
app.unmarkCancelled(msg.CommitSHA)
}

if msg.CancelledCount > 0 {
log.Printf("completed: run_id=%s has %d cancelled scenario(s), skipping dispatch and notifications", msg.RunID, msg.CancelledCount)
return nil
}

if err := sendRepositoryDispatch(githubtoken, &msg); err != nil {
log.Printf("sendRepositoryDispatch failed: %v", err)
Expand Down Expand Up @@ -694,8 +885,6 @@ func run(ctx context.Context, done chan error) {
}

go func() {
// Messages should be payer level. We will subdivide linked accts to separate messages for
// linked-acct-level processing.
ls := lspubsub.NewLengthySubscriber(app, project, pubsub, process)
err = ls.Start(ctx0, done0)
if err != nil {
Expand Down Expand Up @@ -754,7 +943,7 @@ func run(ctx context.Context, done chan error) {

done1 := make(chan error, 1)
go func() {
ls := lspubsub.NewLengthySubscriber(nil, project, scenariopubsub, handleScenarioCompletion)
ls := lspubsub.NewLengthySubscriber(app, project, scenariopubsub, handleScenarioCompletion)
err := ls.Start(ctx0, done1)
if err != nil {
log.Fatalf("listener for scenario progress failed: %v", err)
Expand Down Expand Up @@ -824,4 +1013,4 @@ func main() {
log.SetPrefix("[oops] ")
log.SetOutput(os.Stdout)
rootcmd.Execute()
}
}
Loading
Loading