From 9fefb2ca371ca8de8bb1b12ff2e7b2cf66f7f9af Mon Sep 17 00:00:00 2001 From: yuteng Date: Sat, 21 Mar 2026 07:42:44 +0800 Subject: [PATCH 1/8] rebase Signed-off-by: yuteng --- runs/service/run_service.go | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/runs/service/run_service.go b/runs/service/run_service.go index 4b15104d36..c29e8e06aa 100644 --- a/runs/service/run_service.go +++ b/runs/service/run_service.go @@ -847,9 +847,7 @@ func (s *RunService) WatchRunDetails( } resp := &workflow.WatchRunDetailsResponse{ - Details: &workflow.RunDetails{ - // Would populate from run model - }, + Details: s.runModelToDetails(run, req.Msg.RunId), } if err := stream.Send(resp); err != nil { @@ -1185,8 +1183,30 @@ func actionModelToClusterEvents(action *models.Action) []*workflow.ClusterEvent }} } +// runModelToDetails converts a DB Run model to a RunDetails proto. +func (s *RunService) runModelToDetails(run *models.Run, runID *common.RunIdentifier) *workflow.RunDetails { + var runSpec *task.RunSpec + if len(run.ActionSpec) > 0 { + var actionSpec workflow.ActionSpec + if err := json.Unmarshal(run.ActionSpec, &actionSpec); err == nil { + runSpec = actionSpec.RunSpec + } + } + + return &workflow.RunDetails{ + RunSpec: runSpec, + Action: s.actionModelToDetails(run, &common.ActionIdentifier{ + Run: runID, + Name: run.Name, + }), + } +} + // actionModelToDetails converts a DB Action model to an ActionDetails proto. func (s *RunService) actionModelToDetails(action *models.Action, actionID *common.ActionIdentifier) *workflow.ActionDetails { + details := &workflow.ActionDetails{ + Id: actionID, + } status := &workflow.ActionStatus{ Phase: common.ActionPhase(action.Phase), StartTime: timestamppb.New(action.CreatedAt), @@ -1195,6 +1215,8 @@ func (s *RunService) actionModelToDetails(action *models.Action, actionID *commo } if action.EndedAt.Valid { status.EndTime = timestamppb.New(action.EndedAt.Time) + durationMs := uint64(status.EndTime.AsTime().Sub(status.StartTime.AsTime()).Milliseconds()) + status.DurationMs = &durationMs } if action.DurationMs.Valid { durationMs := uint64(action.DurationMs.Int64) From 353cd54a07d44b6aa20b9babb1471bd7c93c36b5 Mon Sep 17 00:00:00 2001 From: yuteng Date: Fri, 20 Mar 2026 23:44:44 +0800 Subject: [PATCH 2/8] fix potential bug Signed-off-by: yuteng --- runs/service/run_service.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/runs/service/run_service.go b/runs/service/run_service.go index c29e8e06aa..2639113be3 100644 --- a/runs/service/run_service.go +++ b/runs/service/run_service.go @@ -1185,25 +1185,34 @@ func actionModelToClusterEvents(action *models.Action) []*workflow.ClusterEvent // runModelToDetails converts a DB Run model to a RunDetails proto. func (s *RunService) runModelToDetails(run *models.Run, runID *common.RunIdentifier) *workflow.RunDetails { + if run == nil && runID == nil { + return nil + } var runSpec *task.RunSpec - if len(run.ActionSpec) > 0 { + if run != nil && len(run.ActionSpec) > 0 { var actionSpec workflow.ActionSpec if err := json.Unmarshal(run.ActionSpec, &actionSpec); err == nil { runSpec = actionSpec.RunSpec } } + id := &common.ActionIdentifier{ + Run: runID, + } + if run != nil { + id.Name = run.Name + } return &workflow.RunDetails{ RunSpec: runSpec, - Action: s.actionModelToDetails(run, &common.ActionIdentifier{ - Run: runID, - Name: run.Name, - }), + Action: s.actionModelToDetails(run, id), } } // actionModelToDetails converts a DB Action model to an ActionDetails proto. func (s *RunService) actionModelToDetails(action *models.Action, actionID *common.ActionIdentifier) *workflow.ActionDetails { + if action == nil && actionID == nil { + return nil + } details := &workflow.ActionDetails{ Id: actionID, } From a0ca3bd1d290dfa0d6bef0e8e7133a87d500e7d6 Mon Sep 17 00:00:00 2001 From: yuteng Date: Wed, 25 Mar 2026 08:56:09 +0800 Subject: [PATCH 3/8] show last error when total attempt > 1 but status attempt is 0 Signed-off-by: yuteng --- runs/service/run_service.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runs/service/run_service.go b/runs/service/run_service.go index 2639113be3..def597771e 100644 --- a/runs/service/run_service.go +++ b/runs/service/run_service.go @@ -413,8 +413,11 @@ func (s *RunService) buildActionDetails(ctx context.Context, model *models.Actio case common.ActionPhase_ACTION_PHASE_FAILED: // Get action error from last attempt. Events are eventually consistent, so we may not have // information from the latest attempt yet. + // When status.Attempts is 0, attempt tracking was not provided by the caller, so we use + // the last attempt's error info unconditionally. numAttempts := len(action.GetAttempts()) - if numAttempts > 0 && action.GetAttempts()[numAttempts-1].GetAttempt() == action.GetStatus().GetAttempts() { + statusAttempts := action.GetStatus().GetAttempts() + if numAttempts > 0 && (statusAttempts == 0 || action.GetAttempts()[numAttempts-1].GetAttempt() == statusAttempts) { action.Result = &workflow.ActionDetails_ErrorInfo{ ErrorInfo: action.GetAttempts()[numAttempts-1].GetErrorInfo(), } @@ -1213,9 +1216,6 @@ func (s *RunService) actionModelToDetails(action *models.Action, actionID *commo if action == nil && actionID == nil { return nil } - details := &workflow.ActionDetails{ - Id: actionID, - } status := &workflow.ActionStatus{ Phase: common.ActionPhase(action.Phase), StartTime: timestamppb.New(action.CreatedAt), From 944175899e9e55b719cd182929a9deacab1e156c Mon Sep 17 00:00:00 2001 From: yuteng Date: Wed, 25 Mar 2026 09:18:18 +0800 Subject: [PATCH 4/8] Add actionMetadataFromModel test Signed-off-by: yuteng --- runs/service/run_service_test.go | 136 +++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) diff --git a/runs/service/run_service_test.go b/runs/service/run_service_test.go index 4da44baee1..53e22680fd 100644 --- a/runs/service/run_service_test.go +++ b/runs/service/run_service_test.go @@ -1039,3 +1039,139 @@ func TestGetActionData_NonSucceededSkipsOutputs(t *testing.T) { // Verify ReadProtobuf was only called once (for inputs, not outputs) store.AssertNumberOfCalls(t, "ReadProtobuf", 1) } + +func TestActionModelToDetails(t *testing.T) { + svc := &RunService{} + now := time.Now().UTC().Truncate(time.Millisecond) + end := now.Add(5 * time.Second) + + tests := []struct { + name string + action *models.Action + actionID *common.ActionIdentifier + verify func(t *testing.T, result *workflow.ActionDetails) + }{ + { + name: "BothNilReturnsNil", + action: nil, + actionID: nil, + verify: func(t *testing.T, result *workflow.ActionDetails) { + assert.Nil(t, result) + }, + }, + { + name: "BasicStatusFields", + action: &models.Action{ + Phase: int32(common.ActionPhase_ACTION_PHASE_RUNNING), + CreatedAt: now, + Attempts: 2, + CacheStatus: core.CatalogCacheStatus_CACHE_HIT, + }, + actionID: testActionID, + verify: func(t *testing.T, result *workflow.ActionDetails) { + require.NotNil(t, result) + assert.Equal(t, testActionID, result.Id) + assert.Equal(t, common.ActionPhase_ACTION_PHASE_RUNNING, result.Status.Phase) + assert.Equal(t, now, result.Status.StartTime.AsTime()) + assert.Equal(t, uint32(2), result.Status.Attempts) + assert.Equal(t, core.CatalogCacheStatus_CACHE_HIT, result.Status.CacheStatus) + assert.Nil(t, result.Status.EndTime) + assert.Nil(t, result.Status.DurationMs) + }, + }, + { + name: "EndedAtSetsDurationFromTimestamps", + action: &models.Action{ + Phase: int32(common.ActionPhase_ACTION_PHASE_SUCCEEDED), + CreatedAt: now, + EndedAt: sql.NullTime{Time: end, Valid: true}, + }, + actionID: testActionID, + verify: func(t *testing.T, result *workflow.ActionDetails) { + require.NotNil(t, result.Status.EndTime) + assert.Equal(t, end, result.Status.EndTime.AsTime()) + require.NotNil(t, result.Status.DurationMs) + assert.Equal(t, uint64(5000), *result.Status.DurationMs) + }, + }, + { + name: "MetadataOptionalFields", + action: &models.Action{ + Phase: int32(common.ActionPhase_ACTION_PHASE_RUNNING), + ActionType: int32(workflow.ActionType_ACTION_TYPE_TASK), + FunctionName: "my_func", + ParentActionName: sql.NullString{String: "parent-action", Valid: true}, + ActionGroup: sql.NullString{String: "group-1", Valid: true}, + EnvironmentName: sql.NullString{String: "prod", Valid: true}, + }, + actionID: testActionID, + verify: func(t *testing.T, result *workflow.ActionDetails) { + require.NotNil(t, result.Metadata) + assert.Equal(t, workflow.ActionType_ACTION_TYPE_TASK, result.Metadata.ActionType) + assert.Equal(t, "my_func", result.Metadata.FuntionName) + assert.Equal(t, "parent-action", result.Metadata.Parent) + assert.Equal(t, "group-1", result.Metadata.Group) + assert.Equal(t, "prod", result.Metadata.EnvironmentName) + }, + }, + { + name: "TaskMetadataWithFullID", + action: &models.Action{ + Phase: int32(common.ActionPhase_ACTION_PHASE_RUNNING), + ActionType: int32(workflow.ActionType_ACTION_TYPE_TASK), + TaskType: "python", + TaskOrg: sql.NullString{String: "org", Valid: true}, + TaskProject: sql.NullString{String: "proj", Valid: true}, + TaskDomain: sql.NullString{String: "dev", Valid: true}, + TaskName: sql.NullString{String: "my_task", Valid: true}, + TaskVersion: sql.NullString{String: "v1", Valid: true}, + TaskShortName: sql.NullString{String: "short", Valid: true}, + }, + actionID: testActionID, + verify: func(t *testing.T, result *workflow.ActionDetails) { + taskMeta := result.Metadata.GetTask() + require.NotNil(t, taskMeta) + assert.Equal(t, "python", taskMeta.TaskType) + assert.Equal(t, "short", taskMeta.ShortName) + require.NotNil(t, taskMeta.Id) + assert.Equal(t, "org", taskMeta.Id.Org) + assert.Equal(t, "proj", taskMeta.Id.Project) + assert.Equal(t, "dev", taskMeta.Id.Domain) + assert.Equal(t, "my_task", taskMeta.Id.Name) + assert.Equal(t, "v1", taskMeta.Id.Version) + }, + }, + { + name: "TraceMetadata", + action: &models.Action{ + Phase: int32(common.ActionPhase_ACTION_PHASE_RUNNING), + ActionType: int32(workflow.ActionType_ACTION_TYPE_TRACE), + FunctionName: "trace_func", + }, + actionID: testActionID, + verify: func(t *testing.T, result *workflow.ActionDetails) { + traceMeta := result.Metadata.GetTrace() + require.NotNil(t, traceMeta) + assert.Equal(t, "trace_func", traceMeta.Name) + assert.Nil(t, result.Metadata.GetTask()) + }, + }, + { + name: "NilActionID", + action: &models.Action{ + Phase: int32(common.ActionPhase_ACTION_PHASE_QUEUED), + }, + actionID: nil, + verify: func(t *testing.T, result *workflow.ActionDetails) { + require.NotNil(t, result) + assert.Nil(t, result.Id) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.verify(t, svc.actionModelToDetails(tc.action, tc.actionID)) + }) + } +} From 0a17e6ed866f4f2354d55ccc1d9da8cec0d0d042 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Thu, 26 Mar 2026 09:23:00 +0800 Subject: [PATCH 5/8] remove comment Signed-off-by: Yuteng Chen --- runs/service/run_service.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/runs/service/run_service.go b/runs/service/run_service.go index def597771e..c6d5db8f58 100644 --- a/runs/service/run_service.go +++ b/runs/service/run_service.go @@ -413,8 +413,6 @@ func (s *RunService) buildActionDetails(ctx context.Context, model *models.Actio case common.ActionPhase_ACTION_PHASE_FAILED: // Get action error from last attempt. Events are eventually consistent, so we may not have // information from the latest attempt yet. - // When status.Attempts is 0, attempt tracking was not provided by the caller, so we use - // the last attempt's error info unconditionally. numAttempts := len(action.GetAttempts()) statusAttempts := action.GetStatus().GetAttempts() if numAttempts > 0 && (statusAttempts == 0 || action.GetAttempts()[numAttempts-1].GetAttempt() == statusAttempts) { From a5c694e21cdda7ecb78e1c86fa7cef31827f14e3 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Fri, 27 Mar 2026 20:52:47 +0800 Subject: [PATCH 6/8] add unit test of RunModelToDetails Signed-off-by: Yuteng Chen --- runs/service/run_service_test.go | 113 +++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/runs/service/run_service_test.go b/runs/service/run_service_test.go index 53e22680fd..0f23ccd2d3 100644 --- a/runs/service/run_service_test.go +++ b/runs/service/run_service_test.go @@ -1175,3 +1175,116 @@ func TestActionModelToDetails(t *testing.T) { }) } } + +func TestRunModelToDetails(t *testing.T) { + svc := &RunService{} + + testRunID := &common.RunIdentifier{ + Org: "test-org", + Project: "test-project", + Domain: "test-domain", + Name: "test-run", + } + + validActionSpecBytes, err := json.Marshal(&workflow.ActionSpec{ + RunSpec: &task.RunSpec{ + Labels: &task.Labels{ + Values: map[string]string{"env": "prod"}, + }, + }, + }) + require.NoError(t, err) + + tests := []struct { + name string + run *models.Run + runID *common.RunIdentifier + verify func(t *testing.T, result *workflow.RunDetails) + }{ + { + name: "BothNilReturnsNil", + run: nil, + runID: nil, + verify: func(t *testing.T, result *workflow.RunDetails) { + assert.Nil(t, result) + }, + }, + { + name: "EmptyActionSpecNilRunSpec", + run: &models.Run{ + Name: "test-run", + Phase: int32(common.ActionPhase_ACTION_PHASE_RUNNING), + }, + runID: testRunID, + verify: func(t *testing.T, result *workflow.RunDetails) { + require.NotNil(t, result) + assert.Nil(t, result.RunSpec) + }, + }, + { + name: "ValidActionSpecExtractsRunSpec", + run: &models.Run{ + Name: "test-run", + Phase: int32(common.ActionPhase_ACTION_PHASE_RUNNING), + ActionSpec: validActionSpecBytes, + }, + runID: testRunID, + verify: func(t *testing.T, result *workflow.RunDetails) { + require.NotNil(t, result) + require.NotNil(t, result.RunSpec) + require.NotNil(t, result.RunSpec.Labels) + assert.Equal(t, "prod", result.RunSpec.Labels.Values["env"]) + }, + }, + { + name: "MalformedActionSpecNilRunSpec", + run: &models.Run{ + Name: "test-run", + Phase: int32(common.ActionPhase_ACTION_PHASE_RUNNING), + ActionSpec: []byte("not-valid-json{{"), + }, + runID: testRunID, + verify: func(t *testing.T, result *workflow.RunDetails) { + require.NotNil(t, result) + assert.Nil(t, result.RunSpec) + }, + }, + { + name: "RunNamePropagatedToActionID", + run: &models.Run{ + Name: "my-run-name", + Phase: int32(common.ActionPhase_ACTION_PHASE_QUEUED), + }, + runID: testRunID, + verify: func(t *testing.T, result *workflow.RunDetails) { + require.NotNil(t, result) + require.NotNil(t, result.Action) + require.NotNil(t, result.Action.Id) + assert.Equal(t, "my-run-name", result.Action.Id.Name) + }, + }, + { + name: "RunIDPropagatedToActionID", + run: &models.Run{ + Name: testRunID.Name, + Phase: int32(common.ActionPhase_ACTION_PHASE_RUNNING), + }, + runID: testRunID, + verify: func(t *testing.T, result *workflow.RunDetails) { + require.NotNil(t, result) + require.NotNil(t, result.Action) + require.NotNil(t, result.Action.Id) + assert.Equal(t, testRunID.Org, result.Action.Id.Run.Org) + assert.Equal(t, testRunID.Project, result.Action.Id.Run.Project) + assert.Equal(t, testRunID.Domain, result.Action.Id.Run.Domain) + assert.Equal(t, testRunID.Name, result.Action.Id.Run.Name) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.verify(t, svc.runModelToDetails(tc.run, tc.runID)) + }) + } +} From 2f60c7dd72e10fceb39002b7ab3a333e515445d2 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Fri, 27 Mar 2026 23:26:32 +0800 Subject: [PATCH 7/8] Add WatchRunDetails unit test Signed-off-by: Yuteng Chen --- runs/service/run_service_test.go | 167 +++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/runs/service/run_service_test.go b/runs/service/run_service_test.go index 0f23ccd2d3..a528f14bce 100644 --- a/runs/service/run_service_test.go +++ b/runs/service/run_service_test.go @@ -10,6 +10,9 @@ import ( "testing" "time" + "net/http" + "net/http/httptest" + "connectrpc.com/connect" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" @@ -24,6 +27,7 @@ import ( "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/task" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow" + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow/workflowconnect" repoMocks "github.com/flyteorg/flyte/v2/runs/repository/mocks" "github.com/flyteorg/flyte/v2/runs/repository/models" ) @@ -99,6 +103,12 @@ func matchActionID(expected *common.ActionIdentifier) interface{} { }) } +func matchRunID(expected *common.RunIdentifier) interface{} { + return mock.MatchedBy(func(actual *common.RunIdentifier) bool { + return proto.Equal(actual, expected) + }) +} + func TestGetRunDetails_WithTaskSpec(t *testing.T) { actionRepo := &repoMocks.ActionRepo{} taskRepo := &repoMocks.TaskRepo{} @@ -1288,3 +1298,160 @@ func TestRunModelToDetails(t *testing.T) { }) } } + +func newWatchRunDetailsTestClient(t *testing.T, actionRepo *repoMocks.ActionRepo) workflowconnect.RunServiceClient { + t.Helper() + taskRepo := &repoMocks.TaskRepo{} + repo := &repoMocks.Repository{} + repo.On("ActionRepo").Return(actionRepo) + repo.On("TaskRepo").Maybe().Return(taskRepo) + + svc := &RunService{repo: repo, actionsClient: &mockActionsClient{}} + path, handler := workflowconnect.NewRunServiceHandler(svc) + + mux := http.NewServeMux() + mux.Handle(path, handler) + server := httptest.NewServer(mux) + t.Cleanup(server.Close) + + return workflowconnect.NewRunServiceClient(http.DefaultClient, server.URL) +} + +func TestWatchRunDetails(t *testing.T) { + runID := &common.RunIdentifier{ + Org: "test-org", + Project: "test-project", + Domain: "test-domain", + Name: "rtest-watch-1", + } + + runModel := &models.Run{ + Org: runID.Org, + Project: runID.Project, + Domain: runID.Domain, + Name: runID.Name, + Phase: int32(common.ActionPhase_ACTION_PHASE_RUNNING), + } + + tests := []struct { + name string + setupMocks func(actionRepo *repoMocks.ActionRepo) + test func(t *testing.T, client workflowconnect.RunServiceClient) + }{ + { + name: "GetRun_Error_Returns_NotFound", + setupMocks: func(actionRepo *repoMocks.ActionRepo) { + actionRepo.EXPECT().GetRun(mock.Anything, matchRunID(runID)).Return(nil, fmt.Errorf("run not found")) + }, + test: func(t *testing.T, client workflowconnect.RunServiceClient) { + ctx := context.Background() + stream, err := client.WatchRunDetails(ctx, connect.NewRequest(&workflow.WatchRunDetailsRequest{ + RunId: runID, + })) + require.NoError(t, err) + assert.False(t, stream.Receive()) + require.Error(t, stream.Err()) + var connectErr *connect.Error + require.True(t, errors.As(stream.Err(), &connectErr)) + assert.Equal(t, connect.CodeNotFound, connectErr.Code()) + }, + }, + { + name: "Initial_State_Sent_On_Success", + setupMocks: func(actionRepo *repoMocks.ActionRepo) { + actionRepo.EXPECT().GetRun(mock.Anything, matchRunID(runID)).Return(runModel, nil) + actionRepo.EXPECT().WatchRunUpdates(mock.Anything, matchRunID(runID), mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, _ *common.RunIdentifier, _ chan<- *models.Run, _ chan<- error) { + <-ctx.Done() + }) + }, + test: func(t *testing.T, client workflowconnect.RunServiceClient) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.WatchRunDetails(ctx, connect.NewRequest(&workflow.WatchRunDetailsRequest{ + RunId: runID, + })) + require.NoError(t, err) + + assert.True(t, stream.Receive()) + require.NotNil(t, stream.Msg().Details) + + cancel() + assert.False(t, stream.Receive()) + }, + }, + { + name: "Watch_Error_Returns_Internal_Error", + setupMocks: func(actionRepo *repoMocks.ActionRepo) { + actionRepo.EXPECT().GetRun(mock.Anything, matchRunID(runID)).Return(runModel, nil) + actionRepo.EXPECT().WatchRunUpdates(mock.Anything, matchRunID(runID), mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, _ *common.RunIdentifier, _ chan<- *models.Run, errs chan<- error) { + errs <- fmt.Errorf("db watch failed") + }) + }, + test: func(t *testing.T, client workflowconnect.RunServiceClient) { + ctx := context.Background() + stream, err := client.WatchRunDetails(ctx, connect.NewRequest(&workflow.WatchRunDetailsRequest{ + RunId: runID, + })) + require.NoError(t, err) + + // First message: initial state + assert.True(t, stream.Receive()) + + // Watch error terminates the stream + assert.False(t, stream.Receive()) + require.Error(t, stream.Err()) + var connectErr *connect.Error + require.True(t, errors.As(stream.Err(), &connectErr)) + assert.Equal(t, connect.CodeInternal, connectErr.Code()) + }, + }, + { + name: "Run_Update_Sends_New_Response", + setupMocks: func(actionRepo *repoMocks.ActionRepo) { + actionRepo.EXPECT().GetRun(mock.Anything, matchRunID(runID)).Return(runModel, nil) + actionRepo.EXPECT().WatchRunUpdates(mock.Anything, matchRunID(runID), mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, _ *common.RunIdentifier, updates chan<- *models.Run, _ chan<- error) { + updates <- &models.Run{ + Org: runID.Org, + Project: runID.Project, + Domain: runID.Domain, + Name: runID.Name, + Phase: int32(common.ActionPhase_ACTION_PHASE_SUCCEEDED), + } + <-ctx.Done() + }) + }, + test: func(t *testing.T, client workflowconnect.RunServiceClient) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.WatchRunDetails(ctx, connect.NewRequest(&workflow.WatchRunDetailsRequest{ + RunId: runID, + })) + require.NoError(t, err) + + // First message: initial state + assert.True(t, stream.Receive()) + // Second message: run update + assert.True(t, stream.Receive()) + assert.NotNil(t, stream.Msg().Details) + + cancel() + assert.False(t, stream.Receive()) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actionRepo := &repoMocks.ActionRepo{} + tc.setupMocks(actionRepo) + client := newWatchRunDetailsTestClient(t, actionRepo) + tc.test(t, client) + actionRepo.AssertExpectations(t) + }) + } +} From 72144db04be4535fd58e61768dfc39945a17a093 Mon Sep 17 00:00:00 2001 From: YuTeng Chen Date: Sat, 28 Mar 2026 21:29:42 +0800 Subject: [PATCH 8/8] Update runs/service/run_service.go Co-authored-by: Kevin Su Signed-off-by: YuTeng Chen --- runs/service/run_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runs/service/run_service.go b/runs/service/run_service.go index c6d5db8f58..8e89b4d5b7 100644 --- a/runs/service/run_service.go +++ b/runs/service/run_service.go @@ -415,7 +415,7 @@ func (s *RunService) buildActionDetails(ctx context.Context, model *models.Actio // information from the latest attempt yet. numAttempts := len(action.GetAttempts()) statusAttempts := action.GetStatus().GetAttempts() - if numAttempts > 0 && (statusAttempts == 0 || action.GetAttempts()[numAttempts-1].GetAttempt() == statusAttempts) { + if numAttempts > 0 && action.GetAttempts()[numAttempts-1].GetAttempt() == statusAttempts { action.Result = &workflow.ActionDetails_ErrorInfo{ ErrorInfo: action.GetAttempts()[numAttempts-1].GetErrorInfo(), }