From e583e6096916154fd06cbb6dca7f7e3bc581f60f Mon Sep 17 00:00:00 2001 From: yushan Date: Thu, 16 Apr 2026 22:41:05 +0000 Subject: [PATCH 1/2] [ITG] Implement ITG cache to insert ITG graph --- core/itg/cache/cache.go | 196 ++++++++++++++++++ .../changedtargetsandedgesreader_test.go | 2 + core/storage/disk/disk.go | 27 +++ core/storage/disk/disk_test.go | 50 +++++ core/storage/memstorage.go | 13 ++ core/storage/storage.go | 2 + core/storage/storage_test.go | 41 ++++ core/storage/storagemock/storagemock.go | 15 ++ 8 files changed, 346 insertions(+) create mode 100644 core/itg/cache/cache.go diff --git a/core/itg/cache/cache.go b/core/itg/cache/cache.go new file mode 100644 index 0000000..9562461 --- /dev/null +++ b/core/itg/cache/cache.go @@ -0,0 +1,196 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "bytes" + "context" + "encoding/gob" + "fmt" + "path/filepath" + "slices" + "strconv" + "strings" + "time" + + "github.com/uber/tango/core/itg/graph" + "github.com/uber/tango/core/storage" +) + +const keyPrefix = "itg/" + +// Cache is an interface for caching optimized graphs. +type Cache interface { + // Put stores an optimized graph in the cache. + Put(ctx context.Context, optimizedGraph *graph.OptimizedGraph, key Key) error + // Get retrieves an optimized graph from the cache. + Get(ctx context.Context, key Key) (*graph.OptimizedGraph, error) + // FloorKey returns the cache key with the largest commit date that is less than or equal to targetTimeSecond, + // scoped to the given remote repo. + FloorKey(ctx context.Context, remote string, targetTimeSecond int64) (Key, error) +} + +// Key is a key for a cache entry. +type Key struct { + Remote string + BaseCommitTimeSecond int64 + BaseSha string +} + +// CompareKeyFunc compares two cache keys. +var CompareKeyFunc = func(a Key, b Key) int { + switch { + case a.BaseCommitTimeSecond < b.BaseCommitTimeSecond: + return -1 + case a.BaseCommitTimeSecond > b.BaseCommitTimeSecond: + return 1 + default: + return strings.Compare(a.BaseSha, b.BaseSha) + } +} + +// EmptyKey means no cache found. +var EmptyKey = Key{} + +// toStorageKey converts a cache key to its storage key: itg/{remote}/{date}/{committime}_{sha} +func (k *Key) toStorageKey() string { + date := time.Unix(k.BaseCommitTimeSecond, 0).UTC().Format("2006-01-02") + return filepath.Join(keyPrefix, k.Remote, date, fmt.Sprintf("%d_%s", k.BaseCommitTimeSecond, k.BaseSha)) +} + +// NewStorageCache creates a new cache backed by a storage.Storage implementation. +// Cache entries are stored under the "itg/" prefix so they can be listed and +// searched independently from other entries in the same storage. +func NewStorageCache(s storage.Storage) Cache { + return &storageCache{storage: s} +} + +type storageCache struct { + storage storage.Storage +} + +func (c *storageCache) Put(ctx context.Context, optimizedGraph *graph.OptimizedGraph, key Key) error { + exists, err := c.storage.Exists(ctx, key.toStorageKey()) + if err != nil { + return err + } + if exists { + return nil + } + + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(optimizedGraph); err != nil { + return err + } + return c.storage.Put(ctx, storage.UploadRequest{Key: key.toStorageKey(), Reader: &buf}) +} + +func (c *storageCache) Get(ctx context.Context, key Key) (*graph.OptimizedGraph, error) { + resp, err := c.storage.Get(ctx, storage.DownloadRequest{Key: key.toStorageKey()}) + if err != nil { + return nil, fmt.Errorf("download graph %s: %w", key.toStorageKey(), err) + } + defer resp.ReadCloser.Close() + + var optimizedGraph graph.OptimizedGraph + if err := gob.NewDecoder(resp.ReadCloser).Decode(&optimizedGraph); err != nil { + return nil, fmt.Errorf("decode graph %s: %w", key.toStorageKey(), err) + } + for _, t := range optimizedGraph.OptimizedTargets { + if t.Hash == nil { + t.Hash = []byte{} + } + } + return &optimizedGraph, nil +} + +func (c *storageCache) FloorKey(ctx context.Context, remote string, targetTimeSecond int64) (Key, error) { + allKeys, err := c.storage.List(ctx) + if err != nil { + return EmptyKey, err + } + + remotePrefix := keyPrefix + remote + "/" + cacheKeys := make([]Key, 0, len(allKeys)) + for _, k := range allKeys { + if !strings.HasPrefix(k, remotePrefix) { + continue + } + cacheKey, err := parseCacheFileName(strings.TrimPrefix(k, remotePrefix)) + if err != nil { + continue + } + cacheKey.Remote = remote + cacheKeys = append(cacheKeys, cacheKey) + } + + if len(cacheKeys) == 0 { + return EmptyKey, nil + } + + if !slices.IsSortedFunc(cacheKeys, CompareKeyFunc) { + slices.SortStableFunc(cacheKeys, CompareKeyFunc) + } + + if cacheKeys[0].BaseCommitTimeSecond > targetTimeSecond { + return EmptyKey, nil + } + if cacheKeys[len(cacheKeys)-1].BaseCommitTimeSecond <= targetTimeSecond { + return cacheKeys[len(cacheKeys)-1], nil + } + + idx, found := binarySearch(cacheKeys, targetTimeSecond) + if !found { + idx-- + } + return cacheKeys[idx], nil +} + +func binarySearch(cacheKeys []Key, targetTimeSecond int64) (int, bool) { + return slices.BinarySearchFunc(cacheKeys, targetTimeSecond, func(a Key, b int64) int { + switch { + case a.BaseCommitTimeSecond < b: + return -1 + case a.BaseCommitTimeSecond > b: + return 1 + default: + return 0 + } + }) +} + +func parseCacheFileName(name string) (Key, error) { + // name has the form "date/timestamp_sha" after the keyPrefix+remote are stripped. + parts := strings.SplitN(name, "/", 2) + if len(parts) != 2 { + return Key{}, fmt.Errorf("cache path should have form date/TIMESTAMP_SHA: %s", name) + } + filename := parts[1] + + split := strings.SplitN(filename, "_", 2) + if len(split) != 2 { + return Key{}, fmt.Errorf("cache file name should have form TIMESTAMP_SHA: %s", filename) + } + + ts, err := strconv.ParseInt(split[0], 10, 64) + if err != nil { + return Key{}, fmt.Errorf("parse timestamp: %w", err) + } + + return Key{ + BaseCommitTimeSecond: ts, + BaseSha: split[1], + }, nil +} diff --git a/core/storage/changedtargetsandedgesreader_test.go b/core/storage/changedtargetsandedgesreader_test.go index a6e0732..599ab08 100644 --- a/core/storage/changedtargetsandedgesreader_test.go +++ b/core/storage/changedtargetsandedgesreader_test.go @@ -128,6 +128,7 @@ func (s *errStorage) Put(_ context.Context, _ UploadRequest) error { return nil func (s *errStorage) Exists(_ context.Context, _ string) (bool, error) { return false, s.err } +func (s *errStorage) List(_ context.Context, _ string) ([]string, error) { return nil, s.err } // nilResponseStorage is a Storage stub that returns a nil DownloadResponse from Get. type nilResponseStorage struct{} @@ -139,3 +140,4 @@ func (s *nilResponseStorage) Put(_ context.Context, _ UploadRequest) error { ret func (s *nilResponseStorage) Exists(_ context.Context, _ string) (bool, error) { return false, nil } +func (s *nilResponseStorage) List(_ context.Context, _ string) ([]string, error) { return nil, nil } diff --git a/core/storage/disk/disk.go b/core/storage/disk/disk.go index 08298a2..f05715a 100644 --- a/core/storage/disk/disk.go +++ b/core/storage/disk/disk.go @@ -103,3 +103,30 @@ func (d *diskStorage) Exists(ctx context.Context, key string) (bool, error) { } return false, err } + +// List returns the relative paths of all regular files under the given directory prefix. +func (d *diskStorage) List(ctx context.Context, dir string) ([]string, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + root := filepath.Join(d.rootDir, dir) + var keys []string + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + if info.IsDir() { + return nil + } + rel, err := filepath.Rel(d.rootDir, path) + if err != nil { + return err + } + keys = append(keys, rel) + return nil + }) + return keys, err +} diff --git a/core/storage/disk/disk_test.go b/core/storage/disk/disk_test.go index 8abdc7f..2f4e677 100644 --- a/core/storage/disk/disk_test.go +++ b/core/storage/disk/disk_test.go @@ -121,6 +121,56 @@ func TestStorage_Exists(t *testing.T) { }) } +func TestStorage_List(t *testing.T) { + ctx := context.Background() + tmpDir := t.TempDir() + s, err := New(tmpDir) + require.NoError(t, err) + + put := func(key string) { + t.Helper() + require.NoError(t, s.Put(ctx, storage.UploadRequest{Key: key, Reader: bytes.NewReader([]byte("x"))})) + } + put("itg/repoA/2024-01-01/100_abc") + put("itg/repoA/2024-01-02/200_def") + put("itg/repoB/2024-01-01/300_ghi") + put("graph/treehash123") + + t.Run("lists files under subdirectory", func(t *testing.T) { + keys, err := s.List(ctx, "itg/repoA") + require.NoError(t, err) + assert.ElementsMatch(t, []string{ + "itg/repoA/2024-01-01/100_abc", + "itg/repoA/2024-01-02/200_def", + }, keys) + }) + + t.Run("different subdirectory returns different keys", func(t *testing.T) { + keys, err := s.List(ctx, "itg/repoB") + require.NoError(t, err) + assert.ElementsMatch(t, []string{"itg/repoB/2024-01-01/300_ghi"}, keys) + }) + + t.Run("non-existent directory returns empty", func(t *testing.T) { + keys, err := s.List(ctx, "nonexistent") + require.NoError(t, err) + assert.Empty(t, keys) + }) + + t.Run("empty dir lists all files", func(t *testing.T) { + keys, err := s.List(ctx, "") + require.NoError(t, err) + assert.Len(t, keys, 4) + }) + + t.Run("cancelled context returns error", func(t *testing.T) { + cancelledCtx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := s.List(cancelledCtx, "itg") + assert.Error(t, err) + }) +} + func TestStorage_Get_NotFound(t *testing.T) { ctx := context.Background() tmpDir := t.TempDir() diff --git a/core/storage/memstorage.go b/core/storage/memstorage.go index 96094c0..04eeae6 100644 --- a/core/storage/memstorage.go +++ b/core/storage/memstorage.go @@ -19,6 +19,7 @@ import ( "context" "errors" "io" + "strings" "sync" ) @@ -65,3 +66,15 @@ func (m *memoryStorage) Exists(ctx context.Context, key string) (bool, error) { _, ok := m.data[key] return ok, nil } + +func (m *memoryStorage) List(ctx context.Context, dir string) ([]string, error) { + m.mu.RLock() + defer m.mu.RUnlock() + var keys []string + for k := range m.data { + if strings.HasPrefix(k, dir) { + keys = append(keys, k) + } + } + return keys, nil +} diff --git a/core/storage/storage.go b/core/storage/storage.go index c1df683..33ab304 100644 --- a/core/storage/storage.go +++ b/core/storage/storage.go @@ -58,4 +58,6 @@ type Storage interface { Put(ctx context.Context, req UploadRequest) error // Exists checks whether a blob exists in the storage. Exists(ctx context.Context, key string) (bool, error) + // List returns the keys of all blobs under the given directory prefix. + List(ctx context.Context, dir string) ([]string, error) } diff --git a/core/storage/storage_test.go b/core/storage/storage_test.go index fd7b565..f1a5fd0 100644 --- a/core/storage/storage_test.go +++ b/core/storage/storage_test.go @@ -24,6 +24,47 @@ import ( "github.com/stretchr/testify/require" ) +func TestMemoryStorage_List(t *testing.T) { + ctx := context.Background() + s := NewMemoryStorage() + + put := func(key string) { + t.Helper() + require.NoError(t, s.Put(ctx, UploadRequest{Key: key, Reader: bytes.NewReader([]byte("x"))})) + } + put("itg/repoA/2024-01-01/100_abc") + put("itg/repoA/2024-01-02/200_def") + put("itg/repoB/2024-01-01/300_ghi") + put("graph/treehash123") + + t.Run("lists keys under prefix", func(t *testing.T) { + keys, err := s.List(ctx, "itg/repoA/") + require.NoError(t, err) + assert.ElementsMatch(t, []string{ + "itg/repoA/2024-01-01/100_abc", + "itg/repoA/2024-01-02/200_def", + }, keys) + }) + + t.Run("different prefix returns different keys", func(t *testing.T) { + keys, err := s.List(ctx, "itg/repoB/") + require.NoError(t, err) + assert.ElementsMatch(t, []string{"itg/repoB/2024-01-01/300_ghi"}, keys) + }) + + t.Run("non-matching prefix returns empty", func(t *testing.T) { + keys, err := s.List(ctx, "nonexistent/") + require.NoError(t, err) + assert.Empty(t, keys) + }) + + t.Run("empty prefix returns all keys", func(t *testing.T) { + keys, err := s.List(ctx, "") + require.NoError(t, err) + assert.Len(t, keys, 4) + }) +} + func TestMemoryStorage_Exists(t *testing.T) { ctx := context.Background() s := NewMemoryStorage() diff --git a/core/storage/storagemock/storagemock.go b/core/storage/storagemock/storagemock.go index afb8d8b..6ae56a9 100644 --- a/core/storage/storagemock/storagemock.go +++ b/core/storage/storagemock/storagemock.go @@ -84,3 +84,18 @@ func (mr *MockStorageMockRecorder) Put(ctx, req any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockStorage)(nil).Put), ctx, req) } + +// List mocks base method. +func (m *MockStorage) List(ctx context.Context, dir string) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "List", ctx, dir) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// List indicates an expected call of List. +func (mr *MockStorageMockRecorder) List(ctx, dir any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStorage)(nil).List), ctx, dir) +} From 7ac00dd99ce6c3c4bf2ae85de716d6bbded2ef83 Mon Sep 17 00:00:00 2001 From: yushan Date: Thu, 16 Apr 2026 22:48:10 +0000 Subject: [PATCH 2/2] Update gazelle --- core/itg/cache/BUILD.bazel | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 core/itg/cache/BUILD.bazel diff --git a/core/itg/cache/BUILD.bazel b/core/itg/cache/BUILD.bazel new file mode 100644 index 0000000..15ee644 --- /dev/null +++ b/core/itg/cache/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "cache", + srcs = ["cache.go"], + importpath = "github.com/uber/tango/core/itg/cache", + visibility = ["//visibility:public"], + deps = ["//core/storage"], +)