diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 3e3ac497c9..1aac49cac6 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -18,4 +18,6 @@ #### General +- Fix gateway nil-pointer panic on non-live AI requests when no orchestrators are available + #### CLI diff --git a/server/ai_process.go b/server/ai_process.go index 22b3fabbaa..fb77f6d8f3 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -1588,17 +1588,20 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface if monitor.Enabled { monitor.AIRequestError(errMsg, monitor.ToPipeline(capName), modelID, nil) } - monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ - "type": "gateway_no_orchestrators_available", - "timestamp": time.Now().UnixMilli(), - "stream_id": params.liveParams.streamID, - "pipeline_id": params.liveParams.pipelineID, - "request_id": params.liveParams.requestID, - "orchestrator_info": map[string]interface{}{ - "address": "", - "url": "", - }, - }) + // liveParams is only set for realtime video requests; guard against nil for other pipelines + if params.liveParams != nil { + monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + "type": "gateway_no_orchestrators_available", + "timestamp": time.Now().UnixMilli(), + "stream_id": params.liveParams.streamID, + "pipeline_id": params.liveParams.pipelineID, + "request_id": params.liveParams.requestID, + "orchestrator_info": map[string]interface{}{ + "address": "", + "url": "", + }, + }) + } return nil, &ServiceUnavailableError{err: errors.New(errMsg)} } return resp, nil diff --git a/server/ai_process_test.go b/server/ai_process_test.go index d308d8d347..bd11617c89 100644 --- a/server/ai_process_test.go +++ b/server/ai_process_test.go @@ -4,9 +4,12 @@ import ( "context" "errors" "reflect" + "strconv" "testing" + "time" "github.com/livepeer/go-livepeer/ai/worker" + "github.com/livepeer/go-livepeer/core" ) func Test_submitLLM(t *testing.T) { @@ -125,6 +128,52 @@ func TestEncodeReqMetadata(t *testing.T) { } } +// Test_processAIRequest_NoOrchestrators_NonLive ensures that a non-live AI +// request (liveParams == nil) returns a ServiceUnavailableError instead of +// panicking when no orchestrators are available. +func Test_processAIRequest_NoOrchestrators_NonLive(t *testing.T) { + cap := core.Capability_AudioToText + modelID := "openai/whisper-large-v3" + + node := &core.LivepeerNode{ + OrchestratorPool: &stubDiscovery{}, + AIProcesssingRetryTimeout: time.Second, + } + + // Seed the selector cache with empty pools so Select returns nil without + // triggering a network refresh (empty OrchestratorPool keeps the refresh + // threshold at zero). + sel := &AISessionSelector{ + warmPool: NewAISessionPool(&LIFOSelector{}, newSuspender(), 0), + coldPool: NewAISessionPool(&LIFOSelector{}, newSuspender(), 0), + ttl: time.Hour, + lastRefreshTime: time.Now(), + cap: cap, + modelID: modelID, + node: node, + suspender: newSuspender(), + } + mgr := NewAISessionManager(node, time.Hour) + mgr.selectors[strconv.Itoa(int(cap))+"_"+modelID] = sel + + params := aiRequestParams{ + node: node, + sessManager: mgr, + // liveParams intentionally nil to mimic a non-live pipeline request. + } + + req := worker.GenAudioToTextMultipartRequestBody{ModelId: &modelID} + + resp, err := processAIRequest(context.Background(), params, req) + if resp != nil { + t.Fatalf("expected nil response, got %v", resp) + } + var unavailable *ServiceUnavailableError + if !errors.As(err, &unavailable) { + t.Fatalf("expected ServiceUnavailableError, got %v", err) + } +} + func Test_isNoCapacityError(t *testing.T) { tests := []struct { name string