Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions controller/getchangedtargets.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
cacheKey := common.GetComparedTargetsCachePath(request.GetFirstRevision().GetRemote(), treehash1, treehash2, request.GetRequestOptions())
cachedReader, cacheErr := storage.NewChangedTargetsReader(ctx, c.storage, cacheKey)
if cacheErr != nil && !storage.IsNotFound(cacheErr) {
c.logger.Warn("GetChangedTargets: Failed to read from cache, proceeding to compute", zap.Error(cacheErr))
logger.Warn("GetChangedTargets: Failed to read from cache, proceeding to compute", zap.Error(cacheErr))
} else if cachedReader != nil {
// Buffer all responses before sending any. A concurrent goroutine write may have
// left a partial blob in storage; buffering lets us detect corruption and fall
Expand All @@ -95,20 +95,20 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str

if readErr != nil {
// Blob is corrupt (likely an incomplete write). Log and fall through to recompute.
c.logger.Warn("GetChangedTargets: Cached result is incomplete, recomputing", zap.Error(readErr))
logger.Warn("GetChangedTargets: Cached result is incomplete, recomputing", zap.Error(readErr))
} else {
cacheReadDuration := time.Since(cacheStart)
c.logger.Info("GetChangedTargets: Cache hit, streaming from storage",
logger.Info("GetChangedTargets: Cache hit, streaming from storage",
zap.Duration("cache_read_duration", cacheReadDuration),
)
scope.Counter("cache_hit").Inc(1)
scope.Timer("cache_read_duration").Record(cacheReadDuration)
if sendErr := sendWithDistanceFilter(stream, cached, request.GetOutputConfig()); sendErr != nil {
c.logger.Error("GetChangedTargets: Failed to send cached response", zap.Error(sendErr))
logger.Error("GetChangedTargets: Failed to send cached response", zap.Error(sendErr))
return fmt.Errorf("failed to send cached response: %w", sendErr)
}
totalDuration := time.Since(start)
c.logger.Info("GetChangedTargets: Successfully streamed from cache",
logger.Info("GetChangedTargets: Successfully streamed from cache",
zap.Duration("total_duration", totalDuration),
)
scope.Timer("total_duration").Record(totalDuration)
Expand Down Expand Up @@ -202,7 +202,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
}

graphFetchDuration := time.Since(graphFetchStart)
c.logger.Info("GetChangedTargets: Both graphs fetched",
logger.Info("GetChangedTargets: Both graphs fetched",
zap.Duration("graph_fetch_duration", graphFetchDuration),
)
scope.Timer("graph_fetch_duration").Record(graphFetchDuration)
Expand Down Expand Up @@ -236,16 +236,16 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
jobs[1].graphStreamChunks = nil

compareStart := time.Now()
changedTargetsResponses, err := c.compareTargetGraphs(ctx, firstGraph, secondGraph, request.GetOutputConfig())
changedTargetsResponses, err := c.compareTargetGraphs(logger, firstGraph, secondGraph, request.GetOutputConfig())
// Allow GC of raw graph data while the caching goroutine runs.
firstGraph = nil
secondGraph = nil
if err != nil {
c.logger.Error("GetChangedTargets: Failed to compare target graphs", zap.Error(err))
logger.Error("GetChangedTargets: Failed to compare target graphs", zap.Error(err))
return fmt.Errorf("failed to compare target graphs: %w", err)
}
compareDuration := time.Since(compareStart)
c.logger.Info("GetChangedTargets: Target graphs compared",
logger.Info("GetChangedTargets: Target graphs compared",
zap.Duration("compare_duration", compareDuration),
)
scope.Timer("compare_duration").Record(compareDuration)
Expand All @@ -261,32 +261,32 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
if treehash1 != "" && treehash2 != "" {
cacheKey := common.GetComparedTargetsCachePath(request.GetFirstRevision().GetRemote(), treehash1, treehash2, request.GetRequestOptions())
if writeErr := storage.WriteChangedTargetsStream(cacheCtx, c.storage, cacheKey, changedTargetsResponses); writeErr != nil {
c.logger.Warn("GetChangedTargets: Failed to cache result", zap.Error(writeErr))
logger.Warn("GetChangedTargets: Failed to cache result", zap.Error(writeErr))
}
}
}()

sendStart := time.Now()
if err := sendWithDistanceFilter(stream, changedTargetsResponses, request.GetOutputConfig()); err != nil {
c.logger.Error("GetChangedTargets: Failed to send response", zap.Error(err))
logger.Error("GetChangedTargets: Failed to send response", zap.Error(err))
return fmt.Errorf("failed to send response: %w", err)
}
sendDuration := time.Since(sendStart)
scope.Timer("send_duration").Record(sendDuration)

totalDuration := time.Since(start)
c.logger.Info("GetChangedTargets: Successfully processed request",
logger.Info("GetChangedTargets: Successfully processed request",
zap.Duration("send_duration", sendDuration),
zap.Duration("total_duration", totalDuration),
)
scope.Timer("total_duration").Record(totalDuration)
return nil
}

func (c *controller) compareTargetGraphs(ctx context.Context, firstGraph, secondGraph []*pb.GetTargetGraphResponse, outputConfig *pb.OutputConfig) ([]*pb.GetChangedTargetsResponse, error) {
func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, outputConfig *pb.OutputConfig) ([]*pb.GetChangedTargetsResponse, error) {
start := time.Now()
scope := c.scope.SubScope("compare_target_graphs")
c.logger.Info("compareTargetGraphs: Computing differences between target graphs")
logger.Info("compareTargetGraphs: Computing differences between target graphs")

// 1) Extract targets and metadata; index by canonical names
indexStart := time.Now()
Expand Down Expand Up @@ -477,7 +477,7 @@ func (c *controller) compareTargetGraphs(ctx context.Context, firstGraph, second
})
}
totalDuration := time.Since(start)
c.logger.Info("compareTargetGraphs: Done",
logger.Info("compareTargetGraphs: Done",
zap.Duration("total_duration", totalDuration),
)
scope.Timer("total_duration").Record(totalDuration)
Expand Down
16 changes: 8 additions & 8 deletions controller/getchangedtargets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestCompareTargetGraphs(t *testing.T) {
},
}

response, err := c.compareTargetGraphs(context.Background(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, nil)
response, err := c.compareTargetGraphs(zap.NewNop(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, nil)
require.NoError(t, err)
require.NotNil(t, response)
}
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestCompareTargetGraphs_NewTarget_CanonicalIDs(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
require.NoError(t, err)
require.Len(t, res, 2)
cs := res[0].GetChangedTargets()
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestCompareTargetGraphs_SourceFileDirectAndPropagation(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestCompareTargetGraphs_IndirectWhenNoSourceDep(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -641,7 +641,7 @@ func TestCompareTargetGraphs_DirectWhenDependenciesChanged(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -722,7 +722,7 @@ func TestCompareTargetGraphs_DirectWhenAttributesChanged(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -802,7 +802,7 @@ func TestCompareTargetGraphs_DirectWhenNewAttributeAdded(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -1048,7 +1048,7 @@ func TestCompareTargetGraphs_IndirectWhenOnlyHashChanged(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down
32 changes: 16 additions & 16 deletions controller/getchangedtargetsandedges.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE
cacheKey := common.GetChangedTargetsAndEdgesCachePath(request.GetFirstRevision().GetRemote(), treehash1, treehash2, request.GetRequestOptions())
cachedReader, cacheErr := storage.NewChangedTargetsAndEdgesReader(ctx, c.storage, cacheKey)
if cacheErr != nil && !storage.IsNotFound(cacheErr) {
c.logger.Warn("GetChangedTargetsAndEdges: Failed to read from cache, proceeding to compute", zap.Error(cacheErr))
logger.Warn("GetChangedTargetsAndEdges: Failed to read from cache, proceeding to compute", zap.Error(cacheErr))
} else if cachedReader != nil {
var cached []*pb.GetChangedTargetsAndEdgesResponse
var readErr error
Expand All @@ -89,20 +89,20 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE
cachedReader.Close()

if readErr != nil {
c.logger.Warn("GetChangedTargetsAndEdges: Cached result is incomplete, recomputing", zap.Error(readErr))
logger.Warn("GetChangedTargetsAndEdges: Cached result is incomplete, recomputing", zap.Error(readErr))
} else {
cacheReadDuration := time.Since(cacheStart)
c.logger.Info("GetChangedTargetsAndEdges: Cache hit, streaming from storage",
logger.Info("GetChangedTargetsAndEdges: Cache hit, streaming from storage",
zap.Duration("cache_read_duration", cacheReadDuration),
)
scope.Counter("cache_hit").Inc(1)
scope.Timer("cache_read_duration").Record(cacheReadDuration)
if err := sendWithDistanceFilterForEdges(stream, cached, request.GetOutputConfig()); err != nil {
c.logger.Error("GetChangedTargetsAndEdges: Failed to send cached response", zap.Error(err))
logger.Error("GetChangedTargetsAndEdges: Failed to send cached response", zap.Error(err))
return err
}
totalDuration := time.Since(start)
c.logger.Info("GetChangedTargetsAndEdges: Successfully streamed from cache",
logger.Info("GetChangedTargetsAndEdges: Successfully streamed from cache",
zap.Duration("total_duration", totalDuration),
)
scope.Timer("total_duration").Record(totalDuration)
Expand Down Expand Up @@ -184,7 +184,7 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE
}

graphFetchDuration := time.Since(graphFetchStart)
c.logger.Info("GetChangedTargetsAndEdges: Both graphs fetched",
logger.Info("GetChangedTargetsAndEdges: Both graphs fetched",
zap.Duration("graph_fetch_duration", graphFetchDuration),
)
scope.Timer("graph_fetch_duration").Record(graphFetchDuration)
Expand Down Expand Up @@ -213,16 +213,16 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE
jobs[1].graphStreamChunks = nil

compareStart := time.Now()
responses, err := c.compareTargetGraphsAndEdges(ctx, firstGraph, secondGraph, request.GetOutputConfig())
responses, err := c.compareTargetGraphsAndEdges(logger, firstGraph, secondGraph, request.GetOutputConfig())
// Allow GC of raw graph data while the caching goroutine runs.
firstGraph = nil
secondGraph = nil
if err != nil {
c.logger.Error("GetChangedTargetsAndEdges: Failed to compare target graphs", zap.Error(err))
logger.Error("GetChangedTargetsAndEdges: Failed to compare target graphs", zap.Error(err))
return fmt.Errorf("failed to compare target graphs: %w", err)
}
compareDuration := time.Since(compareStart)
c.logger.Info("GetChangedTargetsAndEdges: Target graphs compared",
logger.Info("GetChangedTargetsAndEdges: Target graphs compared",
zap.Duration("compare_duration", compareDuration),
)
scope.Timer("compare_duration").Record(compareDuration)
Expand All @@ -235,32 +235,32 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE
if treehash1 != "" && treehash2 != "" {
cacheKey := common.GetChangedTargetsAndEdgesCachePath(request.GetFirstRevision().GetRemote(), treehash1, treehash2, request.GetRequestOptions())
if writeErr := storage.WriteChangedTargetsAndEdgesStream(cacheCtx, c.storage, cacheKey, responses); writeErr != nil {
c.logger.Warn("GetChangedTargetsAndEdges: Failed to cache result", zap.Error(writeErr))
logger.Warn("GetChangedTargetsAndEdges: Failed to cache result", zap.Error(writeErr))
}
}
}()

sendStart := time.Now()
if err := sendWithDistanceFilterForEdges(stream, responses, request.GetOutputConfig()); err != nil {
c.logger.Error("GetChangedTargetsAndEdges: Failed to send response", zap.Error(err))
logger.Error("GetChangedTargetsAndEdges: Failed to send response", zap.Error(err))
return err
}
sendDuration := time.Since(sendStart)
scope.Timer("send_duration").Record(sendDuration)

totalDuration := time.Since(start)
c.logger.Info("GetChangedTargetsAndEdges: Successfully processed request",
logger.Info("GetChangedTargetsAndEdges: Successfully processed request",
zap.Duration("send_duration", sendDuration),
zap.Duration("total_duration", totalDuration),
)
scope.Timer("total_duration").Record(totalDuration)
return nil
}

func (c *controller) compareTargetGraphsAndEdges(ctx context.Context, firstGraph, secondGraph []*pb.GetTargetGraphResponse, outputConfig *pb.OutputConfig) ([]*pb.GetChangedTargetsAndEdgesResponse, error) {
func (c *controller) compareTargetGraphsAndEdges(logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, outputConfig *pb.OutputConfig) ([]*pb.GetChangedTargetsAndEdgesResponse, error) {
start := time.Now()
scope := c.scope.SubScope("compare_target_graphs_and_edges")
c.logger.Info("compareTargetGraphsAndEdges: Computing differences between target graphs")
logger.Info("compareTargetGraphsAndEdges: Computing differences between target graphs")

// 1) Extract targets and metadata; index by canonical names.
firstTargetsByID, firstMetadata := getTargetsAndMetadata(firstGraph)
Expand Down Expand Up @@ -394,7 +394,7 @@ func (c *controller) compareTargetGraphsAndEdges(ctx context.Context, firstGraph

// 5) Compute BFS distances if requested.
if outputConfig.GetComputeDistances() {
computeDistances(c.logger, changedByName, secondByName, secondMetadata, outputConfig.GetMaxDistance())
computeDistances(logger, changedByName, secondByName, secondMetadata, outputConfig.GetMaxDistance())
}

// 6) Collect changed targets.
Expand Down Expand Up @@ -459,7 +459,7 @@ func (c *controller) compareTargetGraphsAndEdges(ctx context.Context, firstGraph
// 10) Build canonical metadata.

totalDuration := time.Since(start)
c.logger.Info("compareTargetGraphsAndEdges: Done",
logger.Info("compareTargetGraphsAndEdges: Done",
zap.Duration("total_duration", totalDuration),
)
scope.Timer("total_duration").Record(totalDuration)
Expand Down
18 changes: 9 additions & 9 deletions controller/getchangedtargetsandedges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestCompareTargetGraphsAndEdges_Empty(t *testing.T) {
}}
}

res, err := c.compareTargetGraphsAndEdges(context.Background(), emptyGraph(), emptyGraph(), nil)
res, err := c.compareTargetGraphsAndEdges(zap.NewNop(), emptyGraph(), emptyGraph(), nil)
require.NoError(t, err)
require.Len(t, res, 2)
cte, _ := collectCTEResponses(res)
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestCompareTargetGraphsAndEdges_AddedTarget(t *testing.T) {
},
}

res, err := c.compareTargetGraphsAndEdges(context.Background(), first, second, nil)
res, err := c.compareTargetGraphsAndEdges(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cte, meta := collectCTEResponses(res)

Expand Down Expand Up @@ -340,7 +340,7 @@ func TestCompareTargetGraphsAndEdges_RemovedTarget(t *testing.T) {
}},
}}

res, err := c.compareTargetGraphsAndEdges(context.Background(), first, second, nil)
res, err := c.compareTargetGraphsAndEdges(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cte, meta := collectCTEResponses(res)

Expand Down Expand Up @@ -395,7 +395,7 @@ func TestCompareTargetGraphsAndEdges_NewEdge(t *testing.T) {
},
}

res, err := c.compareTargetGraphsAndEdges(context.Background(), first, second, nil)
res, err := c.compareTargetGraphsAndEdges(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cte, meta := collectCTEResponses(res)

Expand Down Expand Up @@ -446,7 +446,7 @@ func TestCompareTargetGraphsAndEdges_RemovedEdge(t *testing.T) {
},
}

res, err := c.compareTargetGraphsAndEdges(context.Background(), first, second, nil)
res, err := c.compareTargetGraphsAndEdges(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cte, meta := collectCTEResponses(res)

Expand Down Expand Up @@ -499,7 +499,7 @@ func TestCompareTargetGraphsAndEdges_ChangedTargetClassification(t *testing.T) {
},
}

res, err := c.compareTargetGraphsAndEdges(context.Background(), first, second, nil)
res, err := c.compareTargetGraphsAndEdges(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cte, meta := collectCTEResponses(res)

Expand Down Expand Up @@ -546,7 +546,7 @@ func TestCompareTargetGraphsAndEdges_UnchangedTargetNotReturned(t *testing.T) {
},
}

res, err := c.compareTargetGraphsAndEdges(context.Background(), first, second, nil)
res, err := c.compareTargetGraphsAndEdges(zap.NewNop(), first, second, nil)
require.NoError(t, err)
cte, _ := collectCTEResponses(res)
assert.Empty(t, cte.GetChangedTargets())
Expand Down Expand Up @@ -580,7 +580,7 @@ func TestCompareTargetGraphsAndEdges_EdgePreservedWhenTargetUnchanged(t *testing
}
}

res, err := c.compareTargetGraphsAndEdges(context.Background(),
res, err := c.compareTargetGraphsAndEdges(zap.NewNop(),
mkGraph(1, 2, "hA", "hB"),
mkGraph(10, 20, "hA", "hB"),
nil,
Expand Down Expand Up @@ -632,7 +632,7 @@ func TestCompareTargetGraphsAndEdges_CanonicalIDs(t *testing.T) {
},
}

res, err := c.compareTargetGraphsAndEdges(context.Background(), first, second, nil)
res, err := c.compareTargetGraphsAndEdges(zap.NewNop(), first, second, nil)
require.NoError(t, err)

cte, meta := collectCTEResponses(res)
Expand Down
Loading
Loading