Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
yaml "github.com/goccy/go-yaml"
)

var _ RepositoryConfigProvider = (*Config)(nil)

// Config is the root configuration structure.
type Config struct {
Repository []RepositoryConfig `yaml:"repository"`
Expand Down
10 changes: 10 additions & 0 deletions config/repository_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,14 @@ type RepositoryConfig struct {
BazelCommand string `yaml:"bazel_command"`
QueryTimeout int64 `yaml:"query_timeout"` // in seconds
BazelExtraArgs []string `yaml:"bazel_extra_args"`
// MaxDistance is the server-side default BFS distance cap applied when the
// client does not set output_config.compute_distances. 0 means unset (no cap).
// Positive values enable distance trimming with the given limit.
MaxDistance int32 `yaml:"max_distance"`
}

// RepositoryConfigProvider looks up per-repository configuration by remote.
// Implementations may read from a local file, a remote config service, etc.
type RepositoryConfigProvider interface {
GetRepositoryConfig(remote string) (RepositoryConfig, bool)
}
1 change: 1 addition & 0 deletions controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
],
embed = [":controller"],
deps = [
"//config",
"//core/common",
"//core/storage",
"//core/storage/storagemock",
Expand Down
23 changes: 18 additions & 5 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (
// Params are the parameters for the controller.
type Params struct {
fx.In
Logger *zap.Logger
Storage storage.Storage
Orchestrator orchestrator.Orchestrator
Scope tally.Scope `optional:"true"`
ChunkConfig config.ChunkConfig `optional:"true"`
Logger *zap.Logger
Storage storage.Storage
Orchestrator orchestrator.Orchestrator
Scope tally.Scope `optional:"true"`
ChunkConfig config.ChunkConfig `optional:"true"`
RepoConfigProvider config.RepositoryConfigProvider `optional:"true"`
}

type controller struct {
Expand All @@ -43,6 +44,7 @@ type controller struct {
targetChunkSize int
changedTargetChunkSize int
metadataMapChunkSize int
repoConfigProvider config.RepositoryConfigProvider
}

// NewController creates a new controller.
Expand Down Expand Up @@ -71,5 +73,16 @@ func NewController(p Params) pb.TangoYARPCServer {
targetChunkSize: targetChunkSize,
changedTargetChunkSize: changedTargetChunkSize,
metadataMapChunkSize: metadataMapChunkSize,
repoConfigProvider: p.RepoConfigProvider,
}
}

// getRepoConfig returns the RepositoryConfig for the given remote, or a
// zero-value config when no provider is configured or the remote is unknown.
func (c *controller) getRepoConfig(remote string) config.RepositoryConfig {
if c.repoConfigProvider == nil {
return config.RepositoryConfig{}
}
repoConfig, _ := c.repoConfigProvider.GetRepositoryConfig(remote)
return repoConfig
}
27 changes: 20 additions & 7 deletions controller/distance_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,27 @@

package controller

import pb "github.com/uber/tango/tangopb"
import (
"github.com/uber/tango/config"
pb "github.com/uber/tango/tangopb"
)

// maxDistanceFromOutputConfig returns the BFS distance cap for filtering
// changed targets, or -1 when filtering is disabled. A non-negative value
// means only targets with 0 <= distance <= max should be kept.
func maxDistanceFromOutputConfig(cfg *pb.OutputConfig) int32 {
if cfg.GetComputeDistances() {
return cfg.GetMaxDistance()
// resolveMaxDistance returns the effective BFS distance cap for filtering, or
// -1 when no filtering should be applied.
//
// Priority (highest first):
// 1. outputConfig.max_distance > 0 → use the client's explicit limit.
// 2. repoConfig.MaxDistance > 0 → server-side default.
// 3. Neither set → -1 (no distance filtering).
//
// Note: outputConfig.compute_distances controls whether the distance field is
// populated in the response and is independent of filtering.
func resolveMaxDistance(repoConfig config.RepositoryConfig, outputConfig *pb.OutputConfig) int32 {
if outputConfig.GetMaxDistance() > 0 {
return outputConfig.GetMaxDistance()
}
if repoConfig.MaxDistance > 0 {
return repoConfig.MaxDistance
}
return -1
}
Expand Down
54 changes: 44 additions & 10 deletions controller/distance_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,58 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/uber/tango/config"
pb "github.com/uber/tango/tangopb"
)

func TestMaxDistanceFromOutputConfig(t *testing.T) {
func TestResolveMaxDistance(t *testing.T) {
tests := []struct {
name string
cfg *pb.OutputConfig
want int32
name string
repoCfg config.RepositoryConfig
outputCfg *pb.OutputConfig
want int32
}{
{"nil config disables filter", nil, -1},
{"compute_distances unset disables filter", &pb.OutputConfig{}, -1},
{"compute_distances unset ignores max_distance", &pb.OutputConfig{MaxDistance: 5}, -1},
{"compute_distances true returns max", &pb.OutputConfig{ComputeDistances: true, MaxDistance: 5}, 5},
{"compute_distances true returns 0", &pb.OutputConfig{ComputeDistances: true, MaxDistance: 0}, 0},
{
name: "neither set: no filtering",
repoCfg: config.RepositoryConfig{},
want: -1,
},
{
name: "repo config default applied",
repoCfg: config.RepositoryConfig{MaxDistance: 3},
want: 3,
},
{
name: "client max_distance overrides repo config",
repoCfg: config.RepositoryConfig{MaxDistance: 3},
outputCfg: &pb.OutputConfig{MaxDistance: 5},
want: 5,
},
{
name: "client max_distance=0 treated as unset, repo config applies",
repoCfg: config.RepositoryConfig{MaxDistance: 3},
outputCfg: &pb.OutputConfig{MaxDistance: 0},
want: 3,
},
{
name: "client max_distance set, no repo config",
outputCfg: &pb.OutputConfig{MaxDistance: 2},
want: 2,
},
{
name: "repo config=0 means unset: no filtering",
repoCfg: config.RepositoryConfig{MaxDistance: 0},
want: -1,
},
{
name: "compute_distances alone does not enable filtering",
outputCfg: &pb.OutputConfig{ComputeDistances: true},
want: -1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, maxDistanceFromOutputConfig(tt.cfg))
assert.Equal(t, tt.want, resolveMaxDistance(tt.repoCfg, tt.outputCfg))
})
}
}
Expand Down
34 changes: 17 additions & 17 deletions controller/getchangedtargets.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str

logger.Info("GetChangedTargets: Processing request")

maxDist := resolveMaxDistance(c.getRepoConfig(request.GetFirstRevision().GetRemote()), request.GetOutputConfig())

// Try to serve from cache first using the stored treehashes for both revisions.
// readTreehash returns "" on any miss/error so we silently skip the cache when
// either treehash is not yet available.
Expand Down Expand Up @@ -103,7 +105,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
)
scope.Counter("cache_hit").Inc(1)
scope.Timer("cache_read_duration").Record(cacheReadDuration)
if sendErr := sendWithDistanceFilter(stream, cached, request.GetOutputConfig()); sendErr != nil {
if sendErr := sendWithDistanceFilter(stream, cached, maxDist); sendErr != nil {
logger.Error("GetChangedTargets: Failed to send cached response", zap.Error(sendErr))
return fmt.Errorf("failed to send cached response: %w", sendErr)
}
Expand Down Expand Up @@ -236,7 +238,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
jobs[1].graphStreamChunks = nil

compareStart := time.Now()
changedTargetsResponses, err := c.compareTargetGraphs(logger, firstGraph, secondGraph, request.GetOutputConfig())
changedTargetsResponses, err := c.compareTargetGraphs(logger, firstGraph, secondGraph, maxDist, request.GetOutputConfig().GetComputeDistances())
// Allow GC of raw graph data while the caching goroutine runs.
firstGraph = nil
secondGraph = nil
Expand Down Expand Up @@ -267,7 +269,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
}()

sendStart := time.Now()
if err := sendWithDistanceFilter(stream, changedTargetsResponses, request.GetOutputConfig()); err != nil {
if err := sendWithDistanceFilter(stream, changedTargetsResponses, maxDist); err != nil {
logger.Error("GetChangedTargets: Failed to send response", zap.Error(err))
return fmt.Errorf("failed to send response: %w", err)
}
Expand All @@ -283,7 +285,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
return nil
}

func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, outputConfig *pb.OutputConfig) ([]*pb.GetChangedTargetsResponse, error) {
func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, maxDist int32, outputDistances bool) ([]*pb.GetChangedTargetsResponse, error) {
start := time.Now()
scope := c.scope.SubScope("compare_target_graphs")
logger.Info("compareTargetGraphs: Computing differences between target graphs")
Expand Down Expand Up @@ -338,7 +340,7 @@ func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondG
secondMetadata.GetAttributeStringValueMapping(),
getTargetId, getRuleTypeId, getTagId, getAttrNameId, getAttrValId,
),
Distance: getDefaultDistance(outputConfig, true),
Distance: getDefaultDistance(maxDist, outputDistances, true),
}
continue
}
Expand Down Expand Up @@ -382,7 +384,7 @@ func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondG
ChangeType: initial,
OldTarget: oldTarget,
NewTarget: newTarget,
Distance: getDefaultDistance(outputConfig, false),
Distance: getDefaultDistance(maxDist, outputDistances, false),
}
}
diffScanDuration := time.Since(diffScanStart)
Expand Down Expand Up @@ -425,11 +427,10 @@ func (c *controller) compareTargetGraphs(logger *zap.Logger, firstGraph, secondG
classifyDuration := time.Since(classifyStart)
scope.Timer("classify_duration").Record(classifyDuration)

// Compute BFS distances from CHANGE_TYPE_DIRECT targets through the dependency graph.
// max_distance is only considered when compute_distances is true.
if outputConfig.GetComputeDistances() {
// Compute BFS distances when filtering is active or the client requested distance output.
if maxDist >= 0 || outputDistances {
distancesStart := time.Now()
computeDistances(c.logger, changedByName, secondByName, secondMetadata, outputConfig.GetMaxDistance())
computeDistances(c.logger, changedByName, secondByName, secondMetadata, maxDist)
distancesDuration := time.Since(distancesStart)
scope.Timer("distances_duration").Record(distancesDuration)
}
Expand Down Expand Up @@ -749,11 +750,10 @@ func transposeOptimizedTarget(
}

// sendWithDistanceFilter streams responses to the client, filtering changed targets to those
// within outputConfig.MaxDistance from any CHANGE_TYPE_DIRECT target when compute_distances is true
// and max_distance >= 0. Metadata and other non-target responses are always forwarded.
// within maxDist from any CHANGE_TYPE_DIRECT target when maxDist >= 0.
// Metadata and other non-target responses are always forwarded.
// Filtering and sending are combined into a single pass to avoid an intermediate allocation.
func sendWithDistanceFilter(stream pb.TangoServiceGetChangedTargetsYARPCServer, responses []*pb.GetChangedTargetsResponse, outputConfig *pb.OutputConfig) error {
maxDist := maxDistanceFromOutputConfig(outputConfig)
func sendWithDistanceFilter(stream pb.TangoServiceGetChangedTargetsYARPCServer, responses []*pb.GetChangedTargetsResponse, maxDist int32) error {
for _, resp := range responses {
toSend := resp
if maxDist >= 0 {
Expand All @@ -777,7 +777,7 @@ func sendWithDistanceFilter(stream pb.TangoServiceGetChangedTargetsYARPCServer,
// target to each changed target via the reverse dependency graph using BFS.
// DIRECT targets get distance 0, their reverse dependants get 1, and so on.
// When maxDistance >= 0, the BFS is pruned: targets at distance > maxDistance are never
// enqueued, so they keep their initial distance of -1 (out-of-range). 0 means no limit.
// enqueued, so they keep their initial distance of -1 (out-of-range).
//
// Targets unreachable from any DIRECT target keep the initial distance of -1.
func computeDistances(logger *zap.Logger, changedByName map[string]*pb.ChangedTarget, targetsByName map[string]*pb.OptimizedTarget, meta *pb.Metadata, maxDistance int32) {
Expand Down Expand Up @@ -878,8 +878,8 @@ func validateGetChangedTargetsRequest(request *pb.GetChangedTargetsRequest) erro
return nil
}

func getDefaultDistance(outputConfig *pb.OutputConfig, forNewTarget bool) int32 {
if !outputConfig.GetComputeDistances() {
func getDefaultDistance(maxDist int32, outputDistances bool, forNewTarget bool) int32 {
if maxDist < 0 && !outputDistances {
return -1
}
// New targets are always CHANGE_TYPE_NEW → distance 0.
Expand Down
20 changes: 10 additions & 10 deletions controller/getchangedtargets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestCompareTargetGraphs(t *testing.T) {
},
}

response, err := c.compareTargetGraphs(zap.NewNop(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, nil)
response, err := c.compareTargetGraphs(zap.NewNop(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, -1, false)
require.NoError(t, err)
require.NotNil(t, response)
}
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestCompareTargetGraphs_NewTarget_CanonicalIDs(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false)
require.NoError(t, err)
require.Len(t, res, 2)
cs := res[0].GetChangedTargets()
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestCompareTargetGraphs_SourceFileDirectAndPropagation(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestCompareTargetGraphs_IndirectWhenNoSourceDep(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -641,7 +641,7 @@ func TestCompareTargetGraphs_DirectWhenDependenciesChanged(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -722,7 +722,7 @@ func TestCompareTargetGraphs_DirectWhenAttributesChanged(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -802,7 +802,7 @@ func TestCompareTargetGraphs_DirectWhenNewAttributeAdded(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -877,7 +877,7 @@ func TestSendWithDistanceFilter_MetadataAlwaysForwarded(t *testing.T) {
}).Times(2)

// max_distance=1 filters out the distance-5 target, metadata always forwarded
require.NoError(t, sendWithDistanceFilter(stream, responses, &pb.OutputConfig{ComputeDistances: true, MaxDistance: 1}))
require.NoError(t, sendWithDistanceFilter(stream, responses, 1))

// First response: target filtered out (distance 5 > maxDist 1)
assert.Empty(t, sent[0].GetChangedTargets().GetChangedTargets())
Expand All @@ -899,7 +899,7 @@ func TestSendWithDistanceFilter_SendError(t *testing.T) {

stream.EXPECT().Send(gomock.Any()).Return(errors.New("send error"))

err := sendWithDistanceFilter(stream, responses, &pb.OutputConfig{})
err := sendWithDistanceFilter(stream, responses, -1)
assert.EqualError(t, err, "send error")
}

Expand Down Expand Up @@ -1048,7 +1048,7 @@ func TestCompareTargetGraphs_IndirectWhenOnlyHashChanged(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, nil)
res, err := c.compareTargetGraphs(zap.NewNop(), first, second, -1, false)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down
Loading
Loading