diff --git a/docs/spec/v1/imagerepositories.md b/docs/spec/v1/imagerepositories.md index d520374a..9098b289 100644 --- a/docs/spec/v1/imagerepositories.md +++ b/docs/spec/v1/imagerepositories.md @@ -92,6 +92,27 @@ Events: Normal Succeeded 17s image-reflector-controller successful scan, found 211 tags ``` +## Controller storage + +The controller stores scanned tags in a rebuildable on-disk cache at +`--storage-path` (`/data` by default). BadgerDB is the default backend, and +`--storage-value-log-file-size` applies only to that backend. + +When the `FluxStorage` feature gate is enabled, the controller uses a filesystem +backend based on `github.com/fluxcd/pkg/artifact/storage`. Tags are stored per +`ImageRepository` object, keyed by `/` instead of canonical +image name, so repositories with different credentials do not share cached tag +sets. The file layout is: + +```text +imagerepository///tags.txt +imagerepository///tags.txt.gz +``` + +Each file contains one OCI tag per line. Files are compressed when the +uncompressed content is at least `--storage-compression-threshold` KiB +(default `64`). Switching the `FluxStorage` gate on or off wipes the tag cache. + ## Writing an ImageRepository spec As with all other Kubernetes config, an ImageRepository needs `apiVersion`, diff --git a/go.mod b/go.mod index 3f5351ed..4d05950f 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/fluxcd/pkg/apis/acl v0.10.0 github.com/fluxcd/pkg/apis/event v0.27.0 github.com/fluxcd/pkg/apis/meta v1.30.0 + github.com/fluxcd/pkg/artifact v0.18.0 github.com/fluxcd/pkg/auth v0.54.0 github.com/fluxcd/pkg/cache v0.14.0 github.com/fluxcd/pkg/runtime v0.110.0 @@ -27,7 +28,6 @@ require ( k8s.io/api v0.36.1 k8s.io/apimachinery v0.36.1 k8s.io/client-go v0.36.1 - k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2 sigs.k8s.io/controller-runtime v0.24.1 sigs.k8s.io/yaml v1.6.0 ) @@ -66,10 +66,11 @@ require ( github.com/blang/semver/v4 v4.0.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chai2010/gettext-go v1.0.3 // indirect + github.com/cyphar/filepath-securejoin v0.6.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect github.com/docker/cli v29.4.3+incompatible // indirect - github.com/docker/docker-credential-helpers v0.9.3 // indirect + github.com/docker/docker-credential-helpers v0.9.5 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/emicklei/go-restful/v3 v3.13.0 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect @@ -77,6 +78,10 @@ require ( github.com/fatih/color v1.18.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fluxcd/cli-utils v1.2.1 // indirect + github.com/fluxcd/pkg/lockedfile v0.8.0 // indirect + github.com/fluxcd/pkg/oci v0.68.0 // indirect + github.com/fluxcd/pkg/sourceignore v0.18.0 // indirect + github.com/fluxcd/pkg/tar v1.2.0 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-errors/errors v1.5.1 // indirect @@ -100,6 +105,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.18.6 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/mailru/easyjson v0.9.0 // indirect @@ -110,7 +116,8 @@ require ( github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nxadm/tail v1.4.11 // indirect - github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be // indirect + github.com/opencontainers/go-digest/blake3 v0.0.0-20250813155314-89707e38ad1a // indirect github.com/opencontainers/image-spec v1.1.1 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect @@ -119,12 +126,13 @@ require ( github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.5 // indirect - github.com/prometheus/procfs v0.19.2 // indirect + github.com/prometheus/procfs v0.20.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sirupsen/logrus v1.9.4 // indirect github.com/spf13/cobra v1.10.2 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xlab/treeprint v1.2.0 // indirect + github.com/zeebo/blake3 v0.2.3 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 // indirect go.opentelemetry.io/otel v1.43.0 // indirect @@ -156,6 +164,7 @@ require ( k8s.io/klog/v2 v2.140.0 // indirect k8s.io/kube-openapi v0.0.0-20260317180543-43fb72c5454a // indirect k8s.io/kubectl v0.36.1 // indirect + k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2 // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect sigs.k8s.io/kustomize/api v0.21.1 // indirect sigs.k8s.io/kustomize/kyaml v0.21.1 // indirect diff --git a/go.sum b/go.sum index b97809e0..7231099e 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= +github.com/bshuster-repo/logrus-logstash-hook v1.1.0 h1:o2FzZifLg+z/DN1OFmzTWzZZx/roaqt8IPZCIVco8r4= +github.com/bshuster-repo/logrus-logstash-hook v1.1.0/go.mod h1:Q2aXOe7rNuPgbBtPCOzYyWDvKX7+FpxE5sRdvcPoui0= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chai2010/gettext-go v1.0.3 h1:9liNh8t+u26xl5ddmWLmsOsdNLwkdRTg5AG+JnTiM80= @@ -76,9 +80,13 @@ github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9 github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= github.com/coreos/go-oidc/v3 v3.17.0 h1:hWBGaQfbi0iVviX4ibC7bk8OKT5qNr4klBaCHVNvehc= github.com/coreos/go-oidc/v3 v3.17.0/go.mod h1:wqPbKFrVnE90vty060SB40FCJ8fTHTxSwyXJqZH+sI8= +github.com/coreos/go-systemd/v22 v22.7.0 h1:LAEzFkke61DFROc7zNLX/WA2i5J8gYqe0rSj9KI28KA= +github.com/coreos/go-systemd/v22 v22.7.0/go.mod h1:xNUYtjHu2EDXbsxz1i41wouACIwT7Ybq9o0BQhMwD0w= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= +github.com/cyphar/filepath-securejoin v0.6.1 h1:5CeZ1jPXEiYt3+Z6zqprSAgSWiggmpVyciv8syjIpVE= +github.com/cyphar/filepath-securejoin v0.6.1/go.mod h1:A8hd4EnAeyujCJRrICiOWqjS1AX0a9kM5XL+NwKoYSc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -89,10 +97,20 @@ github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINA github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI= github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38= github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/distribution/distribution/v3 v3.1.1 h1:KUbk7C8CfaLXy8kbf/hGq9cad/wCoLB6dbWH6DMbmX0= +github.com/distribution/distribution/v3 v3.1.1/go.mod h1:d7lXwZpph0bVcOj4Aqn0nMrWHIwRQGdiV5TLeI+/w6Y= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/docker/cli v29.4.3+incompatible h1:u+UliYm2J/rYrIh2FqHQg32neRG8GjbvNuwQRTzGspU= github.com/docker/cli v29.4.3+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= -github.com/docker/docker-credential-helpers v0.9.3 h1:gAm/VtF9wgqJMoxzT3Gj5p4AqIjCBS4wrsOh9yRqcz8= -github.com/docker/docker-credential-helpers v0.9.3/go.mod h1:x+4Gbw9aGmChi3qTLZj8Dfn0TD20M/fuWy0E5+WDeCo= +github.com/docker/docker-credential-helpers v0.9.5 h1:EFNN8DHvaiK8zVqFA2DT6BjXE0GzfLOZ38ggPTKePkY= +github.com/docker/docker-credential-helpers v0.9.5/go.mod h1:v1S+hepowrQXITkEfw6o4+BMbGot02wiKpzWhGUZK6c= +github.com/docker/go-events v0.0.0-20250808211157-605354379745 h1:yOn6Ze6IbYI/KAw2lw/83ELYvZh6hvsygTVkD0dzMC4= +github.com/docker/go-events v0.0.0-20250808211157-605354379745/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA= +github.com/docker/go-metrics v0.0.1 h1:AgB/0SvBxihN0X8OR4SjsblXkbMvalQ8cjmtKQ2rQV8= +github.com/docker/go-metrics v0.0.1/go.mod h1:cG1hvH2utMXtqgqqYE9plW6lDxS3/5ayHzueweSI3Vw= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elazarl/goproxy v1.8.4 h1:tIHKhYHXf8gQracfoHl8Zy7PG/jhvmIMUR5j8OlPUIM= @@ -117,12 +135,22 @@ github.com/fluxcd/pkg/apis/event v0.27.0 h1:fUJRyWU3sEKjV6SzBnoJT6aDP5cJdMAbspZy github.com/fluxcd/pkg/apis/event v0.27.0/go.mod h1:0zua8dB0E9SXryScEf7LDMuUYVyVKgA/Xeh2h2gtSRU= github.com/fluxcd/pkg/apis/meta v1.30.0 h1:26TOd1hbamH3c5KOb/CIMGpUDB4G4JV+WCcPyUhmuaM= github.com/fluxcd/pkg/apis/meta v1.30.0/go.mod h1:q1YjUeCmf0syhkZoMcRmP3pkBaLKFLl7g0mUvVGG4CM= +github.com/fluxcd/pkg/artifact v0.18.0 h1:TrIcQ6TaQ+787IA3YESFnihPqbfptqHHyqHZ2y0Cnsk= +github.com/fluxcd/pkg/artifact v0.18.0/go.mod h1:bNnNKUm9ExJonG3w1pD/4QmWEkbImt6ka3LrVCfcuL4= github.com/fluxcd/pkg/auth v0.54.0 h1:EiNUhksFwUULmrctsTcfjtwszmzVgpEPuP2LcgfwIbU= github.com/fluxcd/pkg/auth v0.54.0/go.mod h1:bf+0mQNaxgMLvdR3S15qz/3t0GoLTipl1xDQBguNsJI= github.com/fluxcd/pkg/cache v0.14.0 h1:wEwJA8NhYj+nH9P6ifcsglDZARWlcbxbmwngGOzfU4c= github.com/fluxcd/pkg/cache v0.14.0/go.mod h1:KwzU2gyVQ83YOHJsbBeveJ0HsXmLrH0I668zX19d/+s= +github.com/fluxcd/pkg/lockedfile v0.8.0 h1:kL6XwAEbVFyNXPbcoN/wzssTGCPnmB31I/NJue1gqmo= +github.com/fluxcd/pkg/lockedfile v0.8.0/go.mod h1:N/KfjE/aABawME9+NTjiMg4zUiFdCOiCSuzSOIA7Q2A= +github.com/fluxcd/pkg/oci v0.68.0 h1:eykB9HrMGOmCTT6MSKRIBMxTiieBigah1wFedMMyhYM= +github.com/fluxcd/pkg/oci v0.68.0/go.mod h1:lNxvxqjGSGfEKI4Lnpj8mHTVDMy4Fdiico6Oml0tafU= github.com/fluxcd/pkg/runtime v0.110.0 h1:ziGAuoQ3OVSEqmMXS6doZWi2LcF7exEKPe69dun5RNg= github.com/fluxcd/pkg/runtime v0.110.0/go.mod h1:r3X179AZAuUquYAKYFDxCJk5yYpAKTJatfNWTLlA9/s= +github.com/fluxcd/pkg/sourceignore v0.18.0 h1:WU2tPKasG9AM7/H/LlqdjULyaSknnZBTrpHsDDtOuns= +github.com/fluxcd/pkg/sourceignore v0.18.0/go.mod h1:mnH7rFFlEbMTclhz7JZP7tiHssKdXRNpCqnly2JGvaI= +github.com/fluxcd/pkg/tar v1.2.0 h1:T6WFB5M0YRHktlrgdKNskqpdp76TVDdWTOeuWz33CFs= +github.com/fluxcd/pkg/tar v1.2.0/go.mod h1:Wlalp5vIVe+BbckkKkqExKcoHAeeWJPAzwK7ONeFcS0= github.com/fluxcd/pkg/version v0.16.0 h1:VR9+143LAwbyUSAaMhiJHbfsiU+fTjA9L/3dr1ucfrI= github.com/fluxcd/pkg/version v0.16.0/go.mod h1:2M/l90CmbDaD21JTh77hjwaUbd/YM96+Fo8x4fMdxLI= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -192,12 +220,22 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.15 h1:xolVQTEXusUcAA5Ugt github.com/googleapis/enterprise-certificate-proxy v0.3.15/go.mod h1:vqVt9yG9480NtzREnTlmGSBmFrA+bzb0yl0TxoBQXOg= github.com/googleapis/gax-go/v2 v2.22.0 h1:PjIWBpgGIVKGoCXuiCoP64altEJCj3/Ei+kSU5vlZD4= github.com/googleapis/gax-go/v2 v2.22.0/go.mod h1:irWBbALSr0Sk3qlqb9SyJ1h68WjgeFuiOzI4Rqw5+aY= +github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE= +github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-retryablehttp v0.7.8 h1:ylXZWnqa7Lhqpk0L1P1LzDtGcCR0rPVUrx/c8Unxc48= github.com/hashicorp/go-retryablehttp v0.7.8/go.mod h1:rjiScheydd+CxvumBsIrFKlx3iS0jrZ7LvzFGFmuKbw= +github.com/hashicorp/golang-lru/arc/v2 v2.0.5 h1:l2zaLDubNhW4XO3LnliVj0GXO3+/CGNJAg1dcN2Fpfw= +github.com/hashicorp/golang-lru/arc/v2 v2.0.5/go.mod h1:ny6zBSQZi2JxIeYcv7kt2sH2PXJtirBN7RDhRpxPkxU= +github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= +github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= @@ -209,6 +247,9 @@ github.com/keybase/go-keychain v0.0.1 h1:way+bWYa6lDppZoZcgMbYsvC7GxljxrskdNInRt github.com/keybase/go-keychain v0.0.1/go.mod h1:PdEILRW3i9D8JcdM+FmY6RwkHGnhHxXwkPPMeUgOK1k= github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao= github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= +github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -251,8 +292,10 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.41.0 h1:OwKp4pXNgVxf6sCplzYo794OFNuoL2q2SBMU5NSWOjA= github.com/onsi/gomega v1.41.0/go.mod h1:M/Uqpu/8qTjtzCLUA2zJHX9Iilrau25x1PdoSRbWh5A= -github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= -github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be h1:f2PlhC9pm5sqpBZFvnAoKj+KzXRzbjFMA+TqXfJdgho= +github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/go-digest/blake3 v0.0.0-20250813155314-89707e38ad1a h1:IAncDmJeD90l6+YR1Gf6r0HrmnRmOatzPfUpMS80ZTI= +github.com/opencontainers/go-digest/blake3 v0.0.0-20250813155314-89707e38ad1a/go.mod h1:kqQaIc6bZstKgnGpL7GD5dWoLKbA6mH1Y9ULjGImBnM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= @@ -270,8 +313,16 @@ github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNw github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= -github.com/prometheus/procfs v0.19.2 h1:zUMhqEW66Ex7OXIiDkll3tl9a1ZdilUOd/F6ZXw4Vws= -github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05ZpYlu+b4J7mw= +github.com/prometheus/otlptranslator v1.0.0 h1:s0LJW/iN9dkIH+EnhiD3BlkkP5QVIUVEoIwkU+A6qos= +github.com/prometheus/otlptranslator v1.0.0/go.mod h1:vRYWnXvI6aWGpsdY/mOT/cbeVRBlPWtBNDb7kGR3uKM= +github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc= +github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= +github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 h1:EaDatTxkdHG+U3Bk4EUr+DZ7fOGwTfezUiUJMaIcaho= +github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5/go.mod h1:fyalQWdtzDBECAQFBJuQe5bzQ02jGd5Qcbgb97Flm7U= +github.com/redis/go-redis/extra/redisotel/v9 v9.0.5 h1:EfpWLLCyXw8PSM2/XNJLjI3Pb27yVE+gIAfeqp8LUCc= +github.com/redis/go-redis/extra/redisotel/v9 v9.0.5/go.mod h1:WZjPDy7VNzn77AAfnAfVjZNvfJTYfPetfZk5yoSTLaQ= +github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= +github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -299,20 +350,58 @@ github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcY github.com/xlab/treeprint v1.2.0 h1:HzHnuAF1plUN2zGlAFHbSQP2qJ0ZAD3XF5XD7OesXRQ= github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/zeebo/assert v1.1.0 h1:hU1L1vLTHsnO8x8c9KAR5GmM5QscxHg5RNU5z5qbUWY= +github.com/zeebo/assert v1.1.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/blake3 v0.2.3 h1:TFoLXsjeXqRNFxSbk35Dk4YtszE/MQQGK10BH4ptoTg= +github.com/zeebo/blake3 v0.2.3/go.mod h1:mjJjZpnsyIVtVgTOSpJ9vmRE4wgDeyt2HU3qXvvKCaQ= +github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo= +github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/bridges/prometheus v0.67.0 h1:dkBzNEAIKADEaFnuESzcXvpd09vxvDZsOjx11gjUqLk= +go.opentelemetry.io/contrib/bridges/prometheus v0.67.0/go.mod h1:Z5RIwRkZgauOIfnG5IpidvLpERjhTninpP1dTG2jTl4= +go.opentelemetry.io/contrib/exporters/autoexport v0.67.0 h1:4fnRcNpc6YFtG3zsFw9achKn3XgmxPxuMuqIL5rE8e8= +go.opentelemetry.io/contrib/exporters/autoexport v0.67.0/go.mod h1:qTvIHMFKoxW7HXg02gm6/Wofhq5p3Ib/A/NNt1EoBSQ= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 h1:OyrsyzuttWTSur2qN/Lm0m2a8yqyIjUVBZcxFPuXq2o= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0/go.mod h1:C2NGBr+kAB4bk3xtMXfZ94gqFDtg/GkI7e9zqGh5Beg= go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.18.0 h1:deI9UQMoGFgrg5iLPgzueqFPHevDl+28YKfSpPTI6rY= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.18.0/go.mod h1:PFx9NgpNUKXdf7J4Q3agRxMs3Y07QhTCVipKmLsMKnU= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.19.0 h1:HIBTQ3VO5aupLKjC90JgMqpezVXwFuq6Ryjn0/izoag= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.19.0/go.mod h1:ji9vId85hMxqfvICA0Jt8JqEdrXaAkcpkI9HPXya0ro= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0 h1:MdKucPl/HbzckWWEisiNqMPhRrAOQX8r4jTuGr636gk= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0/go.mod h1:RolT8tWtfHcjajEH5wFIZ4Dgh5jpPdFXYV9pTAk/qjc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0 h1:w1K+pCJoPpQifuVpsKamUdn9U0zM3xUziVOqsGksUrY= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0/go.mod h1:HBy4BjzgVE8139ieRI75oXm3EcDN+6GhD88JT1Kjvxg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 h1:88Y4s2C8oTui1LGM6bTWkw0ICGcOLCAI5l6zsD1j20k= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0/go.mod h1:Vl1/iaggsuRlrHf/hfPJPvVag77kKyvrLeD10kpMl+A= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0 h1:zWWrB1U6nqhS/k6zYB74CjRpuiitRtLLi68VcgmOEto= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0/go.mod h1:2qXPNBX1OVRC0IwOnfo1ljoid+RD0QK3443EaqVlsOU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 h1:3iZJKlCZufyRzPzlQhUIWVmfltrXuGyfjREgGP3UUjc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0/go.mod h1:/G+nUPfhq2e+qiXMGxMwumDrP5jtzU+mWN7/sjT2rak= +go.opentelemetry.io/otel/exporters/prometheus v0.64.0 h1:g0LRDXMX/G1SEZtK8zl8Chm4K6GBwRkjPKE36LxiTYs= +go.opentelemetry.io/otel/exporters/prometheus v0.64.0/go.mod h1:UrgcjnarfdlBDP3GjDIJWe6HTprwSazNjwsI+Ru6hro= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.18.0 h1:KJVjPD3rcPb98rIs3HznyJlrfx9ge5oJvxxlGR+P/7s= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.18.0/go.mod h1:K3kRa2ckmHWQaTWQdPRHc7qGXASuVuoEQXzrvlA98Ws= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.42.0 h1:lSZHgNHfbmQTPfuTmWVkEu8J8qXaQwuV30pjCcAUvP8= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.42.0/go.mod h1:so9ounLcuoRDu033MW/E0AD4hhUjVqswrMF5FoZlBcw= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.42.0 h1:s/1iRkCKDfhlh1JF26knRneorus8aOwVIDhvYx9WoDw= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.42.0/go.mod h1:UI3wi0FXg1Pofb8ZBiBLhtMzgoTm1TYkMvn71fAqDzs= +go.opentelemetry.io/otel/log v0.19.0 h1:KUZs/GOsw79TBBMfDWsXS+KZ4g2Ckzksd1ymzsIEbo4= +go.opentelemetry.io/otel/log v0.19.0/go.mod h1:5DQYeGmxVIr4n0/BcJvF4upsraHjg6vudJJpnkL6Ipk= go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg= go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg= +go.opentelemetry.io/otel/sdk/log v0.19.0 h1:scYVLqT22D2gqXItnWiocLUKGH9yvkkeql5dBDiXyko= +go.opentelemetry.io/otel/sdk/log v0.19.0/go.mod h1:vFBowwXGLlW9AvpuF7bMgnNI95LiW10szrOdvzBHlAg= go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw= go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A= go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= +go.opentelemetry.io/proto/otlp v1.10.0 h1:IQRWgT5srOCYfiWnpqUYz9CVmbO8bFmKcwYxpuCSL2g= +go.opentelemetry.io/proto/otlp v1.10.0/go.mod h1:/CV4QoCR/S9yaPj8utp3lvQPoqMtxXdzn7ozvvozVqk= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -357,6 +446,7 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY= golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.43.0 h1:S4RLU2sB31O/NCl+zFN9Aru9A/Cq2aqKpTZJ6B+DwT4= @@ -384,8 +474,8 @@ google.golang.org/api v0.278.0 h1:W7jiRvRi53VYFfZ/HoZjQBtJk7gOFbHD8ot1RzVZU6E= google.golang.org/api v0.278.0/go.mod h1:B9TqLBwJqVjp1mtt7WeoQwWRwvu/400y5lETOql+giQ= google.golang.org/genproto v0.0.0-20260319201613-d00831a3d3e7 h1:XzmzkmB14QhVhgnawEVsOn6OFsnpyxNPRY9QV01dNB0= google.golang.org/genproto v0.0.0-20260319201613-d00831a3d3e7/go.mod h1:L43LFes82YgSonw6iTXTxXUX1OlULt4AQtkik4ULL/I= -google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7 h1:41r6JMbpzBMen0R/4TZeeAmGXSJC7DftGINUodzTkPI= -google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7/go.mod h1:EIQZ5bFCfRQDV4MhRle7+OgjNtZ6P1PiZBgAKuxXu/Y= +google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA= +google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M= google.golang.org/genproto/googleapis/rpc v0.0.0-20260427160629-7cedc36a6bc4 h1:tEkOQcXgF6dH1G+MVKZrfpYvozGrzb91k6ha7jireSM= google.golang.org/genproto/googleapis/rpc v0.0.0-20260427160629-7cedc36a6bc4/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= @@ -411,6 +501,8 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/controller/database.go b/internal/controller/database.go deleted file mode 100644 index de74b508..00000000 --- a/internal/controller/database.go +++ /dev/null @@ -1,31 +0,0 @@ -/* -Copyright 2020 The Flux authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -// DatabaseWriter implementations record the tags for an image repository. -type DatabaseWriter interface { - SetTags(repo string, tags []string) (string, error) -} - -// DatabaseReader implementations get the stored set of tags for an image -// repository. -// -// If no tags are availble for the repo, then implementations should return an -// empty set of tags. -type DatabaseReader interface { - Tags(repo string) ([]string, error) -} diff --git a/internal/controller/imagepolicy_controller.go b/internal/controller/imagepolicy_controller.go index 1b052c27..b5fd9bd6 100644 --- a/internal/controller/imagepolicy_controller.go +++ b/internal/controller/imagepolicy_controller.go @@ -56,6 +56,7 @@ import ( imagev1 "github.com/fluxcd/image-reflector-controller/api/v1" "github.com/fluxcd/image-reflector-controller/internal/policy" "github.com/fluxcd/image-reflector-controller/internal/registry" + "github.com/fluxcd/image-reflector-controller/internal/storage" ) // errAccessDenied is returned when an ImageRepository reference in ImagePolicy @@ -108,7 +109,7 @@ type ImagePolicyReconciler struct { helper.Metrics ControllerName string - Database DatabaseReader + Database storage.DatabaseReader ACLOptions acl.Options AuthOptionsGetter *registry.AuthOptionsGetter TokenCache *cache.TokenCache @@ -355,7 +356,7 @@ func (r *ImagePolicyReconciler) reconcile(ctx context.Context, sp *patch.SerialP // Construct a policer from the spec.policy. // Read the tags from database and use the policy to obtain a result for the // latest tag. - latest, err := r.applyPolicy(obj, repo) + latest, err := r.applyPolicy(ctx, obj, repo) if err != nil { // Stall if it's an invalid policy. if _, ok := err.(errInvalidPolicy); ok { @@ -509,14 +510,14 @@ func (r *ImagePolicyReconciler) getImageRepository(ctx context.Context, obj *ima // applyPolicy reads the tags of the given repository from the internal database // and applies the tag filters and constraints to return the latest image. -func (r *ImagePolicyReconciler) applyPolicy(obj *imagev1.ImagePolicy, repo *imagev1.ImageRepository) (string, error) { +func (r *ImagePolicyReconciler) applyPolicy(ctx context.Context, obj *imagev1.ImagePolicy, repo *imagev1.ImageRepository) (string, error) { policer, err := policy.PolicerFromSpec(obj.Spec.Policy) if err != nil { return "", errInvalidPolicy{err: fmt.Errorf("invalid policy: %w", err)} } // Read tags from database with a maximum of 3 retries. - tags, err := r.listTagsWithBackoff(repo.Status.CanonicalImageName) + tags, err := r.listTagsWithBackoff(ctx, storage.RepoIdentity{Namespace: repo.Namespace, Name: repo.Name, CanonicalName: repo.Status.CanonicalImageName}) if err != nil { return "", err } @@ -569,7 +570,7 @@ func (r *ImagePolicyReconciler) imagePoliciesForRepository(ctx context.Context, // listTagsWithBackoff lists the tags of the given image from the // internal database with retries if there are no tags in the database. -func (r *ImagePolicyReconciler) listTagsWithBackoff(canonicalImageName string) ([]string, error) { +func (r *ImagePolicyReconciler) listTagsWithBackoff(ctx context.Context, repo storage.RepoIdentity) ([]string, error) { var backoff = wait.Backoff{ Steps: 4, Duration: 1 * time.Second, @@ -583,7 +584,7 @@ func (r *ImagePolicyReconciler) listTagsWithBackoff(canonicalImageName string) ( return errors.Is(err, errNoTagsInDatabase) }, func() error { var err error - tags, err = r.Database.Tags(canonicalImageName) + tags, err = r.Database.Tags(ctx, repo) if err != nil { return fmt.Errorf("failed to read tags from database: %w", err) } diff --git a/internal/controller/imagepolicy_controller_test.go b/internal/controller/imagepolicy_controller_test.go index eb5d09b8..73a338ae 100644 --- a/internal/controller/imagepolicy_controller_test.go +++ b/internal/controller/imagepolicy_controller_test.go @@ -1276,7 +1276,7 @@ func TestImagePolicyReconciler_applyPolicy(t *testing.T) { repo := &imagev1.ImageRepository{} - result, err := r.applyPolicy(obj, repo) + result, err := r.applyPolicy(ctx, obj, repo) g.Expect(err != nil).To(Equal(tt.wantErr)) if err == nil { g.Expect(result).To(Equal(tt.wantResult)) diff --git a/internal/controller/imagerepository_controller.go b/internal/controller/imagerepository_controller.go index 8a2ad38d..5d4cc95f 100644 --- a/internal/controller/imagerepository_controller.go +++ b/internal/controller/imagerepository_controller.go @@ -54,6 +54,7 @@ import ( imagev1 "github.com/fluxcd/image-reflector-controller/api/v1" "github.com/fluxcd/image-reflector-controller/internal/registry" + "github.com/fluxcd/image-reflector-controller/internal/storage" ) // latestTagsCount is the number of tags to use as latest tags. @@ -107,12 +108,9 @@ type ImageRepositoryReconciler struct { kuberecorder.EventRecorder helper.Metrics - ControllerName string - TokenCache *cache.TokenCache - Database interface { - DatabaseWriter - DatabaseReader - } + ControllerName string + TokenCache *cache.TokenCache + Database storage.Database AuthOptionsGetter *registry.AuthOptionsGetter patchOptions []patch.Option @@ -173,7 +171,7 @@ func (r *ImageRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Requ // Examine if the object is under deletion. if !obj.ObjectMeta.DeletionTimestamp.IsZero() { - return r.reconcileDelete(obj) + return r.reconcileDelete(ctx, obj) } // Add finalizer first if it doesn't exist to avoid the race condition @@ -289,7 +287,7 @@ func (r *ImageRepositoryReconciler) reconcile(ctx context.Context, sp *patch.Ser } // Check if it can be scanned now. - ok, when, reasonMsg, err := r.shouldScan(*obj, startTime) + ok, when, reasonMsg, err := r.shouldScan(ctx, *obj, startTime) if err != nil { e := fmt.Errorf("failed to determine if it's scan time: %w", err) conditions.MarkFalse(obj, meta.ReadyCondition, metav1.StatusFailure, "%s", e) @@ -361,7 +359,7 @@ func (r *ImageRepositoryReconciler) reconcile(ctx context.Context, sp *patch.Ser // interval // // Else it returns with next scan time. -func (r *ImageRepositoryReconciler) shouldScan(obj imagev1.ImageRepository, now time.Time) (bool, time.Duration, string, error) { +func (r *ImageRepositoryReconciler) shouldScan(ctx context.Context, obj imagev1.ImageRepository, now time.Time) (bool, time.Duration, string, error) { scanInterval := obj.Spec.Interval.Duration // Never scanned; do it now. @@ -402,7 +400,7 @@ func (r *ImageRepositoryReconciler) shouldScan(obj imagev1.ImageRepository, now // FIXME If the repo exists, has been // scanned, and doesn't have any tags, this will mean a scan every // time the resource comes up for reconciliation. - tags, err := r.Database.Tags(obj.Status.CanonicalImageName) + tags, err := r.Database.Tags(ctx, storage.RepoIdentity{Namespace: obj.Namespace, Name: obj.Name, CanonicalName: obj.Status.CanonicalImageName}) if err != nil { return false, scanInterval, "", err } @@ -442,7 +440,7 @@ func (r *ImageRepositoryReconciler) scan(ctx context.Context, obj *imagev1.Image } canonicalName := ref.Context().String() - checksum, err := r.Database.SetTags(canonicalName, filteredTags) + checksum, err := r.Database.SetTags(ctx, storage.RepoIdentity{Namespace: obj.Namespace, Name: obj.Name, CanonicalName: canonicalName}, filteredTags) if err != nil { return fmt.Errorf("failed to set tags for %q: %w", canonicalName, err) } @@ -458,7 +456,13 @@ func (r *ImageRepositoryReconciler) scan(ctx context.Context, obj *imagev1.Image } // reconcileDelete handles the deletion of the object. -func (r *ImageRepositoryReconciler) reconcileDelete(obj *imagev1.ImageRepository) (ctrl.Result, error) { +func (r *ImageRepositoryReconciler) reconcileDelete(ctx context.Context, obj *imagev1.ImageRepository) (ctrl.Result, error) { + if r.Database != nil { + if err := r.Database.Delete(ctx, storage.RepoIdentity{Namespace: obj.Namespace, Name: obj.Name, CanonicalName: obj.Status.CanonicalImageName}); err != nil { + return ctrl.Result{}, err + } + } + // Remove our finalizer from the list. controllerutil.RemoveFinalizer(obj, imagev1.ImageFinalizer) diff --git a/internal/controller/imagerepository_controller_test.go b/internal/controller/imagerepository_controller_test.go index db22c6bb..26e470bd 100644 --- a/internal/controller/imagerepository_controller_test.go +++ b/internal/controller/imagerepository_controller_test.go @@ -47,6 +47,7 @@ import ( imagev1 "github.com/fluxcd/image-reflector-controller/api/v1" "github.com/fluxcd/image-reflector-controller/internal/registry" + "github.com/fluxcd/image-reflector-controller/internal/storage" "github.com/fluxcd/image-reflector-controller/internal/test" ) @@ -58,7 +59,7 @@ type mockDatabase struct { } // SetTags implements the DatabaseWriter interface of the Database. -func (db *mockDatabase) SetTags(repo string, tags []string) (string, error) { +func (db *mockDatabase) SetTags(ctx context.Context, repo storage.RepoIdentity, tags []string) (string, error) { if db.WriteError != nil { return "", db.WriteError } @@ -67,13 +68,18 @@ func (db *mockDatabase) SetTags(repo string, tags []string) (string, error) { } // Tags implements the DatabaseReader interface of the Database. -func (db mockDatabase) Tags(repo string) ([]string, error) { +func (db mockDatabase) Tags(ctx context.Context, repo storage.RepoIdentity) ([]string, error) { if db.ReadError != nil { return nil, db.ReadError } return db.TagData, nil } +// Delete implements the DatabaseWriter interface of the Database. +func (db *mockDatabase) Delete(ctx context.Context, repo storage.RepoIdentity) error { + return nil +} + func TestImageRepositoryReconciler_deleteBeforeFinalizer(t *testing.T) { g := NewWithT(t) @@ -263,7 +269,7 @@ func TestImageRepositoryReconciler_shouldScan(t *testing.T) { tt.beforeFunc(obj, tt.reconcileTime) } - scan, next, scanReason, err := r.shouldScan(*obj, tt.reconcileTime) + scan, next, scanReason, err := r.shouldScan(ctx, *obj, tt.reconcileTime) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(scan).To(Equal(tt.wantScan)) g.Expect(next).To(Equal(tt.wantNextScan)) @@ -398,7 +404,9 @@ func TestImageRepositoryReconciler_scan(t *testing.T) { g.Expect(err).NotTo(HaveOccurred()) } if err == nil { - g.Expect(r.Database.Tags(imgRepo)).To(Equal(tt.wantTags)) + tags, err := r.Database.Tags(ctx, storage.RepoIdentity{CanonicalName: imgRepo}) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(tags).To(Equal(tt.wantTags)) if tt.wantChecksum != "" { g.Expect(repo.Status.LastScanResult.Revision).To(Equal(tt.wantChecksum)) } diff --git a/internal/database/badger.go b/internal/database/badger.go index eb6ab709..0c0c0ba7 100644 --- a/internal/database/badger.go +++ b/internal/database/badger.go @@ -16,11 +16,14 @@ limitations under the License. package database import ( + "context" "encoding/json" "fmt" "hash/adler32" "github.com/dgraph-io/badger/v4" + + "github.com/fluxcd/image-reflector-controller/internal/storage" ) const tagsPrefix = "tags" @@ -41,11 +44,17 @@ func NewBadgerDatabase(db *badger.DB) *BadgerDatabase { // Tags implements the DatabaseReader interface, fetching the tags for the repo. // // If the repo does not exist, an empty set of tags is returned. -func (a *BadgerDatabase) Tags(repo string) ([]string, error) { +func (a *BadgerDatabase) Tags(ctx context.Context, repo storage.RepoIdentity) ([]string, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + var tags []string err := a.db.View(func(txn *badger.Txn) error { var err error - tags, err = getOrEmpty(txn, repo) + tags, err = getOrEmpty(txn, repo.CanonicalName) return err }) return tags, err @@ -55,13 +64,19 @@ func (a *BadgerDatabase) Tags(repo string) ([]string, error) { // the repo. // // It overwrites existing tag sets for the provided repo. -func (a *BadgerDatabase) SetTags(repo string, tags []string) (string, error) { +func (a *BadgerDatabase) SetTags(ctx context.Context, repo storage.RepoIdentity, tags []string) (string, error) { + select { + case <-ctx.Done(): + return "", ctx.Err() + default: + } + b, err := marshal(tags) if err != nil { return "", err } err = a.db.Update(func(txn *badger.Txn) error { - e := badger.NewEntry(keyForRepo(tagsPrefix, repo), b) + e := badger.NewEntry(keyForRepo(tagsPrefix, repo.CanonicalName), b) return txn.SetEntry(e) }) if err != nil { @@ -70,6 +85,18 @@ func (a *BadgerDatabase) SetTags(repo string, tags []string) (string, error) { return fmt.Sprintf("%v", adler32.Checksum(b)), nil } +// Delete implements the DatabaseWriter interface. Badger keys tags by canonical +// image name, which may be shared by multiple ImageRepository objects, so delete +// is intentionally a no-op to preserve existing behavior. +func (a *BadgerDatabase) Delete(ctx context.Context, repo storage.RepoIdentity) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return nil +} + func keyForRepo(prefix, repo string) []byte { return []byte(fmt.Sprintf("%s:%s", prefix, repo)) } diff --git a/internal/database/badger_gc_test.go b/internal/database/badger_gc_test.go index 4d329d74..27a6b236 100644 --- a/internal/database/badger_gc_test.go +++ b/internal/database/badger_gc_test.go @@ -22,8 +22,11 @@ import ( "time" "github.com/dgraph-io/badger/v4" + "github.com/go-logr/logr" "github.com/go-logr/logr/testr" + + tagstorage "github.com/fluxcd/image-reflector-controller/internal/storage" ) func TestBadgerGarbageCollectorDoesStop(t *testing.T) { @@ -42,9 +45,9 @@ func TestBadgerGarbageCollectorDoesStop(t *testing.T) { time.Sleep(time.Second) tags := []string{"latest", "v0.0.1", "v0.0.2"} - _, err := db.SetTags(testRepo, tags) + _, err := db.SetTags(context.Background(), tagstorage.RepoIdentity{CanonicalName: testRepo}, tags) fatalIfError(t, err) - _, err = db.Tags(testRepo) + _, err = db.Tags(context.Background(), tagstorage.RepoIdentity{CanonicalName: testRepo}) fatalIfError(t, err) t.Log("wrote tags successfully") diff --git a/internal/database/badger_test.go b/internal/database/badger_test.go index a686bad2..b68495b2 100644 --- a/internal/database/badger_test.go +++ b/internal/database/badger_test.go @@ -16,12 +16,15 @@ limitations under the License. package database import ( + "context" "fmt" "os" "reflect" "testing" "github.com/dgraph-io/badger/v4" + + tagstorage "github.com/fluxcd/image-reflector-controller/internal/storage" ) const testRepo = "testing/testing" @@ -29,7 +32,7 @@ const testRepo = "testing/testing" func TestGetWithUnknownRepo(t *testing.T) { db := createBadgerDatabase(t) - tags, err := db.Tags(testRepo) + tags, err := db.Tags(context.Background(), repoIdentity(testRepo)) fatalIfError(t, err) if !reflect.DeepEqual([]string{}, tags) { @@ -43,7 +46,7 @@ func TestSetTags(t *testing.T) { fatalIfError(t, setTags(t, db, testRepo, tags, "1943865137")) - loaded, err := db.Tags(testRepo) + loaded, err := db.Tags(context.Background(), repoIdentity(testRepo)) fatalIfError(t, err) if !reflect.DeepEqual(tags, loaded) { t.Fatalf("SetTags failed, got %#v want %#v", loaded, tags) @@ -58,7 +61,7 @@ func TestSetTagsOverwrites(t *testing.T) { fatalIfError(t, setTags(t, db, testRepo, tags2, "3168012550")) - loaded, err := db.Tags(testRepo) + loaded, err := db.Tags(context.Background(), repoIdentity(testRepo)) fatalIfError(t, err) if !reflect.DeepEqual(tags2, loaded) { t.Fatalf("failed to overwrite with SetTags: got %#v, want %#v", loaded, tags2) @@ -73,7 +76,7 @@ func TestGetOnlyFetchesForRepo(t *testing.T) { tags2 := []string{"v0.0.3", "v0.0.4"} fatalIfError(t, setTags(t, db, testRepo2, tags2, "728958008")) - loaded, err := db.Tags(testRepo) + loaded, err := db.Tags(context.Background(), repoIdentity(testRepo)) fatalIfError(t, err) if !reflect.DeepEqual(tags1, loaded) { t.Fatalf("Tags() failed got %#v, want %#v", loaded, tags2) @@ -99,7 +102,7 @@ func createBadgerDatabase(t *testing.T) *BadgerDatabase { func setTags(t *testing.T, db *BadgerDatabase, repo string, tags []string, expectedChecksum string) error { t.Helper() - checksum, err := db.SetTags(repo, tags) + checksum, err := db.SetTags(context.Background(), repoIdentity(repo), tags) if err != nil { return err } @@ -109,6 +112,10 @@ func setTags(t *testing.T, db *BadgerDatabase, repo string, tags []string, expec return nil } +func repoIdentity(repo string) tagstorage.RepoIdentity { + return tagstorage.RepoIdentity{CanonicalName: repo} +} + func fatalIfError(t *testing.T, err error) { t.Helper() if err != nil { diff --git a/internal/features/features.go b/internal/features/features.go index ba8a6146..4b9d9be0 100644 --- a/internal/features/features.go +++ b/internal/features/features.go @@ -30,12 +30,20 @@ const ( // When enabled, it will cache both object types, resulting in increased // memory usage and cluster-wide RBAC permissions (list and watch). CacheSecretsAndConfigMaps = "CacheSecretsAndConfigMaps" + + // FluxStorage controls whether tags are stored with fluxcd/pkg/artifact/storage + // instead of BadgerDB. + FluxStorage = "FluxStorage" ) var features = map[string]bool{ // CacheSecretsAndConfigMaps // opt-in from v0.24 CacheSecretsAndConfigMaps: false, + + // FluxStorage + // opt-in from v1.2 + FluxStorage: false, } func init() { diff --git a/internal/storage/filesystem.go b/internal/storage/filesystem.go new file mode 100644 index 00000000..62bddb08 --- /dev/null +++ b/internal/storage/filesystem.go @@ -0,0 +1,286 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package storage + +import ( + "bufio" + "bytes" + "compress/gzip" + "context" + "crypto/sha256" + "errors" + "fmt" + "io" + "os" + + "github.com/fluxcd/pkg/apis/meta" + artifactstorage "github.com/fluxcd/pkg/artifact/storage" + + imagev1 "github.com/fluxcd/image-reflector-controller/api/v1" +) + +const ( + tagsFilePlain = "tags.txt" + tagsFileGzip = "tags.txt.gz" +) + +// FilesystemDatabase stores image tags per ImageRepository on the local filesystem. +type FilesystemDatabase struct { + storage *artifactstorage.Storage + compressionThreshold int +} + +// NewFilesystemDatabase creates a filesystem-backed tag database. +func NewFilesystemDatabase(storage *artifactstorage.Storage, compressionThreshold int) *FilesystemDatabase { + return &FilesystemDatabase{ + storage: storage, + compressionThreshold: compressionThreshold, + } +} + +// Tags implements the DatabaseReader interface, fetching tags for the repo. +func (d *FilesystemDatabase) Tags(ctx context.Context, repo RepoIdentity) ([]string, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + if err := validateRepoIdentity(repo); err != nil { + return nil, err + } + + plain, err := d.statVariant(repo, tagsFilePlain, false) + if err != nil { + return nil, err + } + compressed, err := d.statVariant(repo, tagsFileGzip, true) + if err != nil { + return nil, err + } + + switch { + case !plain.exists && !compressed.exists: + return []string{}, nil + case plain.exists && compressed.exists: + primary, fallback := newerVariant(plain, compressed) + tags, usedFallback, err := readTagsVariant(primary, fallback) + if err != nil { + return nil, err + } + if !usedFallback { + removeStaleVariant(fallback) + } + return tags, nil + case plain.exists: + tags, _, err := readTagsVariant(plain, compressed) + return tags, err + default: + tags, _, err := readTagsVariant(compressed, plain) + return tags, err + } +} + +// SetTags implements the DatabaseWriter interface, recording tags for the repo. +func (d *FilesystemDatabase) SetTags(ctx context.Context, repo RepoIdentity, tags []string) (string, error) { + if err := ctx.Err(); err != nil { + return "", err + } + if err := validateRepoIdentity(repo); err != nil { + return "", err + } + + serialized := marshalTagsLines(tags) + sum := sha256.Sum256(serialized) + revision := fmt.Sprintf("sha256:%x", sum) + + filename := tagsFilePlain + payload := serialized + if len(serialized) >= d.compressionThreshold { + filename = tagsFileGzip + compressed, err := gzipTags(serialized) + if err != nil { + return "", err + } + payload = compressed + } + + artifact := artifactForRepo(repo, filename) + if err := d.storage.MkdirAll(artifact); err != nil { + return "", fmt.Errorf("failed to create storage directory: %w", err) + } + if err := ctx.Err(); err != nil { + return "", err + } + if err := d.storage.AtomicWriteFile(&artifact, bytes.NewReader(payload), 0o600); err != nil { + return "", fmt.Errorf("failed to write tags: %w", err) + } + + if filename == tagsFilePlain { + removeStaleVariant(tagFileVariant{path: d.storage.LocalPath(artifactForRepo(repo, tagsFileGzip))}) + } else { + removeStaleVariant(tagFileVariant{path: d.storage.LocalPath(artifactForRepo(repo, tagsFilePlain))}) + } + + return revision, nil +} + +// Delete implements the DatabaseWriter interface, deleting tags for the repo. +func (d *FilesystemDatabase) Delete(ctx context.Context, repo RepoIdentity) error { + if err := ctx.Err(); err != nil { + return err + } + if err := validateRepoIdentity(repo); err != nil { + return err + } + _, err := d.storage.RemoveAll(artifactForRepo(repo, tagsFilePlain)) + if err != nil { + return fmt.Errorf("failed to delete tags: %w", err) + } + return nil +} + +func validateRepoIdentity(repo RepoIdentity) error { + if repo.Namespace == "" || repo.Name == "" { + return fmt.Errorf("repo namespace and name are required") + } + return nil +} + +func artifactForRepo(repo RepoIdentity, filename string) meta.Artifact { + return meta.Artifact{ + Path: artifactstorage.ArtifactPath(imagev1.ImageRepositoryKind, repo.Namespace, repo.Name, filename), + } +} + +func marshalTagsLines(tags []string) []byte { + var b bytes.Buffer + for _, tag := range tags { + b.WriteString(tag) + b.WriteByte('\n') + } + return b.Bytes() +} + +func gzipTags(data []byte) ([]byte, error) { + var b bytes.Buffer + w, err := gzip.NewWriterLevel(&b, gzip.BestCompression) + if err != nil { + return nil, fmt.Errorf("failed to create gzip writer: %w", err) + } + if _, err := w.Write(data); err != nil { + _ = w.Close() + return nil, fmt.Errorf("failed to compress tags: %w", err) + } + if err := w.Close(); err != nil { + return nil, fmt.Errorf("failed to close gzip writer: %w", err) + } + return b.Bytes(), nil +} + +type tagFileVariant struct { + path string + compressed bool + exists bool + info os.FileInfo +} + +func (d *FilesystemDatabase) statVariant(repo RepoIdentity, filename string, compressed bool) (tagFileVariant, error) { + artifact := artifactForRepo(repo, filename) + variant := tagFileVariant{ + path: d.storage.LocalPath(artifact), + compressed: compressed, + } + info, err := os.Stat(variant.path) + if errors.Is(err, os.ErrNotExist) { + return variant, nil + } + if err != nil { + return variant, fmt.Errorf("failed to stat %s: %w", variant.path, err) + } + variant.exists = true + variant.info = info + return variant, nil +} + +func newerVariant(a, b tagFileVariant) (tagFileVariant, tagFileVariant) { + if a.info.ModTime().After(b.info.ModTime()) { + return a, b + } + return b, a +} + +func readTagsVariant(primary, fallback tagFileVariant) ([]string, bool, error) { + tags, err := readTagFile(primary) + if errors.Is(err, os.ErrNotExist) && fallback.path != "" { + tags, fallbackErr := readTagFile(fallback) + if fallbackErr == nil { + return tags, true, nil + } + if errors.Is(fallbackErr, os.ErrNotExist) { + return []string{}, false, nil + } + return nil, false, fallbackErr + } + if errors.Is(err, os.ErrNotExist) { + return []string{}, false, nil + } + return tags, false, err +} + +func readTagFile(variant tagFileVariant) (_ []string, retErr error) { + file, err := os.Open(variant.path) + if err != nil { + return nil, err + } + defer func() { + if err := file.Close(); err != nil && retErr == nil { + retErr = err + } + }() + + var reader io.Reader = file + var gzipReader *gzip.Reader + if variant.compressed { + gzipReader, err = gzip.NewReader(file) + if err != nil { + return nil, fmt.Errorf("failed to open gzip tags: %w", err) + } + defer func() { + if err := gzipReader.Close(); err != nil && retErr == nil { + retErr = err + } + }() + reader = gzipReader + } + + scanner := bufio.NewScanner(reader) + scanner.Buffer(make([]byte, 0, 256), 1024) + tags := []string{} + for scanner.Scan() { + tags = append(tags, scanner.Text()) + } + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("failed to read tags: %w", err) + } + return tags, nil +} + +func removeStaleVariant(variant tagFileVariant) { + if variant.path == "" { + return + } + if err := os.Remove(variant.path); err != nil && !errors.Is(err, os.ErrNotExist) { + return + } +} diff --git a/internal/storage/filesystem_test.go b/internal/storage/filesystem_test.go new file mode 100644 index 00000000..ecc9f392 --- /dev/null +++ b/internal/storage/filesystem_test.go @@ -0,0 +1,279 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package storage + +import ( + "bytes" + "compress/gzip" + "context" + "os" + "path/filepath" + "reflect" + "testing" + "time" + + artifactstorage "github.com/fluxcd/pkg/artifact/storage" +) + +func TestFilesystemDatabaseSetTagsPlain(t *testing.T) { + db, st := newFilesystemDatabase(t, 1024) + repo := testRepoIdentity("default", "podinfo") + tags := []string{"latest", "v1.0.0"} + + revision, err := db.SetTags(context.Background(), repo, tags) + if err != nil { + t.Fatal(err) + } + if revision == "" { + t.Fatal("SetTags returned empty revision") + } + if !pathExistsForTest(t, st.LocalPath(artifactForRepo(repo, tagsFilePlain))) { + t.Fatal("plain tags file was not written") + } + if pathExistsForTest(t, st.LocalPath(artifactForRepo(repo, tagsFileGzip))) { + t.Fatal("gzip tags file was written below threshold") + } + + loaded, err := db.Tags(context.Background(), repo) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(tags, loaded) { + t.Fatalf("Tags() got %#v, want %#v", loaded, tags) + } +} + +func TestFilesystemDatabaseSetTagsCompressed(t *testing.T) { + db, st := newFilesystemDatabase(t, 1) + repo := testRepoIdentity("default", "podinfo") + tags := []string{"latest", "v1.0.0"} + + if _, err := db.SetTags(context.Background(), repo, tags); err != nil { + t.Fatal(err) + } + if pathExistsForTest(t, st.LocalPath(artifactForRepo(repo, tagsFilePlain))) { + t.Fatal("plain tags file was written above threshold") + } + if !pathExistsForTest(t, st.LocalPath(artifactForRepo(repo, tagsFileGzip))) { + t.Fatal("gzip tags file was not written") + } + + loaded, err := db.Tags(context.Background(), repo) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(tags, loaded) { + t.Fatalf("Tags() got %#v, want %#v", loaded, tags) + } +} + +func TestFilesystemDatabaseEmptyAndMissing(t *testing.T) { + db, _ := newFilesystemDatabase(t, 1) + repo := testRepoIdentity("default", "podinfo") + + missing, err := db.Tags(context.Background(), repo) + if err != nil { + t.Fatal(err) + } + if len(missing) != 0 { + t.Fatalf("Tags() for missing repo got %#v, want empty", missing) + } + + if _, err := db.SetTags(context.Background(), repo, nil); err != nil { + t.Fatal(err) + } + loaded, err := db.Tags(context.Background(), repo) + if err != nil { + t.Fatal(err) + } + if len(loaded) != 0 { + t.Fatalf("Tags() for empty repo got %#v, want empty", loaded) + } +} + +func TestFilesystemDatabaseStaleVariantCleanup(t *testing.T) { + db, st := newFilesystemDatabase(t, 1) + repo := testRepoIdentity("default", "podinfo") + if _, err := db.SetTags(context.Background(), repo, []string{"latest", "v1.0.0"}); err != nil { + t.Fatal(err) + } + + db.compressionThreshold = 1024 + if _, err := db.SetTags(context.Background(), repo, []string{"latest"}); err != nil { + t.Fatal(err) + } + if !pathExistsForTest(t, st.LocalPath(artifactForRepo(repo, tagsFilePlain))) { + t.Fatal("plain tags file was not written") + } + if pathExistsForTest(t, st.LocalPath(artifactForRepo(repo, tagsFileGzip))) { + t.Fatal("stale gzip tags file was not removed") + } +} + +func TestFilesystemDatabaseBothVariantsPresentUsesNewest(t *testing.T) { + db, st := newFilesystemDatabase(t, 1024) + repo := testRepoIdentity("default", "podinfo") + plainTags := []string{"old"} + gzipTagsSet := []string{"new"} + + plainArtifact := artifactForRepo(repo, tagsFilePlain) + gzipArtifact := artifactForRepo(repo, tagsFileGzip) + if err := st.MkdirAll(plainArtifact); err != nil { + t.Fatal(err) + } + plainPath := st.LocalPath(plainArtifact) + gzipPath := st.LocalPath(gzipArtifact) + if err := os.WriteFile(plainPath, marshalTagsLines(plainTags), 0o600); err != nil { + t.Fatal(err) + } + compressed, err := gzipTags(marshalTagsLines(gzipTagsSet)) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(gzipPath, compressed, 0o600); err != nil { + t.Fatal(err) + } + oldTime := time.Now().Add(-time.Hour) + newTime := time.Now() + if err := os.Chtimes(plainPath, oldTime, oldTime); err != nil { + t.Fatal(err) + } + if err := os.Chtimes(gzipPath, newTime, newTime); err != nil { + t.Fatal(err) + } + + loaded, err := db.Tags(context.Background(), repo) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(gzipTagsSet, loaded) { + t.Fatalf("Tags() got %#v, want %#v", loaded, gzipTagsSet) + } + if pathExistsForTest(t, plainPath) { + t.Fatal("older stale variant was not removed") + } +} + +func TestFilesystemDatabaseRevisionStableAcrossCompression(t *testing.T) { + _, st := newFilesystemDatabase(t, 1) + repo := testRepoIdentity("default", "podinfo") + tags := []string{"latest", "v1.0.0"} + + compressedDB := NewFilesystemDatabase(st, 1) + compressedRevision, err := compressedDB.SetTags(context.Background(), repo, tags) + if err != nil { + t.Fatal(err) + } + plainDB := NewFilesystemDatabase(st, 1024) + plainRevision, err := plainDB.SetTags(context.Background(), repo, tags) + if err != nil { + t.Fatal(err) + } + if compressedRevision != plainRevision { + t.Fatalf("revision changed across compression threshold: %s != %s", compressedRevision, plainRevision) + } +} + +func TestFilesystemDatabaseNamespaceNameIsolation(t *testing.T) { + db, _ := newFilesystemDatabase(t, 1024) + repoA := RepoIdentity{Namespace: "team-a", Name: "app", CanonicalName: "example.com/app"} + repoB := RepoIdentity{Namespace: "team-b", Name: "app", CanonicalName: "example.com/app"} + + if _, err := db.SetTags(context.Background(), repoA, []string{"a"}); err != nil { + t.Fatal(err) + } + if _, err := db.SetTags(context.Background(), repoB, []string{"b"}); err != nil { + t.Fatal(err) + } + + loadedA, err := db.Tags(context.Background(), repoA) + if err != nil { + t.Fatal(err) + } + loadedB, err := db.Tags(context.Background(), repoB) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual([]string{"a"}, loadedA) { + t.Fatalf("repo A tags got %#v", loadedA) + } + if !reflect.DeepEqual([]string{"b"}, loadedB) { + t.Fatalf("repo B tags got %#v", loadedB) + } +} + +func TestFilesystemDatabaseDelete(t *testing.T) { + db, st := newFilesystemDatabase(t, 1024) + repo := testRepoIdentity("default", "podinfo") + if _, err := db.SetTags(context.Background(), repo, []string{"latest"}); err != nil { + t.Fatal(err) + } + if err := db.Delete(context.Background(), repo); err != nil { + t.Fatal(err) + } + if pathExistsForTest(t, filepath.Dir(st.LocalPath(artifactForRepo(repo, tagsFilePlain)))) { + t.Fatal("repo directory still exists after delete") + } +} + +func newFilesystemDatabase(t *testing.T, threshold int) (*FilesystemDatabase, *artifactstorage.Storage) { + t.Helper() + st := &artifactstorage.Storage{BasePath: t.TempDir()} + return NewFilesystemDatabase(st, threshold), st +} + +func testRepoIdentity(namespace, name string) RepoIdentity { + return RepoIdentity{Namespace: namespace, Name: name, CanonicalName: "example.com/" + name} +} + +func pathExistsForTest(t *testing.T, path string) bool { + t.Helper() + _, err := os.Stat(path) + if err == nil { + return true + } + if os.IsNotExist(err) { + return false + } + t.Fatal(err) + return false +} + +func TestGzipTagsDeterministic(t *testing.T) { + data := marshalTagsLines([]string{"latest", "v1.0.0"}) + first, err := gzipTags(data) + if err != nil { + t.Fatal(err) + } + second, err := gzipTags(data) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(first, second) { + t.Fatal("gzip output is not deterministic") + } + + reader, err := gzip.NewReader(bytes.NewReader(first)) + if err != nil { + t.Fatal(err) + } + if reader.Name != "" || !reader.ModTime.IsZero() { + t.Fatalf("gzip header contains name or modtime: name=%q modtime=%s", reader.Name, reader.ModTime) + } + if err := reader.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/internal/storage/format.go b/internal/storage/format.go new file mode 100644 index 00000000..aec579a3 --- /dev/null +++ b/internal/storage/format.go @@ -0,0 +1,205 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package storage + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" +) + +const ( + storageVersionFile = ".storage-version" + storageWipeInProgressFile = ".storage-wipe-in-progress" + filesystemStorageVersion = "2" +) + +// ReconcileFormat prepares the storage root for the selected storage backend. +// Switching backends wipes the rebuildable tag cache before either backend is +// initialized. +func ReconcileFormat(storagePath string, filesystemStorageEnabled bool) error { + if err := os.MkdirAll(storagePath, 0o700); err != nil { + return fmt.Errorf("failed to create storage path: %w", err) + } + + version, versionExists, err := readStorageVersion(storagePath) + if err != nil { + return err + } + + if !storageFormatMatches(version, versionExists, filesystemStorageEnabled) { + if err := ensureWipeMarker(storagePath); err != nil { + return err + } + if err := setStorageVersion(storagePath, filesystemStorageEnabled); err != nil { + return err + } + } + + wipeMarkerExists, err := pathExists(filepath.Join(storagePath, storageWipeInProgressFile)) + if err != nil { + return err + } + if !wipeMarkerExists { + return nil + } + + if err := wipeStoragePath(storagePath); err != nil { + return err + } + if err := syncDir(storagePath); err != nil { + return fmt.Errorf("failed to sync storage path after wipe: %w", err) + } + if err := os.Remove(filepath.Join(storagePath, storageWipeInProgressFile)); err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("failed to remove wipe marker: %w", err) + } + if err := syncDir(storagePath); err != nil { + return fmt.Errorf("failed to sync storage path after marker removal: %w", err) + } + + return nil +} + +func readStorageVersion(storagePath string) (string, bool, error) { + b, err := os.ReadFile(filepath.Join(storagePath, storageVersionFile)) + if errors.Is(err, os.ErrNotExist) { + return "", false, nil + } + if err != nil { + return "", false, fmt.Errorf("failed to read storage version: %w", err) + } + return strings.TrimSpace(string(b)), true, nil +} + +func storageFormatMatches(version string, versionExists, filesystemStorageEnabled bool) bool { + if filesystemStorageEnabled { + return versionExists && version == filesystemStorageVersion + } + return !versionExists +} + +func ensureWipeMarker(storagePath string) error { + path := filepath.Join(storagePath, storageWipeInProgressFile) + file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0o600) + if err != nil { + return fmt.Errorf("failed to create wipe marker: %w", err) + } + if err := syncFileAndClose(file); err != nil { + return fmt.Errorf("failed to sync wipe marker: %w", err) + } + if err := syncDir(storagePath); err != nil { + return fmt.Errorf("failed to sync storage path after marker creation: %w", err) + } + return nil +} + +func setStorageVersion(storagePath string, filesystemStorageEnabled bool) error { + path := filepath.Join(storagePath, storageVersionFile) + if !filesystemStorageEnabled { + if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("failed to remove storage version: %w", err) + } + if err := syncDir(storagePath); err != nil { + return fmt.Errorf("failed to sync storage path after version removal: %w", err) + } + return nil + } + + if err := writeFileSync(storagePath, storageVersionFile, []byte(filesystemStorageVersion+"\n"), 0o600); err != nil { + return fmt.Errorf("failed to write storage version: %w", err) + } + return nil +} + +func writeFileSync(dir, filename string, data []byte, mode os.FileMode) (retErr error) { + tmp, err := os.CreateTemp(dir, filename+"-") + if err != nil { + return err + } + tmpName := tmp.Name() + defer func() { + if retErr != nil { + _ = os.Remove(tmpName) + } + }() + + if _, err := tmp.Write(data); err != nil { + _ = tmp.Close() + return err + } + if err := tmp.Chmod(mode); err != nil { + _ = tmp.Close() + return err + } + if err := syncFileAndClose(tmp); err != nil { + return err + } + if err := os.Rename(tmpName, filepath.Join(dir, filename)); err != nil { + return err + } + return syncDir(dir) +} + +func wipeStoragePath(storagePath string) error { + entries, err := os.ReadDir(storagePath) + if err != nil { + return fmt.Errorf("failed to read storage path: %w", err) + } + for _, entry := range entries { + name := entry.Name() + if name == storageVersionFile || name == storageWipeInProgressFile { + continue + } + if err := os.RemoveAll(filepath.Join(storagePath, name)); err != nil { + return fmt.Errorf("failed to remove storage entry %q: %w", name, err) + } + } + return nil +} + +func syncFileAndClose(file *os.File) error { + if err := file.Sync(); err != nil { + _ = file.Close() + return err + } + return file.Close() +} + +func syncDir(path string) (retErr error) { + dir, err := os.Open(path) + if err != nil { + return err + } + defer func() { + if err := dir.Close(); err != nil && retErr == nil { + retErr = err + } + }() + return dir.Sync() +} + +func pathExists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + return false, fmt.Errorf("failed to stat %s: %w", path, err) +} diff --git a/internal/storage/format_test.go b/internal/storage/format_test.go new file mode 100644 index 00000000..5a5739f7 --- /dev/null +++ b/internal/storage/format_test.go @@ -0,0 +1,142 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package storage + +import ( + "os" + "path/filepath" + "reflect" + "sort" + "testing" +) + +func TestReconcileFormat(t *testing.T) { + tests := []struct { + name string + filesystemEnabled bool + version *string + data bool + wantEntries []string + }{ + { + name: "badger format remains untouched", + filesystemEnabled: false, + data: true, + wantEntries: []string{"data"}, + }, + { + name: "enable filesystem storage wipes old data", + filesystemEnabled: true, + data: true, + wantEntries: []string{storageVersionFile}, + }, + { + name: "disable filesystem storage wipes old data", + filesystemEnabled: false, + version: new(filesystemStorageVersion), + data: true, + wantEntries: []string{}, + }, + { + name: "filesystem format remains untouched", + filesystemEnabled: true, + version: new(filesystemStorageVersion), + data: true, + wantEntries: []string{storageVersionFile, "data"}, + }, + { + name: "unknown version is replaced", + filesystemEnabled: true, + version: new("unknown"), + data: true, + wantEntries: []string{storageVersionFile}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dir := t.TempDir() + if tt.version != nil { + if err := os.WriteFile(filepath.Join(dir, storageVersionFile), []byte(*tt.version), 0o600); err != nil { + t.Fatal(err) + } + } + if tt.data { + if err := os.WriteFile(filepath.Join(dir, "data"), []byte("data"), 0o600); err != nil { + t.Fatal(err) + } + } + + if err := ReconcileFormat(dir, tt.filesystemEnabled); err != nil { + t.Fatal(err) + } + + entries := readEntryNames(t, dir) + if !reflect.DeepEqual(tt.wantEntries, entries) { + t.Fatalf("entries got %#v, want %#v", entries, tt.wantEntries) + } + if contains(entries, storageWipeInProgressFile) { + t.Fatal("wipe marker was not removed") + } + }) + } +} + +func TestReconcileFormatCompletesInterruptedWipe(t *testing.T) { + dir := t.TempDir() + if err := os.WriteFile(filepath.Join(dir, storageVersionFile), []byte(filesystemStorageVersion), 0o600); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(dir, storageWipeInProgressFile), nil, 0o600); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(dir, "leftover"), []byte("data"), 0o600); err != nil { + t.Fatal(err) + } + + if err := ReconcileFormat(dir, true); err != nil { + t.Fatal(err) + } + + entries := readEntryNames(t, dir) + want := []string{storageVersionFile} + if !reflect.DeepEqual(want, entries) { + t.Fatalf("entries got %#v, want %#v", entries, want) + } +} + +func readEntryNames(t *testing.T, dir string) []string { + t.Helper() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatal(err) + } + names := make([]string, 0, len(entries)) + for _, entry := range entries { + names = append(names, entry.Name()) + } + sort.Strings(names) + return names +} + +func contains(items []string, item string) bool { + for _, v := range items { + if v == item { + return true + } + } + return false +} diff --git a/internal/storage/gc.go b/internal/storage/gc.go new file mode 100644 index 00000000..c44f977b --- /dev/null +++ b/internal/storage/gc.go @@ -0,0 +1,191 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package storage + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/fluxcd/pkg/apis/meta" + artifactstorage "github.com/fluxcd/pkg/artifact/storage" + "github.com/go-logr/logr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + imagev1 "github.com/fluxcd/image-reflector-controller/api/v1" +) + +// FilesystemGarbageCollector removes tag files for ImageRepository objects no +// longer handled by this controller instance. +type FilesystemGarbageCollector struct { + Interval time.Duration + + name string + storage *artifactstorage.Storage + client client.Client + log logr.Logger +} + +// NewFilesystemGarbageCollector creates a FilesystemGarbageCollector. +func NewFilesystemGarbageCollector(name string, storage *artifactstorage.Storage, client client.Client, interval time.Duration) *FilesystemGarbageCollector { + return &FilesystemGarbageCollector{ + Interval: interval, + name: name, + storage: storage, + client: client, + } +} + +// NeedLeaderElection ensures the GC runs only on the elected leader. +func (gc *FilesystemGarbageCollector) NeedLeaderElection() bool { + return true +} + +// Start runs the filesystem storage garbage collector after each interval. +func (gc *FilesystemGarbageCollector) Start(ctx context.Context) error { + gc.log = ctrl.LoggerFrom(ctx).WithName(gc.name) + gc.log.Info("Starting filesystem storage GC") + + timer := time.NewTimer(gc.Interval) + defer timer.Stop() + for { + select { + case <-timer.C: + gc.collect(ctx) + timer.Reset(gc.Interval) + case <-ctx.Done(): + gc.log.Info("Stopped filesystem storage GC") + return nil + } + } +} + +func (gc *FilesystemGarbageCollector) collect(ctx context.Context) { + gc.log.V(1).Info("Running filesystem storage GC") + onAPI, err := gc.repositoriesOnTheAPI(ctx) + if err != nil { + gc.log.Error(err, "failed to list ImageRepositories for filesystem storage GC") + return + } + + onDisk, err := gc.repositoriesOnDisk() + if err != nil { + gc.log.Error(err, "failed to enumerate filesystem storage entries") + return + } + + deleted := 0 + for _, repo := range onDisk { + if _, ok := onAPI[repo]; ok { + continue + } + artifact := meta.Artifact{Path: artifactstorage.ArtifactPath(imagev1.ImageRepositoryKind, repo.Namespace, repo.Name, tagsFilePlain)} + deletedDir, err := gc.storage.RemoveAll(artifact) + if err != nil { + gc.log.Error(err, "failed to delete orphaned filesystem storage entry", "repository", repo.String()) + continue + } + deleted++ + gc.log.V(1).Info("deleted orphaned filesystem storage entry", "repository", repo.String(), "path", deletedDir) + } + + if err := gc.pruneEmptyNamespaceDirs(); err != nil { + gc.log.Error(err, "failed to prune empty namespace directories") + } + + gc.log.V(1).Info("Ran filesystem storage GC", "deleted_entries", deleted) +} + +// pruneEmptyNamespaceDirs removes namespace directories left empty after their +// last repository entry is deleted, so empty directories don't accumulate. +func (gc *FilesystemGarbageCollector) pruneEmptyNamespaceDirs() error { + base := filepath.Join(gc.storage.BasePath, strings.ToLower(imagev1.ImageRepositoryKind)) + namespaces, err := os.ReadDir(base) + if errors.Is(err, os.ErrNotExist) { + return nil + } + if err != nil { + return fmt.Errorf("failed to read %s: %w", base, err) + } + + for _, namespace := range namespaces { + if !namespace.IsDir() { + continue + } + nsPath := filepath.Join(base, namespace.Name()) + entries, err := os.ReadDir(nsPath) + if err != nil { + return fmt.Errorf("failed to read %s: %w", nsPath, err) + } + if len(entries) > 0 { + continue + } + if err := os.Remove(nsPath); err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("failed to remove empty namespace directory %s: %w", nsPath, err) + } + gc.log.V(1).Info("removed empty namespace directory", "path", nsPath) + } + return nil +} + +func (gc *FilesystemGarbageCollector) repositoriesOnTheAPI(ctx context.Context) (map[client.ObjectKey]struct{}, error) { + var repos imagev1.ImageRepositoryList + if err := gc.client.List(ctx, &repos); err != nil { + return nil, err + } + + result := make(map[client.ObjectKey]struct{}, len(repos.Items)) + for i := range repos.Items { + repo := client.ObjectKey{Namespace: repos.Items[i].Namespace, Name: repos.Items[i].Name} + result[repo] = struct{}{} + } + return result, nil +} + +func (gc *FilesystemGarbageCollector) repositoriesOnDisk() ([]client.ObjectKey, error) { + base := filepath.Join(gc.storage.BasePath, strings.ToLower(imagev1.ImageRepositoryKind)) + namespaces, err := os.ReadDir(base) + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("failed to read %s: %w", base, err) + } + + var result []client.ObjectKey + for _, namespace := range namespaces { + if !namespace.IsDir() { + continue + } + nsPath := filepath.Join(base, namespace.Name()) + repos, err := os.ReadDir(nsPath) + if err != nil { + return nil, fmt.Errorf("failed to read %s: %w", nsPath, err) + } + for _, repo := range repos { + if !repo.IsDir() { + continue + } + result = append(result, client.ObjectKey{Namespace: namespace.Name(), Name: repo.Name()}) + } + } + return result, nil +} diff --git a/internal/storage/gc_test.go b/internal/storage/gc_test.go new file mode 100644 index 00000000..a542170e --- /dev/null +++ b/internal/storage/gc_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package storage + +import ( + "context" + "errors" + "os" + "path/filepath" + "testing" + "time" + + artifactstorage "github.com/fluxcd/pkg/artifact/storage" + "github.com/go-logr/logr/testr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + imagev1 "github.com/fluxcd/image-reflector-controller/api/v1" +) + +func TestFilesystemGarbageCollectorNeedLeaderElection(t *testing.T) { + gc := &FilesystemGarbageCollector{} + if !gc.NeedLeaderElection() { + t.Fatal("NeedLeaderElection() returned false") + } +} + +func TestFilesystemGarbageCollectorDeletesOrphans(t *testing.T) { + st, cl := newGCTestStorage(t, &imagev1.ImageRepository{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "keep"}}) + createRepoDir(t, st, "default", "keep") + createRepoDir(t, st, "default", "orphan") + + gc := NewFilesystemGarbageCollector("test-gc", st, cl, time.Minute) + gc.log = testr.New(t) + gc.collect(context.Background()) + + if !repoDirExists(t, st, "default", "keep") { + t.Fatal("responsible repo was deleted") + } + if repoDirExists(t, st, "default", "orphan") { + t.Fatal("orphan repo was not deleted") + } +} + +func TestFilesystemGarbageCollectorSkipsDeleteOnListError(t *testing.T) { + st, cl := newGCTestStorage(t) + createRepoDir(t, st, "default", "orphan") + + gc := NewFilesystemGarbageCollector("test-gc", st, listErrorClient{Client: cl}, time.Minute) + gc.log = testr.New(t) + gc.collect(context.Background()) + + if !repoDirExists(t, st, "default", "orphan") { + t.Fatal("orphan repo was deleted after list error") + } +} + +func TestFilesystemGarbageCollectorEmptyResponsibleSetDeletesAll(t *testing.T) { + st, cl := newGCTestStorage(t) + createRepoDir(t, st, "default", "one") + createRepoDir(t, st, "default", "two") + + gc := NewFilesystemGarbageCollector("test-gc", st, cl, time.Minute) + gc.log = testr.New(t) + gc.collect(context.Background()) + + if repoDirExists(t, st, "default", "one") || repoDirExists(t, st, "default", "two") { + t.Fatal("repos were not deleted for empty responsible set") + } +} + +func TestFilesystemGarbageCollectorPrunesEmptyNamespaceDirs(t *testing.T) { + st, cl := newGCTestStorage(t, &imagev1.ImageRepository{ObjectMeta: metav1.ObjectMeta{Namespace: "keep-ns", Name: "keep"}}) + createRepoDir(t, st, "keep-ns", "keep") + // orphan repo whose namespace becomes empty once it's pruned + createRepoDir(t, st, "orphan-ns", "orphan") + + gc := NewFilesystemGarbageCollector("test-gc", st, cl, time.Minute) + gc.log = testr.New(t) + gc.collect(context.Background()) + + if namespaceDirExists(t, st, "orphan-ns") { + t.Fatal("empty namespace directory was not pruned") + } + if !namespaceDirExists(t, st, "keep-ns") { + t.Fatal("namespace directory with a live repo was pruned") + } +} + +type listErrorClient struct { + client.Client +} + +func (c listErrorClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + return errors.New("list failed") +} + +func newGCTestStorage(t *testing.T, objs ...client.Object) (*artifactstorage.Storage, client.Client) { + t.Helper() + scheme := runtime.NewScheme() + if err := imagev1.AddToScheme(scheme); err != nil { + t.Fatal(err) + } + return &artifactstorage.Storage{BasePath: t.TempDir()}, fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() +} + +func createRepoDir(t *testing.T, st *artifactstorage.Storage, namespace, name string) { + t.Helper() + path := filepath.Join(st.BasePath, "imagerepository", namespace, name) + if err := os.MkdirAll(path, 0o700); err != nil { + t.Fatal(err) + } +} + +func namespaceDirExists(t *testing.T, st *artifactstorage.Storage, namespace string) bool { + t.Helper() + _, err := os.Stat(filepath.Join(st.BasePath, "imagerepository", namespace)) + if err == nil { + return true + } + if os.IsNotExist(err) { + return false + } + t.Fatal(err) + return false +} + +func repoDirExists(t *testing.T, st *artifactstorage.Storage, namespace, name string) bool { + t.Helper() + _, err := os.Stat(filepath.Join(st.BasePath, "imagerepository", namespace, name)) + if err == nil { + return true + } + if os.IsNotExist(err) { + return false + } + t.Fatal(err) + return false +} diff --git a/internal/storage/repository.go b/internal/storage/repository.go new file mode 100644 index 00000000..b4bd5879 --- /dev/null +++ b/internal/storage/repository.go @@ -0,0 +1,50 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import "context" + +// RepoIdentity identifies an ImageRepository for tag storage. Implementations +// choose which fields form their storage key: +// - BadgerDatabase keys by CanonicalName. +// - FilesystemDatabase keys by Namespace and Name. +type RepoIdentity struct { + Namespace string + Name string + CanonicalName string +} + +// Database combines tag read and write operations. +type Database interface { + DatabaseWriter + DatabaseReader +} + +// DatabaseWriter implementations record the tags for an image repository. +type DatabaseWriter interface { + SetTags(ctx context.Context, repo RepoIdentity, tags []string) (revision string, err error) + Delete(ctx context.Context, repo RepoIdentity) error +} + +// DatabaseReader implementations get the stored set of tags for an image +// repository. +// +// If no tags are available for the repo, then implementations should return an +// empty set of tags. +type DatabaseReader interface { + Tags(ctx context.Context, repo RepoIdentity) (tags []string, err error) +} diff --git a/main.go b/main.go index f02887e2..a054c679 100644 --- a/main.go +++ b/main.go @@ -28,7 +28,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -36,6 +35,7 @@ import ( ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + artifactstorage "github.com/fluxcd/pkg/artifact/storage" "github.com/fluxcd/pkg/auth" pkgcache "github.com/fluxcd/pkg/cache" "github.com/fluxcd/pkg/runtime/acl" @@ -56,6 +56,7 @@ import ( "github.com/fluxcd/image-reflector-controller/internal/database" "github.com/fluxcd/image-reflector-controller/internal/features" "github.com/fluxcd/image-reflector-controller/internal/registry" + tagstorage "github.com/fluxcd/image-reflector-controller/internal/storage" ) const ( @@ -81,30 +82,32 @@ func main() { ) var ( - metricsAddr string - eventsAddr string - healthAddr string - clientOptions client.Options - logOptions logger.Options - leaderElectionOptions leaderelection.Options - watchOptions helper.WatchOptions - storagePath string - storageValueLogFileSize int64 - gcInterval uint16 // max value is 65535 minutes (~ 45 days) which is well under the maximum time.Duration - concurrent int - aclOptions acl.Options - rateLimiterOptions helper.RateLimiterOptions - featureGates feathelper.FeatureGates - tokenCacheOptions pkgcache.TokenFlags - defaultServiceAccount string - requeueDependency time.Duration + metricsAddr string + eventsAddr string + healthAddr string + clientOptions client.Options + logOptions logger.Options + leaderElectionOptions leaderelection.Options + watchOptions helper.WatchOptions + storagePath string + storageValueLogFileSize int64 + storageCompressionThresholdKiB int + gcInterval uint16 // max value is 65535 minutes (~ 45 days) which is well under the maximum time.Duration + concurrent int + aclOptions acl.Options + rateLimiterOptions helper.RateLimiterOptions + featureGates feathelper.FeatureGates + tokenCacheOptions pkgcache.TokenFlags + defaultServiceAccount string + requeueDependency time.Duration ) flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&eventsAddr, "events-addr", "", "The address of the events receiver.") flag.StringVar(&healthAddr, "health-addr", ":9440", "The address the health endpoint binds to.") flag.StringVar(&storagePath, "storage-path", "/data", "Where to store the persistent database of image metadata") - flag.Int64Var(&storageValueLogFileSize, "storage-value-log-file-size", 1<<28, "Set the database's memory mapped value log file size in bytes. Effective memory usage is about two times this size.") + flag.Int64Var(&storageValueLogFileSize, "storage-value-log-file-size", 1<<28, "Set the Badger database's memory mapped value log file size in bytes. Effective memory usage is about two times this size.") + flag.IntVar(&storageCompressionThresholdKiB, "storage-compression-threshold", 64, "Minimum uncompressed tag data size in KiB before filesystem storage compresses it.") flag.Uint16Var(&gcInterval, "gc-interval", 10, "The number of minutes to wait between garbage collections. 0 disables the garbage collector.") flag.IntVar(&concurrent, "concurrent", 4, "The number of concurrent resource reconciles.") flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.") @@ -146,21 +149,37 @@ func main() { os.Exit(1) } - badgerOpts := badger.DefaultOptions(storagePath) - badgerOpts.ValueLogFileSize = storageValueLogFileSize - badgerDB, err := badger.Open(badgerOpts) + useFilesystemStorage, err := features.Enabled(features.FluxStorage) if err != nil { - setupLog.Error(err, "unable to open the Badger database") + setupLog.Error(err, "unable to check feature gate "+features.FluxStorage) + os.Exit(1) + } + if useFilesystemStorage && storageCompressionThresholdKiB <= 0 { + setupLog.Error(fmt.Errorf("value must be greater than zero"), "invalid --storage-compression-threshold") os.Exit(1) } - defer badgerDB.Close() - db := database.NewBadgerDatabase(badgerDB) - var badgerGC *database.BadgerGarbageCollector - if gcInterval > 0 { - badgerGC = database.NewBadgerGarbageCollector("badger-gc", badgerDB, time.Duration(gcInterval)*time.Minute, discardRatio) + if err := tagstorage.ReconcileFormat(storagePath, useFilesystemStorage); err != nil { + setupLog.Error(err, "unable to reconcile storage format") + os.Exit(1) + } + + var db tagstorage.Database + var artifactStorage *artifactstorage.Storage + var badgerDB *badger.DB + if useFilesystemStorage { + artifactStorage = &artifactstorage.Storage{BasePath: storagePath} + db = tagstorage.NewFilesystemDatabase(artifactStorage, storageCompressionThresholdKiB*1024) } else { - setupLog.V(1).Info("Badger garbage collector is disabled") + badgerOpts := badger.DefaultOptions(storagePath) + badgerOpts.ValueLogFileSize = storageValueLogFileSize + badgerDB, err = badger.Open(badgerOpts) + if err != nil { + setupLog.Error(err, "unable to open the Badger database") + os.Exit(1) + } + defer badgerDB.Close() + db = database.NewBadgerDatabase(badgerDB) } watchNamespace := "" @@ -217,14 +236,14 @@ func main() { ExtraHandlers: pprof.GetHandlers(), }, Controller: config.Controller{ - RecoverPanic: pointer.Bool(true), + RecoverPanic: new(true), MaxConcurrentReconciles: concurrent, }, } if watchNamespace != "" { mgrConfig.Cache.DefaultNamespaces = map[string]ctrlcache.Config{ - watchNamespace: ctrlcache.Config{}, + watchNamespace: {}, } } @@ -234,12 +253,23 @@ func main() { os.Exit(1) } - if badgerGC != nil { - err := mgr.Add(badgerGC) - if err != nil { - setupLog.Error(err, "unable to add GC to manager") - os.Exit(1) + if gcInterval > 0 { + gcIntervalDuration := time.Duration(gcInterval) * time.Minute + if useFilesystemStorage { + if err := mgr.Add(tagstorage.NewFilesystemGarbageCollector("filesystem-storage-gc", artifactStorage, mgr.GetClient(), gcIntervalDuration)); err != nil { + setupLog.Error(err, "unable to add filesystem storage GC to manager") + os.Exit(1) + } + } else { + if err := mgr.Add(database.NewBadgerGarbageCollector("badger-gc", badgerDB, gcIntervalDuration, discardRatio)); err != nil { + setupLog.Error(err, "unable to add Badger GC to manager") + os.Exit(1) + } } + } else if useFilesystemStorage { + setupLog.V(1).Info("filesystem storage garbage collector is disabled") + } else { + setupLog.V(1).Info("Badger garbage collector is disabled") } probes.SetupChecks(mgr, setupLog)