Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions cmd/agc/internal/listener/goroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,22 @@ type Config struct {
IdleThreshold int // consecutive 202s before idle shutdown; 0 means 50
// RenewInterval is the cadence of the per-job RenewJob loop. 0 means 60s.
RenewInterval time.Duration
JobHandler JobHandlerFunc
Clock Clock
Log *slog.Logger
// ControlPlaneTimeout bounds each non-long-poll broker call on the
// session-establishment path — the OAuth token exchange and CreateSession —
// so a slow or unresponsive broker cannot wedge the goroutine indefinitely.
// Without it those calls inherit only the long-lived manager context (the
// broker uses http.DefaultClient, which has no read timeout), so a broker
// that accepts the connection but is slow to respond — e.g. an overloaded
// shared fakegithub under parallel CI load — blocks the goroutine inside a
// single attempt and the RunnerGroup never registers a session (Q134). With
// a deadline the call fails fast and retriably, so the Multiplexer restarts
// the baseline and retries. Zero selects defaultControlPlaneTimeout. The
// GetMessage long-poll is deliberately excluded — it holds the connection
// open for the broker's poll interval by design.
ControlPlaneTimeout time.Duration
JobHandler JobHandlerFunc
Clock Clock
Log *slog.Logger

// RunnerOS is passed to AcquireJob (e.g. "Linux").
RunnerOS string
Expand Down Expand Up @@ -112,6 +125,22 @@ type Config struct {
// traffic.
const staleEOFThreshold = 3

// defaultControlPlaneTimeout is the per-call deadline applied to the listener's
// non-long-poll broker operations (OAuth token exchange, CreateSession) when
// Config.ControlPlaneTimeout is unset. 30s is generous for a healthy round-trip
// to GitHub yet tight enough that several retries fit inside the e2e
// session-registration budget when the broker stalls (Q134).
const defaultControlPlaneTimeout = 30 * time.Second

// controlPlaneTimeout returns the per-call deadline for the goroutine's
// non-long-poll broker operations, defaulting when unset.
func (cfg Config) controlPlaneTimeout() time.Duration {
if cfg.ControlPlaneTimeout > 0 {
return cfg.ControlPlaneTimeout
}
return defaultControlPlaneTimeout
}

// Run executes the listener goroutine. It blocks until the context is cancelled
// or an unrecoverable error occurs (VersionTooOldError, unauthorized).
// The caller (Multiplexer) is responsible for restarting it after a recoverable exit.
Expand Down Expand Up @@ -321,7 +350,9 @@ func Run(ctx context.Context, cfg Config) error {

// refreshBrokerToken fetches a fresh OAuth token and sets it on cfg.Broker.
func refreshBrokerToken(ctx context.Context, cfg Config) error {
token, err := githubapp.FetchRunnerOAuthToken(ctx, cfg.Agent.Creds, cfg.Agent.PrivateKey, cfg.HTTPClient)
cctx, cancel := context.WithTimeout(ctx, cfg.controlPlaneTimeout())
defer cancel()
token, err := githubapp.FetchRunnerOAuthToken(cctx, cfg.Agent.Creds, cfg.Agent.PrivateKey, cfg.HTTPClient)
if err != nil {
return fmt.Errorf("refresh broker token: %w", err)
}
Expand Down Expand Up @@ -350,7 +381,9 @@ type sessionState struct {
// the AES-256-CBC message key from the server's RSA-encrypted session key.
func createSession(ctx context.Context, cfg Config, log *slog.Logger) (sessionState, error) {
agentName := fmt.Sprintf("%s-%d", cfg.Group, cfg.Agent.Index)
sess, err := cfg.Broker.CreateSession(ctx, cfg.Agent.AgentID, agentName, cfg.Agent.RunnerVersion)
cctx, cancel := context.WithTimeout(ctx, cfg.controlPlaneTimeout())
defer cancel()
sess, err := cfg.Broker.CreateSession(cctx, cfg.Agent.AgentID, agentName, cfg.Agent.RunnerVersion)
if err != nil {
var vtooOld *broker.VersionTooOldError
if errors.As(err, &vtooOld) {
Expand Down Expand Up @@ -421,13 +454,20 @@ func handleJob(ctx context.Context, cfg Config, log *slog.Logger, aesKey []byte,
runServiceURL = jobBody.RunServiceURL
)

// Call AcquireJob if we have a runServiceURL.
// Call AcquireJob if we have a runServiceURL. Bounded by the control-plane
// timeout for the same reason as createSession: it is a short request/response
// call (not the long-poll), so an unresponsive broker here must not wedge the
// goroutine — that would block job pickup and the worker pod would never spawn
// (Q134 class). A timeout surfaces as a recoverable AcquireJob error; the poll
// loop logs it and continues, re-acquiring on the next delivery.
if runServiceURL != "" {
resp, rawBytes, acqErr := cfg.Broker.AcquireJob(ctx, runServiceURL, broker.JobAcquisitionRequest{
acqCtx, cancelAcq := context.WithTimeout(ctx, cfg.controlPlaneTimeout())
resp, rawBytes, acqErr := cfg.Broker.AcquireJob(acqCtx, runServiceURL, broker.JobAcquisitionRequest{
JobMessageID: jobBody.RunnerRequestID,
RunnerOS: cfg.RunnerOS,
BillingOwnerID: jobBody.BillingOwnerID,
})
cancelAcq()
if acqErr != nil {
if cfg.Metrics != nil {
cfg.Metrics.JobAcquisitionErrors.WithLabelValues(cfg.Namespace, "acquirejob_failed").Inc()
Expand Down
117 changes: 117 additions & 0 deletions cmd/agc/internal/listener/goroutine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"crypto/sha1" //nolint:gosec // SHA-1 required by RSA-OAEP to match server side
"encoding/base64"
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -353,6 +354,64 @@ func TestListener_CreateSessionUnauthorized(t *testing.T) {
goleak.VerifyNone(t)
}

// TestListener_CreateSessionStallDoesNotWedge proves that a broker which
// accepts the connection but never responds to CreateSession cannot wedge the
// goroutine: the per-call ControlPlaneTimeout fires, Run returns a *retriable*
// error well before the outer context deadline, and the Multiplexer is free to
// restart the baseline and retry. This is the Q134 regression guard — before
// the control-plane timeout the goroutine inherited only the long-lived manager
// context and blocked inside a single CreateSession indefinitely, so the
// RunnerGroup never registered a session within the e2e budget.
func TestListener_CreateSessionStallDoesNotWedge(t *testing.T) {
oauthSrv := oauthStub()

// The Create handler simulates an overloaded broker that accepts the
// connection but is slow to respond: it blocks until the test releases it
// (stop), with a hard-cap fallback so it can never wedge the run. Note we do
// NOT rely on r.Context() cancellation here — server-side observation of a
// client's context-deadline disconnect is not prompt or reliable, so the
// test drives the handler's lifetime directly via stop.
stop := make(chan struct{})
var once sync.Once
handlerReturned := make(chan struct{})
mux := &brokerMux{}
mux.SetCreate(func(_ http.ResponseWriter, _ *http.Request) {
select {
case <-stop:
case <-time.After(30 * time.Second): // safety cap; close(stop) fires first
}
once.Do(func() { close(handlerReturned) })
})
brokerSrv := httptest.NewServer(mux)

cfg := makeCfg(t, oauthSrv, brokerSrv)
cfg.ControlPlaneTimeout = 200 * time.Millisecond

// Generous outer deadline: the assertion is that Run returns far sooner,
// proving the per-call timeout fired rather than the outer context.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

start := time.Now()
err := listener.Run(ctx, cfg)
elapsed := time.Since(start)

require.Error(t, err, "Run must surface the stalled CreateSession, not hang")
assert.Less(t, elapsed, 3*time.Second,
"Run should fail fast on a stalled CreateSession (got %s)", elapsed)
// The failure must be retriable so the Multiplexer restarts the baseline; a
// NonRetriableError would permanently park it.
var nre *listener.NonRetriableError
assert.False(t, errors.As(err, &nre),
"a control-plane timeout must be retriable, got %v", err)

close(stop) // release the blocked handler
<-handlerReturned // ...and wait for it to return before the leak check
closeHTTP(oauthSrv)
closeHTTP(brokerSrv)
goleak.VerifyNone(t)
}

func TestListener_GetMessage202Loop(t *testing.T) {
oauthSrv := oauthStub()
var polls atomic.Int32
Expand Down Expand Up @@ -526,6 +585,64 @@ func TestListener_AcquireJobThenReuse(t *testing.T) {
goleak.VerifyNone(t)
}

// TestListener_AcquireJobStallDoesNotWedge proves the control-plane timeout also
// guards the job-pickup path: a broker that accepts the connection but never
// responds to AcquireJob must not wedge the goroutine inside handleJob (which
// would block job pickup so the worker pod never spawns — the Q134 class at the
// AcquireJob call site). The bounded AcquireJob fails fast, handleJob returns a
// recoverable error, and the poll loop re-enters GetMessage.
func TestListener_AcquireJobStallDoesNotWedge(t *testing.T) {
oauthSrv := oauthStub()
var delivered atomic.Bool
var pollsAfter atomic.Int32
mux := &brokerMux{}
brokerSrv := httptest.NewServer(mux)

mux.SetGetMessage(func(w http.ResponseWriter, _ *http.Request) {
if !delivered.Swap(true) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(jobMsgWithURL(brokerSrv.URL))
return
}
pollsAfter.Add(1)
w.WriteHeader(http.StatusAccepted)
})

// AcquireJob accepts the connection but is slow to respond. Without the
// per-call timeout the listener would wedge in handleJob here and never poll
// again; with it, AcquireJob fails fast and the poll loop continues.
stop := make(chan struct{})
var once sync.Once
handlerReturned := make(chan struct{})
mux.SetAcquire(func(_ http.ResponseWriter, _ *http.Request) {
select {
case <-stop:
case <-time.After(30 * time.Second): // safety cap; close(stop) fires first
}
once.Do(func() { close(handlerReturned) })
})

cfg := makeCfg(t, oauthSrv, brokerSrv)
cfg.IsLastListener = func() bool { return true }
cfg.ControlPlaneTimeout = 200 * time.Millisecond
cfg.JobHandler = func(_ context.Context, _, _ string, _ []byte, _ string) error { return nil }

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

done := runAndWait(ctx, cfg)
assert.Eventually(t, func() bool { return pollsAfter.Load() > 0 }, 3*time.Second, 10*time.Millisecond,
"listener should re-poll after a stalled AcquireJob times out, not wedge in handleJob")
cancel()
<-done

close(stop)
<-handlerReturned
closeHTTP(oauthSrv)
closeHTTP(brokerSrv)
goleak.VerifyNone(t)
}

func TestListener_SpawnReplacementOnAcquire(t *testing.T) {
oauthSrv := oauthStub()
var spawnCalls atomic.Int32
Expand Down
5 changes: 4 additions & 1 deletion docs/STATUS.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ Specific actionable items in priority order. Pick from the top; skip 🚫 items

| ID | Item | Labels | St | Sz | Notes |
|---|---|---|---|---|---|
| <a id="Q134"></a>Q134 | Flake: e2e "no session registered" 180s timeout (WorkerPodAdmitted / SessionRegistered) | `tests` `bug` `1.0-gate` | 🔲 | S | Root-caused: listener createSession+OAuth fetch run on the manager ctx with no per-call deadline → a slow shared fakegithub wedges the goroutine past the 180s budget. Fix bounds control-plane broker calls; verify green post-merge. |
| <a id="Q135"></a>Q135 | Flake: e2e E2E_AGC_MultipleJobsQueued — 4min timeout waiting for 2nd worker pod | `tests` `bug` | 🔲 | S | job_lifecycle_test.go:179 timed out waiting for the 2nd queued job's worker pod on PR 227 run 27510232006; passed on rerun w/o code change. Timing-sensitive worker-pod spawn under load; same shared-brokerStub/session-recycle class as Q120. |
| <a id="Q134"></a>Q134 | Flake: e2e E2E_AGC_WorkerPodAdmittedWithNonNumericUserImage"no session registered" 180s timeout | `tests` `bug` `1.0-gate` | 🔲 | M | Heaviest e2e spec (real worker image) times out waiting for an AGC session (worker_securitycontext_test.go:96). Hit main + 2 PRs ~2026-06-14 17:00; other runs passed — intermittent. Likely AGC session/image-pull timing; blocks PRs when it fires. |
| <a id="Q139"></a>Q139 | Flake: e2e E2E_GMC_TenantProvisioning_ProxyConnectWorkscurl through proxy gets HTTP 504 | `tests` `bug` `infra` | 🔲 | M | provisioning_test.go:282: curl CONNECT through the per-tenant egress proxy to GitHub returned 504 on PR 231 e2e; other specs passed — intermittent. Proxy upstream/tunnel timeout under CI load; independent of the AGC session flakes (Q134). |
| <a id="Q131"></a>Q131 | Flake: TestListener_IdleNotShutdownIfLast poll-count timing | `tests` `bug` | 🔲 | S | goroutine_test.go:419 asserted poll count ≥5 but got 3 ("poll past threshold when last listener") on a local `make check`; passed on rerun (-count=3) w/o code change. Timing-sensitive idle-shutdown test; tighten synchronization, not the count. |
| <a id="Q113"></a>Q113 | Flake: eviction integration tests time out in waitForWorkerPod | `tests` `bug` | 🔲 | S | EvictionTriggersRequeue + EvictionBudgetExhausted (failure_recovery_test.go:142) timed out (20s) on CI run 27383065643, passed on rerun w/o code change. Suspects: sessions[len-1] pick on shared brokerStub; 20s budget on 2-vCPU runner. |
| <a id="Q120"></a>Q120 | Flake: SIGTERM integration test misses session-delete budget | `tests` `bug` | 🔲 | S | TestAGC_SIGTERM_DeletesAllSessions: session-39 missed the 10s WaitForSessionDelete on CI run 27422248358 (PR 209), passed on rerun w/o code change. ~40 concurrent teardowns on a 2-vCPU runner; same shared-brokerStub/budget class as Q113. |
| <a id="Q136"></a>Q136 | node-local-dns (NodeLocal DNSCache) support for tenant DNS egress | `security` `infra` `1.0-gate` | 🔲 | S | Q105 DNS rule (kube-dns podSelector) breaks NodeLocal DNSCache: pods query link-local 169.254.20.10 (hostNetwork pod) → dropped on enforcing CNI, incl proxy. Fix: also allow port-53 to 169.254.0.0/16 (link-local, non-routable, keeps Q105). |
| <a id="Q137"></a>Q137 | AGC RunnerGroup not re-reconciled after a non-retriable baseline-listener exit | `bug` `infra` | 🔲 | S | runnergroup_controller.go returns RequeueAfter=reapAfter (0 with no worker pods); if the permanent baseline exits non-retriably the L317 restart only fires on the next watch event — up to the 10h resync. Requeue when ActiveCount<desired. |
| <a id="Q138"></a>Q138 | Bounded-by-default HTTP clients — retrofit http.DefaultClient fallbacks + lint gate | `infra` `bug` `tests` | 🔲 | M | ~8 prod clients default to http.DefaultClient (no read timeout); a slow peer wedges the goroutine (Q134 class). Add a bounded-by-default httpx client, make long-poll the explicit exception, and gate new uses with forbidigo+noctx. |
| <a id="Q98"></a>Q98 | Helm chart distribution/publishing pipeline | `infra` `1.0-gate` | 🔲 | M | Pipeline shipped: publish.yml chart-publish job packages, pushes (oci://ghcr/charts), and cosign-signs the chart. Remaining (first v* tag): live publish proof, flip prerelease annotation, oci:// in upgrade.md/README, Artifact Hub listing. |
| <a id="Q112"></a>Q112 | GMC Events silently 403'd: recorder writes events.k8s.io, RBAC grants core only | `bug` `infra` | 🔲 | S | Same root cause as the AGC fix in PR 202 (Q95): GMC uses mgr.GetEventRecorder (writes events.k8s.io/v1) but its kubebuilder marker grants only core "" events, so every GMC Event is dropped. Fix marker + `make manifests`; assert one event in e2e. |
| <a id="Q9"></a>Q9 | [M3-tests remaining items (H2/M/L)](plan/milestone-3-tests.md) | `milestone` `tests` | 🔲 | M | **Unblocked** — M3 metric assertions (H1) landed. Highest-leverage remaining: **H2** (rerun-API 5xx contract), **H3** (decryption-failure fallback), **M3** (`activePodCount` Pending branch). Worth picking up after 5c–5g. |
Expand Down
Loading
Loading