Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion backend/modules/evaluation/application/eval_openapi_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,12 @@ func (e *EvalOpenAPIApplication) ReportEvalTargetInvokeResult_(ctx context.Conte
return nil, err
}

logs.CtxInfo(ctx, "report target record, record_id: %v, space_id: %v, expt_id: %v, expt_run_id: %v, item_id: %v", req.GetInvokeID(), req.GetWorkspaceID(), actx.Event.GetExptID(), actx.Event.GetExptRunID(), actx.Event.GetEvalSetItemID())
if actx == nil {
logs.CtxWarn(ctx, "report target record, actx missing, invoke_id: %v, space_id: %v", req.GetInvokeID(), req.GetWorkspaceID())
return nil, errorx.New("eval async context not found, invoke_id: %v", req.GetInvokeID())
}

logs.CtxInfo(ctx, "report target record, record_id: %v, space_id: %v, expt_id: %v, expt_run_id: %v, item_id: %v", req.GetInvokeID(), req.GetWorkspaceID(), actx.Event.GetExptID(), actx.Event.GetExptRunID(), actx.Event.GetEvalSetItemID())
outputData := target.ToInvokeOutputDataDO(req)
outputData.TimeConsumingMS = gptr.Of(time.Now().UnixMilli() - actx.AsyncUnixMS)
if err := e.targetSvc.ReportInvokeRecords(ctx, &entity.ReportTargetRecordParam{
Expand Down Expand Up @@ -2433,6 +2437,10 @@ func (e *EvalOpenAPIApplication) ReportEvaluatorInvokeResult_(ctx context.Contex
if err != nil {
return nil, err
}
if actx == nil {
logs.CtxWarn(ctx, "report evaluator record, actx missing, invoke_id: %v, space_id: %v", req.GetInvokeID(), req.GetWorkspaceID())
return nil, errorx.New("eval async context not found, invoke_id: %v", req.GetInvokeID())
}

logs.CtxInfo(ctx, "report evaluator record, invoke_id: %v, evaluator_version_id: %v, space_id: %v, expt_id: %v, expt_run_id: %v",
req.GetInvokeID(), actx.EvaluatorVersionID, req.GetWorkspaceID(), actx.Event.GetExptID(), actx.Event.GetExptRunID())
Expand Down
21 changes: 21 additions & 0 deletions backend/modules/evaluation/application/eval_openapi_app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,14 @@ func TestEvalOpenAPIApplication_ReportEvalTargetInvokeResult(t *testing.T) {
},
wantErr: true,
},
{
name: "actx is nil",
req: repoErrorReq,
setup: func(t *testing.T, asyncRepo *repomocks.MockIEvalAsyncRepo, _ *servicemocks.MockIEvalTargetService, _ *eventmocks.MockExptEventPublisher, _ *configermocks.MockIConfiger) {
asyncRepo.EXPECT().GetEvalAsyncCtx(gomock.Any(), strconv.FormatInt(repoErrorReq.GetInvokeID(), 10)).Return(nil, nil)
},
wantErr: true,
},
{
name: "report invoke records returns error",
req: reportErrorReq,
Expand Down Expand Up @@ -5586,6 +5594,19 @@ func TestEvalOpenAPIApplication_ReportEvaluatorInvokeResult(t *testing.T) {
},
wantErr: -1,
},
{
name: "actx is nil",
req: &openapi.ReportEvaluatorInvokeResultRequest{
WorkspaceID: gptr.Of(workspaceID),
InvokeID: gptr.Of(invokeID),
Status: gptr.Of(spi.InvokeEvaluatorRunStatus_SUCCESS),
},
setup: func(auth *rpcmocks.MockIAuthProvider, asyncRepo *repomocks.MockIEvalAsyncRepo, _ *servicemocks.MockEvaluatorService, _ *eventmocks.MockExptEventPublisher) {
auth.EXPECT().Authorization(gomock.Any(), gomock.Any()).Return(nil)
asyncRepo.EXPECT().GetEvalAsyncCtx(gomock.Any(), "evaluator:2002").Return(nil, nil)
},
wantErr: -1,
},
{
name: "report service failed",
req: &openapi.ReportEvaluatorInvokeResultRequest{
Expand Down
18 changes: 13 additions & 5 deletions backend/modules/evaluation/application/experiment_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1550,11 +1550,15 @@ func (e *experimentApplication) InvokeExperiment(ctx context.Context, req *expt.
if err != nil {
return nil, err
}
err = e.resultSvc.UpsertExptTurnResultFilter(ctx, req.GetWorkspaceID(), req.GetExperimentID(), maps.ToSlice(idMap, func(k, v int64) int64 {
return v
}))
if err != nil {
return nil, err

// 当没有新增 item 时,跳过全量 UpsertFilter 避免无效的 O(N) CK 扫描和逐条 BMQ 发送
if len(idMap) > 0 {
err = e.resultSvc.UpsertExptTurnResultFilter(ctx, req.GetWorkspaceID(), req.GetExperimentID(), maps.ToSlice(idMap, func(k, v int64) int64 {
return v
}))
if err != nil {
return nil, err
}
}

return &expt.InvokeExperimentResponse{
Expand All @@ -1570,6 +1574,10 @@ func (e *experimentApplication) FinishExperiment(ctx context.Context, req *expt.

got, err := e.manager.Get(ctx, req.GetExperimentID(), req.GetWorkspaceID(), session)
if err != nil {
if se, ok := errorx.FromStatusError(err); ok && se.Code() == errno.ResourceNotFoundCode {
// 实验已删除,视为已完成,幂等返回
return &expt.FinishExperimentResponse{BaseResp: base.NewBaseResp()}, nil
}
return nil, err
}

Expand Down
58 changes: 58 additions & 0 deletions backend/modules/evaluation/application/experiment_app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4338,6 +4338,45 @@ func TestExperimentApplication_InvokeExperiment(t *testing.T) {
},
wantErr: false,
},
{
name: "success - empty idMap skips UpsertFilter",
req: &exptpb.InvokeExperimentRequest{
WorkspaceID: validSpaceID,
ExperimentID: gptr.Of(validExptID),
ExperimentRunID: gptr.Of(validExptRunID),
EvaluationSetID: validEvalSetID,
Items: validItems,
Session: &common.Session{UserID: gptr.Of(validUserID)},
SkipInvalidItems: gptr.Of(true),
AllowPartialAdd: gptr.Of(true),
},
mockSetup: func() {
mockManager.EXPECT().
Get(gomock.Any(), validExptID, validSpaceID, &entity.Session{UserID: strconv.FormatInt(validUserID, 10)}).
Return(validExpt, nil)

mockAuth.EXPECT().
AuthorizationWithoutSPI(gomock.Any(), gomock.Any()).
Return(nil)

// BatchCreateEvaluationSetItems returns empty idMap (dataset full)
mockEvalSetItemService.EXPECT().
BatchCreateEvaluationSetItems(gomock.Any(), gomock.Any()).
Return(map[int64]int64{}, nil, nil, nil)

// Invoke is still called with empty Items
mockManager.EXPECT().
Invoke(gomock.Any(), gomock.Any()).
Return(nil)

// UpsertExptTurnResultFilter should NOT be called
},
wantResp: &exptpb.InvokeExperimentResponse{
AddedItems: map[int64]int64{},
BaseResp: base.NewBaseResp(),
},
wantErr: false,
},
{
name: "error - experiment status not allowed",
req: &exptpb.InvokeExperimentRequest{
Expand Down Expand Up @@ -4491,6 +4530,25 @@ func TestExperimentApplication_FinishExperiment(t *testing.T) {
},
wantErr: false,
},
{
name: "success - experiment deleted",
req: &exptpb.FinishExperimentRequest{
WorkspaceID: gptr.Of(validSpaceID),
ExperimentID: gptr.Of(validExptID),
ExperimentRunID: gptr.Of(validExptRunID),
Session: &common.Session{UserID: gptr.Of(validUserID)},
},
mockSetup: func() {
// Mock Get experiment returns ResourceNotFound (soft-deleted)
mockManager.EXPECT().
Get(gomock.Any(), validExptID, validSpaceID, &entity.Session{UserID: strconv.FormatInt(validUserID, 10)}).
Return(nil, errorx.NewByCode(errno.ResourceNotFoundCode, errorx.WithExtraMsg("experiment not found")))
},
wantResp: &exptpb.FinishExperimentResponse{
BaseResp: base.NewBaseResp(),
},
wantErr: false,
},
{
name: "error - get experiment failed",
req: &exptpb.FinishExperimentRequest{
Expand Down
Loading