From 72e677ad664f6489987d5ae8bdffe3e2e39b9eb0 Mon Sep 17 00:00:00 2001 From: Joe Corall Date: Mon, 4 May 2026 11:15:08 +0000 Subject: [PATCH 1/3] Fix cache miss response stalls --- docs/caching.md | 13 +- internal/cache/file.go | 56 +++++++- internal/cache/file_test.go | 32 +++++ internal/storage/http.go | 20 ++- internal/storage/http_test.go | 22 ++++ internal/storage/local_url.go | 200 ++++++++++++++++++++++++++++- internal/storage/local_url_test.go | 158 +++++++++++++++++++++++ 7 files changed, 482 insertions(+), 19 deletions(-) diff --git a/docs/caching.md b/docs/caching.md index a506ad8..02e13a2 100644 --- a/docs/caching.md +++ b/docs/caching.md @@ -20,16 +20,17 @@ cache: ``` `max_bytes` is a best-effort filesystem eviction target. `max_age` is an -optional age limit for derivative entries. Failed transforms and HTTP error -responses are not stored. +optional age limit for derivative entries. Eviction after writes runs in the +background, so a cache miss response is not delayed by walking a large cache +tree. Failed transforms and HTTP error responses are not stored. `cache.max_bytes` is the approximate total retained size of derivative payload files under `cache.root`. It is different from `iiif.image.max_derivative_bytes`, which limits one generated response before it -can be returned or cached. A cache write can temporarily exceed `cache.max_bytes` -before eviction runs. When size eviction runs, Triplet removes the oldest -derivative payload files first based on payload file modification time; reads -do not refresh cache age. +can be returned or cached. A cache write can temporarily exceed +`cache.max_bytes` before asynchronous eviction catches up. When size eviction +runs, Triplet removes the oldest derivative payload files first based on payload +file modification time; reads do not refresh cache age. `cache.max_age` is based on the derivative payload file modification time, not when it was last requested. When a cached derivative is older than `max_age`, diff --git a/internal/cache/file.go b/internal/cache/file.go index db6daed..ff222c8 100644 --- a/internal/cache/file.go +++ b/internal/cache/file.go @@ -27,14 +27,18 @@ type FileStore struct { storeContentType bool // MaxBytes optionally bounds total cache size; when exceeded, the - // oldest payload files are evicted on the next Put based on mtime. + // oldest payload files are evicted asynchronously after Put based on mtime. MaxBytes int64 // MaxAge optionally bounds how long entries remain usable after Put. // Expired entries are removed on Get and opportunistically on Put. MaxAge time.Duration - mu sync.Mutex + mu sync.Mutex + evictMu sync.Mutex + evicting bool + evictAgain bool + evictDone chan struct{} } const tempFilePrefix = ".tmp-" @@ -154,7 +158,7 @@ func (s *FileStore) Put(_ context.Context, key, contentType string, value io.Rea return err } if s.MaxAge > 0 || s.MaxBytes > 0 { - s.evict() + s.scheduleEvict() } return nil } @@ -169,7 +173,7 @@ func (s *FileStore) installWithMeta(tmpName, dataPath, metaPath, contentType str } meta := fileMeta{ContentType: contentType} mb, _ := json.Marshal(meta) - if err := os.WriteFile(metaPath, mb, 0o640); err != nil { + if err := os.WriteFile(metaPath, mb, 0o600); err != nil { _ = os.Remove(dataPath) return err } @@ -195,6 +199,50 @@ func (s *FileStore) paths(key string) (data, meta string) { return base, base + ".meta" } +func (s *FileStore) scheduleEvict() { + s.evictMu.Lock() + if s.evicting { + s.evictAgain = true + s.evictMu.Unlock() + return + } + s.evicting = true + s.evictAgain = false + s.evictDone = make(chan struct{}) + s.evictMu.Unlock() + + go s.runEvict() +} + +func (s *FileStore) runEvict() { + for { + s.evict() + + s.evictMu.Lock() + if !s.evictAgain { + s.evicting = false + close(s.evictDone) + s.evictMu.Unlock() + return + } + s.evictAgain = false + s.evictMu.Unlock() + } +} + +func (s *FileStore) waitForEviction() { + for { + s.evictMu.Lock() + if !s.evicting { + s.evictMu.Unlock() + return + } + done := s.evictDone + s.evictMu.Unlock() + <-done + } +} + func (s *FileStore) evict() { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/cache/file_test.go b/internal/cache/file_test.go index 80cfd39..3ccec2c 100644 --- a/internal/cache/file_test.go +++ b/internal/cache/file_test.go @@ -146,12 +146,39 @@ func TestFileStoreEvictSkipsInFlightTempFiles(t *testing.T) { if err := store.Put(context.Background(), "b", "text/plain", strings.NewReader("b")); err != nil { t.Fatal(err) } + store.waitForEviction() if _, err := os.Stat(tmpPath); err != nil { t.Fatalf("tmp stat err = %v, want exists", err) } } +func TestPayloadFileStorePutDoesNotWaitForEviction(t *testing.T) { + store, err := NewPayloadFileStoreWithMaxAge(t.TempDir(), 1, 0) + if err != nil { + t.Fatal(err) + } + + store.mu.Lock() + done := make(chan error, 1) + go func() { + done <- store.Put(context.Background(), "a", "text/plain", strings.NewReader("payload")) + }() + + select { + case err := <-done: + if err != nil { + t.Fatal(err) + } + case <-time.After(time.Second): + store.mu.Unlock() + t.Fatal("Put waited for eviction") + } + + store.mu.Unlock() + store.waitForEviction() +} + func TestFileStoreConcurrentPutsWithEviction(t *testing.T) { t.Parallel() @@ -182,6 +209,7 @@ func TestFileStoreConcurrentPutsWithEviction(t *testing.T) { wg.Wait() close(errCh) + store.waitForEviction() for err := range errCh { t.Fatal(err) @@ -201,6 +229,7 @@ func TestFileStoreEvictsWhenOversize(t *testing.T) { if err := store.Put(context.Background(), "b", "text/plain", bytes.NewReader([]byte("5678"))); err != nil { t.Fatal(err) } + store.waitForEviction() if _, _, err := store.Get(context.Background(), "a"); !errors.Is(err, ErrMiss) { t.Fatalf("a err = %v, want miss", err) @@ -236,6 +265,7 @@ func TestFileStoreGetDoesNotRefreshEvictionOrder(t *testing.T) { if err := store.Put(context.Background(), "c", "text/plain", bytes.NewReader([]byte("90"))); err != nil { t.Fatal(err) } + store.waitForEviction() if _, _, err := store.Get(context.Background(), "a"); !errors.Is(err, ErrMiss) { t.Fatalf("a err = %v, want miss", err) @@ -279,6 +309,7 @@ func TestFileStoreEvictsByModTime(t *testing.T) { if err := store.Put(context.Background(), "c", "text/plain", bytes.NewReader([]byte("90"))); err != nil { t.Fatal(err) } + store.waitForEviction() if _, _, err := store.Get(context.Background(), "a"); !errors.Is(err, ErrMiss) { t.Fatalf("a err = %v, want miss", err) @@ -340,6 +371,7 @@ func TestFileStoreMaxAgeEvictsOnPut(t *testing.T) { if err := store.Put(context.Background(), "new", "text/plain", strings.NewReader("new")); err != nil { t.Fatal(err) } + store.waitForEviction() if _, _, err := store.Get(context.Background(), "old"); !errors.Is(err, ErrMiss) { t.Fatalf("old err = %v, want miss", err) } diff --git a/internal/storage/http.go b/internal/storage/http.go index 46c1e94..4d76090 100644 --- a/internal/storage/http.go +++ b/internal/storage/http.go @@ -11,6 +11,7 @@ import ( "os" "strconv" "strings" + "sync" "time" ) @@ -33,6 +34,10 @@ type HTTPOpener struct { // ForwardAuthHeaders forwards Cookie and Authorization headers from the // request context. Only enable this for trusted, per-request fallbacks. ForwardAuthHeaders bool + + transportMu sync.Mutex + sharedTransport http.RoundTripper + sharedTransportPrivateMode bool } const DefaultRequestTimeout = 2 * time.Minute @@ -258,17 +263,28 @@ func (h *HTTPOpener) client() *http.Client { return &http.Client{ Timeout: 30 * time.Second, CheckRedirect: h.checkRedirect, - Transport: h.transport(), + Transport: h.defaultTransport(), } } c := *h.Client c.CheckRedirect = h.checkRedirect if c.Transport == nil { - c.Transport = h.transport() + c.Transport = h.defaultTransport() } return &c } +func (h *HTTPOpener) defaultTransport() http.RoundTripper { + h.transportMu.Lock() + defer h.transportMu.Unlock() + if h.sharedTransport != nil && h.sharedTransportPrivateMode == h.AllowPrivateHosts { + return h.sharedTransport + } + h.sharedTransport = h.transport() + h.sharedTransportPrivateMode = h.AllowPrivateHosts + return h.sharedTransport +} + func (h *HTTPOpener) transport() http.RoundTripper { base := http.DefaultTransport.(*http.Transport).Clone() base.DialContext = (&net.Dialer{ diff --git a/internal/storage/http_test.go b/internal/storage/http_test.go index 624d551..2e3b5a9 100644 --- a/internal/storage/http_test.go +++ b/internal/storage/http_test.go @@ -125,6 +125,28 @@ func TestHTTPOpenerAppliesDefaultRequestTimeout(t *testing.T) { } } +func TestHTTPOpenerReusesDefaultTransport(t *testing.T) { + op := NewHTTPOpener([]string{"https://example.org"}, 5*time.Second, 0) + + first := op.client() + second := op.client() + if first.Transport == nil { + t.Fatal("missing first transport") + } + if first.Transport != second.Transport { + t.Fatal("default transport was not reused") + } + + op.AllowPrivateHosts = true + third := op.client() + if third.Transport == first.Transport { + t.Fatal("transport was not rebuilt after private-host mode changed") + } + if third.Transport != op.client().Transport { + t.Fatal("rebuilt transport was not reused") + } +} + func TestHTTPOpenerRejectsRedirectToDeniedHost(t *testing.T) { redirector := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "http://example.org/secret", http.StatusFound) diff --git a/internal/storage/local_url.go b/internal/storage/local_url.go index 74eed11..02564a7 100644 --- a/internal/storage/local_url.go +++ b/internal/storage/local_url.go @@ -40,6 +40,9 @@ type LocalURLFallback struct { authMu sync.Mutex auth map[string]authCacheEntry inflight map[string][]chan error + + authFallbackMetaInflight map[string]*authFallbackMetaCall + authFallbackOpenInflight map[string]*authFallbackOpenCall } // LocalURLMapping maps a URL identifier prefix to a local file source. @@ -59,6 +62,33 @@ type authCacheEntry struct { expiresAt time.Time } +type authFallbackMetaCall struct { + done chan struct{} + meta Meta + err error +} + +type authFallbackOpenCall struct { + done chan struct{} + meta Meta + path string + err error + refs int + doneClosed bool +} + +type sharedAuthFallbackFile struct { + *os.File + releaseOnce sync.Once + release func() +} + +func (s *sharedAuthFallbackFile) Close() error { + err := s.File.Close() + s.releaseOnce.Do(s.release) + return err +} + var errAuthProbeHeadUnsupported = errors.New("auth probe head unsupported") type authCacheTier string @@ -120,7 +150,7 @@ func (l *LocalURLFallback) openLocalMeta(ctx context.Context, identifier string) l.debug(ctx, "local url ocfl object missing", identifier, "operation", "meta", "prefix", mapping.Prefix, "path", path, "disk_path", mappingDiskPath(mapping, path)) if mapping.AuthProbe && l.AuthFallback != nil { l.debug(ctx, "local url using authenticated fallback after local miss", identifier, "operation", "meta", "prefix", mapping.Prefix) - return l.openAuthFallback(ctx, identifier, true) + return l.openAuthFallback(ctx, identifier, mapping, true) } continue } @@ -140,7 +170,7 @@ func (l *LocalURLFallback) openLocalMeta(ctx context.Context, identifier string) l.debug(ctx, "local url file missing", identifier, "operation", "meta", "prefix", mapping.Prefix, "path", path, "disk_path", mappingDiskPath(mapping, path)) if mapping.AuthProbe && l.AuthFallback != nil { l.debug(ctx, "local url using authenticated fallback after local miss", identifier, "operation", "meta", "prefix", mapping.Prefix) - return l.openAuthFallback(ctx, identifier, true) + return l.openAuthFallback(ctx, identifier, mapping, true) } continue } @@ -178,7 +208,7 @@ func (l *LocalURLFallback) openLocal(ctx context.Context, identifier string) (io l.debug(ctx, "local url ocfl object missing", identifier, "operation", "open", "prefix", mapping.Prefix, "path", path, "disk_path", mappingDiskPath(mapping, path)) if mapping.AuthProbe && l.AuthFallback != nil { l.debug(ctx, "local url using authenticated fallback after local miss", identifier, "operation", "open", "prefix", mapping.Prefix) - return l.openAuthFallback(ctx, identifier, false) + return l.openAuthFallback(ctx, identifier, mapping, false) } continue } @@ -206,7 +236,7 @@ func (l *LocalURLFallback) openLocal(ctx context.Context, identifier string) (io l.debug(ctx, "local url file missing", identifier, "operation", "open", "prefix", mapping.Prefix, "path", path, "disk_path", mappingDiskPath(mapping, path)) if mapping.AuthProbe && l.AuthFallback != nil { l.debug(ctx, "local url using authenticated fallback after local miss", identifier, "operation", "open", "prefix", mapping.Prefix) - return l.openAuthFallback(ctx, identifier, false) + return l.openAuthFallback(ctx, identifier, mapping, false) } continue } @@ -246,15 +276,94 @@ func (l *LocalURLFallback) localMappings() []LocalURLMapping { return mappings } -func (l *LocalURLFallback) openAuthFallback(ctx context.Context, identifier string, metaOnly bool) (io.ReadSeekCloser, Meta, bool, error) { +func (l *LocalURLFallback) openAuthFallback(ctx context.Context, identifier string, mapping LocalURLMapping, metaOnly bool) (io.ReadSeekCloser, Meta, bool, error) { + headers := authHeadersFromContext(ctx) if metaOnly { - meta, err := l.AuthFallback.Meta(ctx, identifier) + meta, err := l.coalescedAuthFallbackMeta(ctx, authFallbackKey("meta", mapping.Prefix, identifier, headers), identifier) return nil, meta, true, err } - rc, meta, err := l.AuthFallback.Open(ctx, identifier) + rc, meta, err := l.coalescedAuthFallbackOpen(ctx, authFallbackKey("open", mapping.Prefix, identifier, headers), identifier) return rc, meta, true, err } +func (l *LocalURLFallback) coalescedAuthFallbackMeta(ctx context.Context, key, identifier string) (Meta, error) { + call, leader := l.beginAuthFallbackMeta(key) + if leader { + meta, err := l.AuthFallback.Meta(context.WithoutCancel(ctx), identifier) + l.finishAuthFallbackMeta(key, call, meta, err) + return meta, err + } + select { + case <-call.done: + return call.meta, call.err + case <-ctx.Done(): + return Meta{}, ctx.Err() + } +} + +func (l *LocalURLFallback) coalescedAuthFallbackOpen(ctx context.Context, key, identifier string) (io.ReadSeekCloser, Meta, error) { + call, leader := l.beginAuthFallbackOpen(key) + if leader { + path, meta, err := l.fetchAuthFallbackSource(context.WithoutCancel(ctx), identifier) + l.finishAuthFallbackOpen(key, call, meta, path, err) + return l.openAuthFallbackResult(call) + } + select { + case <-call.done: + return l.openAuthFallbackResult(call) + case <-ctx.Done(): + l.releaseAuthFallbackOpen(call) + return nil, Meta{}, ctx.Err() + } +} + +func (l *LocalURLFallback) fetchAuthFallbackSource(ctx context.Context, identifier string) (string, Meta, error) { + rc, meta, err := l.AuthFallback.Open(ctx, identifier) + if err != nil { + return "", Meta{}, err + } + defer rc.Close() + + tmp, err := os.CreateTemp("", "triplet-auth-fallback-source-*") + if err != nil { + return "", Meta{}, fmt.Errorf("auth fallback source temp file: %w", err) + } + tmpName := tmp.Name() + cleanup := func() { + _ = tmp.Close() + _ = os.Remove(tmpName) + } + n, err := io.Copy(tmp, rc) + if err != nil { + cleanup() + return "", Meta{}, fmt.Errorf("auth fallback source read: %w", err) + } + if meta.Size == 0 { + meta.Size = n + } + if err := tmp.Close(); err != nil { + _ = os.Remove(tmpName) + return "", Meta{}, fmt.Errorf("auth fallback source close: %w", err) + } + return tmpName, meta, nil +} + +func (l *LocalURLFallback) openAuthFallbackResult(call *authFallbackOpenCall) (io.ReadSeekCloser, Meta, error) { + if call.err != nil { + l.releaseAuthFallbackOpen(call) + return nil, Meta{}, call.err + } + f, err := os.Open(call.path) + if err != nil { + l.releaseAuthFallbackOpen(call) + return nil, Meta{}, fmt.Errorf("auth fallback source open: %w", err) + } + return &sharedAuthFallbackFile{ + File: f, + release: func() { l.releaseAuthFallbackOpen(call) }, + }, call.meta, nil +} + func (l *LocalURLFallback) stripLocalURLPrefix(identifier, prefix string) (string, bool) { if prefix == "" { return "", false @@ -374,6 +483,16 @@ func authCacheKey(tier authCacheTier, prefix, identifier string, headers http.He return string(tier) + "|" + hex.EncodeToString(prefixSum[:]) + ":" + hex.EncodeToString(sum.Sum(nil)) } +func authFallbackKey(operation, prefix, identifier string, headers http.Header) string { + tier := authCacheTierAnonymous + var scopedHeaders http.Header + if hasAuthHeaders(headers) { + tier = authCacheTierAuthenticated + scopedHeaders = headers + } + return "auth-fallback|" + operation + "|" + authCacheKey(tier, prefix, identifier, scopedHeaders) +} + func authCacheScope(key string) string { if i := strings.IndexByte(key, ':'); i >= 0 { return key[:i] @@ -504,6 +623,73 @@ func (l *LocalURLFallback) finishAuthProbe(key string, err error) { } } +func (l *LocalURLFallback) beginAuthFallbackMeta(key string) (*authFallbackMetaCall, bool) { + l.authMu.Lock() + defer l.authMu.Unlock() + if l.authFallbackMetaInflight == nil { + l.authFallbackMetaInflight = map[string]*authFallbackMetaCall{} + } + if call, ok := l.authFallbackMetaInflight[key]; ok { + return call, false + } + call := &authFallbackMetaCall{done: make(chan struct{})} + l.authFallbackMetaInflight[key] = call + return call, true +} + +func (l *LocalURLFallback) finishAuthFallbackMeta(key string, call *authFallbackMetaCall, meta Meta, err error) { + l.authMu.Lock() + defer l.authMu.Unlock() + call.meta = meta + call.err = err + delete(l.authFallbackMetaInflight, key) + close(call.done) +} + +func (l *LocalURLFallback) beginAuthFallbackOpen(key string) (*authFallbackOpenCall, bool) { + l.authMu.Lock() + defer l.authMu.Unlock() + if l.authFallbackOpenInflight == nil { + l.authFallbackOpenInflight = map[string]*authFallbackOpenCall{} + } + if call, ok := l.authFallbackOpenInflight[key]; ok { + call.refs++ + return call, false + } + call := &authFallbackOpenCall{done: make(chan struct{}), refs: 1} + l.authFallbackOpenInflight[key] = call + return call, true +} + +func (l *LocalURLFallback) finishAuthFallbackOpen(key string, call *authFallbackOpenCall, meta Meta, path string, err error) { + l.authMu.Lock() + call.meta = meta + call.path = path + call.err = err + call.doneClosed = true + delete(l.authFallbackOpenInflight, key) + removePath := call.refs == 0 && call.path != "" + l.authMu.Unlock() + + close(call.done) + if removePath { + _ = os.Remove(path) + } +} + +func (l *LocalURLFallback) releaseAuthFallbackOpen(call *authFallbackOpenCall) { + l.authMu.Lock() + if call.refs > 0 { + call.refs-- + } + removePath := call.doneClosed && call.refs == 0 && call.path != "" + path := call.path + l.authMu.Unlock() + if removePath { + _ = os.Remove(path) + } +} + func (l *LocalURLFallback) probe(ctx context.Context, identifier string, headers http.Header) (http.Header, error) { respHeader, err := l.probeRequest(ctx, http.MethodHead, identifier, headers) if err == nil || !errors.Is(err, errAuthProbeHeadUnsupported) { diff --git a/internal/storage/local_url_test.go b/internal/storage/local_url_test.go index 6e19794..2482143 100644 --- a/internal/storage/local_url_test.go +++ b/internal/storage/local_url_test.go @@ -3,11 +3,13 @@ package storage import ( "context" "errors" + "fmt" "io" "net/http" "net/http/httptest" "os" "path/filepath" + "strconv" "strings" "sync" "sync/atomic" @@ -171,6 +173,162 @@ func TestLocalURLFallbackUsesAuthenticatedHTTPFallbackForProtectedMiss(t *testin } } +func TestLocalURLFallbackCoalescesAuthenticatedHTTPMetadataFallback(t *testing.T) { + root := t.TempDir() + var heads atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodHead { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + if r.Header.Get("Cookie") != "SESS=abc" { + w.WriteHeader(http.StatusForbidden) + return + } + heads.Add(1) + time.Sleep(50 * time.Millisecond) + w.Header().Set("Content-Length", "6") + })) + defer srv.Close() + fileOp, err := NewFileOpener(root) + if err != nil { + t.Fatal(err) + } + authHTTP := NewHTTPOpener([]string{srv.URL}, 0, 0) + authHTTP.AllowPrivateHosts = true + authHTTP.ForwardAuthHeaders = true + op := &LocalURLFallback{ + Mappings: []LocalURLMapping{{ + Prefix: srv.URL + "/system/files", + File: fileOp, + AuthProbe: true, + }}, + Fallback: errOpener{}, + AuthFallback: authHTTP, + } + ctx := ContextWithAuthHeaders(context.Background(), http.Header{ + "Cookie": []string{"SESS=abc"}, + }) + + const callers = 8 + start := make(chan struct{}) + errCh := make(chan error, callers) + var wg sync.WaitGroup + for range callers { + wg.Add(1) + go func() { + defer wg.Done() + <-start + meta, err := op.Meta(ctx, srv.URL+"/system/files/missing.jp2") + if err != nil { + errCh <- err + return + } + if meta.Size != 6 { + errCh <- fmt.Errorf("size = %d, want 6", meta.Size) + } + }() + } + close(start) + wg.Wait() + close(errCh) + for err := range errCh { + t.Fatal(err) + } + if got := heads.Load(); got != 1 { + t.Fatalf("HEAD requests = %d, want 1", got) + } +} + +func TestLocalURLFallbackCoalescesAuthenticatedHTTPOpenFallback(t *testing.T) { + root := t.TempDir() + body := []byte("remote-payload") + var rangeGets atomic.Int32 + var fullGets atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + if r.Header.Get("Cookie") != "SESS=abc" { + w.WriteHeader(http.StatusForbidden) + return + } + w.Header().Set("Content-Type", "image/jp2") + w.Header().Set("Content-Length", strconv.Itoa(len(body))) + if r.Header.Get("Range") != "" { + rangeGets.Add(1) + w.WriteHeader(http.StatusOK) + _, _ = w.Write(body) + return + } + fullGets.Add(1) + time.Sleep(50 * time.Millisecond) + _, _ = w.Write(body) + })) + defer srv.Close() + fileOp, err := NewFileOpener(root) + if err != nil { + t.Fatal(err) + } + authHTTP := NewHTTPOpener([]string{srv.URL}, 0, 0) + authHTTP.AllowPrivateHosts = true + authHTTP.ForwardAuthHeaders = true + op := &LocalURLFallback{ + Mappings: []LocalURLMapping{{ + Prefix: srv.URL + "/system/files", + File: fileOp, + AuthProbe: true, + }}, + Fallback: errOpener{}, + AuthFallback: authHTTP, + } + ctx := ContextWithAuthHeaders(context.Background(), http.Header{ + "Cookie": []string{"SESS=abc"}, + }) + + const callers = 8 + start := make(chan struct{}) + errCh := make(chan error, callers) + var wg sync.WaitGroup + for range callers { + wg.Add(1) + go func() { + defer wg.Done() + <-start + rc, meta, err := op.Open(ctx, srv.URL+"/system/files/missing.jp2") + if err != nil { + errCh <- err + return + } + defer rc.Close() + got, err := io.ReadAll(rc) + if err != nil { + errCh <- err + return + } + if string(got) != string(body) { + errCh <- fmt.Errorf("body = %q, want %q", string(got), string(body)) + } + if meta.Size != int64(len(body)) { + errCh <- fmt.Errorf("size = %d, want %d", meta.Size, len(body)) + } + }() + } + close(start) + wg.Wait() + close(errCh) + for err := range errCh { + t.Fatal(err) + } + if got := rangeGets.Load(); got != 1 { + t.Fatalf("range probes = %d, want 1", got) + } + if got := fullGets.Load(); got != 1 { + t.Fatalf("full GET requests = %d, want 1", got) + } +} + func TestLocalURLFallbackSupportsMultipleRoots(t *testing.T) { systemRoot := t.TempDir() fedoraRoot := t.TempDir() From d0f31da77c3746725de887aa109f48ef711cdfb4 Mon Sep 17 00:00:00 2001 From: Joe Corall Date: Mon, 4 May 2026 11:19:29 +0000 Subject: [PATCH 2/3] Limit only active libvips work --- internal/iiif/image/v3/handler/handler.go | 52 ++++++++------------- internal/iiif/image/v3/pipeline/pipeline.go | 16 +++++++ internal/vips/limiter.go | 29 ++++++++++++ 3 files changed, 65 insertions(+), 32 deletions(-) create mode 100644 internal/vips/limiter.go diff --git a/internal/iiif/image/v3/handler/handler.go b/internal/iiif/image/v3/handler/handler.go index 5d060a4..b8d69f6 100644 --- a/internal/iiif/image/v3/handler/handler.go +++ b/internal/iiif/image/v3/handler/handler.go @@ -34,6 +34,7 @@ import ( "github.com/libops/triplet/internal/iiif/image/v3/types" "github.com/libops/triplet/internal/redact" "github.com/libops/triplet/internal/storage" + tvips "github.com/libops/triplet/internal/vips" ) // Handler serves the Image API 3.0 surface mounted at Prefix. @@ -53,7 +54,7 @@ type Handler struct { infoLimits types.Limits maxSourcePixels int64 maxSourceBytes int64 - vipsLimiter chan struct{} + vipsLimiter *tvips.Limiter logger *slog.Logger } @@ -76,9 +77,9 @@ func New(prefix, publicBaseURL string, src storage.Opener, pipe *pipeline.Pipeli if derivCache == nil { derivCache = cache.Noop{} } - var vipsLimiter chan struct{} - if maxConcurrentTransforms > 0 { - vipsLimiter = make(chan struct{}, maxConcurrentTransforms) + vipsLimiter := tvips.NewLimiter(maxConcurrentTransforms) + if pipe != nil { + pipe.SetLimiter(vipsLimiter) } return &Handler{ prefix: strings.TrimRight(prefix, "/"), @@ -140,6 +141,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *Handler) serveInfo(w http.ResponseWriter, r *http.Request, identifier string) { width, height, err := h.imageDimensions(r.Context(), identifier) if err != nil { + if errors.Is(err, pipeline.ErrBusy) { + writeError(w, http.StatusServiceUnavailable, "server busy") + return + } if errors.Is(err, storage.ErrNotFound) { writeError(w, http.StatusNotFound, "identifier not found") return @@ -238,14 +243,8 @@ func (h *Handler) serveImage(w http.ResponseWriter, r *http.Request, req parse.R w.Header().Set("Content-Type", contentType) w.Header().Set("X-Cache", "miss") - release, err := h.acquireVips(r.Context()) - if err != nil { - writeError(w, http.StatusServiceUnavailable, "server busy") - return - } tmp, err := os.CreateTemp("", "triplet-derivative-*") if err != nil { - release() h.logger.Error("create derivative temp file", "identifier", redact.Identifier(req.Identifier), "identifier_hash", redact.Hash(req.Identifier), "err", err) writeError(w, http.StatusInternalServerError, "failed to prepare response") return @@ -253,11 +252,12 @@ func (h *Handler) serveImage(w http.ResponseWriter, r *http.Request, req parse.R defer os.Remove(tmp.Name()) defer tmp.Close() - result, err := func() (pipeline.Result, error) { - defer release() - return h.pipeline.Transform(r.Context(), req, tmp) - }() + result, err := h.pipeline.Transform(r.Context(), req, tmp) if err != nil { + if errors.Is(err, pipeline.ErrBusy) { + writeError(w, http.StatusServiceUnavailable, "server busy") + return + } if errors.Is(err, storage.ErrNotFound) { writeError(w, http.StatusNotFound, "identifier not found") return @@ -313,12 +313,6 @@ func (h *Handler) serveImage(w http.ResponseWriter, r *http.Request, req parse.R } func (h *Handler) imageDimensions(ctx context.Context, identifier string) (int, int, error) { - release, err := h.acquireVips(ctx) - if err != nil { - return 0, 0, fmt.Errorf("acquire vips worker: %w", err) - } - defer release() - rc, meta, err := h.src.Open(ctx, identifier) if err != nil { return 0, 0, err @@ -364,6 +358,12 @@ func (h *Handler) imageDimensions(ctx context.Context, identifier string) (int, return 0, 0, fmt.Errorf("close source temp file: %w", err) } } + release, err := h.vipsLimiter.Acquire(ctx) + if err != nil { + return 0, 0, fmt.Errorf("%w: acquire vips worker: %w", pipeline.ErrBusy, err) + } + defer release() + params := gv.NewImportParams() params.Access.Set(gv.AccessSequential) img, err := gv.LoadImageFromFileDirect(path, params) @@ -577,18 +577,6 @@ func (h *Handler) derivativeInvalidationVersion(ctx context.Context, identifier return "" } -func (h *Handler) acquireVips(ctx context.Context) (func(), error) { - if h.vipsLimiter == nil { - return func() {}, nil - } - select { - case h.vipsLimiter <- struct{}{}: - return func() { <-h.vipsLimiter }, nil - case <-ctx.Done(): - return nil, ctx.Err() - } -} - func writeError(w http.ResponseWriter, status int, msg string) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) diff --git a/internal/iiif/image/v3/pipeline/pipeline.go b/internal/iiif/image/v3/pipeline/pipeline.go index 4ca57a6..bf11656 100644 --- a/internal/iiif/image/v3/pipeline/pipeline.go +++ b/internal/iiif/image/v3/pipeline/pipeline.go @@ -36,6 +36,10 @@ var ErrBadRequest = errors.New("pipeline: bad request") // Triplet's supported input image formats. var ErrUnsupportedSource = errors.New("pipeline: unsupported source image") +// ErrBusy marks requests whose context ended before a libvips worker was +// available. +var ErrBusy = errors.New("pipeline: server busy") + // Limits caps resource use per request. type Limits struct { // MaxOutputPixels rejects requests whose computed output exceeds this @@ -58,6 +62,7 @@ type Pipeline struct { src storage.Opener limits Limits options Options + limiter *tvips.Limiter } // Options controls optional performance/correctness tradeoffs. @@ -86,6 +91,11 @@ func New(src storage.Opener, limits Limits, opts ...Options) *Pipeline { return &Pipeline{src: src, limits: limits, options: options} } +// SetLimiter sets the shared libvips concurrency limiter used by Transform. +func (p *Pipeline) SetLimiter(limiter *tvips.Limiter) { + p.limiter = limiter +} + // Result describes a successfully encoded derivative. type Result struct { ContentType string @@ -111,6 +121,12 @@ func (p *Pipeline) Transform(ctx context.Context, req parse.Request, w io.Writer } defer source.Close() + release, err := p.limiter.Acquire(ctx) + if err != nil { + return Result{}, fmt.Errorf("%w: acquire vips worker: %w", ErrBusy, err) + } + defer release() + params := gv.NewImportParams() params.Access.Set(p.loadAccess(req)) img, err := gv.LoadImageFromFileDirect(source.Path, params) diff --git a/internal/vips/limiter.go b/internal/vips/limiter.go new file mode 100644 index 0000000..986bdd2 --- /dev/null +++ b/internal/vips/limiter.go @@ -0,0 +1,29 @@ +package vips + +import "context" + +// Limiter bounds concurrent libvips jobs. +type Limiter struct { + ch chan struct{} +} + +// NewLimiter constructs a limiter. Non-positive max values disable limiting. +func NewLimiter(max int) *Limiter { + if max <= 0 { + return nil + } + return &Limiter{ch: make(chan struct{}, max)} +} + +// Acquire waits for a libvips job slot and returns a release function. +func (l *Limiter) Acquire(ctx context.Context) (func(), error) { + if l == nil || l.ch == nil { + return func() {}, nil + } + select { + case l.ch <- struct{}{}: + return func() { <-l.ch }, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} From c6dd2a547aed0408708db5ea5038c6d58ad9be17 Mon Sep 17 00:00:00 2001 From: Joe Corall Date: Mon, 4 May 2026 11:43:07 +0000 Subject: [PATCH 3/3] Move cache cleanup to explicit command --- Dockerfile | 4 +- cmd/triplet-cache-cleanup/main.go | 110 +++++++++++++++++ cmd/triplet-cache-cleanup/main_test.go | 117 ++++++++++++++++++ config.example.yaml | 15 +-- docs/caching.md | 36 ++++-- internal/cache/file.go | 161 +++++++++---------------- internal/cache/file_test.go | 118 ++++++++++-------- internal/storage/file.go | 2 +- internal/storage/local_url.go | 4 +- scripts/build.sh | 4 + 10 files changed, 391 insertions(+), 180 deletions(-) create mode 100644 cmd/triplet-cache-cleanup/main.go create mode 100644 cmd/triplet-cache-cleanup/main_test.go diff --git a/Dockerfile b/Dockerfile index 8c0c112..65f101f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -140,7 +140,8 @@ COPY . . RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=cache,target=/root/.cache/go-build \ CGO_ENABLED=1 go build -trimpath -ldflags='-s -w' -o /out/triplet ./cmd/triplet \ - && CGO_ENABLED=0 go build -trimpath -ldflags='-s -w' -o /out/triplet-healthcheck ./cmd/triplet-healthcheck + && CGO_ENABLED=0 go build -trimpath -ldflags='-s -w' -o /out/triplet-healthcheck ./cmd/triplet-healthcheck \ + && CGO_ENABLED=0 go build -trimpath -ldflags='-s -w' -o /out/triplet-cache-cleanup ./cmd/triplet-cache-cleanup FROM base AS test-runner WORKDIR /app @@ -223,6 +224,7 @@ COPY --chown=triplet:triplet deploy/compose/images/ /var/lib/triplet/testdata/im COPY --from=build /out/triplet /usr/local/bin/triplet COPY --from=build /out/triplet-healthcheck /usr/local/bin/triplet-healthcheck +COPY --from=build /out/triplet-cache-cleanup /usr/local/bin/triplet-cache-cleanup COPY config.example.yaml /etc/triplet/config.yaml RUN ldd /usr/local/bin/triplet >/dev/null diff --git a/cmd/triplet-cache-cleanup/main.go b/cmd/triplet-cache-cleanup/main.go new file mode 100644 index 0000000..0fa061f --- /dev/null +++ b/cmd/triplet-cache-cleanup/main.go @@ -0,0 +1,110 @@ +// Command triplet-cache-cleanup performs explicit filesystem cache cleanup. +package main + +import ( + "context" + "flag" + "fmt" + "io" + "os" + + "github.com/libops/triplet/internal/cache" + "github.com/libops/triplet/internal/config" +) + +type namedReport struct { + name string + maxConfig string + report cache.CleanupReport +} + +func main() { + configPath := flag.String("config", "config.yaml", "path to the YAML config file") + timeout := flag.Duration("timeout", 0, "optional cleanup timeout") + flag.Parse() + + cfg, err := config.Load(*configPath) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "config: %v\n", err) + os.Exit(2) + } + + ctx := context.Background() + if *timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, *timeout) + defer cancel() + } + + reports, err := cleanupCaches(ctx, cfg) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "cache cleanup: %v\n", err) + os.Exit(1) + } + if len(reports) == 0 { + _, _ = fmt.Fprintln(os.Stdout, "no filesystem cache roots configured") + return + } + + overMax := false + for _, r := range reports { + printReport(os.Stdout, r) + if r.report.OverMaxBytes { + overMax = true + _, _ = fmt.Fprintf(os.Stderr, "%s cache remains over %s: bytes=%d max_bytes=%d\n", r.name, r.maxConfig, r.report.Bytes, r.report.MaxBytes) + } + } + if overMax { + os.Exit(1) + } +} + +func cleanupCaches(ctx context.Context, cfg *config.Config) ([]namedReport, error) { + var reports []namedReport + if cfg.Cache.Root != "" { + store, err := cache.NewPayloadFileStoreWithMaxAge(cfg.Cache.Root, int64(cfg.Cache.MaxBytes), cfg.Cache.MaxAge) + if err != nil { + return nil, fmt.Errorf("derivative cache: %w", err) + } + report, err := store.Cleanup(ctx) + if err != nil { + return nil, fmt.Errorf("derivative cache: %w", err) + } + reports = append(reports, namedReport{ + name: "derivative", + maxConfig: "cache.max_bytes", + report: report, + }) + } + if cfg.Cache.SourceRoot != "" { + store, err := cache.NewFileStore(cfg.Cache.SourceRoot, int64(cfg.Cache.SourceMaxBytes)) + if err != nil { + return nil, fmt.Errorf("source cache: %w", err) + } + report, err := store.Cleanup(ctx) + if err != nil { + return nil, fmt.Errorf("source cache: %w", err) + } + reports = append(reports, namedReport{ + name: "source", + maxConfig: "cache.source_max_bytes", + report: report, + }) + } + return reports, nil +} + +func printReport(out io.Writer, r namedReport) { + _, _ = fmt.Fprintf(out, + "%s cache root=%s scanned=%d removed=%d expired_removed=%d removed_bytes=%d bytes=%d max_bytes=%d over_max=%t\n", + r.name, + r.report.Root, + r.report.Scanned, + r.report.Removed, + r.report.ExpiredRemoved, + r.report.RemovedBytes, + r.report.Bytes, + r.report.MaxBytes, + r.report.OverMaxBytes, + ) +} diff --git a/cmd/triplet-cache-cleanup/main_test.go b/cmd/triplet-cache-cleanup/main_test.go new file mode 100644 index 0000000..bdb88e9 --- /dev/null +++ b/cmd/triplet-cache-cleanup/main_test.go @@ -0,0 +1,117 @@ +package main + +import ( + "context" + "errors" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/libops/triplet/internal/cache" + "github.com/libops/triplet/internal/config" +) + +func TestCleanupCachesRemovesExpiredDerivativeAndReportsOversize(t *testing.T) { + derivRoot := t.TempDir() + sourceRoot := t.TempDir() + + derivStore, err := cache.NewPayloadFileStoreWithMaxAge(derivRoot, 0, time.Hour) + if err != nil { + t.Fatal(err) + } + if err := derivStore.Put(context.Background(), "old", "image/jpeg", strings.NewReader("old")); err != nil { + t.Fatal(err) + } + oldFiles := payloadFiles(t, derivRoot) + if len(oldFiles) != 1 { + t.Fatalf("payload files after old put = %d, want 1", len(oldFiles)) + } + oldTime := time.Now().Add(-2 * time.Hour) + if err := os.Chtimes(oldFiles[0], oldTime, oldTime); err != nil { + t.Fatal(err) + } + if err := derivStore.Put(context.Background(), "new", "image/jpeg", strings.NewReader("new")); err != nil { + t.Fatal(err) + } + + sourceStore, err := cache.NewFileStore(sourceRoot, 0) + if err != nil { + t.Fatal(err) + } + if err := sourceStore.Put(context.Background(), "source", "image/tiff", strings.NewReader("source")); err != nil { + t.Fatal(err) + } + + reports, err := cleanupCaches(context.Background(), &config.Config{ + Cache: config.Cache{ + Root: derivRoot, + MaxAge: time.Hour, + SourceRoot: sourceRoot, + SourceMaxBytes: 1, + }, + }) + if err != nil { + t.Fatal(err) + } + + derivReport := reportByName(t, reports, "derivative") + if derivReport.ExpiredRemoved != 1 { + t.Fatalf("expired removed = %d, want 1", derivReport.ExpiredRemoved) + } + if derivReport.Removed != 1 { + t.Fatalf("derivative removed = %d, want 1", derivReport.Removed) + } + if got := len(payloadFiles(t, derivRoot)); got != 1 { + t.Fatalf("derivative payload files = %d, want 1", got) + } + + sourceReport := reportByName(t, reports, "source") + if !sourceReport.OverMaxBytes { + t.Fatal("expected source cache to report over max bytes") + } + if sourceReport.Bytes != int64(len("source")) { + t.Fatalf("source bytes = %d, want %d", sourceReport.Bytes, len("source")) + } +} + +func TestCleanupCachesSkipsUnconfiguredRoots(t *testing.T) { + reports, err := cleanupCaches(context.Background(), &config.Config{}) + if err != nil { + t.Fatal(err) + } + if len(reports) != 0 { + t.Fatalf("reports = %d, want 0", len(reports)) + } +} + +func reportByName(t *testing.T, reports []namedReport, name string) cache.CleanupReport { + t.Helper() + for _, report := range reports { + if report.name == name { + return report.report + } + } + t.Fatalf("missing %s report", name) + return cache.CleanupReport{} +} + +func payloadFiles(t *testing.T, root string) []string { + t.Helper() + var out []string + err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() || filepath.Ext(path) == ".meta" || strings.HasPrefix(d.Name(), ".tmp-") { + return nil + } + out = append(out, path) + return nil + }) + if err != nil && !errors.Is(err, os.ErrNotExist) { + t.Fatal(err) + } + return out +} diff --git a/config.example.yaml b/config.example.yaml index 4c8661c..521a34e 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -185,18 +185,19 @@ cache: root: /var/lib/triplet/cache # Best-effort aggregate size target for all cached derivative payload files # under cache.root. This controls retained cache footprint over time, not the - # size of any single generated response. A write may temporarily exceed this - # target before eviction runs, and metadata sidecar files are not counted. - # 0 disables size-based eviction. + # size of any single generated response. The server does not prune for size + # in the request path; run triplet-cache-cleanup periodically to report when + # this target is exceeded. Metadata sidecar files are not counted. 0 disables + # size reporting. max_bytes: 500GiB - # Optional age limit for derivative entries. Expired entries are removed on - # read and opportunistically during writes. 0 disables age-based eviction. + # Optional age limit for derivative entries. Expired entries miss on read and + # are removed by triplet-cache-cleanup. 0 disables age-based cleanup. max_age: 720h # Optional filesystem source cache for fetched source bytes (primarily HTTP # identifiers). # source_root: /var/lib/triplet/source-cache - # Best-effort eviction target for the source cache. 0 disables size-based - # eviction. + # Best-effort reporting target for the source cache. The cleanup command + # reports when this target is exceeded. 0 disables size reporting. source_max_bytes: 1GiB # When non-zero, stale source-cache hits are served immediately while a # background refresh fetches a fresh copy for later requests. diff --git a/docs/caching.md b/docs/caching.md index 02e13a2..939eefc 100644 --- a/docs/caching.md +++ b/docs/caching.md @@ -19,25 +19,35 @@ cache: max_age: 720h ``` -`max_bytes` is a best-effort filesystem eviction target. `max_age` is an -optional age limit for derivative entries. Eviction after writes runs in the -background, so a cache miss response is not delayed by walking a large cache -tree. Failed transforms and HTTP error responses are not stored. +`max_bytes` is a best-effort filesystem size target. `max_age` is an optional +age limit for derivative entries. Failed transforms and HTTP error responses +are not stored. `cache.max_bytes` is the approximate total retained size of derivative payload files under `cache.root`. It is different from `iiif.image.max_derivative_bytes`, which limits one generated response before it -can be returned or cached. A cache write can temporarily exceed -`cache.max_bytes` before asynchronous eviction catches up. When size eviction -runs, Triplet removes the oldest derivative payload files first based on payload -file modification time; reads do not refresh cache age. +can be returned or cached. Cache writes do not walk or prune the cache tree in +the server request path. `cache.max_age` is based on the derivative payload file modification time, not when it was last requested. When a cached derivative is older than `max_age`, -Triplet removes it and treats the request as a cache miss. Expired entries are -also removed opportunistically when new entries are written. Set `max_age: 0` -or omit it to keep derivative files until size eviction, manual deletion, -invalidation, or cache-key changes make them unused. +Triplet treats the request as a cache miss. Set `max_age: 0` or omit it to keep +derivative files until manual deletion, invalidation, or cache-key changes make +them unused. + +Run `triplet-cache-cleanup` periodically from cron, systemd timers, Kubernetes +CronJobs, or a similar scheduler: + +```sh +triplet-cache-cleanup -config /etc/triplet/config.yaml +``` + +The cleanup command reads the same YAML configuration as the server. It removes +derivative cache entries older than `cache.max_age`, then measures the remaining +derivative cache size. It also measures the source cache when `cache.source_root` +is configured. If a cache remains above `cache.max_bytes` or +`cache.source_max_bytes`, the command reports that condition and exits non-zero; +it does not delete live entries solely to satisfy a size target. ### Derivative invalidation @@ -154,7 +164,7 @@ derivative and source caches. | Layer | Configuration | What is cached | Invalidation / freshness | |---|---|---|---| -| Derivative cache | `cache.root`; optional `cache.max_bytes`, `cache.max_age`, `iiif.image.cache_invalidation_token` | Encoded IIIF image responses, keyed by identifier, source version, invalidation marker, region, size, rotation, quality, and format. | A changed source version produces a new key. The protected invalidation route bumps the per-identifier invalidation marker. `cache.max_bytes` is a best-effort aggregate cache budget; `cache.max_age` removes derivative entries older than the configured duration. `iiif.image.max_derivative_bytes` is the per-response size limit before return/cache. Failed transforms and HTTP error responses are not stored. | +| Derivative cache | `cache.root`; optional `cache.max_bytes`, `cache.max_age`, `iiif.image.cache_invalidation_token` | Encoded IIIF image responses, keyed by identifier, source version, invalidation marker, region, size, rotation, quality, and format. | A changed source version produces a new key. The protected invalidation route bumps the per-identifier invalidation marker. `cache.max_bytes` is a best-effort aggregate cache budget reported by `triplet-cache-cleanup`; `cache.max_age` is enforced on reads and by `triplet-cache-cleanup`. `iiif.image.max_derivative_bytes` is the per-response size limit before return/cache. Failed transforms and HTTP error responses are not stored. | | HTTP source cache | `cache.source_root`; optional `cache.source_max_bytes`, `cache.source_stale_after` | Original source bytes fetched through the HTTP source backend. | Keys are source identifiers. When `source_stale_after` is set, stale hits are served immediately and refreshed in the background. Upstream 4xx/5xx responses are not stored. | | HTTP metadata cache | `sources.http.metadata_cache_ttl` | Successful remote source metadata lookups for URL identifiers. | In-memory only. While fresh, derivative cache checks can avoid upstream metadata requests. This can serve stale derivatives until the TTL expires. | | `info.json` dimension cache | `iiif.image.info_dimension_cache` | Source dimensions used to build Image API `info.json`. | In-memory only. Entries are keyed by identifier plus source size/modtime metadata, so source changes with updated metadata miss the cache. | diff --git a/internal/cache/file.go b/internal/cache/file.go index ff222c8..d442681 100644 --- a/internal/cache/file.go +++ b/internal/cache/file.go @@ -11,7 +11,6 @@ import ( "io/fs" "os" "path/filepath" - "sort" "strings" "sync" "time" @@ -26,19 +25,15 @@ type FileStore struct { storeContentType bool - // MaxBytes optionally bounds total cache size; when exceeded, the - // oldest payload files are evicted asynchronously after Put based on mtime. + // MaxBytes optionally reports when total cache size exceeds this value + // during explicit cleanup. It does not trigger request-path eviction. MaxBytes int64 // MaxAge optionally bounds how long entries remain usable after Put. - // Expired entries are removed on Get and opportunistically on Put. + // Expired entries miss on Get and are removed during explicit cleanup. MaxAge time.Duration - mu sync.Mutex - evictMu sync.Mutex - evicting bool - evictAgain bool - evictDone chan struct{} + mu sync.Mutex } const tempFilePrefix = ".tmp-" @@ -55,7 +50,7 @@ func NewFileStore(root string, maxBytes int64) (*FileStore, error) { return &FileStore{Root: abs, storeContentType: true, MaxBytes: maxBytes}, nil } -// NewFileStoreWithMaxAge constructs a FileStore with size and age eviction. +// NewFileStoreWithMaxAge constructs a FileStore with cleanup settings. func NewFileStoreWithMaxAge(root string, maxBytes int64, maxAge time.Duration) (*FileStore, error) { store, err := NewFileStore(root, maxBytes) if err != nil { @@ -82,12 +77,24 @@ type fileMeta struct { ContentType string `json:"content_type"` } +// CleanupReport summarizes one explicit cleanup pass. +type CleanupReport struct { + Root string + Scanned int + Removed int + RemovedBytes int64 + Bytes int64 + MaxBytes int64 + OverMaxBytes bool + ExpiredRemoved int +} + // Get implements Store. func (s *FileStore) Get(_ context.Context, key string) (io.ReadCloser, Entry, error) { dataPath, metaPath := s.paths(key) contentType := "" if s.storeContentType { - mb, err := os.ReadFile(metaPath) + mb, err := os.ReadFile(metaPath) // #nosec G304 -- metaPath is derived from a SHA-256 cache key under Root. if err != nil { if errors.Is(err, fs.ErrNotExist) { return nil, Entry{}, ErrMiss @@ -100,7 +107,7 @@ func (s *FileStore) Get(_ context.Context, key string) (io.ReadCloser, Entry, er } contentType = m.ContentType } - f, err := os.Open(dataPath) + f, err := os.Open(dataPath) // #nosec G304 -- dataPath is derived from a SHA-256 cache key under Root. if err != nil { if errors.Is(err, fs.ErrNotExist) { return nil, Entry{}, ErrMiss @@ -115,10 +122,6 @@ func (s *FileStore) Get(_ context.Context, key string) (io.ReadCloser, Entry, er storedAt := info.ModTime() if s.expired(storedAt, time.Now()) { _ = f.Close() - _ = os.Remove(dataPath) - if s.storeContentType { - _ = os.Remove(metaPath) - } return nil, Entry{}, ErrMiss } return f, Entry{ @@ -157,9 +160,6 @@ func (s *FileStore) Put(_ context.Context, key, contentType string, value io.Rea _ = os.Remove(tmpName) return err } - if s.MaxAge > 0 || s.MaxBytes > 0 { - s.scheduleEvict() - } return nil } @@ -199,63 +199,29 @@ func (s *FileStore) paths(key string) (data, meta string) { return base, base + ".meta" } -func (s *FileStore) scheduleEvict() { - s.evictMu.Lock() - if s.evicting { - s.evictAgain = true - s.evictMu.Unlock() - return - } - s.evicting = true - s.evictAgain = false - s.evictDone = make(chan struct{}) - s.evictMu.Unlock() - - go s.runEvict() -} - -func (s *FileStore) runEvict() { - for { - s.evict() - - s.evictMu.Lock() - if !s.evictAgain { - s.evicting = false - close(s.evictDone) - s.evictMu.Unlock() - return - } - s.evictAgain = false - s.evictMu.Unlock() - } -} - -func (s *FileStore) waitForEviction() { - for { - s.evictMu.Lock() - if !s.evicting { - s.evictMu.Unlock() - return - } - done := s.evictDone - s.evictMu.Unlock() - <-done - } -} - -func (s *FileStore) evict() { +// Cleanup removes expired cache payloads, then reports whether remaining bytes +// exceed MaxBytes. It does not delete live entries for size. +func (s *FileStore) Cleanup(ctx context.Context) (CleanupReport, error) { s.mu.Lock() defer s.mu.Unlock() - var total int64 - type entry struct { - path string - size int64 - modTime time.Time + report := CleanupReport{ + Root: s.Root, + MaxBytes: s.MaxBytes, + } + root, err := os.OpenRoot(s.Root) + if err != nil { + return report, err } - var entries []entry + defer root.Close() now := time.Now() - _ = filepath.WalkDir(s.Root, func(p string, d fs.DirEntry, err error) error { - if err != nil || d.IsDir() { + err = fs.WalkDir(root.FS(), ".", func(p string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } + if d.IsDir() { return nil } if filepath.Ext(p) == ".meta" || strings.HasPrefix(d.Name(), tempFilePrefix) { @@ -265,46 +231,29 @@ func (s *FileStore) evict() { if err != nil { return nil } - metaPath := p + ".meta" - if s.storeContentType { - if _, err := os.Stat(metaPath); err != nil { - if errors.Is(err, fs.ErrNotExist) { - _ = os.Remove(p) - return nil - } - return nil - } - } - if s.expired(info.ModTime(), now) { - _ = os.Remove(p) - if s.storeContentType { - _ = os.Remove(metaPath) - } + if !info.Mode().IsRegular() { return nil } - entries = append(entries, entry{path: p, size: info.Size(), modTime: info.ModTime()}) - total += info.Size() - return nil - }) - if s.MaxBytes <= 0 || total <= s.MaxBytes { - return - } - sort.Slice(entries, func(i, j int) bool { - if entries[i].modTime.Equal(entries[j].modTime) { - return entries[i].path < entries[j].path - } - return entries[i].modTime.Before(entries[j].modTime) - }) - for _, e := range entries { - if total <= s.MaxBytes { - return + report.Scanned++ + metaPath := p + ".meta" + if !s.expired(info.ModTime(), now) { + report.Bytes += info.Size() + return nil } - _ = os.Remove(e.path) + _ = root.Remove(p) if s.storeContentType { - _ = os.Remove(e.path + ".meta") + _ = root.Remove(metaPath) } - total -= e.size + report.Removed++ + report.ExpiredRemoved++ + report.RemovedBytes += info.Size() + return nil + }) + if err != nil { + return report, err } + report.OverMaxBytes = s.MaxBytes > 0 && report.Bytes > s.MaxBytes + return report, nil } func (s *FileStore) expired(storedAt, now time.Time) bool { diff --git a/internal/cache/file_test.go b/internal/cache/file_test.go index 3ccec2c..c74d4f9 100644 --- a/internal/cache/file_test.go +++ b/internal/cache/file_test.go @@ -120,7 +120,7 @@ func TestFileStoreMissDoesNotCreateKeyDirectory(t *testing.T) { } } -func TestFileStoreEvictSkipsInFlightTempFiles(t *testing.T) { +func TestFileStoreCleanupSkipsInFlightTempFiles(t *testing.T) { store, err := NewFileStore(t.TempDir(), 1) if err != nil { t.Fatal(err) @@ -143,43 +143,48 @@ func TestFileStoreEvictSkipsInFlightTempFiles(t *testing.T) { t.Fatal(err) } - if err := store.Put(context.Background(), "b", "text/plain", strings.NewReader("b")); err != nil { + if _, err := store.Cleanup(context.Background()); err != nil { t.Fatal(err) } - store.waitForEviction() if _, err := os.Stat(tmpPath); err != nil { t.Fatalf("tmp stat err = %v, want exists", err) } } -func TestPayloadFileStorePutDoesNotWaitForEviction(t *testing.T) { - store, err := NewPayloadFileStoreWithMaxAge(t.TempDir(), 1, 0) +func TestFileStoreCleanupKeepsPayloadMissingMeta(t *testing.T) { + store, err := NewFileStore(t.TempDir(), 1) if err != nil { t.Fatal(err) } + dataPath, _ := store.paths("partial") + if err := os.MkdirAll(filepath.Dir(dataPath), 0o750); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(dataPath, []byte("partial"), 0o600); err != nil { + t.Fatal(err) + } - store.mu.Lock() - done := make(chan error, 1) - go func() { - done <- store.Put(context.Background(), "a", "text/plain", strings.NewReader("payload")) - }() - - select { - case err := <-done: - if err != nil { - t.Fatal(err) - } - case <-time.After(time.Second): - store.mu.Unlock() - t.Fatal("Put waited for eviction") + report, err := store.Cleanup(context.Background()) + if err != nil { + t.Fatal(err) } - store.mu.Unlock() - store.waitForEviction() + if report.Removed != 0 { + t.Fatalf("removed = %d, want 0", report.Removed) + } + if report.Bytes != int64(len("partial")) { + t.Fatalf("bytes = %d, want %d", report.Bytes, len("partial")) + } + if !report.OverMaxBytes { + t.Fatal("expected over max bytes") + } + if _, err := os.Stat(dataPath); err != nil { + t.Fatalf("data stat err = %v, want exists", err) + } } -func TestFileStoreConcurrentPutsWithEviction(t *testing.T) { +func TestFileStoreConcurrentPuts(t *testing.T) { t.Parallel() store, err := NewFileStore(t.TempDir(), 1) @@ -209,14 +214,13 @@ func TestFileStoreConcurrentPutsWithEviction(t *testing.T) { wg.Wait() close(errCh) - store.waitForEviction() for err := range errCh { t.Fatal(err) } } -func TestFileStoreEvictsWhenOversize(t *testing.T) { +func TestFileStorePutDoesNotEvictWhenOversize(t *testing.T) { store, err := NewFileStore(t.TempDir(), 5) if err != nil { t.Fatal(err) @@ -229,10 +233,11 @@ func TestFileStoreEvictsWhenOversize(t *testing.T) { if err := store.Put(context.Background(), "b", "text/plain", bytes.NewReader([]byte("5678"))); err != nil { t.Fatal(err) } - store.waitForEviction() - if _, _, err := store.Get(context.Background(), "a"); !errors.Is(err, ErrMiss) { - t.Fatalf("a err = %v, want miss", err) + if rc, _, err := store.Get(context.Background(), "a"); err != nil { + t.Fatalf("a err = %v", err) + } else { + _ = rc.Close() } if rc, _, err := store.Get(context.Background(), "b"); err != nil { t.Fatalf("b err = %v", err) @@ -241,7 +246,7 @@ func TestFileStoreEvictsWhenOversize(t *testing.T) { } } -func TestFileStoreGetDoesNotRefreshEvictionOrder(t *testing.T) { +func TestFileStoreCleanupReportsOversize(t *testing.T) { store, err := NewFileStore(t.TempDir(), 8) if err != nil { t.Fatal(err) @@ -265,24 +270,23 @@ func TestFileStoreGetDoesNotRefreshEvictionOrder(t *testing.T) { if err := store.Put(context.Background(), "c", "text/plain", bytes.NewReader([]byte("90"))); err != nil { t.Fatal(err) } - store.waitForEviction() + report, err := store.Cleanup(context.Background()) + if err != nil { + t.Fatal(err) + } - if _, _, err := store.Get(context.Background(), "a"); !errors.Is(err, ErrMiss) { - t.Fatalf("a err = %v, want miss", err) + if !report.OverMaxBytes { + t.Fatal("expected over max bytes") } - if rc, _, err := store.Get(context.Background(), "b"); err != nil { - t.Fatalf("b err = %v", err) - } else { - _ = rc.Close() + if report.Bytes != 10 { + t.Fatalf("bytes = %d, want 10", report.Bytes) } - if rc, _, err := store.Get(context.Background(), "c"); err != nil { - t.Fatalf("c err = %v", err) - } else { - _ = rc.Close() + if report.Removed != 0 { + t.Fatalf("removed = %d, want 0", report.Removed) } } -func TestFileStoreEvictsByModTime(t *testing.T) { +func TestFileStoreCleanupDoesNotDeleteOldestForSize(t *testing.T) { store, err := NewFileStore(t.TempDir(), 8) if err != nil { t.Fatal(err) @@ -309,10 +313,18 @@ func TestFileStoreEvictsByModTime(t *testing.T) { if err := store.Put(context.Background(), "c", "text/plain", bytes.NewReader([]byte("90"))); err != nil { t.Fatal(err) } - store.waitForEviction() + report, err := store.Cleanup(context.Background()) + if err != nil { + t.Fatal(err) + } - if _, _, err := store.Get(context.Background(), "a"); !errors.Is(err, ErrMiss) { - t.Fatalf("a err = %v, want miss", err) + if !report.OverMaxBytes { + t.Fatal("expected over max bytes") + } + if rc, _, err := store.Get(context.Background(), "a"); err != nil { + t.Fatalf("a err = %v", err) + } else { + _ = rc.Close() } if rc, _, err := store.Get(context.Background(), "b"); err != nil { t.Fatalf("b err = %v", err) @@ -326,7 +338,7 @@ func TestFileStoreEvictsByModTime(t *testing.T) { } } -func TestFileStoreMaxAgeExpiresOnGet(t *testing.T) { +func TestFileStoreMaxAgeMissesOnGet(t *testing.T) { store, err := NewFileStoreWithMaxAge(t.TempDir(), 0, time.Hour) if err != nil { t.Fatal(err) @@ -346,15 +358,15 @@ func TestFileStoreMaxAgeExpiresOnGet(t *testing.T) { t.Fatalf("err = %v, want miss", err) } - if _, err := os.Stat(dataPath); !errors.Is(err, os.ErrNotExist) { - t.Fatalf("data stat err = %v, want not exist", err) + if _, err := os.Stat(dataPath); err != nil { + t.Fatalf("data stat err = %v, want exists until cleanup", err) } - if _, err := os.Stat(metaPath); !errors.Is(err, os.ErrNotExist) { - t.Fatalf("meta stat err = %v, want not exist", err) + if _, err := os.Stat(metaPath); err != nil { + t.Fatalf("meta stat err = %v, want exists until cleanup", err) } } -func TestFileStoreMaxAgeEvictsOnPut(t *testing.T) { +func TestFileStoreMaxAgeCleanup(t *testing.T) { store, err := NewFileStoreWithMaxAge(t.TempDir(), 0, time.Hour) if err != nil { t.Fatal(err) @@ -371,7 +383,13 @@ func TestFileStoreMaxAgeEvictsOnPut(t *testing.T) { if err := store.Put(context.Background(), "new", "text/plain", strings.NewReader("new")); err != nil { t.Fatal(err) } - store.waitForEviction() + report, err := store.Cleanup(context.Background()) + if err != nil { + t.Fatal(err) + } + if report.ExpiredRemoved != 1 { + t.Fatalf("expired removed = %d, want 1", report.ExpiredRemoved) + } if _, _, err := store.Get(context.Background(), "old"); !errors.Is(err, ErrMiss) { t.Fatalf("old err = %v, want miss", err) } diff --git a/internal/storage/file.go b/internal/storage/file.go index b252d54..65df3a5 100644 --- a/internal/storage/file.go +++ b/internal/storage/file.go @@ -46,7 +46,7 @@ func (f *FileOpener) Open(_ context.Context, identifier string) (io.ReadSeekClos if err != nil { return nil, Meta{}, err } - file, err := os.Open(realPath) + file, err := os.Open(realPath) // #nosec G304 -- realPath is resolved and checked under the configured source root. if err != nil { if errors.Is(err, fs.ErrNotExist) { return nil, Meta{}, fmt.Errorf("%w: %s", ErrNotFound, identifier) diff --git a/internal/storage/local_url.go b/internal/storage/local_url.go index 02564a7..0e15c19 100644 --- a/internal/storage/local_url.go +++ b/internal/storage/local_url.go @@ -217,7 +217,7 @@ func (l *LocalURLFallback) openLocal(ctx context.Context, identifier string) (io l.debug(ctx, "local url auth denied local file", identifier, "operation", "open", "prefix", mapping.Prefix, "err", authErr) return nil, Meta{}, false, authErr } - rc, err := os.Open(diskPath) + rc, err := os.Open(diskPath) // #nosec G304 -- diskPath is returned by ocflMeta after object-root validation. if err != nil { if errors.Is(err, fs.ErrNotExist) { l.debug(ctx, "local url ocfl content missing", identifier, "operation", "open", "prefix", mapping.Prefix, "path", path, "disk_path", diskPath) @@ -786,7 +786,7 @@ func (l *LocalURLFallback) ocflDiskPath(mapping LocalURLMapping, path string) (s } ocflDir := ocflDir(mapping.File.Root, "info:fedora/"+path) inventoryPath := filepath.Join(ocflDir, "extensions", "0005-mutable-head", "head", "inventory.json") - body, err := os.ReadFile(inventoryPath) + body, err := os.ReadFile(inventoryPath) // #nosec G304 -- inventoryPath is derived from the configured OCFL root and object hash. if err != nil { if errors.Is(err, fs.ErrNotExist) { return "", fmt.Errorf("%w: ocfl inventory", ErrNotFound) diff --git a/scripts/build.sh b/scripts/build.sh index d560b15..41ae380 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -4,8 +4,10 @@ set -euo pipefail IMAGE_TAG="${TRIPLET_BUILD_IMAGE:-triplet-build:dev}" BIN_PATH="${BIN_PATH:-bin/triplet}" +CACHE_CLEANUP_BIN_PATH="${CACHE_CLEANUP_BIN_PATH:-$(dirname "$BIN_PATH")/triplet-cache-cleanup}" mkdir -p "$(dirname "$BIN_PATH")" +mkdir -p "$(dirname "$CACHE_CLEANUP_BIN_PATH")" echo "Building builder image: $IMAGE_TAG" docker build --target build -t "$IMAGE_TAG" . @@ -17,4 +19,6 @@ cleanup() { trap cleanup EXIT docker cp "$CID:/out/triplet" "$BIN_PATH" +docker cp "$CID:/out/triplet-cache-cleanup" "$CACHE_CLEANUP_BIN_PATH" echo "Wrote $BIN_PATH" +echo "Wrote $CACHE_CLEANUP_BIN_PATH"