Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/itg/cache/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "cache",
srcs = ["cache.go"],
importpath = "github.com/uber/tango/core/itg/cache",
visibility = ["//visibility:public"],
deps = ["//core/storage"],
)
196 changes: 196 additions & 0 deletions core/itg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"bytes"
"context"
"encoding/gob"
"fmt"
"path/filepath"
"slices"
"strconv"
"strings"
"time"

"github.com/uber/tango/core/itg/graph"
"github.com/uber/tango/core/storage"
)

const keyPrefix = "itg/"

// Cache is an interface for caching optimized graphs.
type Cache interface {
// Put stores an optimized graph in the cache.
Put(ctx context.Context, optimizedGraph *graph.OptimizedGraph, key Key) error
// Get retrieves an optimized graph from the cache.
Get(ctx context.Context, key Key) (*graph.OptimizedGraph, error)
// FloorKey returns the cache key with the largest commit date that is less than or equal to targetTimeSecond,
// scoped to the given remote repo.
FloorKey(ctx context.Context, remote string, targetTimeSecond int64) (Key, error)
}

// Key is a key for a cache entry.
type Key struct {
Remote string
BaseCommitTimeSecond int64
BaseSha string
}

// CompareKeyFunc compares two cache keys.
var CompareKeyFunc = func(a Key, b Key) int {
switch {
case a.BaseCommitTimeSecond < b.BaseCommitTimeSecond:
return -1
case a.BaseCommitTimeSecond > b.BaseCommitTimeSecond:
return 1
default:
return strings.Compare(a.BaseSha, b.BaseSha)
}
}

// EmptyKey means no cache found.
var EmptyKey = Key{}

// toStorageKey converts a cache key to its storage key: itg/{remote}/{date}/{committime}_{sha}
func (k *Key) toStorageKey() string {
date := time.Unix(k.BaseCommitTimeSecond, 0).UTC().Format("2006-01-02")
return filepath.Join(keyPrefix, k.Remote, date, fmt.Sprintf("%d_%s", k.BaseCommitTimeSecond, k.BaseSha))
}

// NewStorageCache creates a new cache backed by a storage.Storage implementation.
// Cache entries are stored under the "itg/" prefix so they can be listed and
// searched independently from other entries in the same storage.
func NewStorageCache(s storage.Storage) Cache {
return &storageCache{storage: s}
}

type storageCache struct {
storage storage.Storage
}

func (c *storageCache) Put(ctx context.Context, optimizedGraph *graph.OptimizedGraph, key Key) error {
exists, err := c.storage.Exists(ctx, key.toStorageKey())
if err != nil {
return err
}
if exists {
return nil
}

var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(optimizedGraph); err != nil {
return err
}
return c.storage.Put(ctx, storage.UploadRequest{Key: key.toStorageKey(), Reader: &buf})
}

func (c *storageCache) Get(ctx context.Context, key Key) (*graph.OptimizedGraph, error) {
resp, err := c.storage.Get(ctx, storage.DownloadRequest{Key: key.toStorageKey()})
if err != nil {
return nil, fmt.Errorf("download graph %s: %w", key.toStorageKey(), err)
}
defer resp.ReadCloser.Close()

var optimizedGraph graph.OptimizedGraph
if err := gob.NewDecoder(resp.ReadCloser).Decode(&optimizedGraph); err != nil {
return nil, fmt.Errorf("decode graph %s: %w", key.toStorageKey(), err)
}
for _, t := range optimizedGraph.OptimizedTargets {
if t.Hash == nil {
t.Hash = []byte{}
}
}
return &optimizedGraph, nil
}

func (c *storageCache) FloorKey(ctx context.Context, remote string, targetTimeSecond int64) (Key, error) {
allKeys, err := c.storage.List(ctx)
if err != nil {
return EmptyKey, err
}

remotePrefix := keyPrefix + remote + "/"
cacheKeys := make([]Key, 0, len(allKeys))
for _, k := range allKeys {
if !strings.HasPrefix(k, remotePrefix) {
continue
}
cacheKey, err := parseCacheFileName(strings.TrimPrefix(k, remotePrefix))
if err != nil {
continue
}
cacheKey.Remote = remote
cacheKeys = append(cacheKeys, cacheKey)
}

if len(cacheKeys) == 0 {
return EmptyKey, nil
}

if !slices.IsSortedFunc(cacheKeys, CompareKeyFunc) {
slices.SortStableFunc(cacheKeys, CompareKeyFunc)
}

if cacheKeys[0].BaseCommitTimeSecond > targetTimeSecond {
return EmptyKey, nil
}
if cacheKeys[len(cacheKeys)-1].BaseCommitTimeSecond <= targetTimeSecond {
return cacheKeys[len(cacheKeys)-1], nil
}

idx, found := binarySearch(cacheKeys, targetTimeSecond)
if !found {
idx--
}
return cacheKeys[idx], nil
}

func binarySearch(cacheKeys []Key, targetTimeSecond int64) (int, bool) {
return slices.BinarySearchFunc(cacheKeys, targetTimeSecond, func(a Key, b int64) int {
switch {
case a.BaseCommitTimeSecond < b:
return -1
case a.BaseCommitTimeSecond > b:
return 1
default:
return 0
}
})
}

func parseCacheFileName(name string) (Key, error) {
// name has the form "date/timestamp_sha" after the keyPrefix+remote are stripped.
parts := strings.SplitN(name, "/", 2)
if len(parts) != 2 {
return Key{}, fmt.Errorf("cache path should have form date/TIMESTAMP_SHA: %s", name)
}
filename := parts[1]

split := strings.SplitN(filename, "_", 2)
if len(split) != 2 {
return Key{}, fmt.Errorf("cache file name should have form TIMESTAMP_SHA: %s", filename)
}

ts, err := strconv.ParseInt(split[0], 10, 64)
if err != nil {
return Key{}, fmt.Errorf("parse timestamp: %w", err)
}

return Key{
BaseCommitTimeSecond: ts,
BaseSha: split[1],
}, nil
}
2 changes: 2 additions & 0 deletions core/storage/changedtargetsandedgesreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (s *errStorage) Put(_ context.Context, _ UploadRequest) error { return nil
func (s *errStorage) Exists(_ context.Context, _ string) (bool, error) {
return false, s.err
}
func (s *errStorage) List(_ context.Context, _ string) ([]string, error) { return nil, s.err }

// nilResponseStorage is a Storage stub that returns a nil DownloadResponse from Get.
type nilResponseStorage struct{}
Expand All @@ -139,3 +140,4 @@ func (s *nilResponseStorage) Put(_ context.Context, _ UploadRequest) error { ret
func (s *nilResponseStorage) Exists(_ context.Context, _ string) (bool, error) {
return false, nil
}
func (s *nilResponseStorage) List(_ context.Context, _ string) ([]string, error) { return nil, nil }
27 changes: 27 additions & 0 deletions core/storage/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,30 @@ func (d *diskStorage) Exists(ctx context.Context, key string) (bool, error) {
}
return false, err
}

// List returns the relative paths of all regular files under the given directory prefix.
func (d *diskStorage) List(ctx context.Context, dir string) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
root := filepath.Join(d.rootDir, dir)
var keys []string
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
if info.IsDir() {
return nil
}
rel, err := filepath.Rel(d.rootDir, path)
if err != nil {
return err
}
keys = append(keys, rel)
return nil
})
return keys, err
}
50 changes: 50 additions & 0 deletions core/storage/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,56 @@ func TestStorage_Exists(t *testing.T) {
})
}

func TestStorage_List(t *testing.T) {
ctx := context.Background()
tmpDir := t.TempDir()
s, err := New(tmpDir)
require.NoError(t, err)

put := func(key string) {
t.Helper()
require.NoError(t, s.Put(ctx, storage.UploadRequest{Key: key, Reader: bytes.NewReader([]byte("x"))}))
}
put("itg/repoA/2024-01-01/100_abc")
put("itg/repoA/2024-01-02/200_def")
put("itg/repoB/2024-01-01/300_ghi")
put("graph/treehash123")

t.Run("lists files under subdirectory", func(t *testing.T) {
keys, err := s.List(ctx, "itg/repoA")
require.NoError(t, err)
assert.ElementsMatch(t, []string{
"itg/repoA/2024-01-01/100_abc",
"itg/repoA/2024-01-02/200_def",
}, keys)
})

t.Run("different subdirectory returns different keys", func(t *testing.T) {
keys, err := s.List(ctx, "itg/repoB")
require.NoError(t, err)
assert.ElementsMatch(t, []string{"itg/repoB/2024-01-01/300_ghi"}, keys)
})

t.Run("non-existent directory returns empty", func(t *testing.T) {
keys, err := s.List(ctx, "nonexistent")
require.NoError(t, err)
assert.Empty(t, keys)
})

t.Run("empty dir lists all files", func(t *testing.T) {
keys, err := s.List(ctx, "")
require.NoError(t, err)
assert.Len(t, keys, 4)
})

t.Run("cancelled context returns error", func(t *testing.T) {
cancelledCtx, cancel := context.WithCancel(context.Background())
cancel()
_, err := s.List(cancelledCtx, "itg")
assert.Error(t, err)
})
}

func TestStorage_Get_NotFound(t *testing.T) {
ctx := context.Background()
tmpDir := t.TempDir()
Expand Down
13 changes: 13 additions & 0 deletions core/storage/memstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"errors"
"io"
"strings"
"sync"
)

Expand Down Expand Up @@ -65,3 +66,15 @@ func (m *memoryStorage) Exists(ctx context.Context, key string) (bool, error) {
_, ok := m.data[key]
return ok, nil
}

func (m *memoryStorage) List(ctx context.Context, dir string) ([]string, error) {
m.mu.RLock()
defer m.mu.RUnlock()
var keys []string
for k := range m.data {
if strings.HasPrefix(k, dir) {
keys = append(keys, k)
}
}
return keys, nil
}
2 changes: 2 additions & 0 deletions core/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ type Storage interface {
Put(ctx context.Context, req UploadRequest) error
// Exists checks whether a blob exists in the storage.
Exists(ctx context.Context, key string) (bool, error)
// List returns the keys of all blobs under the given directory prefix.
List(ctx context.Context, dir string) ([]string, error)
}
41 changes: 41 additions & 0 deletions core/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,47 @@ import (
"github.com/stretchr/testify/require"
)

func TestMemoryStorage_List(t *testing.T) {
ctx := context.Background()
s := NewMemoryStorage()

put := func(key string) {
t.Helper()
require.NoError(t, s.Put(ctx, UploadRequest{Key: key, Reader: bytes.NewReader([]byte("x"))}))
}
put("itg/repoA/2024-01-01/100_abc")
put("itg/repoA/2024-01-02/200_def")
put("itg/repoB/2024-01-01/300_ghi")
put("graph/treehash123")

t.Run("lists keys under prefix", func(t *testing.T) {
keys, err := s.List(ctx, "itg/repoA/")
require.NoError(t, err)
assert.ElementsMatch(t, []string{
"itg/repoA/2024-01-01/100_abc",
"itg/repoA/2024-01-02/200_def",
}, keys)
})

t.Run("different prefix returns different keys", func(t *testing.T) {
keys, err := s.List(ctx, "itg/repoB/")
require.NoError(t, err)
assert.ElementsMatch(t, []string{"itg/repoB/2024-01-01/300_ghi"}, keys)
})

t.Run("non-matching prefix returns empty", func(t *testing.T) {
keys, err := s.List(ctx, "nonexistent/")
require.NoError(t, err)
assert.Empty(t, keys)
})

t.Run("empty prefix returns all keys", func(t *testing.T) {
keys, err := s.List(ctx, "")
require.NoError(t, err)
assert.Len(t, keys, 4)
})
}

func TestMemoryStorage_Exists(t *testing.T) {
ctx := context.Background()
s := NewMemoryStorage()
Expand Down
Loading
Loading