Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 87 additions & 14 deletions backend/modules/evaluation/domain/service/expt_result_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func (e ExptResultServiceImpl) MGetExperimentResult(ctx context.Context, param *
turnID int64
}
turnLogIDByItemTurn := make(map[itemTurnKeyNoRun]string)
targetResultIDByRunItemTurn := make(map[itemTurnKey]int64)

// 将基准页内 turn_result 里的 expt_run_id 拆分出来,批量拉取对应的 run-log
runID2ItemIDSet := make(map[int64]map[int64]struct{})
Expand Down Expand Up @@ -484,32 +485,42 @@ func (e ExptResultServiceImpl) MGetExperimentResult(ctx context.Context, param *
if trl == nil {
continue
}
if trl.LogID == "" {
continue
}
turnLogIDByRunItemTurn[itemTurnKey{
k := itemTurnKey{
exptRunID: exptRunID,
itemID: trl.ItemID,
turnID: trl.TurnID,
}] = trl.LogID
// 兜底:同一分页里同一 item/turn 若出现多个 run_id,只保留第一个非空 logid 用于对外展示
k := itemTurnKeyNoRun{itemID: trl.ItemID, turnID: trl.TurnID}
if turnLogIDByItemTurn[k] == "" {
turnLogIDByItemTurn[k] = trl.LogID
}
if trl.LogID != "" {
turnLogIDByRunItemTurn[k] = trl.LogID
// 兜底:同一分页里同一 item/turn 若出现多个 run_id,只保留第一个非空 logid 用于对外展示
noRunKey := itemTurnKeyNoRun{itemID: trl.ItemID, turnID: trl.TurnID}
if turnLogIDByItemTurn[noRunKey] == "" {
turnLogIDByItemTurn[noRunKey] = trl.LogID
}
}
if trl.TargetResultID > 0 {
targetResultIDByRunItemTurn[k] = trl.TargetResultID
}
}
}

// 确保每个 item 都能回填到一个 logid:按 turn_resultDAOs 的遍历顺序取该 item 的第一个非空 logid
// 确保每个 item 都能回填到一个 logid:按 turn_resultDAOs 的遍历顺序取该 item 的第一个非空 logid。
// 异步评测对象执行中会先把 target record id 写入 run-log,turn_result 可能要等终态才回写;
// 这里提前补齐 TargetResultID,保证 processing 阶段的结果接口也能返回 target_record/logid。
for _, tr := range turnResultDAOs {
if tr == nil {
continue
}
if itemID2LogID[tr.ItemID] != "" {
continue
key := itemTurnKey{exptRunID: tr.ExptRunID, itemID: tr.ItemID, turnID: tr.TurnID}
if itemID2LogID[tr.ItemID] == "" {
if logID, ok := turnLogIDByRunItemTurn[key]; ok && logID != "" {
itemID2LogID[tr.ItemID] = logID
}
}
if logID, ok := turnLogIDByRunItemTurn[itemTurnKey{exptRunID: tr.ExptRunID, itemID: tr.ItemID, turnID: tr.TurnID}]; ok && logID != "" {
itemID2LogID[tr.ItemID] = logID
if tr.TargetResultID == 0 {
if targetResultID := targetResultIDByRunItemTurn[key]; targetResultID > 0 {
tr.TargetResultID = targetResultID
}
}
}

Expand Down Expand Up @@ -1562,6 +1573,10 @@ func (e *ExptResultBuilder) build(ctx context.Context) error {
return nil
}

if err := e.fillProcessingTargetResultID(ctx); err != nil {
return err
}

// 由于turnID可能为0,以turn_result_id为行的唯一标识聚合数据,组装payload数据时再通过turn_result_id与item_id(单轮)或turn_id(多轮)映射进行组装
e.ItemIDTurnID2TurnResultID = make(map[int64]map[int64]int64) // itemID -> turnID -> turn_result_id
for _, turnResult := range e.turnResultDO {
Expand Down Expand Up @@ -1595,6 +1610,64 @@ func (e *ExptResultBuilder) build(ctx context.Context) error {
return nil
}

func (e *ExptResultBuilder) fillProcessingTargetResultID(ctx context.Context) error {
type runItemTurnKey struct {
exptRunID int64
itemID int64
turnID int64
}

runID2ItemIDSet := make(map[int64]map[int64]struct{})
for _, turnResult := range e.turnResultDO {
if turnResult == nil || turnResult.TargetResultID > 0 || turnResult.ExptRunID == 0 {
continue
}
if runID2ItemIDSet[turnResult.ExptRunID] == nil {
runID2ItemIDSet[turnResult.ExptRunID] = make(map[int64]struct{})
}
runID2ItemIDSet[turnResult.ExptRunID][turnResult.ItemID] = struct{}{}
}
if len(runID2ItemIDSet) == 0 {
return nil
}

targetResultIDByRunItemTurn := make(map[runItemTurnKey]int64)
for exptRunID, itemIDSet := range runID2ItemIDSet {
itemIDs := make([]int64, 0, len(itemIDSet))
for itemID := range itemIDSet {
itemIDs = append(itemIDs, itemID)
}
sort.Slice(itemIDs, func(i, j int) bool { return itemIDs[i] < itemIDs[j] })

turnRunLogs, err := e.ExptTurnResultRepo.MGetItemTurnRunLogs(ctx, e.ExptID, exptRunID, itemIDs, e.SpaceID)
if err != nil {
return err
}
for _, turnRunLog := range turnRunLogs {
if turnRunLog == nil || turnRunLog.TargetResultID == 0 {
continue
}
targetResultIDByRunItemTurn[runItemTurnKey{
exptRunID: exptRunID,
itemID: turnRunLog.ItemID,
turnID: turnRunLog.TurnID,
}] = turnRunLog.TargetResultID
}
}

for _, turnResult := range e.turnResultDO {
if turnResult == nil || turnResult.TargetResultID > 0 {
continue
}
key := runItemTurnKey{exptRunID: turnResult.ExptRunID, itemID: turnResult.ItemID, turnID: turnResult.TurnID}
if targetResultID := targetResultIDByRunItemTurn[key]; targetResultID > 0 {
turnResult.TargetResultID = targetResultID
}
}

return nil
}

func (e *ExptResultBuilder) buildEvaluatorResult(ctx context.Context) error {
turnResultIDs := make([]int64, 0)
for _, turnResult := range e.turnResultDO {
Expand Down
181 changes: 181 additions & 0 deletions backend/modules/evaluation/domain/service/expt_result_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,142 @@ func TestExptResultServiceImpl_MGetExperimentResult(t *testing.T) {
}
}

func TestExptResultServiceImpl_MGetExperimentResult_FillsProcessingTargetRecordFromRunLog(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

const (
spaceID int64 = 100
exptID int64 = 1
exptRunID int64 = 2
itemID int64 = 3
turnID int64 = 0
turnResultID int64 = 4
targetRecordID int64 = 5
)

mockExptTurnResultRepo := repoMocks.NewMockIExptTurnResultRepo(ctrl)
mockExperimentRepo := repoMocks.NewMockIExperimentRepo(ctrl)
mockExptItemResultRepo := repoMocks.NewMockIExptItemResultRepo(ctrl)
mockExptAnnotateRepo := repoMocks.NewMockIExptAnnotateRepo(ctrl)
mockEvalTargetService := svcMocks.NewMockIEvalTargetService(ctrl)
mockEvaluationSetVersionService := svcMocks.NewMockEvaluationSetVersionService(ctrl)
mockEvaluationSetItemService := svcMocks.NewMockEvaluationSetItemService(ctrl)
mockEvaluatorRecordService := svcMocks.NewMockEvaluatorRecordService(ctrl)
mockAnalysisService := svcMocks.NewMockIEvaluationAnalysisService(ctrl)
mockMetric := metricsMocks.NewMockExptMetric(ctrl)
mockLWT := lwtMocks.NewMockILatestWriteTracker(ctrl)

expt := &entity.Experiment{
ID: exptID,
SpaceID: spaceID,
EvalSetID: 10,
EvalSetVersionID: 11,
ExptType: entity.ExptType_Offline,
}

mockMetric.EXPECT().EmitGetExptResult(spaceID, false).Times(1)
mockLWT.EXPECT().CheckWriteFlagByID(gomock.Any(), platestwrite.ResourceTypeExperiment, exptID).Return(false).Times(1)
mockExperimentRepo.EXPECT().MGetByID(gomock.Any(), []int64{exptID}, spaceID).Return([]*entity.Experiment{expt}, nil).Times(1)
mockExperimentRepo.EXPECT().GetEvaluatorRefByExptIDs(gomock.Any(), []int64{exptID}, spaceID).Return(nil, nil).Times(1)
mockEvaluationSetVersionService.EXPECT().GetEvaluationSetVersion(gomock.Any(), spaceID, expt.EvalSetVersionID, gptr.Of(true)).Return(&entity.EvaluationSetVersion{
EvaluationSetSchema: &entity.EvaluationSetSchema{},
}, nil, nil).Times(1)
mockExptTurnResultRepo.EXPECT().
ListTurnResult(gomock.Any(), spaceID, exptID, gomock.Nil(), entity.Page{}, false).
Return([]*entity.ExptTurnResult{{
ID: turnResultID,
SpaceID: spaceID,
ExptID: exptID,
ExptRunID: exptRunID,
ItemID: itemID,
TurnID: turnID,
Status: int32(entity.TurnRunState_Processing),
}}, int64(1), nil).
Times(1)
mockExptItemResultRepo.EXPECT().BatchGet(gomock.Any(), spaceID, exptID, []int64{itemID}).Return([]*entity.ExptItemResult{{
SpaceID: spaceID,
ExptID: exptID,
ItemID: itemID,
Status: entity.ItemRunState_Processing,
}}, nil).Times(2)
mockExptItemResultRepo.EXPECT().MGetItemRunLog(gomock.Any(), exptID, exptRunID, []int64{itemID}, spaceID).Return(nil, nil).Times(1)
mockExptTurnResultRepo.EXPECT().MGetItemTurnRunLogs(gomock.Any(), exptID, exptRunID, []int64{itemID}, spaceID).Return([]*entity.ExptTurnResultRunLog{{
SpaceID: spaceID,
ExptID: exptID,
ExptRunID: exptRunID,
ItemID: itemID,
TurnID: turnID,
Status: entity.TurnRunState_Processing,
LogID: "target-logid",
TargetResultID: targetRecordID,
}}, nil).Times(1)
mockExperimentRepo.EXPECT().GetByID(gomock.Any(), exptID, spaceID).Return(expt, nil).Times(1)
mockExptTurnResultRepo.EXPECT().BatchGetTurnEvaluatorResultRef(gomock.Any(), spaceID, []int64{turnResultID}).Return(nil, nil).Times(1)
mockEvaluatorRecordService.EXPECT().BatchGetEvaluatorRecord(gomock.Any(), []int64{}, false, false).Return(nil, nil).Times(1)
mockEvaluationSetItemService.EXPECT().BatchGetEvaluationSetItems(gomock.Any(), &entity.BatchGetEvaluationSetItemsParam{
SpaceID: spaceID,
EvaluationSetID: expt.EvalSetID,
VersionID: gptr.Of(expt.EvalSetVersionID),
ItemIDs: []int64{itemID},
}).Return([]*entity.EvaluationSetItem{{
ItemID: itemID,
Turns: []*entity.Turn{{ID: turnID, ItemID: itemID}},
}}, nil).Times(1)
targetStatus := entity.EvalTargetRunStatusAsyncInvoking
mockEvalTargetService.EXPECT().BatchGetRecordByIDs(gomock.Any(), spaceID, []int64{targetRecordID}).Return([]*entity.EvalTargetRecord{{
ID: targetRecordID,
SpaceID: spaceID,
ExperimentRunID: exptRunID,
ItemID: itemID,
TurnID: turnID,
LogID: "target-logid",
TraceID: "target-trace",
Status: &targetStatus,
EvalTargetOutputData: &entity.EvalTargetOutputData{OutputFields: map[string]*entity.Content{}},
}}, nil).Times(1)
mockExptAnnotateRepo.EXPECT().BatchGetExptTurnResultTagRefs(gomock.Any(), []int64{exptID}, spaceID).Return(nil, nil).Times(1)
mockExptAnnotateRepo.EXPECT().GetExptTurnAnnotateRecordRefsByTurnResultIDs(gomock.Any(), spaceID, []int64{turnResultID}).Return(nil, nil).Times(1)
mockExptAnnotateRepo.EXPECT().GetAnnotateRecordsByIDs(gomock.Any(), spaceID, []int64{}).Return(nil, nil).Times(1)
mockAnalysisService.EXPECT().BatchGetAnalysisRecordByUniqueKeys(gomock.Any(), []string{"100_1_3_0"}).Return(map[string]*entity.AnalysisRecord{}, nil).Times(1)

svc := ExptResultServiceImpl{
ExptTurnResultRepo: mockExptTurnResultRepo,
ExperimentRepo: mockExperimentRepo,
Metric: mockMetric,
lwt: mockLWT,
ExptItemResultRepo: mockExptItemResultRepo,
evalTargetService: mockEvalTargetService,
evaluationSetVersionService: mockEvaluationSetVersionService,
evaluationSetItemService: mockEvaluationSetItemService,
evaluatorRecordService: mockEvaluatorRecordService,
ExptAnnotateRepo: mockExptAnnotateRepo,
analysisService: mockAnalysisService,
}

got, err := svc.MGetExperimentResult(context.Background(), &entity.MGetExperimentResultParam{
SpaceID: spaceID,
ExptIDs: []int64{exptID},
BaseExptID: gptr.Of(exptID),
Page: entity.Page{},
})

require.NoError(t, err)
require.Len(t, got.ItemResults, 1)
require.Len(t, got.ItemResults[0].TurnResults, 1)
require.Len(t, got.ItemResults[0].TurnResults[0].ExperimentResults, 1)
payload := got.ItemResults[0].TurnResults[0].ExperimentResults[0].Payload
require.NotNil(t, payload)
require.NotNil(t, payload.TargetOutput)
require.NotNil(t, payload.TargetOutput.EvalTargetRecord)
assert.Equal(t, targetRecordID, payload.TargetOutput.EvalTargetRecord.ID)
assert.Equal(t, "target-logid", payload.TargetOutput.EvalTargetRecord.LogID)
assert.Equal(t, "target-trace", payload.TargetOutput.EvalTargetRecord.TraceID)
require.NotNil(t, payload.SystemInfo)
require.NotNil(t, payload.SystemInfo.LogID)
assert.Equal(t, "target-logid", *payload.SystemInfo.LogID)
}

func TestExptResultServiceImpl_RecordItemRunLogs(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -4409,6 +4545,51 @@ func TestExptResultBuilder_buildTargetOutput(t *testing.T) {
}
}

func TestExptResultBuilder_fillProcessingTargetResultID(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

const (
spaceID = int64(100)
exptID = int64(200)
exptRunID = int64(300)
itemID = int64(400)
turnID = int64(500)
targetResultID = int64(600)
)

mockTurnResultRepo := repoMocks.NewMockIExptTurnResultRepo(ctrl)
mockTurnResultRepo.EXPECT().
MGetItemTurnRunLogs(gomock.Any(), exptID, exptRunID, []int64{itemID}, spaceID).
Return([]*entity.ExptTurnResultRunLog{
{
ExptRunID: exptRunID,
ItemID: itemID,
TurnID: turnID,
TargetResultID: targetResultID,
},
}, nil)

builder := &ExptResultBuilder{
ExptID: exptID,
SpaceID: spaceID,
ExptTurnResultRepo: mockTurnResultRepo,
turnResultDO: []*entity.ExptTurnResult{
{
ID: 1,
ExptRunID: exptRunID,
ItemID: itemID,
TurnID: turnID,
},
},
}

err := builder.fillProcessingTargetResultID(context.Background())

require.NoError(t, err)
assert.Equal(t, targetResultID, builder.turnResultDO[0].TargetResultID)
}

func TestExptResultBuilder_buildEvaluatorResult(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading