From 2a65dc97b5cc1fcd4f6500e793c6cce8bcc58c92 Mon Sep 17 00:00:00 2001 From: "caijialin.626" Date: Mon, 25 May 2026 17:06:46 +0800 Subject: [PATCH 01/10] test: force timeout repro for coze video target --- .../modules/evaluation/domain/service/target_impl.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/backend/modules/evaluation/domain/service/target_impl.go b/backend/modules/evaluation/domain/service/target_impl.go index a4adbdfd2..464f3bac5 100644 --- a/backend/modules/evaluation/domain/service/target_impl.go +++ b/backend/modules/evaluation/domain/service/target_impl.go @@ -282,6 +282,18 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ if evalTargetDO == nil { return nil, errorx.NewByCode(errno.CommonInvalidParamCode, errorx.WithExtraMsg("[ExecuteTarget]evalTargetDO is nil")) } + if evalTargetDO.EvalTargetVersion != nil { + customRPC := evalTargetDO.EvalTargetVersion.CustomRPCServer + if customRPC != nil && + customRPC.Timeout != nil && + *customRPC.Timeout > 0 && + evalTargetDO.SourceTargetID == "7590095306227520514" && + gptr.Indirect(customRPC.ExecEnv) == "ppe_fornax_timeout_record_0525" { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(*customRPC.Timeout)*time.Millisecond) + defer cancel() + } + } defer func() { if e := recover(); e != nil { From 8c5ff8a9f73d831d13e024fcc7415e62ef71c633 Mon Sep 17 00:00:00 2001 From: "caijialin.626" Date: Mon, 25 May 2026 17:41:13 +0800 Subject: [PATCH 02/10] test: force outer timeout repro for coze video item --- .../service/expt_run_item_event_impl.go | 25 +++++++++++++++++++ .../evaluation/domain/service/target_impl.go | 12 --------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go index 5267538b5..1db2ad46b 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go @@ -250,6 +250,11 @@ func (e *ExptItemEventEvalServiceImpl) eval(ctx context.Context, event *entity.E } ctx = e.WithCtx(ctx, eiec) + if timeoutMS, ok := cozeVideoTimeoutRecordReproTimeoutMS(eiec); ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(timeoutMS)*time.Millisecond) + defer cancel() + } mode, err := NewRecordEvalMode( eiec.Event, @@ -281,6 +286,26 @@ func (e *ExptItemEventEvalServiceImpl) eval(ctx context.Context, event *entity.E return nil } +func cozeVideoTimeoutRecordReproTimeoutMS(eiec *entity.ExptItemEvalCtx) (int64, bool) { + if eiec == nil || eiec.Expt == nil || eiec.Expt.Target == nil || eiec.Expt.Target.EvalTargetVersion == nil { + return 0, false + } + + target := eiec.Expt.Target + customRPC := target.EvalTargetVersion.CustomRPCServer + if customRPC == nil || customRPC.Timeout == nil || *customRPC.Timeout <= 0 { + return 0, false + } + if target.SourceTargetID != "7590095306227520514" { + return 0, false + } + if gptr.Indirect(customRPC.ExecEnv) != "ppe_fornax_timeout_record_0525" { + return 0, false + } + + return *customRPC.Timeout, true +} + func (e *ExptItemEventEvalServiceImpl) WithCtx(ctx context.Context, eiec *entity.ExptItemEvalCtx) context.Context { return logs.SetLogID(ctx, eiec.GetRecordEvalLogID(ctx)) } diff --git a/backend/modules/evaluation/domain/service/target_impl.go b/backend/modules/evaluation/domain/service/target_impl.go index 464f3bac5..a4adbdfd2 100644 --- a/backend/modules/evaluation/domain/service/target_impl.go +++ b/backend/modules/evaluation/domain/service/target_impl.go @@ -282,18 +282,6 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ if evalTargetDO == nil { return nil, errorx.NewByCode(errno.CommonInvalidParamCode, errorx.WithExtraMsg("[ExecuteTarget]evalTargetDO is nil")) } - if evalTargetDO.EvalTargetVersion != nil { - customRPC := evalTargetDO.EvalTargetVersion.CustomRPCServer - if customRPC != nil && - customRPC.Timeout != nil && - *customRPC.Timeout > 0 && - evalTargetDO.SourceTargetID == "7590095306227520514" && - gptr.Indirect(customRPC.ExecEnv) == "ppe_fornax_timeout_record_0525" { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(*customRPC.Timeout)*time.Millisecond) - defer cancel() - } - } defer func() { if e := recover(); e != nil { From a066961962f5a67c69e2f3643c43e083eeae34d5 Mon Sep 17 00:00:00 2001 From: "caijialin.626" Date: Mon, 25 May 2026 18:07:13 +0800 Subject: [PATCH 03/10] fix: persist eval failure records after context cancellation --- .../domain/service/expt_run_item_impl.go | 17 +++- .../domain/service/expt_run_item_impl_test.go | 49 ++++++++++ .../evaluation/domain/service/target_impl.go | 13 ++- .../domain/service/target_impl_test.go | 96 ++++++++++++++++++- 4 files changed, 162 insertions(+), 13 deletions(-) diff --git a/backend/modules/evaluation/domain/service/expt_run_item_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_impl.go index 8b410a89b..653f4b057 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_impl.go @@ -69,6 +69,8 @@ type ExptItemEvalCtxExecutor struct { evalSetItemSvc EvaluationSetItemService } +const exptRunLogPersistTimeout = 5 * time.Second + func (e *ExptItemEvalCtxExecutor) Eval(ctx context.Context, eiec *entity.ExptItemEvalCtx) error { event := eiec.Event @@ -130,6 +132,8 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * if result == nil { return fmt.Errorf("StoreTurnRunResult with nil result") } + persistCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), exptRunLogPersistTimeout) + defer cancel() turn := etec.Turn turnResultLog := etec.GetExistTurnResultLogs()[turn.ID] @@ -173,7 +177,7 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * if se, ok := errorx.FromStatusError(evalErr); ok && (se.Code() == errno.CustomEvalTargetInvokeFailCode || se.Code() == errno.CustomRPCEvaluatorRunFailedCode) { errMsg = errorx.ErrorWithoutStack(evalErr) } else { - errMsg = e.Configer.GetErrCtrl(ctx).ConvertErrMsg(evalErr.Error()) + errMsg = e.Configer.GetErrCtrl(persistCtx).ConvertErrMsg(evalErr.Error()) } logs.CtxWarn(ctx, "[ExptTurnEval] store turn run err, before: %v, after: %v", evalErr, errMsg) @@ -195,7 +199,7 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * result.SetEvalErr(evalErr) - if err := e.TurnResultRepo.SaveTurnRunLogs(ctx, []*entity.ExptTurnResultRunLog{clone}); err != nil { + if err := e.TurnResultRepo.SaveTurnRunLogs(persistCtx, []*entity.ExptTurnResultRunLog{clone}); err != nil { return err } @@ -275,8 +279,11 @@ func (e *ExptItemEvalCtxExecutor) buildExptTurnEvalCtx(ctx context.Context, turn } func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *entity.ExptItemEvalEvent, evalErr error) error { + persistCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), exptRunLogPersistTimeout) + defer cancel() + if evalErr != nil { - if retry, _ := e.evalErrNeedRetry(ctx, event, evalErr); retry { + if retry, _ := e.evalErrNeedRetry(persistCtx, event, evalErr); retry { return evalErr } } @@ -292,11 +299,11 @@ func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *en ufields["status"] = int32(entity.ItemRunState_Success) } - if err := e.ItemResultRepo.UpdateItemRunLog(ctx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { + if err := e.ItemResultRepo.UpdateItemRunLog(persistCtx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { return err } - if e.evalErrNeedTerminateExpt(ctx, event.SpaceID, evalErr) { + if e.evalErrNeedTerminateExpt(persistCtx, event.SpaceID, evalErr) { logs.CtxWarn(ctx, "[ExptTurnEval] found error which should terminate expt, expt_id: %v, expt_run_id: %v, item_id: %v, err: %v", event.ExptID, event.ExptRunID, event.EvalSetItemID, evalErr) return evalErr } diff --git a/backend/modules/evaluation/domain/service/expt_run_item_impl_test.go b/backend/modules/evaluation/domain/service/expt_run_item_impl_test.go index 166039c59..3ac898ad9 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_impl_test.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_impl_test.go @@ -11,6 +11,7 @@ import ( "github.com/bytedance/gg/gptr" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "github.com/coze-dev/coze-loop/backend/infra/external/benefit" @@ -371,6 +372,23 @@ func Test_ExptItemEvalCtxExecutor_CompleteSetItemRun(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "mock updateitemrunlog error") }) + + t.Run("ctx取消后仍落item失败状态", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + mockConfiger.EXPECT().GetErrRetryConf(gomock.Any(), int64(4), gomock.Any()).AnyTimes().Return(&entity.RetryConf{IsInDebt: false}) + mockItemResultRepo.EXPECT().UpdateItemRunLog(gomock.Any(), int64(1), int64(2), []int64{3}, gomock.Any(), int64(4)). + DoAndReturn(func(ctx context.Context, _, _ int64, _ []int64, ufields map[string]any, _ int64) error { + require.NoError(t, ctx.Err()) + assert.Equal(t, int32(entity.ItemRunState_Fail), ufields["status"]) + return nil + }) + + event := &entity.ExptItemEvalEvent{ExptID: 1, ExptRunID: 2, EvalSetItemID: 3, SpaceID: 4, RetryTimes: 1} + err := executor.CompleteItemRun(ctx, event, errors.New("target timeout")) + assert.NoError(t, err) + }) } func Test_ExptItemEvalCtxExecutor_storeTurnRunResult(t *testing.T) { @@ -438,6 +456,37 @@ func Test_ExptItemEvalCtxExecutor_storeTurnRunResult(t *testing.T) { err := executor.storeTurnRunResult(context.Background(), etec, result) assert.NoError(t, err) }) + + t.Run("ctx取消后仍落turn失败状态", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + turnResultLog := &entity.ExptTurnResultRunLog{ID: 1, TurnID: 1} + etec := &entity.ExptTurnEvalCtx{ + Turn: &entity.Turn{ID: 1}, + ExptItemEvalCtx: &entity.ExptItemEvalCtx{ + Expt: &entity.Experiment{ID: 1, SourceID: "src", SpaceID: 2}, + Event: &entity.ExptItemEvalEvent{ExptRunID: 3}, + EvalSetItem: &entity.EvaluationSetItem{ItemID: 2}, + ExistItemEvalResult: &entity.ExptItemEvalResult{TurnResultRunLogs: map[int64]*entity.ExptTurnResultRunLog{1: turnResultLog}}, + }, + } + result := &entity.ExptTurnRunResult{EvalErr: errors.New("target timeout")} + + mockConfiger.EXPECT().GetErrCtrl(gomock.Any()).DoAndReturn(func(ctx context.Context) *entity.ExptErrCtrl { + require.NoError(t, ctx.Err()) + return entity.DefaultExptErrCtrl() + }) + mockTurnResultRepo.EXPECT().SaveTurnRunLogs(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, logs []*entity.ExptTurnResultRunLog) error { + require.NoError(t, ctx.Err()) + require.Len(t, logs, 1) + assert.Equal(t, entity.TurnRunState_Fail, logs[0].Status) + return nil + }) + + err := executor.storeTurnRunResult(ctx, etec, result) + assert.NoError(t, err) + }) } func Test_buildExptTurnEvalCtx(t *testing.T) { diff --git a/backend/modules/evaluation/domain/service/target_impl.go b/backend/modules/evaluation/domain/service/target_impl.go index a4adbdfd2..29f8dcdc1 100644 --- a/backend/modules/evaluation/domain/service/target_impl.go +++ b/backend/modules/evaluation/domain/service/target_impl.go @@ -43,6 +43,8 @@ type EvalTargetServiceImpl struct { configer component.IConfiger } +const evalTargetRecordPersistTimeout = 5 * time.Second + func NewEvalTargetServiceImpl(evalTargetRepo repo.IEvalTargetRepo, idgen idgen.IIDGenerator, metric metrics.EvalTargetMetrics, @@ -354,12 +356,15 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ } } - recordID, err1 := e.idgen.GenID(ctx) + recordCtx, recordCancel := context.WithTimeout(context.WithoutCancel(ctx), evalTargetRecordPersistTimeout) + defer recordCancel() + + recordID, err1 := e.idgen.GenID(recordCtx) if err1 != nil { err = err1 return } - logID := logs.GetLogID(ctx) + logID := logs.GetLogID(recordCtx) record = &entity.EvalTargetRecord{ ID: recordID, @@ -385,9 +390,9 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ UpdatedAt: gptr.Of(time.Now().UnixMilli()), }, } - e.convEvalTargetRunErr(ctx, record) + e.convEvalTargetRunErr(recordCtx, record) - _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(ctx, record, nil) + _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(recordCtx, record, nil) if errCreate != nil { return } diff --git a/backend/modules/evaluation/domain/service/target_impl_test.go b/backend/modules/evaluation/domain/service/target_impl_test.go index 0ece1bc66..6f383125c 100755 --- a/backend/modules/evaluation/domain/service/target_impl_test.go +++ b/backend/modules/evaluation/domain/service/target_impl_test.go @@ -607,10 +607,14 @@ func TestEvalTargetServiceImpl_ExecuteTarget(t *testing.T) { deps.configer.EXPECT().GetTargetTrajectoryConf(gomock.Any()).AnyTimes().Return(&entity.TargetTrajectoryConf{}) // convEvalTargetRunErr (in ExecuteTarget defer) may call GetErrCtrl when record has EvalTargetRunError deps.configer.EXPECT().GetErrCtrl(gomock.Any()).AnyTimes().Return(entity.DefaultExptErrCtrl()) - deps.idgen.EXPECT().GenID(ctx).Return(int64(9999), nil) + deps.idgen.EXPECT().GenID(gomock.Any()).DoAndReturn(func(ctx context.Context) (int64, error) { + require.NoError(t, ctx.Err()) + return int64(9999), nil + }) var savedRecord *entity.EvalTargetRecord - deps.repo.EXPECT().CreateEvalTargetRecord(ctx, gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, rec *entity.EvalTargetRecord, _ *bool) (int64, error) { + deps.repo.EXPECT().CreateEvalTargetRecord(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, rec *entity.EvalTargetRecord, _ *bool) (int64, error) { + require.NoError(t, ctx.Err()) savedRecord = rec return rec.ID, nil }) @@ -651,6 +655,86 @@ func TestEvalTargetServiceImpl_ExecuteTarget(t *testing.T) { } } +func TestEvalTargetServiceImpl_ExecuteTarget_PersistsFailRecordAfterContextCanceled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + deps := &evalTargetServiceTestDeps{ + repo: repomocks.NewMockIEvalTargetRepo(ctrl), + idgen: idgenmocks.NewMockIIDGenerator(ctrl), + metric: metricsmocks.NewMockEvalTargetMetrics(ctrl), + operator: servicemocks.NewMockISourceEvalTargetOperateService(ctrl), + configer: componentmocks.NewMockIConfiger(ctrl), + } + + evalTarget := &entity.EvalTarget{ + ID: 200, + SpaceID: 100, + SourceTargetID: "src-id", + EvalTargetType: entity.EvalTargetTypeLoopPrompt, + EvalTargetVersion: &entity.EvalTargetVersion{ + ID: 300, + SourceTargetVersion: "v1", + InputSchema: []*entity.ArgsSchema{ + {Key: gptr.Of("field")}, + }, + }, + } + input := &entity.EvalTargetInputData{ + InputFields: map[string]*entity.Content{ + "field": { + ContentType: gptr.Of(entity.ContentTypeText), + Text: gptr.Of("hello"), + }, + }, + } + param := &entity.ExecuteTargetCtx{ + ExperimentRunID: gptr.Of(int64(555)), + ItemID: 777, + TurnID: 888, + } + + deps.repo.EXPECT().GetEvalTargetVersion(ctx, evalTarget.SpaceID, evalTarget.EvalTargetVersion.ID).Return(evalTarget, nil) + deps.metric.EXPECT().EmitRun(evalTarget.SpaceID, gomock.Any(), gomock.Any()).Times(1) + deps.configer.EXPECT().GetTargetTrajectoryConf(gomock.Any()).AnyTimes().Return(&entity.TargetTrajectoryConf{}) + deps.configer.EXPECT().GetErrCtrl(gomock.Any()).AnyTimes().DoAndReturn(func(ctx context.Context) *entity.ExptErrCtrl { + require.NoError(t, ctx.Err()) + return entity.DefaultExptErrCtrl() + }) + deps.operator.EXPECT().ValidateInput(gomock.Any(), evalTarget.SpaceID, evalTarget.EvalTargetVersion.InputSchema, input).Return(nil) + deps.operator.EXPECT().Execute(gomock.Any(), evalTarget.SpaceID, gomock.Any()).DoAndReturn(func(context.Context, int64, *entity.ExecuteEvalTargetParam) (*entity.EvalTargetOutputData, entity.EvalTargetRunStatus, error) { + cancel() + return nil, entity.EvalTargetRunStatusFail, context.Canceled + }) + deps.idgen.EXPECT().GenID(gomock.Any()).DoAndReturn(func(ctx context.Context) (int64, error) { + require.NoError(t, ctx.Err()) + return int64(9999), nil + }) + deps.repo.EXPECT().CreateEvalTargetRecord(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, rec *entity.EvalTargetRecord, _ *bool) (int64, error) { + require.NoError(t, ctx.Err()) + require.NotNil(t, rec) + assert.Equal(t, entity.EvalTargetRunStatusFail, gptr.Indirect(rec.Status)) + return rec.ID, nil + }) + + svc := &EvalTargetServiceImpl{ + evalTargetRepo: deps.repo, + idgen: deps.idgen, + metric: deps.metric, + typedOperators: map[entity.EvalTargetType]ISourceEvalTargetOperateService{ + evalTarget.EvalTargetType: deps.operator, + }, + configer: deps.configer, + } + + record, err := svc.ExecuteTarget(ctx, evalTarget.SpaceID, evalTarget.ID, evalTarget.EvalTargetVersion.ID, param, input) + require.NoError(t, err) + require.NotNil(t, record) + assert.Equal(t, int64(9999), record.ID) + assert.Equal(t, entity.EvalTargetRunStatusFail, gptr.Indirect(record.Status)) +} + func TestEvalTargetServiceImpl_ExecuteTarget_TrajectoryExtraction(t *testing.T) { // do not run in parallel, this test involves time.Sleep @@ -752,7 +836,10 @@ func TestEvalTargetServiceImpl_ExecuteTarget_TrajectoryExtraction(t *testing.T) spaceID: 1, }, }) - deps.idgen.EXPECT().GenID(ctx).Return(int64(9999), nil) + deps.idgen.EXPECT().GenID(gomock.Any()).DoAndReturn(func(ctx context.Context) (int64, error) { + require.NoError(t, ctx.Err()) + return int64(9999), nil + }) trajectoryAdapter.EXPECT(). ListTrajectory(gomock.Any(), spaceID, gomock.Any(), gomock.AssignableToTypeOf((*int64)(nil))). @@ -770,7 +857,8 @@ func TestEvalTargetServiceImpl_ExecuteTarget_TrajectoryExtraction(t *testing.T) Return(outputData, entity.EvalTargetRunStatusSuccess, nil) var savedRecord *entity.EvalTargetRecord - deps.repo.EXPECT().CreateEvalTargetRecord(ctx, gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, rec *entity.EvalTargetRecord, _ *bool) (int64, error) { + deps.repo.EXPECT().CreateEvalTargetRecord(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, rec *entity.EvalTargetRecord, _ *bool) (int64, error) { + require.NoError(t, ctx.Err()) savedRecord = rec return rec.ID, nil }) From 33d3bf894d51349e36a39072c46abb48c23971b6 Mon Sep 17 00:00:00 2001 From: "caijialin.626" Date: Mon, 25 May 2026 18:16:39 +0800 Subject: [PATCH 04/10] test: force target timeout repro after span setup --- .../evaluation/domain/service/target_impl.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/backend/modules/evaluation/domain/service/target_impl.go b/backend/modules/evaluation/domain/service/target_impl.go index 29f8dcdc1..8355c6c37 100644 --- a/backend/modules/evaluation/domain/service/target_impl.go +++ b/backend/modules/evaluation/domain/service/target_impl.go @@ -402,6 +402,7 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ ctx, span = looptracer.GetTracer().StartSpan(ctx, "EvalTarget", "eval_target", looptracer.WithStartNewTrace(), looptracer.WithSpanWorkspaceID(strconv.FormatInt(spaceID, 10))) span.SetCallType("EvalTarget") ctx = looptracer.GetTracer().Inject(ctx) + cozeVideoTimeoutRecordReproSleep(evalTargetDO) if e.typedOperators[evalTargetDO.EvalTargetType] == nil { return nil, errorx.NewByCode(errno.CommonInvalidParamCode, errorx.WithExtraMsg("target type not support")) } @@ -440,6 +441,25 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ return record, nil } +func cozeVideoTimeoutRecordReproSleep(evalTargetDO *entity.EvalTarget) { + if evalTargetDO == nil || evalTargetDO.EvalTargetVersion == nil { + return + } + + customRPC := evalTargetDO.EvalTargetVersion.CustomRPCServer + if customRPC == nil || customRPC.Timeout == nil || *customRPC.Timeout <= 0 { + return + } + if evalTargetDO.SourceTargetID != "7590095306227520514" { + return + } + if gptr.Indirect(customRPC.ExecEnv) != "ppe_fornax_timeout_record_0525" { + return + } + + time.Sleep(time.Duration(*customRPC.Timeout+50) * time.Millisecond) +} + func (e *EvalTargetServiceImpl) ExtractTrajectory(ctx context.Context, spaceID int64, traceID string, startTimeMS *int64) (*entity.Trajectory, error) { if len(traceID) == 0 { return nil, errorx.New("ExtractTrajectory with null traceID") From a769529ed3b7a1ec170a1e98b7e6d56cebfaa059 Mon Sep 17 00:00:00 2001 From: "caijialin.626" Date: Mon, 25 May 2026 18:34:06 +0800 Subject: [PATCH 05/10] chore: remove timeout repro hooks --- .../service/expt_run_item_event_impl.go | 25 ------------------- .../evaluation/domain/service/target_impl.go | 20 --------------- 2 files changed, 45 deletions(-) diff --git a/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go index 1db2ad46b..5267538b5 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go @@ -250,11 +250,6 @@ func (e *ExptItemEventEvalServiceImpl) eval(ctx context.Context, event *entity.E } ctx = e.WithCtx(ctx, eiec) - if timeoutMS, ok := cozeVideoTimeoutRecordReproTimeoutMS(eiec); ok { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(timeoutMS)*time.Millisecond) - defer cancel() - } mode, err := NewRecordEvalMode( eiec.Event, @@ -286,26 +281,6 @@ func (e *ExptItemEventEvalServiceImpl) eval(ctx context.Context, event *entity.E return nil } -func cozeVideoTimeoutRecordReproTimeoutMS(eiec *entity.ExptItemEvalCtx) (int64, bool) { - if eiec == nil || eiec.Expt == nil || eiec.Expt.Target == nil || eiec.Expt.Target.EvalTargetVersion == nil { - return 0, false - } - - target := eiec.Expt.Target - customRPC := target.EvalTargetVersion.CustomRPCServer - if customRPC == nil || customRPC.Timeout == nil || *customRPC.Timeout <= 0 { - return 0, false - } - if target.SourceTargetID != "7590095306227520514" { - return 0, false - } - if gptr.Indirect(customRPC.ExecEnv) != "ppe_fornax_timeout_record_0525" { - return 0, false - } - - return *customRPC.Timeout, true -} - func (e *ExptItemEventEvalServiceImpl) WithCtx(ctx context.Context, eiec *entity.ExptItemEvalCtx) context.Context { return logs.SetLogID(ctx, eiec.GetRecordEvalLogID(ctx)) } diff --git a/backend/modules/evaluation/domain/service/target_impl.go b/backend/modules/evaluation/domain/service/target_impl.go index 8355c6c37..29f8dcdc1 100644 --- a/backend/modules/evaluation/domain/service/target_impl.go +++ b/backend/modules/evaluation/domain/service/target_impl.go @@ -402,7 +402,6 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ ctx, span = looptracer.GetTracer().StartSpan(ctx, "EvalTarget", "eval_target", looptracer.WithStartNewTrace(), looptracer.WithSpanWorkspaceID(strconv.FormatInt(spaceID, 10))) span.SetCallType("EvalTarget") ctx = looptracer.GetTracer().Inject(ctx) - cozeVideoTimeoutRecordReproSleep(evalTargetDO) if e.typedOperators[evalTargetDO.EvalTargetType] == nil { return nil, errorx.NewByCode(errno.CommonInvalidParamCode, errorx.WithExtraMsg("target type not support")) } @@ -441,25 +440,6 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ return record, nil } -func cozeVideoTimeoutRecordReproSleep(evalTargetDO *entity.EvalTarget) { - if evalTargetDO == nil || evalTargetDO.EvalTargetVersion == nil { - return - } - - customRPC := evalTargetDO.EvalTargetVersion.CustomRPCServer - if customRPC == nil || customRPC.Timeout == nil || *customRPC.Timeout <= 0 { - return - } - if evalTargetDO.SourceTargetID != "7590095306227520514" { - return - } - if gptr.Indirect(customRPC.ExecEnv) != "ppe_fornax_timeout_record_0525" { - return - } - - time.Sleep(time.Duration(*customRPC.Timeout+50) * time.Millisecond) -} - func (e *EvalTargetServiceImpl) ExtractTrajectory(ctx context.Context, spaceID int64, traceID string, startTimeMS *int64) (*entity.Trajectory, error) { if len(traceID) == 0 { return nil, errorx.New("ExtractTrajectory with null traceID") From 8c8cfc94b760fab00b0cdbb965d4d01d78b4a307 Mon Sep 17 00:00:00 2001 From: "caijialin.626" Date: Mon, 25 May 2026 18:48:49 +0800 Subject: [PATCH 06/10] test: repro canceled context without persistence fix --- .../service/expt_run_item_event_impl.go | 25 +++++++++++++++++++ .../domain/service/expt_run_item_impl.go | 17 ++++--------- .../evaluation/domain/service/target_impl.go | 13 +++------- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go index 5267538b5..1db2ad46b 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go @@ -250,6 +250,11 @@ func (e *ExptItemEventEvalServiceImpl) eval(ctx context.Context, event *entity.E } ctx = e.WithCtx(ctx, eiec) + if timeoutMS, ok := cozeVideoTimeoutRecordReproTimeoutMS(eiec); ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(timeoutMS)*time.Millisecond) + defer cancel() + } mode, err := NewRecordEvalMode( eiec.Event, @@ -281,6 +286,26 @@ func (e *ExptItemEventEvalServiceImpl) eval(ctx context.Context, event *entity.E return nil } +func cozeVideoTimeoutRecordReproTimeoutMS(eiec *entity.ExptItemEvalCtx) (int64, bool) { + if eiec == nil || eiec.Expt == nil || eiec.Expt.Target == nil || eiec.Expt.Target.EvalTargetVersion == nil { + return 0, false + } + + target := eiec.Expt.Target + customRPC := target.EvalTargetVersion.CustomRPCServer + if customRPC == nil || customRPC.Timeout == nil || *customRPC.Timeout <= 0 { + return 0, false + } + if target.SourceTargetID != "7590095306227520514" { + return 0, false + } + if gptr.Indirect(customRPC.ExecEnv) != "ppe_fornax_timeout_record_0525" { + return 0, false + } + + return *customRPC.Timeout, true +} + func (e *ExptItemEventEvalServiceImpl) WithCtx(ctx context.Context, eiec *entity.ExptItemEvalCtx) context.Context { return logs.SetLogID(ctx, eiec.GetRecordEvalLogID(ctx)) } diff --git a/backend/modules/evaluation/domain/service/expt_run_item_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_impl.go index 653f4b057..8b410a89b 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_impl.go @@ -69,8 +69,6 @@ type ExptItemEvalCtxExecutor struct { evalSetItemSvc EvaluationSetItemService } -const exptRunLogPersistTimeout = 5 * time.Second - func (e *ExptItemEvalCtxExecutor) Eval(ctx context.Context, eiec *entity.ExptItemEvalCtx) error { event := eiec.Event @@ -132,8 +130,6 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * if result == nil { return fmt.Errorf("StoreTurnRunResult with nil result") } - persistCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), exptRunLogPersistTimeout) - defer cancel() turn := etec.Turn turnResultLog := etec.GetExistTurnResultLogs()[turn.ID] @@ -177,7 +173,7 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * if se, ok := errorx.FromStatusError(evalErr); ok && (se.Code() == errno.CustomEvalTargetInvokeFailCode || se.Code() == errno.CustomRPCEvaluatorRunFailedCode) { errMsg = errorx.ErrorWithoutStack(evalErr) } else { - errMsg = e.Configer.GetErrCtrl(persistCtx).ConvertErrMsg(evalErr.Error()) + errMsg = e.Configer.GetErrCtrl(ctx).ConvertErrMsg(evalErr.Error()) } logs.CtxWarn(ctx, "[ExptTurnEval] store turn run err, before: %v, after: %v", evalErr, errMsg) @@ -199,7 +195,7 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * result.SetEvalErr(evalErr) - if err := e.TurnResultRepo.SaveTurnRunLogs(persistCtx, []*entity.ExptTurnResultRunLog{clone}); err != nil { + if err := e.TurnResultRepo.SaveTurnRunLogs(ctx, []*entity.ExptTurnResultRunLog{clone}); err != nil { return err } @@ -279,11 +275,8 @@ func (e *ExptItemEvalCtxExecutor) buildExptTurnEvalCtx(ctx context.Context, turn } func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *entity.ExptItemEvalEvent, evalErr error) error { - persistCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), exptRunLogPersistTimeout) - defer cancel() - if evalErr != nil { - if retry, _ := e.evalErrNeedRetry(persistCtx, event, evalErr); retry { + if retry, _ := e.evalErrNeedRetry(ctx, event, evalErr); retry { return evalErr } } @@ -299,11 +292,11 @@ func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *en ufields["status"] = int32(entity.ItemRunState_Success) } - if err := e.ItemResultRepo.UpdateItemRunLog(persistCtx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { + if err := e.ItemResultRepo.UpdateItemRunLog(ctx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { return err } - if e.evalErrNeedTerminateExpt(persistCtx, event.SpaceID, evalErr) { + if e.evalErrNeedTerminateExpt(ctx, event.SpaceID, evalErr) { logs.CtxWarn(ctx, "[ExptTurnEval] found error which should terminate expt, expt_id: %v, expt_run_id: %v, item_id: %v, err: %v", event.ExptID, event.ExptRunID, event.EvalSetItemID, evalErr) return evalErr } diff --git a/backend/modules/evaluation/domain/service/target_impl.go b/backend/modules/evaluation/domain/service/target_impl.go index 29f8dcdc1..a4adbdfd2 100644 --- a/backend/modules/evaluation/domain/service/target_impl.go +++ b/backend/modules/evaluation/domain/service/target_impl.go @@ -43,8 +43,6 @@ type EvalTargetServiceImpl struct { configer component.IConfiger } -const evalTargetRecordPersistTimeout = 5 * time.Second - func NewEvalTargetServiceImpl(evalTargetRepo repo.IEvalTargetRepo, idgen idgen.IIDGenerator, metric metrics.EvalTargetMetrics, @@ -356,15 +354,12 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ } } - recordCtx, recordCancel := context.WithTimeout(context.WithoutCancel(ctx), evalTargetRecordPersistTimeout) - defer recordCancel() - - recordID, err1 := e.idgen.GenID(recordCtx) + recordID, err1 := e.idgen.GenID(ctx) if err1 != nil { err = err1 return } - logID := logs.GetLogID(recordCtx) + logID := logs.GetLogID(ctx) record = &entity.EvalTargetRecord{ ID: recordID, @@ -390,9 +385,9 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ UpdatedAt: gptr.Of(time.Now().UnixMilli()), }, } - e.convEvalTargetRunErr(recordCtx, record) + e.convEvalTargetRunErr(ctx, record) - _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(recordCtx, record, nil) + _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(ctx, record, nil) if errCreate != nil { return } From a729c3460af9421ea1c556ceef72c78fb950a613 Mon Sep 17 00:00:00 2001 From: "caijialin.626" Date: Mon, 25 May 2026 19:01:07 +0800 Subject: [PATCH 07/10] test: log detached persistence context for timeout repro --- .../domain/service/expt_run_item_impl.go | 23 +++++++++++++++---- .../evaluation/domain/service/target_impl.go | 20 ++++++++++++---- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/backend/modules/evaluation/domain/service/expt_run_item_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_impl.go index 8b410a89b..6683c4a15 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_impl.go @@ -69,6 +69,8 @@ type ExptItemEvalCtxExecutor struct { evalSetItemSvc EvaluationSetItemService } +const exptRunLogPersistTimeout = 5 * time.Second + func (e *ExptItemEvalCtxExecutor) Eval(ctx context.Context, eiec *entity.ExptItemEvalCtx) error { event := eiec.Event @@ -130,6 +132,8 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * if result == nil { return fmt.Errorf("StoreTurnRunResult with nil result") } + persistCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), exptRunLogPersistTimeout) + defer cancel() turn := etec.Turn turnResultLog := etec.GetExistTurnResultLogs()[turn.ID] @@ -173,7 +177,7 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * if se, ok := errorx.FromStatusError(evalErr); ok && (se.Code() == errno.CustomEvalTargetInvokeFailCode || se.Code() == errno.CustomRPCEvaluatorRunFailedCode) { errMsg = errorx.ErrorWithoutStack(evalErr) } else { - errMsg = e.Configer.GetErrCtrl(ctx).ConvertErrMsg(evalErr.Error()) + errMsg = e.Configer.GetErrCtrl(persistCtx).ConvertErrMsg(evalErr.Error()) } logs.CtxWarn(ctx, "[ExptTurnEval] store turn run err, before: %v, after: %v", evalErr, errMsg) @@ -195,7 +199,10 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * result.SetEvalErr(evalErr) - if err := e.TurnResultRepo.SaveTurnRunLogs(ctx, []*entity.ExptTurnResultRunLog{clone}); err != nil { + logs.CtxInfo(persistCtx, "[CozeVideoTimeoutRecordE2E] persist turn run log with detached ctx, expt_id: %v, expt_run_id: %v, item_id: %v, turn_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", + etec.Expt.ID, etec.Event.ExptRunID, etec.EvalSetItem.ItemID, turn.ID, clone.Status, ctx.Err(), persistCtx.Err()) + + if err := e.TurnResultRepo.SaveTurnRunLogs(persistCtx, []*entity.ExptTurnResultRunLog{clone}); err != nil { return err } @@ -275,8 +282,11 @@ func (e *ExptItemEvalCtxExecutor) buildExptTurnEvalCtx(ctx context.Context, turn } func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *entity.ExptItemEvalEvent, evalErr error) error { + persistCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), exptRunLogPersistTimeout) + defer cancel() + if evalErr != nil { - if retry, _ := e.evalErrNeedRetry(ctx, event, evalErr); retry { + if retry, _ := e.evalErrNeedRetry(persistCtx, event, evalErr); retry { return evalErr } } @@ -292,11 +302,14 @@ func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *en ufields["status"] = int32(entity.ItemRunState_Success) } - if err := e.ItemResultRepo.UpdateItemRunLog(ctx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { + logs.CtxInfo(persistCtx, "[CozeVideoTimeoutRecordE2E] persist item run log with detached ctx, expt_id: %v, expt_run_id: %v, item_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", + event.ExptID, event.ExptRunID, event.EvalSetItemID, ufields["status"], ctx.Err(), persistCtx.Err()) + + if err := e.ItemResultRepo.UpdateItemRunLog(persistCtx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { return err } - if e.evalErrNeedTerminateExpt(ctx, event.SpaceID, evalErr) { + if e.evalErrNeedTerminateExpt(persistCtx, event.SpaceID, evalErr) { logs.CtxWarn(ctx, "[ExptTurnEval] found error which should terminate expt, expt_id: %v, expt_run_id: %v, item_id: %v, err: %v", event.ExptID, event.ExptRunID, event.EvalSetItemID, evalErr) return evalErr } diff --git a/backend/modules/evaluation/domain/service/target_impl.go b/backend/modules/evaluation/domain/service/target_impl.go index a4adbdfd2..3075d80f9 100644 --- a/backend/modules/evaluation/domain/service/target_impl.go +++ b/backend/modules/evaluation/domain/service/target_impl.go @@ -43,6 +43,8 @@ type EvalTargetServiceImpl struct { configer component.IConfiger } +const evalTargetRecordPersistTimeout = 5 * time.Second + func NewEvalTargetServiceImpl(evalTargetRepo repo.IEvalTargetRepo, idgen idgen.IIDGenerator, metric metrics.EvalTargetMetrics, @@ -354,12 +356,18 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ } } - recordID, err1 := e.idgen.GenID(ctx) + recordCtx, recordCancel := context.WithTimeout(context.WithoutCancel(ctx), evalTargetRecordPersistTimeout) + defer recordCancel() + + recordID, err1 := e.idgen.GenID(recordCtx) if err1 != nil { err = err1 return } - logID := logs.GetLogID(ctx) + logID := logs.GetLogID(recordCtx) + + logs.CtxInfo(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record with detached ctx, expt_id: %v, item_id: %v, target_id: %v, source_target_id: %v, parent_ctx_err: %v, persist_ctx_err: %v, log_id: %v, trace_id: %v", + gptr.Indirect(param.ExperimentID), param.ItemID, targetID, evalTargetDO.SourceTargetID, ctx.Err(), recordCtx.Err(), logID, span.GetTraceID()) record = &entity.EvalTargetRecord{ ID: recordID, @@ -385,12 +393,16 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ UpdatedAt: gptr.Of(time.Now().UnixMilli()), }, } - e.convEvalTargetRunErr(ctx, record) + e.convEvalTargetRunErr(recordCtx, record) - _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(ctx, record, nil) + _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(recordCtx, record, nil) if errCreate != nil { + logs.CtxError(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record failed, expt_id: %v, item_id: %v, target_record_id: %v, parent_ctx_err: %v, persist_ctx_err: %v, err: %v", + gptr.Indirect(param.ExperimentID), param.ItemID, record.ID, ctx.Err(), recordCtx.Err(), errCreate) return } + logs.CtxInfo(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record success, expt_id: %v, item_id: %v, target_record_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", + gptr.Indirect(param.ExperimentID), param.ItemID, record.ID, runStatus, ctx.Err(), recordCtx.Err()) err = nil }() From 73355649da7dea7febcd3a6b23ac865bf1733307 Mon Sep 17 00:00:00 2001 From: "caijialin.626" Date: Mon, 25 May 2026 19:10:37 +0800 Subject: [PATCH 08/10] test: repro cancellation after pre-eval without persistence fix --- .../service/expt_run_item_event_impl.go | 11 +++++---- .../domain/service/expt_run_item_impl.go | 23 ++++--------------- .../evaluation/domain/service/target_impl.go | 20 ++++------------ 3 files changed, 15 insertions(+), 39 deletions(-) diff --git a/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go index 1db2ad46b..93e95dd38 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go @@ -250,11 +250,6 @@ func (e *ExptItemEventEvalServiceImpl) eval(ctx context.Context, event *entity.E } ctx = e.WithCtx(ctx, eiec) - if timeoutMS, ok := cozeVideoTimeoutRecordReproTimeoutMS(eiec); ok { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(timeoutMS)*time.Millisecond) - defer cancel() - } mode, err := NewRecordEvalMode( eiec.Event, @@ -274,6 +269,12 @@ func (e *ExptItemEventEvalServiceImpl) eval(ctx context.Context, event *entity.E return err } + if timeoutMS, ok := cozeVideoTimeoutRecordReproTimeoutMS(eiec); ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(timeoutMS)*time.Millisecond) + defer cancel() + } + if err := NewExptItemEvaluation(e.exptTurnResultRepo, e.exptItemResultRepo, e.configer, e.metric, e.evaTargetService, e.evaluatorRecordService, e.evaluatorService, e.benefitService, e.evalAsyncRepo, e.evaluationSetItemService). Eval(ctx, eiec); err != nil { return err diff --git a/backend/modules/evaluation/domain/service/expt_run_item_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_impl.go index 6683c4a15..8b410a89b 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_impl.go @@ -69,8 +69,6 @@ type ExptItemEvalCtxExecutor struct { evalSetItemSvc EvaluationSetItemService } -const exptRunLogPersistTimeout = 5 * time.Second - func (e *ExptItemEvalCtxExecutor) Eval(ctx context.Context, eiec *entity.ExptItemEvalCtx) error { event := eiec.Event @@ -132,8 +130,6 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * if result == nil { return fmt.Errorf("StoreTurnRunResult with nil result") } - persistCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), exptRunLogPersistTimeout) - defer cancel() turn := etec.Turn turnResultLog := etec.GetExistTurnResultLogs()[turn.ID] @@ -177,7 +173,7 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * if se, ok := errorx.FromStatusError(evalErr); ok && (se.Code() == errno.CustomEvalTargetInvokeFailCode || se.Code() == errno.CustomRPCEvaluatorRunFailedCode) { errMsg = errorx.ErrorWithoutStack(evalErr) } else { - errMsg = e.Configer.GetErrCtrl(persistCtx).ConvertErrMsg(evalErr.Error()) + errMsg = e.Configer.GetErrCtrl(ctx).ConvertErrMsg(evalErr.Error()) } logs.CtxWarn(ctx, "[ExptTurnEval] store turn run err, before: %v, after: %v", evalErr, errMsg) @@ -199,10 +195,7 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * result.SetEvalErr(evalErr) - logs.CtxInfo(persistCtx, "[CozeVideoTimeoutRecordE2E] persist turn run log with detached ctx, expt_id: %v, expt_run_id: %v, item_id: %v, turn_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", - etec.Expt.ID, etec.Event.ExptRunID, etec.EvalSetItem.ItemID, turn.ID, clone.Status, ctx.Err(), persistCtx.Err()) - - if err := e.TurnResultRepo.SaveTurnRunLogs(persistCtx, []*entity.ExptTurnResultRunLog{clone}); err != nil { + if err := e.TurnResultRepo.SaveTurnRunLogs(ctx, []*entity.ExptTurnResultRunLog{clone}); err != nil { return err } @@ -282,11 +275,8 @@ func (e *ExptItemEvalCtxExecutor) buildExptTurnEvalCtx(ctx context.Context, turn } func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *entity.ExptItemEvalEvent, evalErr error) error { - persistCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), exptRunLogPersistTimeout) - defer cancel() - if evalErr != nil { - if retry, _ := e.evalErrNeedRetry(persistCtx, event, evalErr); retry { + if retry, _ := e.evalErrNeedRetry(ctx, event, evalErr); retry { return evalErr } } @@ -302,14 +292,11 @@ func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *en ufields["status"] = int32(entity.ItemRunState_Success) } - logs.CtxInfo(persistCtx, "[CozeVideoTimeoutRecordE2E] persist item run log with detached ctx, expt_id: %v, expt_run_id: %v, item_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", - event.ExptID, event.ExptRunID, event.EvalSetItemID, ufields["status"], ctx.Err(), persistCtx.Err()) - - if err := e.ItemResultRepo.UpdateItemRunLog(persistCtx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { + if err := e.ItemResultRepo.UpdateItemRunLog(ctx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { return err } - if e.evalErrNeedTerminateExpt(persistCtx, event.SpaceID, evalErr) { + if e.evalErrNeedTerminateExpt(ctx, event.SpaceID, evalErr) { logs.CtxWarn(ctx, "[ExptTurnEval] found error which should terminate expt, expt_id: %v, expt_run_id: %v, item_id: %v, err: %v", event.ExptID, event.ExptRunID, event.EvalSetItemID, evalErr) return evalErr } diff --git a/backend/modules/evaluation/domain/service/target_impl.go b/backend/modules/evaluation/domain/service/target_impl.go index 3075d80f9..a4adbdfd2 100644 --- a/backend/modules/evaluation/domain/service/target_impl.go +++ b/backend/modules/evaluation/domain/service/target_impl.go @@ -43,8 +43,6 @@ type EvalTargetServiceImpl struct { configer component.IConfiger } -const evalTargetRecordPersistTimeout = 5 * time.Second - func NewEvalTargetServiceImpl(evalTargetRepo repo.IEvalTargetRepo, idgen idgen.IIDGenerator, metric metrics.EvalTargetMetrics, @@ -356,18 +354,12 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ } } - recordCtx, recordCancel := context.WithTimeout(context.WithoutCancel(ctx), evalTargetRecordPersistTimeout) - defer recordCancel() - - recordID, err1 := e.idgen.GenID(recordCtx) + recordID, err1 := e.idgen.GenID(ctx) if err1 != nil { err = err1 return } - logID := logs.GetLogID(recordCtx) - - logs.CtxInfo(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record with detached ctx, expt_id: %v, item_id: %v, target_id: %v, source_target_id: %v, parent_ctx_err: %v, persist_ctx_err: %v, log_id: %v, trace_id: %v", - gptr.Indirect(param.ExperimentID), param.ItemID, targetID, evalTargetDO.SourceTargetID, ctx.Err(), recordCtx.Err(), logID, span.GetTraceID()) + logID := logs.GetLogID(ctx) record = &entity.EvalTargetRecord{ ID: recordID, @@ -393,16 +385,12 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ UpdatedAt: gptr.Of(time.Now().UnixMilli()), }, } - e.convEvalTargetRunErr(recordCtx, record) + e.convEvalTargetRunErr(ctx, record) - _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(recordCtx, record, nil) + _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(ctx, record, nil) if errCreate != nil { - logs.CtxError(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record failed, expt_id: %v, item_id: %v, target_record_id: %v, parent_ctx_err: %v, persist_ctx_err: %v, err: %v", - gptr.Indirect(param.ExperimentID), param.ItemID, record.ID, ctx.Err(), recordCtx.Err(), errCreate) return } - logs.CtxInfo(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record success, expt_id: %v, item_id: %v, target_record_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", - gptr.Indirect(param.ExperimentID), param.ItemID, record.ID, runStatus, ctx.Err(), recordCtx.Err()) err = nil }() From 0f105ae2c4d1353935cab5b2caf71bf1c5e0b0e4 Mon Sep 17 00:00:00 2001 From: "caijialin.626" Date: Mon, 25 May 2026 19:20:04 +0800 Subject: [PATCH 09/10] test: log detached persistence after post-preeval timeout --- .../domain/service/expt_run_item_impl.go | 23 +++++++++++++++---- .../evaluation/domain/service/target_impl.go | 20 ++++++++++++---- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/backend/modules/evaluation/domain/service/expt_run_item_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_impl.go index 8b410a89b..6683c4a15 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_impl.go @@ -69,6 +69,8 @@ type ExptItemEvalCtxExecutor struct { evalSetItemSvc EvaluationSetItemService } +const exptRunLogPersistTimeout = 5 * time.Second + func (e *ExptItemEvalCtxExecutor) Eval(ctx context.Context, eiec *entity.ExptItemEvalCtx) error { event := eiec.Event @@ -130,6 +132,8 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * if result == nil { return fmt.Errorf("StoreTurnRunResult with nil result") } + persistCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), exptRunLogPersistTimeout) + defer cancel() turn := etec.Turn turnResultLog := etec.GetExistTurnResultLogs()[turn.ID] @@ -173,7 +177,7 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * if se, ok := errorx.FromStatusError(evalErr); ok && (se.Code() == errno.CustomEvalTargetInvokeFailCode || se.Code() == errno.CustomRPCEvaluatorRunFailedCode) { errMsg = errorx.ErrorWithoutStack(evalErr) } else { - errMsg = e.Configer.GetErrCtrl(ctx).ConvertErrMsg(evalErr.Error()) + errMsg = e.Configer.GetErrCtrl(persistCtx).ConvertErrMsg(evalErr.Error()) } logs.CtxWarn(ctx, "[ExptTurnEval] store turn run err, before: %v, after: %v", evalErr, errMsg) @@ -195,7 +199,10 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * result.SetEvalErr(evalErr) - if err := e.TurnResultRepo.SaveTurnRunLogs(ctx, []*entity.ExptTurnResultRunLog{clone}); err != nil { + logs.CtxInfo(persistCtx, "[CozeVideoTimeoutRecordE2E] persist turn run log with detached ctx, expt_id: %v, expt_run_id: %v, item_id: %v, turn_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", + etec.Expt.ID, etec.Event.ExptRunID, etec.EvalSetItem.ItemID, turn.ID, clone.Status, ctx.Err(), persistCtx.Err()) + + if err := e.TurnResultRepo.SaveTurnRunLogs(persistCtx, []*entity.ExptTurnResultRunLog{clone}); err != nil { return err } @@ -275,8 +282,11 @@ func (e *ExptItemEvalCtxExecutor) buildExptTurnEvalCtx(ctx context.Context, turn } func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *entity.ExptItemEvalEvent, evalErr error) error { + persistCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), exptRunLogPersistTimeout) + defer cancel() + if evalErr != nil { - if retry, _ := e.evalErrNeedRetry(ctx, event, evalErr); retry { + if retry, _ := e.evalErrNeedRetry(persistCtx, event, evalErr); retry { return evalErr } } @@ -292,11 +302,14 @@ func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *en ufields["status"] = int32(entity.ItemRunState_Success) } - if err := e.ItemResultRepo.UpdateItemRunLog(ctx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { + logs.CtxInfo(persistCtx, "[CozeVideoTimeoutRecordE2E] persist item run log with detached ctx, expt_id: %v, expt_run_id: %v, item_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", + event.ExptID, event.ExptRunID, event.EvalSetItemID, ufields["status"], ctx.Err(), persistCtx.Err()) + + if err := e.ItemResultRepo.UpdateItemRunLog(persistCtx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { return err } - if e.evalErrNeedTerminateExpt(ctx, event.SpaceID, evalErr) { + if e.evalErrNeedTerminateExpt(persistCtx, event.SpaceID, evalErr) { logs.CtxWarn(ctx, "[ExptTurnEval] found error which should terminate expt, expt_id: %v, expt_run_id: %v, item_id: %v, err: %v", event.ExptID, event.ExptRunID, event.EvalSetItemID, evalErr) return evalErr } diff --git a/backend/modules/evaluation/domain/service/target_impl.go b/backend/modules/evaluation/domain/service/target_impl.go index a4adbdfd2..3075d80f9 100644 --- a/backend/modules/evaluation/domain/service/target_impl.go +++ b/backend/modules/evaluation/domain/service/target_impl.go @@ -43,6 +43,8 @@ type EvalTargetServiceImpl struct { configer component.IConfiger } +const evalTargetRecordPersistTimeout = 5 * time.Second + func NewEvalTargetServiceImpl(evalTargetRepo repo.IEvalTargetRepo, idgen idgen.IIDGenerator, metric metrics.EvalTargetMetrics, @@ -354,12 +356,18 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ } } - recordID, err1 := e.idgen.GenID(ctx) + recordCtx, recordCancel := context.WithTimeout(context.WithoutCancel(ctx), evalTargetRecordPersistTimeout) + defer recordCancel() + + recordID, err1 := e.idgen.GenID(recordCtx) if err1 != nil { err = err1 return } - logID := logs.GetLogID(ctx) + logID := logs.GetLogID(recordCtx) + + logs.CtxInfo(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record with detached ctx, expt_id: %v, item_id: %v, target_id: %v, source_target_id: %v, parent_ctx_err: %v, persist_ctx_err: %v, log_id: %v, trace_id: %v", + gptr.Indirect(param.ExperimentID), param.ItemID, targetID, evalTargetDO.SourceTargetID, ctx.Err(), recordCtx.Err(), logID, span.GetTraceID()) record = &entity.EvalTargetRecord{ ID: recordID, @@ -385,12 +393,16 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ UpdatedAt: gptr.Of(time.Now().UnixMilli()), }, } - e.convEvalTargetRunErr(ctx, record) + e.convEvalTargetRunErr(recordCtx, record) - _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(ctx, record, nil) + _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(recordCtx, record, nil) if errCreate != nil { + logs.CtxError(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record failed, expt_id: %v, item_id: %v, target_record_id: %v, parent_ctx_err: %v, persist_ctx_err: %v, err: %v", + gptr.Indirect(param.ExperimentID), param.ItemID, record.ID, ctx.Err(), recordCtx.Err(), errCreate) return } + logs.CtxInfo(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record success, expt_id: %v, item_id: %v, target_record_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", + gptr.Indirect(param.ExperimentID), param.ItemID, record.ID, runStatus, ctx.Err(), recordCtx.Err()) err = nil }() From 785879ae394fd602db931b309f0356a31af2789c Mon Sep 17 00:00:00 2001 From: "caijialin.626" Date: Mon, 25 May 2026 19:34:48 +0800 Subject: [PATCH 10/10] chore: remove timeout e2e hooks --- .../service/expt_run_item_event_impl.go | 26 ------------------- .../domain/service/expt_run_item_impl.go | 6 ----- .../evaluation/domain/service/target_impl.go | 7 ----- 3 files changed, 39 deletions(-) diff --git a/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go index 93e95dd38..5267538b5 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_event_impl.go @@ -269,12 +269,6 @@ func (e *ExptItemEventEvalServiceImpl) eval(ctx context.Context, event *entity.E return err } - if timeoutMS, ok := cozeVideoTimeoutRecordReproTimeoutMS(eiec); ok { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(timeoutMS)*time.Millisecond) - defer cancel() - } - if err := NewExptItemEvaluation(e.exptTurnResultRepo, e.exptItemResultRepo, e.configer, e.metric, e.evaTargetService, e.evaluatorRecordService, e.evaluatorService, e.benefitService, e.evalAsyncRepo, e.evaluationSetItemService). Eval(ctx, eiec); err != nil { return err @@ -287,26 +281,6 @@ func (e *ExptItemEventEvalServiceImpl) eval(ctx context.Context, event *entity.E return nil } -func cozeVideoTimeoutRecordReproTimeoutMS(eiec *entity.ExptItemEvalCtx) (int64, bool) { - if eiec == nil || eiec.Expt == nil || eiec.Expt.Target == nil || eiec.Expt.Target.EvalTargetVersion == nil { - return 0, false - } - - target := eiec.Expt.Target - customRPC := target.EvalTargetVersion.CustomRPCServer - if customRPC == nil || customRPC.Timeout == nil || *customRPC.Timeout <= 0 { - return 0, false - } - if target.SourceTargetID != "7590095306227520514" { - return 0, false - } - if gptr.Indirect(customRPC.ExecEnv) != "ppe_fornax_timeout_record_0525" { - return 0, false - } - - return *customRPC.Timeout, true -} - func (e *ExptItemEventEvalServiceImpl) WithCtx(ctx context.Context, eiec *entity.ExptItemEvalCtx) context.Context { return logs.SetLogID(ctx, eiec.GetRecordEvalLogID(ctx)) } diff --git a/backend/modules/evaluation/domain/service/expt_run_item_impl.go b/backend/modules/evaluation/domain/service/expt_run_item_impl.go index 6683c4a15..653f4b057 100644 --- a/backend/modules/evaluation/domain/service/expt_run_item_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_item_impl.go @@ -199,9 +199,6 @@ func (e *ExptItemEvalCtxExecutor) storeTurnRunResult(ctx context.Context, etec * result.SetEvalErr(evalErr) - logs.CtxInfo(persistCtx, "[CozeVideoTimeoutRecordE2E] persist turn run log with detached ctx, expt_id: %v, expt_run_id: %v, item_id: %v, turn_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", - etec.Expt.ID, etec.Event.ExptRunID, etec.EvalSetItem.ItemID, turn.ID, clone.Status, ctx.Err(), persistCtx.Err()) - if err := e.TurnResultRepo.SaveTurnRunLogs(persistCtx, []*entity.ExptTurnResultRunLog{clone}); err != nil { return err } @@ -302,9 +299,6 @@ func (e *ExptItemEvalCtxExecutor) CompleteItemRun(ctx context.Context, event *en ufields["status"] = int32(entity.ItemRunState_Success) } - logs.CtxInfo(persistCtx, "[CozeVideoTimeoutRecordE2E] persist item run log with detached ctx, expt_id: %v, expt_run_id: %v, item_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", - event.ExptID, event.ExptRunID, event.EvalSetItemID, ufields["status"], ctx.Err(), persistCtx.Err()) - if err := e.ItemResultRepo.UpdateItemRunLog(persistCtx, event.ExptID, event.ExptRunID, []int64{event.EvalSetItemID}, ufields, event.SpaceID); err != nil { return err } diff --git a/backend/modules/evaluation/domain/service/target_impl.go b/backend/modules/evaluation/domain/service/target_impl.go index 3075d80f9..29f8dcdc1 100644 --- a/backend/modules/evaluation/domain/service/target_impl.go +++ b/backend/modules/evaluation/domain/service/target_impl.go @@ -366,9 +366,6 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ } logID := logs.GetLogID(recordCtx) - logs.CtxInfo(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record with detached ctx, expt_id: %v, item_id: %v, target_id: %v, source_target_id: %v, parent_ctx_err: %v, persist_ctx_err: %v, log_id: %v, trace_id: %v", - gptr.Indirect(param.ExperimentID), param.ItemID, targetID, evalTargetDO.SourceTargetID, ctx.Err(), recordCtx.Err(), logID, span.GetTraceID()) - record = &entity.EvalTargetRecord{ ID: recordID, SpaceID: spaceID, @@ -397,12 +394,8 @@ func (e *EvalTargetServiceImpl) ExecuteTarget(ctx context.Context, spaceID, targ _, errCreate := e.evalTargetRepo.CreateEvalTargetRecord(recordCtx, record, nil) if errCreate != nil { - logs.CtxError(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record failed, expt_id: %v, item_id: %v, target_record_id: %v, parent_ctx_err: %v, persist_ctx_err: %v, err: %v", - gptr.Indirect(param.ExperimentID), param.ItemID, record.ID, ctx.Err(), recordCtx.Err(), errCreate) return } - logs.CtxInfo(recordCtx, "[CozeVideoTimeoutRecordE2E] persist eval target record success, expt_id: %v, item_id: %v, target_record_id: %v, status: %v, parent_ctx_err: %v, persist_ctx_err: %v", - gptr.Indirect(param.ExperimentID), param.ItemID, record.ID, runStatus, ctx.Err(), recordCtx.Err()) err = nil }()