diff --git a/.gitignore b/.gitignore index e74e03b..2a9ab97 100644 --- a/.gitignore +++ b/.gitignore @@ -20,15 +20,15 @@ go.work # ========================= # coldkeep specific binaries # ========================= -coldkeep app/coldkeep +coldkeep + # binaries generated during development *.exe *.out # ========================= # Local storage directories # ========================= -storage/ storage/containers/ storage/output/ pgdata/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c38e87..b697332 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,13 +2,58 @@ All notable changes to this project will be documented in this file. -This project follows a lightweight, research-friendly versioning -approach. Version numbers indicate conceptual milestones rather than +This project follows a lightweight, prototype-friendly versioning +approach. + +Version numbers indicate conceptual milestones rather than production stability. ------------------------------------------------------------------------ -## \[0.1.0\] - 2026-02-24 +## [0.2.0]- 2026-03-11 + +Crash-consistency foundation for the storage engine + +### Added + +- Logical file lifecycle management +- Chunk lifecycle management +- Retry handling for interrupted operations +- Startup recovery system +- Container quarantine mechanism +- Extended storage statistics +- Smoke test improvements +- Durable container writes with fsync to guarantee on-disk persistence + +### Improved + +- Concurrent file ingestion +- Garbage collection safety +- Operational observability + +### Notes + +This version introduces the core reliability model for the storage +engine. + +The on-disk format and APIs may still change in future releases. + +### Known Limitations + +- Basic crash recovery exists, but full end-to-end crash consistency across + filesystem and database layers is still evolving. +- No encryption at rest or in transit. +- No authentication or authorization model. +- Whole-container compression is not suitable for efficient random-access + restores. +- Concurrency behavior has not been heavily stress-tested under high parallel + workloads. +- No background integrity verification or automatic container scrubbing. +- On-disk storage format may change before v1.0. + +------------------------------------------------------------------------ + +## [0.1.0] - 2026-02-24 Initial public research prototype (POC). diff --git a/app/Dockerfile b/Dockerfile similarity index 77% rename from app/Dockerfile rename to Dockerfile index 1974af8..7ea94f7 100644 --- a/app/Dockerfile +++ b/Dockerfile @@ -6,16 +6,21 @@ WORKDIR /app # Install git (required for some Go modules) RUN apk add --no-cache git +# Copy module files first (better layer caching) COPY go.mod go.sum ./ RUN go mod download -COPY app/ . -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o coldkeep . +# Copy source +COPY cmd/ ./cmd/ +COPY internal/ ./internal/ + +# Build binary +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o coldkeep ./cmd/coldkeep + # ---------- Runtime ---------- FROM alpine:3.19 -# Install runtime dependencies RUN apk add --no-cache \ ca-certificates \ bash @@ -25,9 +30,8 @@ WORKDIR /app # Copy binary COPY --from=builder /app/coldkeep /usr/local/bin/coldkeep -# Copy scripts for smoke/testing +# Copy scripts COPY scripts/ ./scripts/ -# Default command ENTRYPOINT ["coldkeep"] CMD ["stats"] \ No newline at end of file diff --git a/README.md b/README.md index 2302fa6..8250be5 100644 --- a/README.md +++ b/README.md @@ -1,54 +1,106 @@ -# coldkeep (POC) +# coldkeep ![CI](https://github.com/franchoy/coldkeep/actions/workflows/ci.yml/badge.svg) ![Go Version](https://img.shields.io/badge/go-1.23+-blue) ![License](https://img.shields.io/badge/license-Apache%202.0-blue) -![Status](https://img.shields.io/badge/status-research%20prototype-orange) +![Status](https://img.shields.io/badge/status-research%20experimental-orange) -> **Status:** Research prototype / proof‑of‑concept.\ -> **Not production‑ready. Do not use for real or sensitive data.** +> **Status:** Experimental research projec.\ +> **Not production-ready. Do not use for real or sensitive data.** -coldkeep is an experimental local-first content‑addressed file storage -prototype written in Go.\ -It stores files as **chunks** inside **container** files on disk, and -tracks metadata in Postgres. +coldkeep is an experimental **local-first content-addressed file storage engine** +written in Go. -This repo is meant for learning, experimentation, and discussion --- not -for operational backup/storage use. +Files are split into **content-addressed chunks**, packed into +**container files on disk**, and tracked through **PostgreSQL +metadata**. + +This repository exists primarily for **learning, experimentation, and +design exploration** --- not for operational backup or production +storage. ------------------------------------------------------------------------ -## What it does (today) +# What it does (today) - Store a file (or folder) by splitting it into chunks. -- Deduplicate chunks using SHA‑256. -- Pack chunks into container files up to a max size. -- Restore a file by reconstructing it from chunks. -- Remove a logical file (decrements chunk ref counts). -- Run GC to delete unreferenced chunks/containers. -- Basic stats. +- Deduplicate chunks using SHA-256. +- Pack chunks into container files up to a maximum size. +- Restore a file by reconstructing it from stored chunks. +- Remove logical files (decrements chunk reference counts). +- Run garbage collection to remove unreferenced chunks. +- Recover safely from interrupted operations on startup. +- Display storage statistics and container health information. ------------------------------------------------------------------------ -## Design sketch +# Design sketch + +Core tables: + +- **logical_file**\ + User-visible file entry (name, size, file_hash). + +- **chunk**\ + Content-addressed chunk (chunk_hash, size, ref_count, container_id, + offset). + +- **file_chunk**\ + Ordered mapping between logical files and chunks. -- **logical_file**: user-facing file entry (name, size, file_hash) -- **chunk**: content-addressed chunk (hash, size, ref_count, - container_id, offset) -- **file_chunk**: mapping from logical_file → ordered chunk list -- **container**: physical file on disk that stores chunk data +- **container**\ + Physical container file storing chunk data. -Containers are plain files under: +Containers are stored on disk under: storage/containers/ +Lifecycle states: + +- logical_file: PROCESSING → COMPLETED → ABORTED +- chunk: PROCESSING → COMPLETED → ABORTED + +These states allow coldkeep to detect interrupted operations and +recover safely on startup. + +------------------------------------------------------------------------ + +# Project structure + + coldkeep/ + │ + ├─ cmd/ + │ └─ coldkeep/ # CLI entrypoint + │ + ├─ internal/ + │ ├─ container/ # container format + container management + │ ├─ chunk/ # chunking and compression logic + │ ├─ db/ # database connection helpers + │ ├─ storage/ # store / restore / remove pipeline + │ ├─ maintenance/ # gc and stats + │ ├─ listing/ # file listing operations + │ └─ utils/ # small helper utilities + │ + ├─ tests/ # integration tests + ├─ scripts/ # smoke / development scripts + ├─ db/ # database schema + │ + ├─ docker-compose.yml + ├─ go.mod + └─ README.md + +`internal/` packages implement the storage engine.\ +`cmd/` contains the CLI entrypoint. + ------------------------------------------------------------------------ -## Quickstart (Using the included `samples/` folder) +# Quickstart + +A small `samples/` folder is included for testing. -This repository includes a small `samples/` directory for testing. +------------------------------------------------------------------------ -### 🐳 With Docker +# 🐳 With Docker Start services: @@ -74,7 +126,7 @@ List stored files: docker compose run --rm app list ``` -Restore a file (replace ID as needed): +Restore a file: ``` bash docker compose run --rm app restore 1 _out.bin @@ -94,20 +146,18 @@ docker compose run --rm app gc ------------------------------------------------------------------------ -### 💻 Local (without Docker) +# 💻 Local development (without Docker) -Start Postgres (example via Docker): +Start Postgres (example): ``` bash docker compose up -d postgres ``` -Build: +Build the CLI: ``` bash -cd app -go build -o ../coldkeep . -cd .. +go build -o coldkeep ./cmd/coldkeep ``` Store the sample folder: @@ -116,7 +166,7 @@ Store the sample folder: ./coldkeep store-folder samples ``` -List files: +List stored files: ``` bash ./coldkeep list @@ -125,7 +175,7 @@ List files: Restore a file: ``` bash -./coldkeep restore 1 ./restored.bin +./coldkeep restore 1 restored.bin ``` Show stats: @@ -142,110 +192,152 @@ Run GC: ------------------------------------------------------------------------ -## Configuration +# Configuration -The app reads DB connection info from environment variables\ +Database configuration is read from environment variables\ (see `docker-compose.yml` for defaults). -Storage goes to: +Storage is written to: ./storage/ -You can safely delete the entire `storage/` directory during testing. +During development you can safely delete this directory. + +Additional environment variables used in development: + +- `COLDKEEP_STORAGE_DIR` +- `COLDKEEP_SAMPLES_DIR` ------------------------------------------------------------------------ -## Known limitations (important) +# Known limitations + +## Crash recovery -### Crash consistency +coldkeep now includes a basic crash recovery model. -coldkeep is **not crash-consistent**. Some operations combine filesystem -writes with DB transactions, but filesystem changes cannot be rolled -back if a DB transaction fails. +Operations use lifecycle states (`PROCESSING`, `COMPLETED`, `ABORTED`) +to detect interrupted operations. -Possible effects: +On startup the system: -- Orphan container files on disk -- Temporary disagreement between DB and disk -- Partially applied container sealing/compression +- marks stale `PROCESSING` rows as `ABORTED` +- prevents incomplete chunks from being reused +- allows safe retries of interrupted operations -Use only with disposable test data. +However, the system is still experimental and full transactional +guarantees across filesystem and database layers are not yet complete. + +Use only with **disposable test data**. ------------------------------------------------------------------------ -### Whole-container compression +## Container compression + +Container-level compression currently compresses the whole container. -If container compression is enabled (gzip/zstd), restores may become -very slow because compressed streams are not seekable. +This makes random access difficult because compressed streams are not +seekable. -Default for this POC is **no container compression**. +Default for this prototype: **no compression**. ------------------------------------------------------------------------ -### Concurrency & integrity +## Concurrency & integrity -- Concurrency guarantees are minimal. -- Heavy concurrent store/remove/gc is not a goal for v0. -- Concurrent operations may leave orphan bytes inside containers. +Concurrency support is still evolving. + +Basic protections exist to avoid duplicate chunk ingestion and to +coordinate concurrent writers, but the system has not yet been +stress-tested for heavy parallel workloads. + +- Concurrent store/remove/gc operations are not a focus for v0. +- Concurrent operations may leave unused bytes in containers. + +Future versions will improve: + +- crash recovery +- concurrency coordination +- container lifecycle safety ------------------------------------------------------------------------ -## Security +# Security See `SECURITY.md`. This project is a prototype and should not be used to protect sensitive -information. +data. ------------------------------------------------------------------------ -## Development +# Development + +## Build -### Tests + go build ./cmd/coldkeep -- Unit tests: `go test ./...` -- Integration tests require Postgres (see tests for details). +## Tests -### Smoke script +Run all tests: -`scripts/smoke.sh` provides a quick end‑to‑end test using the `samples/` -folder. + go test ./... + +Integration tests live under: + + tests/ + +and require a running PostgreSQL instance. + +------------------------------------------------------------------------ -#### Local (without Docker) +## Smoke test +`scripts/smoke.sh` runs a full end-to-end workflow : using the `samples/` directory. + +store → stats → list → restore → dedup check. + +### Local + +``` bash docker compose up -d postgres -go build -o coldkeep ./app +go build -o coldkeep ./cmd/coldkeep bash scripts/smoke.sh +``` -#### Docker +### Docker +``` bash docker compose up -d postgres -docker compose run --rm + +docker compose run --rm \ -e COLDKEEP_SAMPLES_DIR=/samples \ -e COLDKEEP_STORAGE_DIR=/tmp/coldkeep-storage \ -v "$PWD/samples:/samples" \ --entrypoint bash \ app scripts/smoke.sh +``` ------------------------------------------------------------------------ -## Roadmap ideas +# Roadmap ideas -- Framed container format for random access with compression. -- Stronger crash consistency. -- Safer concurrent operations. -- Cloud backends experiments. -- CLI improvements and richer stats. +- framed container format with random-access compression +- stronger crash consistency guarantees +- improved concurrent ingestion +- experimental cloud storage backends +- richer CLI and observability ------------------------------------------------------------------------ -## Contributing +# Contributing + +Contributions and discussion are welcome. -Contributions and discussion are welcome.\ See `CONTRIBUTING.md`. ------------------------------------------------------------------------ -## License +# License Apache-2.0. See `LICENSE`. \ No newline at end of file diff --git a/app/integration_test.go b/app/integration_test.go deleted file mode 100644 index f724408..0000000 --- a/app/integration_test.go +++ /dev/null @@ -1,507 +0,0 @@ -package main - -import ( - "bytes" - "crypto/sha256" - "database/sql" - "encoding/hex" - "os" - "path/filepath" - "testing" - "time" -) - -// NOTE: -// These tests are integration-style (DB + filesystem). -// They are skipped unless COLDKEEP_TEST_DB=1 is set. -// -// Run (example): -// COLDKEEP_TEST_DB=1 DB_HOST=localhost DB_PORT=5432 DB_USER=coldkeep DB_PASSWORD=coldkeep DB_NAME=coldkeep go test ./app -v - -func requireDB(t *testing.T) { - t.Helper() - if os.Getenv("COLDKEEP_TEST_DB") == "" { - t.Skip("Set COLDKEEP_TEST_DB=1 to run integration tests") - } -} - -func applySchema(t *testing.T, db *sql.DB) { - t.Helper() - - // 1) Allow explicit override (best for Docker / CI) - if p := os.Getenv("COLDKEEP_SCHEMA_PATH"); p != "" { - b, err := os.ReadFile(p) - if err != nil { - t.Fatalf("read schema %s: %v", p, err) - } - if _, err := db.Exec(string(b)); err != nil { - t.Fatalf("apply schema: %v", err) - } - return - } - - // 2) Walk upwards from cwd to find db/init.sql - cwd, err := os.Getwd() - if err == nil { - dir := cwd - for i := 0; i < 6; i++ { // climb up a few levels - candidate := filepath.Join(dir, "db", "init.sql") - if _, statErr := os.Stat(candidate); statErr == nil { - b, err := os.ReadFile(candidate) - if err != nil { - t.Fatalf("read schema %s: %v", candidate, err) - } - if _, err := db.Exec(string(b)); err != nil { - t.Fatalf("apply schema: %v", err) - } - return - } - parent := filepath.Dir(dir) - if parent == dir { - break - } - dir = parent - } - } - - // 3) Common fallbacks for containers / mounts - candidates := []string{ - "../db/init.sql", - "../../db/init.sql", - "/repo/db/init.sql", - "/work/db/init.sql", - "/db/init.sql", - "/app/db/init.sql", - } - for _, p := range candidates { - if _, statErr := os.Stat(p); statErr == nil { - b, err := os.ReadFile(p) - if err != nil { - t.Fatalf("read schema %s: %v", p, err) - } - if _, err := db.Exec(string(b)); err != nil { - t.Fatalf("apply schema: %v", err) - } - return - } - } - - t.Fatalf("could not find db/init.sql; set COLDKEEP_SCHEMA_PATH to an absolute path") -} - -func resetDB(t *testing.T, db *sql.DB) { - t.Helper() - // Keep schema_version; clear the data tables and reset sequences. - _, err := db.Exec(` - TRUNCATE TABLE - file_chunk, - chunk, - logical_file, - container - RESTART IDENTITY CASCADE - `) - if err != nil { - t.Fatalf("truncate tables: %v", err) - } -} - -func resetStorage(t *testing.T) { - t.Helper() - if storageDir == "" { - t.Fatalf("storageDir is empty") - } - _ = os.RemoveAll(storageDir) - if err := os.MkdirAll(storageDir, 0o755); err != nil { - t.Fatalf("mkdir storageDir: %v", err) - } -} - -func sha256File(t *testing.T, path string) string { - t.Helper() - b, err := os.ReadFile(path) - if err != nil { - t.Fatalf("read %s: %v", path, err) - } - sum := sha256.Sum256(b) - return hex.EncodeToString(sum[:]) -} - -func createTempFile(t *testing.T, dir, name string, size int) string { - t.Helper() - p := filepath.Join(dir, name) - data := make([]byte, size) - - // Deterministic content (repeatable). - for i := 0; i < size; i++ { - data[i] = byte((i*31 + 7) % 251) - } - - if err := os.WriteFile(p, data, 0o644); err != nil { - t.Fatalf("write temp file: %v", err) - } - return p -} - -func fetchFileIDByHash(t *testing.T, db *sql.DB, fileHash string) int64 { - t.Helper() - var id int64 - err := db.QueryRow(`SELECT id FROM logical_file WHERE file_hash = $1`, fileHash).Scan(&id) - if err != nil { - t.Fatalf("query logical_file by hash: %v", err) - } - return id -} - -func TestRoundTripStoreRestore(t *testing.T) { - requireDB(t) - - // Use temp dirs per test - tmp := t.TempDir() - storageDir = filepath.Join(tmp, "containers") - _ = os.Setenv("COLDKEEP_STORAGE_DIR", storageDir) - resetStorage(t) - - db, err := connectDB() - if err != nil { - t.Fatalf("connectDB: %v", err) - } - defer db.Close() - - applySchema(t, db) - resetDB(t, db) - - // Ensure we don't exercise heavy compression here. - defaultCompression = CompressionNone - - inputDir := filepath.Join(tmp, "input") - _ = os.MkdirAll(inputDir, 0o755) - - // 512KB file should create multiple chunks depending on CDC params. - inPath := createTempFile(t, inputDir, "roundtrip.bin", 512*1024) - want := mustRead(t, inPath) - wantHash := sha256File(t, inPath) - - if err := storeFileWithDB(db, inPath); err != nil { - t.Fatalf("storeFileWithDB: %v", err) - } - - fileID := fetchFileIDByHash(t, db, wantHash) - - outDir := filepath.Join(tmp, "out") - _ = os.MkdirAll(outDir, 0o755) - outPath := filepath.Join(outDir, "roundtrip.restored.bin") - - if err := restoreFileWithDB(db, fileID, outPath); err != nil { - t.Fatalf("restoreFileWithDB: %v", err) - } - - got := mustRead(t, outPath) - if !bytes.Equal(want, got) { - t.Fatalf("restored bytes differ from original") - } - - gotHash := sha256File(t, outPath) - if gotHash != wantHash { - t.Fatalf("hash mismatch: want %s got %s", wantHash, gotHash) - } -} - -func TestDedupSameFile(t *testing.T) { - requireDB(t) - - tmp := t.TempDir() - storageDir = filepath.Join(tmp, "containers") - _ = os.Setenv("COLDKEEP_STORAGE_DIR", storageDir) - resetStorage(t) - - db, err := connectDB() - if err != nil { - t.Fatalf("connectDB: %v", err) - } - defer db.Close() - - applySchema(t, db) - resetDB(t, db) - - defaultCompression = CompressionNone - - inputDir := filepath.Join(tmp, "input") - _ = os.MkdirAll(inputDir, 0o755) - inPath := createTempFile(t, inputDir, "dup.bin", 256*1024) - fileHash := sha256File(t, inPath) - - if err := storeFileWithDB(db, inPath); err != nil { - t.Fatalf("first store: %v", err) - } - if err := storeFileWithDB(db, inPath); err != nil { - t.Fatalf("second store: %v", err) - } - - // Should still be 1 logical file for this hash - var n int - if err := db.QueryRow(`SELECT COUNT(*) FROM logical_file WHERE file_hash = $1`, fileHash).Scan(&n); err != nil { - t.Fatalf("count logical_file: %v", err) - } - if n != 1 { - t.Fatalf("expected 1 logical_file row, got %d", n) - } -} - -func TestStoreFolderParallelSmoke(t *testing.T) { - requireDB(t) - - tmp := t.TempDir() - storageDir = filepath.Join(tmp, "containers") - _ = os.Setenv("COLDKEEP_STORAGE_DIR", storageDir) - resetStorage(t) - - db, err := connectDB() - if err != nil { - t.Fatalf("connectDB: %v", err) - } - defer db.Close() - applySchema(t, db) - resetDB(t, db) - - defaultCompression = CompressionNone - - // Build folder with duplicates + shared-chunk variants - inputDir := filepath.Join(tmp, "folder") - _ = os.MkdirAll(inputDir, 0o755) - - paths := make([]string, 0, 32) - - // 1) Base unique-ish files - for i := 0; i < 10; i++ { - paths = append(paths, createTempFile(t, inputDir, "file_"+itoa(i)+".bin", 64*1024)) - } - - // 2) Exact duplicates (file-level dedupe) - for i := 0; i < 3; i++ { - src := paths[i] - dst := filepath.Join(inputDir, "dup_"+itoa(i)+".bin") - b := mustRead(t, src) - if err := os.WriteFile(dst, b, 0o644); err != nil { - t.Fatalf("write dup: %v", err) - } - paths = append(paths, dst) - } - - // 3) Shared-chunk-but-different files (chunk-level dedupe) - // Create a shared prefix and combine with per-file unique suffix. - // This should cause some shared chunks even when full file hashes differ. - sharedPrefix := make([]byte, 32*1024) - for i := range sharedPrefix { - sharedPrefix[i] = byte((i*17 + 3) % 251) - } - - for i := 0; i < 3; i++ { - suffix := make([]byte, 32*1024) - for j := range suffix { - suffix[j] = byte((j*31 + 7 + i) % 251) - } - - hybrid := append(append([]byte{}, sharedPrefix...), suffix...) - dst := filepath.Join(inputDir, "hybrid_"+itoa(i)+".bin") - if err := os.WriteFile(dst, hybrid, 0o644); err != nil { - t.Fatalf("write hybrid: %v", err) - } - paths = append(paths, dst) - } - - // Run storeFolder with timeout to catch deadlocks/hangs. - done := make(chan error, 1) - go func() { done <- storeFolder(inputDir) }() - - select { - case err := <-done: - if err != nil { - t.Fatalf("storeFolder: %v", err) - } - case <-time.After(30 * time.Second): - t.Fatalf("storeFolder timed out (possible deadlock or blocked workers)") - } - - // Spot-check: restore a few logical files and compare hashes. - rows, err := db.Query(`SELECT id, file_hash, original_name FROM logical_file ORDER BY id ASC LIMIT 5`) - if err != nil { - t.Fatalf("query logical_file: %v", err) - } - defer rows.Close() - - outDir := filepath.Join(tmp, "out") - _ = os.MkdirAll(outDir, 0o755) - - for rows.Next() { - var id int64 - var expectHash, name string - if err := rows.Scan(&id, &expectHash, &name); err != nil { - t.Fatalf("scan: %v", err) - } - outPath := filepath.Join(outDir, name) - if err := restoreFileWithDB(db, id, outPath); err != nil { - t.Fatalf("restore %d: %v", id, err) - } - gotHash := sha256File(t, outPath) - if gotHash != expectHash { - t.Fatalf("hash mismatch for restored id=%d want=%s got=%s", id, expectHash, gotHash) - } - } - if err := rows.Err(); err != nil { - t.Fatalf("rows: %v", err) - } -} - -// Helpers (small, local) -func mustRead(t *testing.T, p string) []byte { - t.Helper() - b, err := os.ReadFile(p) - if err != nil { - t.Fatalf("read %s: %v", p, err) - } - return b -} - -func itoa(i int) string { - // small int to string without fmt to keep output clean - if i == 0 { - return "0" - } - neg := i < 0 - if neg { - i = -i - } - var buf [32]byte - pos := len(buf) - for i > 0 { - pos-- - buf[pos] = byte('0' + (i % 10)) - i /= 10 - } - if neg { - pos-- - buf[pos] = '-' - } - return string(buf[pos:]) -} - -func TestGCRemovesUnusedContainers(t *testing.T) { - requireDB(t) - - tmp := t.TempDir() - storageDir = filepath.Join(tmp, "containers") - _ = os.Setenv("COLDKEEP_STORAGE_DIR", storageDir) - resetStorage(t) - - db, err := connectDB() - if err != nil { - t.Fatalf("connectDB: %v", err) - } - defer db.Close() - applySchema(t, db) - resetDB(t, db) - - defaultCompression = CompressionNone - - inputDir := filepath.Join(tmp, "input") - _ = os.MkdirAll(inputDir, 0o755) - - // Create two different files large enough to likely create separate containers - fileA := createTempFile(t, inputDir, "fileA.bin", 512*1024) - - // Create fileB with slightly different deterministic pattern - fileBPath := filepath.Join(inputDir, "fileB.bin") - b := make([]byte, 512*1024) - for i := range b { - b[i] = byte((i*37 + 11) % 251) // different formula - } - if err := os.WriteFile(fileBPath, b, 0o644); err != nil { - t.Fatalf("write fileB: %v", err) - } - fileB := fileBPath - - // Store both - if err := storeFileWithDB(db, fileA); err != nil { - t.Fatalf("store fileA: %v", err) - } - if err := storeFileWithDB(db, fileB); err != nil { - t.Fatalf("store fileB: %v", err) - } - - // Count containers before removal - var containersBefore int - if err := db.QueryRow(`SELECT COUNT(*) FROM container`).Scan(&containersBefore); err != nil { - t.Fatalf("count container: %v", err) - } - if containersBefore == 0 { - t.Fatalf("expected at least 1 container") - } - - // Fetch fileA ID - var fileAID int64 - hashA := sha256File(t, fileA) - if err := db.QueryRow( - `SELECT id FROM logical_file WHERE file_hash = $1`, - hashA, - ).Scan(&fileAID); err != nil { - t.Fatalf("fetch fileA id: %v", err) - } - - // Remove fileA - if err := removeFileWithDB(db, fileAID); err != nil { - t.Fatalf("removeFileWithDB: %v", err) - } - - // Run GC - if err := runGC(); err != nil { - t.Fatalf("runGC: %v", err) - } - - // Count containers after GC - var containersAfter int - if err := db.QueryRow(`SELECT COUNT(*) FROM container`).Scan(&containersAfter); err != nil { - t.Fatalf("count container after: %v", err) - } - - // GC may delete 0 containers if remaining live chunks share the same container. - // What we must guarantee is that GC does not break restore and ref_counts remain valid. - var negatives int - if err := db.QueryRow(`SELECT COUNT(*) FROM chunk WHERE ref_count < 0`).Scan(&negatives); err != nil { - t.Fatalf("check negative ref_count: %v", err) - } - if negatives != 0 { - t.Fatalf("found %d chunks with negative ref_count", negatives) - } - - // Ensure fileB still restores correctly - var fileBID int64 - hashB := sha256File(t, fileB) - if err := db.QueryRow( - `SELECT id FROM logical_file WHERE file_hash = $1`, - hashB, - ).Scan(&fileBID); err != nil { - t.Fatalf("fetch fileB id: %v", err) - } - - outDir := filepath.Join(tmp, "out") - _ = os.MkdirAll(outDir, 0o755) - outPath := filepath.Join(outDir, "fileB.restored.bin") - - if err := restoreFileWithDB(db, fileBID, outPath); err != nil { - t.Fatalf("restore fileB after GC: %v", err) - } - - gotHash := sha256File(t, outPath) - if gotHash != hashB { - t.Fatalf("hash mismatch after GC: want %s got %s", hashB, gotHash) - } - - // Ensure no chunk has negative ref_count - //var negatives int - if err := db.QueryRow(`SELECT COUNT(*) FROM chunk WHERE ref_count < 0`).Scan(&negatives); err != nil { - t.Fatalf("check negative ref_count: %v", err) - } - if negatives != 0 { - t.Fatalf("found chunks with negative ref_count") - } -} diff --git a/app/stats.go b/app/stats.go deleted file mode 100644 index fada9a3..0000000 --- a/app/stats.go +++ /dev/null @@ -1,113 +0,0 @@ -package main - -import ( - "database/sql" - "fmt" -) - -func runStats() error { - db, err := connectDB() - if err != nil { - return fmt.Errorf("Failed to connect to DB: %w", err) - } - defer db.Close() - - var totalFiles int64 - var totalLogicalSize sql.NullInt64 - var totalContainers int64 - var totalContainerSize sql.NullInt64 - var totalCompressedSize sql.NullInt64 - var liveBytes sql.NullInt64 - var deadBytes sql.NullInt64 - - // Logical file stats - db.QueryRow(`SELECT COUNT(*), COALESCE(SUM(total_size),0) FROM logical_file`). - Scan(&totalFiles, &totalLogicalSize) - - // Container stats - db.QueryRow(` - SELECT COUNT(*), - COALESCE(SUM(current_size),0), - COALESCE(SUM(compressed_size),0) - FROM container - `).Scan(&totalContainers, &totalContainerSize, &totalCompressedSize) - - // Chunk live/dead stats - db.QueryRow(` - SELECT - COALESCE(SUM(CASE WHEN ref_count > 0 THEN size ELSE 0 END),0), - COALESCE(SUM(CASE WHEN ref_count = 0 THEN size ELSE 0 END),0) - FROM chunk - `).Scan(&liveBytes, &deadBytes) - - fmt.Println("\n====== coldkeep Stats ======") - - fmt.Printf("Logical files: %d\n", totalFiles) - fmt.Printf("Logical stored size: %.2f MB\n", float64(totalLogicalSize.Int64)/(1024*1024)) - - fmt.Printf("Containers: %d\n", totalContainers) - fmt.Printf("Raw container bytes: %.2f MB\n", float64(totalContainerSize.Int64)/(1024*1024)) - fmt.Printf("Compressed bytes: %.2f MB\n", float64(totalCompressedSize.Int64)/(1024*1024)) - - fmt.Printf("Live chunk bytes: %.2f MB\n", float64(liveBytes.Int64)/(1024*1024)) - fmt.Printf("Dead chunk bytes: %.2f MB\n", float64(deadBytes.Int64)/(1024*1024)) - - if totalLogicalSize.Int64 > 0 { - dedupRatio := 1.0 - (float64(liveBytes.Int64) / float64(totalLogicalSize.Int64)) - fmt.Printf("Global dedup ratio: %.2f%%\n", dedupRatio*100) - } - - if totalContainerSize.Int64 > 0 { - deadRatio := float64(deadBytes.Int64) / float64(totalContainerSize.Int64) - fmt.Printf("Fragmentation ratio: %.2f%%\n", deadRatio*100) - } - - fmt.Println("============================") - - // ---- Per container breakdown ---- - fmt.Println("\nPer-container breakdown:") - - rows, err := db.Query(` - SELECT - c.id, - c.filename, - c.current_size, - COALESCE(SUM(CASE WHEN ch.ref_count > 0 THEN ch.size ELSE 0 END),0) AS live, - COALESCE(SUM(CASE WHEN ch.ref_count = 0 THEN ch.size ELSE 0 END),0) AS dead - FROM container c - LEFT JOIN chunk ch ON ch.container_id = c.id - GROUP BY c.id - ORDER BY c.id - `) - if err != nil { - return err - } - defer rows.Close() - - for rows.Next() { - var id int64 - var filename string - var totalSize int64 - var live int64 - var dead int64 - - if err := rows.Scan(&id, &filename, &totalSize, &live, &dead); err != nil { - return err - } - - liveRatio := 0.0 - if totalSize > 0 { - liveRatio = float64(live) / float64(totalSize) * 100 - } - - fmt.Printf("Container %d (%s): total=%.2fMB live=%.2fMB dead=%.2fMB live_ratio=%.2f%%\n", - id, - filename, - float64(totalSize)/(1024*1024), - float64(live)/(1024*1024), - float64(dead)/(1024*1024), - liveRatio, - ) - } - return nil -} diff --git a/app/store.go b/app/store.go deleted file mode 100644 index 154d97f..0000000 --- a/app/store.go +++ /dev/null @@ -1,268 +0,0 @@ -package main - -import ( - "crypto/sha256" - "database/sql" - "encoding/hex" - "fmt" - "io" - "os" - "path/filepath" - "runtime" - "time" -) - -func storeFile(path string) error { - db, err := connectDB() - if err != nil { - return fmt.Errorf("Failed to connect to DB: %w", err) - } - defer db.Close() - - if err := storeFileWithDB(db, path); err != nil { - return err - } - return nil -} - -func storeFileWithDB(db *sql.DB, path string) (err error) { - start := time.Now() - - file, err := os.Open(path) - if err != nil { - return err - } - defer file.Close() - - info, err := file.Stat() - if err != nil { - return err - } - totalSize := info.Size() - - // Compute full file hash - hasher := sha256.New() - if _, err := io.Copy(hasher, file); err != nil { - return err - } - fileHash := hex.EncodeToString(hasher.Sum(nil)) - - tx, err := db.Begin() - if err != nil { - return err - } - defer func() { - if err != nil { - _ = tx.Rollback() - } - }() - - // Insert logical file (concurrency-safe) - // If another goroutine inserts the same hash at the same time, we won't error. - var fileID int64 - insErr := tx.QueryRow( - `INSERT INTO logical_file (original_name, total_size, file_hash) - VALUES ($1, $2, $3) - ON CONFLICT (file_hash) DO NOTHING - RETURNING id`, - info.Name(), - totalSize, - fileHash, - ).Scan(&fileID) - - if insErr == sql.ErrNoRows { - // Conflict happened: someone else already stored this file hash - var existingID int64 - if err := tx.QueryRow( - `SELECT id FROM logical_file WHERE file_hash = $1`, - fileHash, - ).Scan(&existingID); err != nil { - return err - } - - // Commit and exit early (don't store chunks again) - if err := tx.Commit(); err != nil { - return err - } - - fmt.Printf("File '%s' already stored\n", path) - fmt.Printf(" SHA256: %s\n", fileHash) - return nil - } - if insErr != nil { - return insErr - } - - chunks, err := chunkFile(path) - if err != nil { - return err - } - - chunkOrder := 0 - - for _, chunkData := range chunks { - sum := sha256.Sum256(chunkData) - hash := hex.EncodeToString(sum[:]) - - var chunkID int64 - cerr := tx.QueryRow( - "SELECT id FROM chunk WHERE sha256=$1", - hash, - ).Scan(&chunkID) - - if cerr == nil { - // Chunk exists -> bump refcount - _, err = tx.Exec( - "UPDATE chunk SET ref_count = ref_count + 1 WHERE id = $1", - chunkID, - ) - if err != nil { - return err - } - } else if cerr == sql.ErrNoRows { - // New chunk -> append physically and insert chunk row - containerID, filename, currentSize, err2 := getOrCreateOpenContainer(tx) - if err2 != nil { - err = err2 - return err - } - - offset, newSize, err2 := appendChunkPhysical(filename, currentSize, chunkData) - if err2 != nil { - err = err2 - return err - } - - if err2 := updateContainerSize(tx, containerID, newSize); err2 != nil { - err = err2 - return err - } - - // Seal if reached max size - var maxSize int64 - if err2 := tx.QueryRow(`SELECT max_size FROM container WHERE id = $1`, containerID).Scan(&maxSize); err2 != nil { - err = err2 - return err - } - - if newSize >= maxSize { - if err2 := sealContainer(tx, containerID, filename); err2 != nil { - err = err2 - return err - } - } - - // Try insert chunk (concurrency-safe) - var insertedChunkID int64 - insErr := tx.QueryRow( - `INSERT INTO chunk (sha256, size, container_id, chunk_offset, ref_count) - VALUES ($1, $2, $3, $4, 1) - ON CONFLICT (sha256) DO NOTHING - RETURNING id`, - hash, - len(chunkData), - containerID, - offset, - ).Scan(&insertedChunkID) - - if insErr == nil { - // We won: this chunk is new - chunkID = insertedChunkID - } else if insErr == sql.ErrNoRows { - // Someone else inserted it first; reuse it and bump refcount - if err := tx.QueryRow(`SELECT id FROM chunk WHERE sha256 = $1`, hash).Scan(&chunkID); err != nil { - return err - } - if _, err := tx.Exec(`UPDATE chunk SET ref_count = ref_count + 1 WHERE id = $1`, chunkID); err != nil { - return err - } - } else { - return insErr - } - } else { - err = cerr - return err - } - - // Link file ↔ chunk - _, err = tx.Exec( - `INSERT INTO file_chunk (logical_file_id, chunk_id, chunk_order) - VALUES ($1, $2, $3)`, - fileID, - chunkID, - chunkOrder, - ) - if err != nil { - return err - } - - chunkOrder++ - } - - if err = tx.Commit(); err != nil { - return err - } - - printSuccess("File stored successfully") - fmt.Printf(" FileID: %d\n", fileID) - fmt.Printf(" Path: %s\n", path) - fmt.Printf(" SHA256: %s\n", fileHash) - printDuration(start) - - return nil -} - -func storeFolder(root string) error { - start := time.Now() - - db, err := connectDB() - if err != nil { - return fmt.Errorf("Failed to connect to DB: %w", err) - } - defer db.Close() - - workerCount := runtime.NumCPU() - fileChan := make(chan string, 128) - - errChan := make(chan error, workerCount) - - // Workers - for i := 0; i < workerCount; i++ { - go func() { - for p := range fileChan { - if err := storeFileWithDB(db, p); err != nil { - errChan <- err - return - } - } - errChan <- nil - }() - } - - // Producer - walkErr := filepath.WalkDir(root, func(path string, d os.DirEntry, walkErr error) error { - if walkErr != nil { - return walkErr - } - if !d.IsDir() { - fileChan <- path - } - return nil - }) - close(fileChan) - - if walkErr != nil { - return walkErr - } - - // Wait workers - for i := 0; i < workerCount; i++ { - if werr := <-errChan; werr != nil { - return werr - } - } - - printSuccess("Folder stored successfully") - printDuration(start) - return nil -} diff --git a/app/main.go b/cmd/coldkeep/main.go similarity index 79% rename from app/main.go rename to cmd/coldkeep/main.go index 7e522ff..f033e15 100644 --- a/app/main.go +++ b/cmd/coldkeep/main.go @@ -5,33 +5,41 @@ import ( "log" "os" "strconv" - "time" -) -const version = "0.1.0" + "github.com/franchoy/coldkeep/internal/listing" + "github.com/franchoy/coldkeep/internal/maintenance" + "github.com/franchoy/coldkeep/internal/recovery" + "github.com/franchoy/coldkeep/internal/storage" +) -var defaultCompression = CompressionNone //CompressionZstd +const version = "0.2.0" func main() { + + //system recovery on startup + err := recovery.SystemRecovery() + if err != nil { + log.Printf("System recovery failed: %v\n", err) + os.Exit(1) + } + if len(os.Args) < 2 { printHelp() return } - var err error - switch os.Args[1] { case "store": if len(os.Args) < 3 { log.Fatal("Usage: coldkeep store ") } - err = storeFile(os.Args[2]) + err = storage.StoreFile(os.Args[2]) case "store-folder": if len(os.Args) < 3 { log.Fatal("Usage: coldkeep store-folder ") } - err = storeFolder(os.Args[2]) + err = storage.StoreFolder(os.Args[2]) case "restore": if len(os.Args) < 4 { @@ -41,7 +49,7 @@ func main() { if err != nil { log.Fatal("Invalid fileID: ", err) } - err = restoreFile(fileID, os.Args[3]) + err = storage.RestoreFile(fileID, os.Args[3]) case "remove": if len(os.Args) < 3 { @@ -51,13 +59,13 @@ func main() { if err != nil { log.Fatal("Invalid fileID: ", err) } - err = removeFile(fileID) + err = storage.RemoveFile(fileID) case "gc": - err = runGC() + err = maintenance.RunGC() case "stats": - err = runStats() + err = maintenance.RunStats() case "help", "-h", "--help": printHelp() @@ -66,10 +74,10 @@ func main() { fmt.Println("coldkeep version", version) case "list": - err = listFiles() + err = listing.ListFiles() case "search": - err = searchFiles(os.Args[2:]) + err = listing.SearchFiles(os.Args[2:]) default: fmt.Println("Unknown command:", os.Args[1]) @@ -86,7 +94,7 @@ func main() { } func printHelp() { - fmt.Println("coldkeep POC (V0)") + fmt.Println("coldkeep (V0.2.0)") fmt.Println() fmt.Println("Usage:") fmt.Println(" coldkeep [arguments]") @@ -121,11 +129,3 @@ func printHelp() { fmt.Println(" coldkeep store myfile.bin") fmt.Println(" coldkeep restore 12 ./restored") } - -func printSuccess(title string) { - fmt.Println(title) -} - -func printDuration(start time.Time) { - fmt.Printf(" Time: %v\n", time.Since(start)) -} diff --git a/db/init.sql b/db/init.sql index 8cda4f7..4196e06 100644 --- a/db/init.sql +++ b/db/init.sql @@ -5,11 +5,10 @@ BEGIN; -- ========================= CREATE TABLE IF NOT EXISTS schema_version ( - version INTEGER NOT NULL + version INTEGER PRIMARY KEY ); - INSERT INTO schema_version(version) -SELECT 1 +SELECT 2 WHERE NOT EXISTS (SELECT 1 FROM schema_version); -- ========================= @@ -19,16 +18,19 @@ WHERE NOT EXISTS (SELECT 1 FROM schema_version); CREATE TABLE IF NOT EXISTS container ( id BIGSERIAL PRIMARY KEY, filename TEXT NOT NULL UNIQUE, + sealed BOOLEAN NOT NULL DEFAULT FALSE, + quarantine BOOLEAN NOT NULL DEFAULT FALSE, current_size BIGINT NOT NULL DEFAULT 0 CHECK (current_size >= 0), max_size BIGINT NOT NULL CHECK (max_size > 0), - sealed BOOLEAN NOT NULL DEFAULT FALSE, compression_algorithm TEXT NOT NULL DEFAULT 'none', compressed_size BIGINT NOT NULL DEFAULT 0 CHECK (compressed_size >= 0), - created_at TIMESTAMP NOT NULL DEFAULT NOW() + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -CREATE INDEX IF NOT EXISTS idx_container_sealed -ON container(sealed); +CREATE INDEX IF NOT EXISTS idx_container_sealed ON container(sealed); +CREATE INDEX IF NOT EXISTS idx_container_quarantine ON container(quarantine); +CREATE INDEX IF NOT EXISTS idx_container_sealed_quarantine ON container(sealed, quarantine); -- ========================= -- Chunk table @@ -36,20 +38,22 @@ ON container(sealed); CREATE TABLE IF NOT EXISTS chunk ( id BIGSERIAL PRIMARY KEY, - sha256 TEXT NOT NULL UNIQUE, + chunk_hash TEXT NOT NULL, size BIGINT NOT NULL CHECK (size > 0), - container_id BIGINT NOT NULL + status TEXT NOT NULL CHECK (status IN ('PROCESSING','COMPLETED','ABORTED')), + container_id BIGINT REFERENCES container(id) ON DELETE RESTRICT, - chunk_offset BIGINT NOT NULL CHECK (chunk_offset >= 0), - ref_count BIGINT NOT NULL DEFAULT 1 CHECK (ref_count >= 0), - created_at TIMESTAMP NOT NULL DEFAULT NOW() + chunk_offset BIGINT CHECK (chunk_offset >= 0), + ref_count BIGINT NOT NULL DEFAULT 0 CHECK (ref_count >= 0), + retry_count INTEGER NOT NULL DEFAULT 0 CHECK (retry_count >= 0), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -CREATE INDEX IF NOT EXISTS idx_chunk_container -ON chunk(container_id); - -CREATE INDEX IF NOT EXISTS idx_chunk_ref_count -ON chunk(ref_count); +CREATE UNIQUE INDEX IF NOT EXISTS idx_chunk_hash_size ON chunk(chunk_hash, size); +CREATE INDEX IF NOT EXISTS idx_chunk_container ON chunk(container_id); +CREATE INDEX IF NOT EXISTS idx_chunk_ref_count ON chunk(ref_count); +CREATE INDEX IF NOT EXISTS idx_chunk_status ON chunk(status); -- ========================= -- Logical file table @@ -59,12 +63,17 @@ CREATE TABLE IF NOT EXISTS logical_file ( id BIGSERIAL PRIMARY KEY, original_name TEXT NOT NULL, total_size BIGINT NOT NULL CHECK (total_size >= 0), - file_hash TEXT NOT NULL UNIQUE, - created_at TIMESTAMP NOT NULL DEFAULT NOW() + file_hash TEXT NOT NULL, + status TEXT NOT NULL CHECK (status IN ('PROCESSING','COMPLETED','ABORTED')), + retry_count INTEGER NOT NULL DEFAULT 0 CHECK (retry_count >= 0), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (file_hash, total_size) ); -CREATE INDEX IF NOT EXISTS idx_logical_file_hash -ON logical_file(file_hash); +CREATE INDEX IF NOT EXISTS idx_logical_file_hash ON logical_file(file_hash); + +CREATE INDEX IF NOT EXISTS idx_logical_file_status ON logical_file(status); -- ========================= -- File ↔ Chunk mapping @@ -76,10 +85,39 @@ CREATE TABLE IF NOT EXISTS file_chunk ( chunk_id BIGINT NOT NULL REFERENCES chunk(id) ON DELETE RESTRICT, chunk_order INTEGER NOT NULL CHECK (chunk_order >= 0), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (logical_file_id, chunk_order) ); CREATE INDEX IF NOT EXISTS idx_file_chunk_logical_file_id ON file_chunk(logical_file_id); CREATE INDEX IF NOT EXISTS idx_file_chunk_chunk_id ON file_chunk(chunk_id); + +-- ========================= +-- updated_at trigger +-- ========================= + +CREATE OR REPLACE FUNCTION set_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER trg_container_updated_at +BEFORE UPDATE ON container +FOR EACH ROW +EXECUTE FUNCTION set_updated_at(); + +CREATE TRIGGER trg_chunk_updated_at +BEFORE UPDATE ON chunk +FOR EACH ROW +EXECUTE FUNCTION set_updated_at(); + +CREATE TRIGGER trg_logical_file_updated_at +BEFORE UPDATE ON logical_file +FOR EACH ROW +EXECUTE FUNCTION set_updated_at(); + COMMIT; \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 9a41280..8f13a10 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,7 +15,7 @@ services: app: build: context: . - dockerfile: app/Dockerfile + dockerfile: Dockerfile container_name: coldkeep_app depends_on: - postgres diff --git a/app/cdc.go b/internal/chunk/cdc.go similarity index 93% rename from app/cdc.go rename to internal/chunk/cdc.go index ee5b9ee..ccb4301 100644 --- a/app/cdc.go +++ b/internal/chunk/cdc.go @@ -1,4 +1,4 @@ -package main +package chunk import ( "io" @@ -11,7 +11,7 @@ const ( mask = 0x3FFFF ) -func chunkFile(filePath string) ([][]byte, error) { +func ChunkFile(filePath string) ([][]byte, error) { file, err := os.Open(filePath) if err != nil { return nil, err diff --git a/internal/container/constants.go b/internal/container/constants.go new file mode 100644 index 0000000..76e052f --- /dev/null +++ b/internal/container/constants.go @@ -0,0 +1,19 @@ +package container + +import ( + "github.com/franchoy/coldkeep/internal/utils" +) + +var ContainersDir = utils.GetenvOrDefault("COLDKEEP_STORAGE_DIR", "./storage/containers") + +var containerMaxSize = utils.GetenvOrDefaultInt64("COLDKEEP_CONTAINER_MAX_SIZE_MB", 64) * 1024 * 1024 //MB + +// GetContainerMaxSize returns the current container max size +func GetContainerMaxSize() int64 { + return containerMaxSize +} + +// SetContainerMaxSize sets the container max size (for testing) +func SetContainerMaxSize(size int64) { + containerMaxSize = size +} diff --git a/app/container.go b/internal/container/container.go similarity index 73% rename from app/container.go rename to internal/container/container.go index 855742b..29f127f 100644 --- a/app/container.go +++ b/internal/container/container.go @@ -1,4 +1,4 @@ -package main +package container import ( "crypto/sha256" @@ -8,13 +8,12 @@ import ( "os" "path/filepath" "time" -) - -var storageDir = envOrDefault("COLDKEEP_STORAGE_DIR", "./storage/containers") -var containerMaxSize = envOrDefaultInt64("COLDKEEP_CONTAINER_MAX_SIZE_MB", 64) * 1024 * 1024 //MB + "github.com/franchoy/coldkeep/internal/db" + "github.com/franchoy/coldkeep/internal/utils" +) -func getOrCreateOpenContainer(db DBTX) (int64, string, int64, error) { +func GetOrCreateOpenContainer(db db.DBTX) (int64, string, int64, error) { var id int64 var filename string var currentSize int64 @@ -23,7 +22,7 @@ func getOrCreateOpenContainer(db DBTX) (int64, string, int64, error) { err := db.QueryRow(` SELECT id, filename, current_size FROM container - WHERE sealed = FALSE + WHERE sealed = FALSE and quarantine = FALSE ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED @@ -55,11 +54,11 @@ func getOrCreateOpenContainer(db DBTX) (int64, string, int64, error) { // 3️⃣ Create physical file - if err := os.MkdirAll(storageDir, 0755); err != nil { + if err := os.MkdirAll(ContainersDir, 0755); err != nil { return 0, "", 0, err } - fullPath := filepath.Join(storageDir, filename) + fullPath := filepath.Join(ContainersDir, filename) f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) if err != nil { @@ -82,9 +81,9 @@ func getOrCreateOpenContainer(db DBTX) (int64, string, int64, error) { return id, filename, currentSize, nil } -func appendChunkPhysical(filename string, currentSize int64, chunk []byte) (int64, int64, error) { - containerDir := storageDir - containerPath := filepath.Join(containerDir, filename) +func AppendChunkPhysical(filename string, currentSize int64, chunk []byte) (int64, int64, error) { + + containerPath := filepath.Join(ContainersDir, filename) f, err := os.OpenFile(containerPath, os.O_APPEND|os.O_WRONLY, 0644) if err != nil { @@ -116,13 +115,17 @@ func appendChunkPhysical(filename string, currentSize int64, chunk []byte) (int6 if _, err := f.Write(chunk); err != nil { return 0, 0, err } + // Ensure data is flushed to disk + if err := f.Sync(); err != nil { + return 0, 0, err + } newSize := currentSize + int64(32+4+len(chunk)) return offset, newSize, nil } -func updateContainerSize(tx DBTX, containerID int64, newSize int64) error { +func UpdateContainerSize(tx db.DBTX, containerID int64, newSize int64) error { _, err := tx.Exec( `UPDATE container SET current_size = $1 WHERE id = $2`, newSize, @@ -131,12 +134,12 @@ func updateContainerSize(tx DBTX, containerID int64, newSize int64) error { return err } -func sealContainer(tx DBTX, containerID int64, filename string) error { - containerDir := storageDir - originalPath := filepath.Join(containerDir, filename) +func SealContainer(tx db.DBTX, containerID int64, filename string) error { + + originalPath := filepath.Join(ContainersDir, filename) // Compress file - compressedPath, compressed_size, err := CompressFile(originalPath, defaultCompression) + compressedPath, compressed_size, err := utils.CompressFile(originalPath, utils.DefaultCompression) if err != nil { return err } @@ -148,12 +151,12 @@ func sealContainer(tx DBTX, containerID int64, filename string) error { compression_algorithm = $1, compressed_size = $2 WHERE id = $3 - `, string(defaultCompression), compressed_size, containerID) + `, string(utils.DefaultCompression), compressed_size, containerID) if err != nil { return fmt.Errorf("update/seal container failed: %w", err) } - fmt.Printf("Container %d sealed and compressed with type %s : %s\n", containerID, defaultCompression, compressedPath) + fmt.Printf("Container %d sealed and compressed with type %s : %s\n", containerID, utils.DefaultCompression, compressedPath) return nil } diff --git a/app/container_format.go b/internal/container/format.go similarity index 85% rename from app/container_format.go rename to internal/container/format.go index eecaa57..676e194 100644 --- a/app/container_format.go +++ b/internal/container/format.go @@ -1,4 +1,4 @@ -package main +package container import ( "crypto/rand" @@ -10,7 +10,7 @@ import ( const ( ContainerMagic = "coldkeep0" // must be exactly 8 bytes - ContainerHdrLenV0 = 64 // fixed header size in bytes + ContainerHdrLenV0 = 64 // fixed header size in bytes ) // Reserved future flags (not used yet) @@ -37,8 +37,8 @@ func writeNewContainerHeaderV0(f *os.File, maxSize int64) error { // flags (none for V0) binary.LittleEndian.PutUint32(h[16:20], 0) - // created timestamp - binary.LittleEndian.PutUint64(h[20:28], uint64(time.Now().UnixNano())) + // created timestampZ UTC + binary.LittleEndian.PutUint64(h[20:28], uint64(time.Now().UTC().UnixNano())) // max container size policy binary.LittleEndian.PutUint64(h[28:36], uint64(maxSize)) diff --git a/app/db.go b/internal/db/db.go similarity index 84% rename from app/db.go rename to internal/db/db.go index c20750b..1274065 100644 --- a/app/db.go +++ b/internal/db/db.go @@ -1,4 +1,4 @@ -package main +package db import ( "database/sql" @@ -6,16 +6,17 @@ import ( "os" "strings" + "github.com/franchoy/coldkeep/internal/utils" _ "github.com/lib/pq" ) -func connectDB() (*sql.DB, error) { +func ConnectDB() (*sql.DB, error) { connStr := "host=" + os.Getenv("DB_HOST") + " port=" + os.Getenv("DB_PORT") + " user=" + os.Getenv("DB_USER") + " password=" + os.Getenv("DB_PASSWORD") + " dbname=" + os.Getenv("DB_NAME") + - " sslmode=" + envOrDefault("DB_SSLMODE", "disable") + " sslmode=" + utils.GetenvOrDefault("DB_SSLMODE", "disable") safeConnStr := strings.ReplaceAll(connStr, "password="+os.Getenv("DB_PASSWORD"), "password=***") log.Printf("Connecting to DB with: %s", safeConnStr) // Log the connection string (without password) diff --git a/app/list.go b/internal/listing/list.go similarity index 81% rename from app/list.go rename to internal/listing/list.go index 1655e23..640075d 100644 --- a/app/list.go +++ b/internal/listing/list.go @@ -1,18 +1,20 @@ -package main +package listing import ( "fmt" "time" + + "github.com/franchoy/coldkeep/internal/db" ) -func listFiles() error { - db, err := connectDB() +func ListFiles() error { + dbconn, err := db.ConnectDB() if err != nil { return fmt.Errorf("Failed to connect to DB: %w", err) } - defer db.Close() + defer dbconn.Close() - rows, err := db.Query(` + rows, err := dbconn.Query(` SELECT id, original_name, total_size, created_at FROM logical_file ORDER BY created_at DESC diff --git a/app/search.go b/internal/listing/search.go similarity index 88% rename from app/search.go rename to internal/listing/search.go index f0d97de..50cc511 100644 --- a/app/search.go +++ b/internal/listing/search.go @@ -1,16 +1,18 @@ -package main +package listing import ( "fmt" "time" + + "github.com/franchoy/coldkeep/internal/db" ) -func searchFiles(args []string) error { - db, err := connectDB() +func SearchFiles(args []string) error { + dbconn, err := db.ConnectDB() if err != nil { return fmt.Errorf("Failed to connect to DB: %w", err) } - defer db.Close() + defer dbconn.Close() query := ` SELECT id, original_name, total_size, created_at @@ -53,7 +55,7 @@ func searchFiles(args []string) error { query += " ORDER BY created_at DESC" - rows, err := db.Query(query, params...) + rows, err := dbconn.Query(query, params...) if err != nil { return err } diff --git a/app/gc.go b/internal/maintenance/gc.go similarity index 70% rename from app/gc.go rename to internal/maintenance/gc.go index 6470b75..ac96d95 100644 --- a/app/gc.go +++ b/internal/maintenance/gc.go @@ -1,22 +1,26 @@ -package main +package maintenance import ( "fmt" "log" "os" "path/filepath" + + "github.com/franchoy/coldkeep/internal/container" + "github.com/franchoy/coldkeep/internal/db" + "github.com/franchoy/coldkeep/internal/utils" ) -func runGC() error { - db, err := connectDB() +func RunGC() error { + dbconn, err := db.ConnectDB() if err != nil { return fmt.Errorf("failed to connect to DB: %w", err) } - defer db.Close() + defer dbconn.Close() - rows, err := db.Query(` + rows, err := dbconn.Query(` SELECT id, filename, compression_algorithm - FROM container + FROM container WHERE quarantine = FALSE `) if err != nil { return err @@ -34,7 +38,7 @@ func runGC() error { return err } - tx, err := db.Begin() + tx, err := dbconn.Begin() if err != nil { return err } @@ -48,8 +52,8 @@ func runGC() error { WHERE container_id = $1 AND ref_count > 0 ) - FROM container - WHERE id = $1 + FROM container where quarantine = FALSE + and id = $1 `, containerID).Scan(&stillEmpty) if err != nil { _ = tx.Rollback() @@ -62,7 +66,7 @@ func runGC() error { } // Delete chunks - _, err = tx.Exec(`DELETE FROM chunk WHERE container_id = $1`, containerID) + _, err = tx.Exec(`DELETE FROM chunk WHERE container_id = $1 and status = 'COMPLETED'`, containerID) if err != nil { _ = tx.Rollback() return err @@ -80,8 +84,8 @@ func runGC() error { } // After commit, delete file from disk - containerPath := filepath.Join(storageDir, filename) - if algo != "" && algo != string(CompressionNone) { + containerPath := filepath.Join(container.ContainersDir, filename) + if algo != "" && algo != string(utils.CompressionNone) { containerPath += "." + algo } diff --git a/internal/maintenance/stats.go b/internal/maintenance/stats.go new file mode 100644 index 0000000..65765e2 --- /dev/null +++ b/internal/maintenance/stats.go @@ -0,0 +1,270 @@ +package maintenance + +import ( + "database/sql" + "fmt" + + "github.com/franchoy/coldkeep/internal/db" +) + +func bytesToMB(bytes int64) float64 { + return float64(bytes) / (1024 * 1024) +} + +func RunStats() error { + dbconn, err := db.ConnectDB() + if err != nil { + return fmt.Errorf("Failed to connect to DB: %w", err) + } + defer dbconn.Close() + + var totalFiles int64 + var totalLogicalSize sql.NullInt64 + var completedFiles int64 + var completedLogicalSize sql.NullInt64 + var processingFiles int64 + var processingLogicalSize sql.NullInt64 + var abortedFiles int64 + var abortedLogicalSize sql.NullInt64 + var healthyContainers int64 + var quarantinedContainers int64 + var totalContainers int64 + var healthyContainerSize sql.NullInt64 + var healthyCompressedSize sql.NullInt64 + var quarantinedContainerSize sql.NullInt64 + var quarantinedCompressedSize sql.NullInt64 + + var totalContainerSize sql.NullInt64 + var totalCompressedSize sql.NullInt64 + var liveBytes sql.NullInt64 + var deadBytes sql.NullInt64 + + // Retry stats + var totalFileRetries sql.NullInt64 + var avgFileRetries sql.NullFloat64 + var maxFileRetries sql.NullInt64 + var totalChunkRetries sql.NullInt64 + var avgChunkRetries sql.NullFloat64 + var maxChunkRetries sql.NullInt64 + + // Logical file stats - total + err = dbconn.QueryRow(`SELECT COUNT(*), COALESCE(SUM(total_size),0) FROM logical_file`). + Scan(&totalFiles, &totalLogicalSize) + if err != nil { + return fmt.Errorf("Failed to query total logical files: %w", err) + } + + // Logical file stats - completed + err = dbconn.QueryRow(`SELECT COUNT(*), COALESCE(SUM(total_size),0) FROM logical_file WHERE status = 'COMPLETED'`). + Scan(&completedFiles, &completedLogicalSize) + if err != nil { + return fmt.Errorf("Failed to query completed logical files: %w", err) + } + + // Logical file stats - processing + err = dbconn.QueryRow(`SELECT COUNT(*), COALESCE(SUM(total_size),0) FROM logical_file WHERE status = 'PROCESSING'`). + Scan(&processingFiles, &processingLogicalSize) + if err != nil { + return fmt.Errorf("Failed to query processing logical files: %w", err) + } + + // Logical file stats - aborted + err = dbconn.QueryRow(`SELECT COUNT(*), COALESCE(SUM(total_size),0) FROM logical_file WHERE status = 'ABORTED'`). + Scan(&abortedFiles, &abortedLogicalSize) + if err != nil { + return fmt.Errorf("Failed to query aborted logical files: %w", err) + } + + // healthy Container stats + err = dbconn.QueryRow(` + SELECT COUNT(*), + COALESCE(SUM(current_size),0), + COALESCE(SUM(compressed_size),0) + FROM container + WHERE quarantine = FALSE + `).Scan(&healthyContainers, &healthyContainerSize, &healthyCompressedSize) + if err != nil { + return fmt.Errorf("Failed to query healthy containers: %w", err) + } + + // quarantined Container stats + err = dbconn.QueryRow(` + SELECT COUNT(*), + COALESCE(SUM(current_size),0), + COALESCE(SUM(compressed_size),0) + FROM container + WHERE quarantine = TRUE + `).Scan(&quarantinedContainers, &quarantinedContainerSize, &quarantinedCompressedSize) + if err != nil { + return fmt.Errorf("Failed to query quarantined containers: %w", err) + } + + // Total container stats + totalContainers = healthyContainers + quarantinedContainers + totalContainerSize = sql.NullInt64{ + Int64: healthyContainerSize.Int64 + quarantinedContainerSize.Int64, + Valid: healthyContainerSize.Valid || quarantinedContainerSize.Valid, + } + totalCompressedSize = sql.NullInt64{ + Int64: healthyCompressedSize.Int64 + quarantinedCompressedSize.Int64, + Valid: healthyCompressedSize.Valid || quarantinedCompressedSize.Valid, + } + + // Chunk live/dead stats + err = dbconn.QueryRow(` + SELECT + COALESCE(SUM(CASE WHEN ref_count > 0 THEN size ELSE 0 END),0), + COALESCE(SUM(CASE WHEN ref_count = 0 THEN size ELSE 0 END),0) + FROM chunk + `).Scan(&liveBytes, &deadBytes) + if err != nil { + return fmt.Errorf("Failed to query chunk live/dead stats: %w", err) + } + + // Retry stats for logical files + err = dbconn.QueryRow(` + SELECT COALESCE(SUM(retry_count),0), COALESCE(AVG(retry_count),0), COALESCE(MAX(retry_count),0) + FROM logical_file + `).Scan(&totalFileRetries, &avgFileRetries, &maxFileRetries) + if err != nil { + return fmt.Errorf("Failed to query logical file retry stats: %w", err) + } + + // Retry stats for chunks + err = dbconn.QueryRow(` + SELECT COALESCE(SUM(retry_count),0), COALESCE(AVG(retry_count),0), COALESCE(MAX(retry_count),0) + FROM chunk + `).Scan(&totalChunkRetries, &avgChunkRetries, &maxChunkRetries) + if err != nil { + return fmt.Errorf("Failed to query chunk retry stats: %w", err) + } + + fmt.Println("\n====== coldkeep Stats ======") + + fmt.Printf("Logical files (total): %d\n", totalFiles) + fmt.Printf("Logical stored size (total): %.2f MB\n", bytesToMB(totalLogicalSize.Int64)) + fmt.Printf(" Completed files: %d (%.2f MB)\n", completedFiles, bytesToMB(completedLogicalSize.Int64)) + fmt.Printf(" Processing files: %d (%.2f MB)\n", processingFiles, bytesToMB(processingLogicalSize.Int64)) + fmt.Printf(" Aborted files: %d (%.2f MB)\n", abortedFiles, bytesToMB(abortedLogicalSize.Int64)) + fmt.Printf("Healthy containers: %d\n", healthyContainers) + fmt.Printf("Healthy container bytes: %.2f MB\n", bytesToMB(healthyContainerSize.Int64)) + fmt.Printf("Healthy compressed bytes: %.2f MB\n", bytesToMB(healthyCompressedSize.Int64)) + fmt.Printf("Quarantined containers: %d\n", quarantinedContainers) + fmt.Printf("Quarantined container bytes: %.2f MB\n", bytesToMB(quarantinedContainerSize.Int64)) + fmt.Printf("Quarantined compressed bytes: %.2f MB\n", bytesToMB(quarantinedCompressedSize.Int64)) + fmt.Printf("Total containers: %d\n", totalContainers) + fmt.Printf("Total container bytes: %.2f MB\n", bytesToMB(totalContainerSize.Int64)) + fmt.Printf("Total compressed bytes: %.2f MB\n", bytesToMB(totalCompressedSize.Int64)) + + fmt.Printf("Live chunk bytes: %.2f MB\n", bytesToMB(liveBytes.Int64)) + fmt.Printf("Dead chunk bytes: %.2f MB\n", bytesToMB(deadBytes.Int64)) + + if completedLogicalSize.Int64 > 0 { + dedupRatio := 1.0 - (float64(liveBytes.Int64) / float64(completedLogicalSize.Int64)) + fmt.Printf("Global dedup ratio: %.2f%%\n", dedupRatio*100) + } + + if healthyContainerSize.Int64 > 0 { + deadRatio := float64(deadBytes.Int64) / float64(healthyContainerSize.Int64) + fmt.Printf("Fragmentation ratio: %.2f%%\n", deadRatio*100) + } + + if totalLogicalSize.Int64 > 0 && totalCompressedSize.Int64 > 0 { + compressionRatio := float64(totalLogicalSize.Int64) / float64(totalCompressedSize.Int64) + fmt.Printf("Compression ratio: %.2f\n", compressionRatio) + } + + fmt.Printf("File retry stats: total=%d, avg=%.2f, max=%d\n", totalFileRetries.Int64, avgFileRetries.Float64, maxFileRetries.Int64) + fmt.Printf("Chunk retry stats: total=%d, avg=%.2f, max=%d\n", totalChunkRetries.Int64, avgChunkRetries.Float64, maxChunkRetries.Int64) + + fmt.Println("============================") + + rows, err := dbconn.Query(` + SELECT status, COUNT(*), COALESCE(SUM(size),0) + FROM chunk + GROUP BY status`) + if err != nil { + return err + } + defer rows.Close() + + var completedCount, processingCount, abortedCount int64 + var completedBytes int64 + + for rows.Next() { + var status string + var count int64 + var bytes int64 + + if err := rows.Scan(&status, &count, &bytes); err != nil { + return err + } + + switch status { + case "COMPLETED": + completedCount = count + completedBytes = bytes + case "PROCESSING": + processingCount = count + case "ABORTED": + abortedCount = count + } + } + + fmt.Printf("Chunks (total): %d\n", completedCount+processingCount+abortedCount) + fmt.Printf(" Completed chunks: %d (%.2f MB)\n", completedCount, bytesToMB(completedBytes)) + fmt.Printf(" Processing chunks: %d\n", processingCount) + fmt.Printf(" Aborted chunks: %d\n", abortedCount) + + fmt.Println("============================") + + // ---- Per container breakdown ---- + fmt.Println("\nPer-container breakdown:") + + rows, err = dbconn.Query(` + SELECT + c.id, + c.filename, + c.current_size, + COALESCE(SUM(CASE WHEN ch.ref_count > 0 THEN ch.size ELSE 0 END),0) AS live, + COALESCE(SUM(CASE WHEN ch.ref_count = 0 THEN ch.size ELSE 0 END),0) AS dead, + c.quarantine + FROM container c + LEFT JOIN chunk ch ON ch.container_id = c.id + GROUP BY c.id + ORDER BY c.id + `) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var id int64 + var filename string + var totalSize int64 + var live int64 + var dead int64 + var quarantine bool + + if err := rows.Scan(&id, &filename, &totalSize, &live, &dead, &quarantine); err != nil { + return err + } + + liveRatio := 0.0 + if totalSize > 0 { + liveRatio = float64(live) / float64(totalSize) * 100 + } + + fmt.Printf("Container %d (%s): quarantined=%t : total=%.2fMB live=%.2fMB dead=%.2fMB live_ratio=%.2f%%\n", + id, + filename, + quarantine, + float64(totalSize)/(1024*1024), + float64(live)/(1024*1024), + float64(dead)/(1024*1024), + liveRatio, + ) + } + return nil +} diff --git a/internal/recovery/system_recovery.go b/internal/recovery/system_recovery.go new file mode 100644 index 0000000..8eefff6 --- /dev/null +++ b/internal/recovery/system_recovery.go @@ -0,0 +1,135 @@ +package recovery + +import ( + "database/sql" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/franchoy/coldkeep/internal/container" + "github.com/franchoy/coldkeep/internal/db" +) + +func SystemRecovery() error { + log.Println("Starting system recovery process") + dbconn, err := db.ConnectDB() + if err != nil { + return fmt.Errorf("Failed to connect to DB: %w", err) + } + defer dbconn.Close() + + err = abortProcessingLogicalFiles(dbconn) + if err != nil { + return err + } + err = abortProcessingChunks(dbconn) + if err != nil { + return err + } + err = quarantineMissingContainers(dbconn) + if err != nil { + return err + } + err = quarantineOrphanContainers(dbconn) + if err != nil { + return err + } + + return nil +} + +func abortProcessingLogicalFiles(dbconn *sql.DB) error { + log.Println("Aborting logical files stuck in PROCESSING state for more than 10 minutes") + _, err := dbconn.Exec(`UPDATE logical_file SET status = 'ABORTED' WHERE status = 'PROCESSING' AND updated_at < NOW() - INTERVAL '10 minutes'`) + if err != nil { + return fmt.Errorf("query update logical_file to ABORTED: %w", err) + } + log.Println("Aborting logical files stuck in PROCESSING state for more than 10 minutes - done") + return nil +} + +func abortProcessingChunks(dbconn *sql.DB) error { + log.Println("Aborting chunks stuck in PROCESSING state for more than 10 minutes") + _, err := dbconn.Exec(`UPDATE chunk SET status = 'ABORTED' WHERE status = 'PROCESSING' AND updated_at < NOW() - INTERVAL '10 minutes'`) + if err != nil { + return fmt.Errorf("query update chunk to ABORTED: %w", err) + } + return nil +} + +func quarantineMissingContainers(dbconn *sql.DB) error { + log.Println("Quarantining container records with missing files") + rows, err := dbconn.Query(`SELECT id, filename FROM container WHERE quarantine = FALSE`) + if err != nil { + return fmt.Errorf("query retrieve container list: %w", err) + } + defer rows.Close() + + for rows.Next() { + + var id int64 + var filename string + + if err := rows.Scan(&id, &filename); err != nil { + return err + } + + path := filepath.Join(container.ContainersDir, filename) + + _, err := os.Stat(path) + + if os.IsNotExist(err) { + + _, err := dbconn.Exec(`UPDATE container SET quarantine = TRUE WHERE id = $1`, id) + if err != nil { + return fmt.Errorf("query update container to quarantine due to missing file: %w", err) + } + log.Printf("Quarantined container record with missing file: %s", filename) + + } else if err != nil { + return fmt.Errorf("stat container file: %w", err) + } + + } + + return rows.Err() +} + +func quarantineOrphanContainers(dbconn *sql.DB) error { + log.Println("Checking for orphan container files in the containers directory") + // recover files in container folder + entries, err := os.ReadDir(container.ContainersDir) + if os.IsNotExist(err) { + return nil + } + if err != nil { + return fmt.Errorf("read containers dir: %w", err) + } + + for _, file := range entries { + if file.IsDir() { + continue + } + fileinfo, err := file.Info() + if err != nil { + return fmt.Errorf("get info for file %s: %w", file.Name(), err) + } + name := file.Name() + // check if a container record exists for this filename + var exists bool + err = dbconn.QueryRow(`SELECT EXISTS(SELECT 1 FROM container WHERE filename = $1)`, name).Scan(&exists) + if err != nil { + return fmt.Errorf("query check container existence: %w", err) + } + if !exists { + _, err := dbconn.Exec(`INSERT INTO container (filename, quarantine, current_size, max_size) VALUES ($1, TRUE, $2, $3) ON CONFLICT (filename) DO NOTHING`, name, fileinfo.Size(), fileinfo.Size()) + if err != nil { + return fmt.Errorf("insert orphan container record: %w", err) + } + log.Printf("Quarantined orphan container file: %s", name) + } + } + + return nil +} diff --git a/internal/storage/constants.go b/internal/storage/constants.go new file mode 100644 index 0000000..2866ae7 --- /dev/null +++ b/internal/storage/constants.go @@ -0,0 +1,7 @@ +package storage + +import "time" + +var logicalFileWaitingtime = 100 * time.Millisecond + +var chunkWaitingtime = 100 * time.Millisecond diff --git a/app/remove.go b/internal/storage/remove.go similarity index 85% rename from app/remove.go rename to internal/storage/remove.go index 6fab5e9..39ebc4e 100644 --- a/app/remove.go +++ b/internal/storage/remove.go @@ -1,26 +1,28 @@ -package main +package storage import ( "database/sql" "fmt" + + "github.com/franchoy/coldkeep/internal/db" ) -func removeFile(fileID int64) error { - db, err := connectDB() +func RemoveFile(fileID int64) error { + dbconn, err := db.ConnectDB() if err != nil { return fmt.Errorf("Failed to connect to DB: %w", err) } - defer db.Close() + defer dbconn.Close() - if err := removeFileWithDB(db, fileID); err != nil { + if err := RemoveFileWithDB(dbconn, fileID); err != nil { return err } return nil } -func removeFileWithDB(db *sql.DB, fileID int64) error { +func RemoveFileWithDB(dbconn *sql.DB, fileID int64) error { - tx, err := db.Begin() + tx, err := dbconn.Begin() if err != nil { return err } diff --git a/app/restore.go b/internal/storage/restore.go similarity index 86% rename from app/restore.go rename to internal/storage/restore.go index 060a202..629c082 100644 --- a/app/restore.go +++ b/internal/storage/restore.go @@ -1,4 +1,4 @@ -package main +package storage import ( "bytes" @@ -12,22 +12,26 @@ import ( "path/filepath" "strings" "time" + + "github.com/franchoy/coldkeep/internal/container" + "github.com/franchoy/coldkeep/internal/db" + "github.com/franchoy/coldkeep/internal/utils" ) -func restoreFile(id int64, outputPath string) error { - db, err := connectDB() +func RestoreFile(id int64, outputPath string) error { + dbconn, err := db.ConnectDB() if err != nil { return fmt.Errorf("Failed to connect to DB: %w", err) } - defer db.Close() + defer dbconn.Close() - if err := restoreFileWithDB(db, id, outputPath); err != nil { + if err := RestoreFileWithDB(dbconn, id, outputPath); err != nil { return err } return nil } -func restoreFileWithDB(db *sql.DB, fileID int64, outputPath string) error { +func RestoreFileWithDB(dbconn *sql.DB, fileID int64, outputPath string) error { start := time.Now() // ------------------------------------------------------------ // Fetch logical file metadata @@ -35,8 +39,8 @@ func restoreFileWithDB(db *sql.DB, fileID int64, outputPath string) error { var expectedFileHash string var originalName string - err := db.QueryRow( - "SELECT original_name, file_hash FROM logical_file WHERE id = $1", + err := dbconn.QueryRow( + "SELECT original_name, file_hash FROM logical_file WHERE status = 'COMPLETED' AND id = $1", fileID, ).Scan(&originalName, &expectedFileHash) @@ -52,17 +56,17 @@ func restoreFileWithDB(db *sql.DB, fileID int64, outputPath string) error { // NOTE: chunk_offset points to the *start of the record* inside the container: // [32 bytes sha256][4 bytes little-endian uint32 size][ bytes data] // ------------------------------------------------------------ - rows, err := db.Query(` + rows, err := dbconn.Query(` SELECT c.chunk_offset, c.size, - c.sha256, + c.chunk_hash, ct.filename, ct.compression_algorithm FROM file_chunk fc JOIN chunk c ON fc.chunk_id = c.id JOIN container ct ON c.container_id = ct.id - WHERE fc.logical_file_id = $1 + WHERE fc.logical_file_id = $1 AND c.status = 'COMPLETED' ORDER BY fc.chunk_order ASC `, fileID) @@ -107,21 +111,21 @@ func restoreFileWithDB(db *sql.DB, fileID int64, outputPath string) error { return fmt.Errorf("scan chunk row: %w", err) } - algo := CompressionType(algoStr) + algo := utils.CompressionType(algoStr) // Container filename changes when compressed (CompressFile removes the original and writes filename.) containerFilename := filename - if algo != CompressionNone { + if algo != utils.CompressionNone { containerFilename = filename + "." + algoStr } - containerPath := filepath.Join(storageDir, containerFilename) + containerPath := filepath.Join(container.ContainersDir, containerFilename) // Open as plain file (seek) or as decompressed stream (skip bytes) var r io.ReadCloser var f *os.File - if algo == CompressionNone { + if algo == utils.CompressionNone { f, err = os.Open(containerPath) if err != nil { return fmt.Errorf("open container %q: %w", containerFilename, err) @@ -134,7 +138,7 @@ func restoreFileWithDB(db *sql.DB, fileID int64, outputPath string) error { // Use file as reader; close via f.Close() below r = f } else { - r, err = OpenDecompressionReader(containerPath, algo) + r, err = utils.OpenDecompressionReader(containerPath, algo) if err != nil { return fmt.Errorf("open compressed container %q: %w", containerFilename, err) } @@ -217,7 +221,7 @@ func restoreFileWithDB(db *sql.DB, fileID int64, outputPath string) error { fmt.Printf("File %s restored successfully\n", originalName) fmt.Printf(" Output: %s\n", outputPath) fmt.Printf(" SHA256: %s\n", restoredHash) - printDuration(start) + utils.PrintDuration(start) return nil } diff --git a/internal/storage/store.go b/internal/storage/store.go new file mode 100644 index 0000000..49cfd6e --- /dev/null +++ b/internal/storage/store.go @@ -0,0 +1,483 @@ +package storage + +import ( + "crypto/sha256" + "database/sql" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "time" + + "github.com/franchoy/coldkeep/internal/chunk" + "github.com/franchoy/coldkeep/internal/container" + "github.com/franchoy/coldkeep/internal/db" + "github.com/franchoy/coldkeep/internal/utils" +) + +func StoreFile(path string) error { + dbconn, err := db.ConnectDB() + if err != nil { + return fmt.Errorf("Failed to connect to DB: %w", err) + } + defer dbconn.Close() + + if err := StoreFileWithDB(dbconn, path); err != nil { + return err + } + return nil + +} + +func claimLogicalFile(dbconn *sql.DB, fileinfo os.FileInfo, fileHash string) (fileID int64, filestatus string, err error) { + + tx, err := dbconn.Begin() + if err != nil { + return 0, "", err + } + txclosed := false + defer func() { + if err != nil && !txclosed { + _ = tx.Rollback() + } + }() + + // Insert logical file (concurrency-safe) + // If another goroutine inserts the same hash at the same time, we won't error. + + insErr := tx.QueryRow( + `INSERT INTO logical_file (original_name, total_size, file_hash, status) + VALUES ($1, $2, $3, 'PROCESSING') + ON CONFLICT (file_hash, total_size) DO NOTHING + RETURNING id`, + fileinfo.Name(), + fileinfo.Size(), + fileHash, + ).Scan(&fileID) + + if insErr == sql.ErrNoRows { + // Conflict happened: someone else already stored this file hash + var existingID int64 + if err := tx.QueryRow( + `SELECT id, status FROM logical_file WHERE file_hash = $1 and total_size = $2`, + fileHash, + fileinfo.Size(), + ).Scan(&existingID, &filestatus); err != nil { + return 0, "", err + } + + switch filestatus { + case "COMPLETED": + // File already stored and ready: we can reuse it + _ = tx.Rollback() // Don't hold locks while waiting + txclosed = true + fmt.Printf("File '%s' already stored\n", fileinfo.Name()) + fmt.Printf(" FileID: %d\n", existingID) + fmt.Printf(" SHA256: %s\n", fileHash) + return existingID, filestatus, nil + case "PROCESSING": + // Another process is currently storing this file: we can wait and reuse it once done + _ = tx.Rollback() // Don't hold locks while waiting + txclosed = true + fmt.Printf("File '%s' is currently being stored by another process. Waiting...\n", fileinfo.Name()) + + for keep_waiting := true; keep_waiting; { + + // Poll every logicalFileWaitingtime milliseconds until the other process finishes + time.Sleep(logicalFileWaitingtime) + + var finalStatus string + if err := dbconn.QueryRow( + `SELECT status FROM logical_file WHERE id = $1`, + existingID, + ).Scan(&finalStatus); err != nil { + return 0, "", err + } + + switch finalStatus { + case "COMPLETED": + fmt.Printf("File '%s' already stored\n", fileinfo.Name()) + fmt.Printf(" FileID: %d\n", existingID) + fmt.Printf(" SHA256: %s\n", fileHash) + return existingID, finalStatus, nil + case "ABORTED": + // Previous attempt was aborted while we were waiting: we can try to store again + filestatus = finalStatus // Update status to break the loop and retry storing + keep_waiting = false + } + } + } + + // If we reach here, it means the previous attempt was aborted while we were waiting: we can try to store again + if filestatus == "ABORTED" { + // Previous attempt was aborted while we were waiting: we can try to store again + // We can reuse the same logical_file row since it has the same file_hash + tx2, err := dbconn.Begin() + if err != nil { + return 0, "", err + } + if _, err := tx2.Exec( + `UPDATE logical_file SET status = 'PROCESSING', retry_count = retry_count + 1 WHERE id = $1`, + existingID, + ); err != nil { + _ = tx2.Rollback() + return 0, "", err + } + if err := tx2.Commit(); err != nil { + _ = tx2.Rollback() + return 0, "", err + } + fileID = existingID + filestatus = "PROCESSING" + } + } else if insErr == nil { + // We won: this file is new and we should store it + filestatus = "PROCESSING" + } else { + return 0, "", insErr + } + if !txclosed { + if err := tx.Commit(); err != nil { + return 0, "", err + } + } + + return fileID, filestatus, nil +} + +func claimChunk(dbconn *sql.DB, chunkHash string, chunksize int64) (chunkID int64, chunkstatus string, err error) { + + tx, err := dbconn.Begin() + if err != nil { + return 0, "", err + } + txclosed := false + defer func() { + if err != nil && !txclosed { + _ = tx.Rollback() + } + }() + + // Insert chunk (concurrency-safe) + // If another goroutine inserts the same hash at the same time, we won't error. + insErr := tx.QueryRow( + `INSERT INTO chunk (chunk_hash, size, status, ref_count) + VALUES ($1, $2, $3, 1) + ON CONFLICT (chunk_hash, size) DO NOTHING + RETURNING id`, + chunkHash, + chunksize, + "PROCESSING", + ).Scan(&chunkID) + + if insErr == nil { + // We won: this chunk is new + chunkstatus = "PROCESSING" + } else if insErr == sql.ErrNoRows { + // Someone else inserted it first + if err := tx.QueryRow(`SELECT id, status FROM chunk WHERE chunk_hash = $1 AND size = $2`, chunkHash, chunksize).Scan(&chunkID, &chunkstatus); err != nil { + return 0, "", err + } + switch chunkstatus { + case "COMPLETED": + // Chunk already stored and ready: we can reuse it + _ = tx.Rollback() // Don't hold locks while waiting + txclosed = true + return chunkID, chunkstatus, nil + case "PROCESSING": + // Another process is currently storing this chunk: we can wait and reuse it once done + _ = tx.Rollback() // Don't hold locks while waiting + txclosed = true + fmt.Printf("Chunk '%s' is currently being stored by another process. Waiting...\n", chunkHash) + + for keep_waiting := true; keep_waiting; { + + // Poll every chunkWaitingtime milliseconds until the other process finishes + time.Sleep(chunkWaitingtime) + + var finalStatus string + if err := dbconn.QueryRow( + `SELECT status FROM chunk WHERE id = $1`, + chunkID, + ).Scan(&finalStatus); err != nil { + return 0, "", err + } + switch finalStatus { + case "COMPLETED": + fmt.Printf("Chunk '%s' already stored\n", chunkHash) + return chunkID, finalStatus, nil + case "ABORTED": + // Previous attempt was aborted while we were waiting: we can try to store again + chunkstatus = finalStatus // Update status to break the loop and retry storing + keep_waiting = false + } + } + } + + // If we reach here, it means the previous attempt was aborted while we were waiting: we can try to store again + if chunkstatus == "ABORTED" { + // Previous attempt was aborted while we were waiting: we can try to store again + // We can reuse the same chunk row since it has the same chunk_hash and size + tx2, err := dbconn.Begin() + if err != nil { + return 0, "", err + } + if _, err := tx2.Exec( + `UPDATE chunk SET status = 'PROCESSING', retry_count = retry_count + 1 WHERE id = $1`, + chunkID, + ); err != nil { + _ = tx2.Rollback() + return 0, chunkstatus, err + } + if err := tx2.Commit(); err != nil { + _ = tx2.Rollback() + return 0, chunkstatus, err + } + chunkstatus = "PROCESSING" + } + } else { + return 0, "", insErr + } + + if !txclosed { + if err := tx.Commit(); err != nil { + return 0, chunkstatus, err + } + } + return chunkID, chunkstatus, nil +} + +func StoreFileWithDB(dbconn *sql.DB, path string) (err error) { + start := time.Now() + + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() + + fileinfo, err := file.Stat() + if err != nil { + return err + } + + // Compute full file hash + hasher := sha256.New() + if _, err := io.Copy(hasher, file); err != nil { + return err + } + fileHash := hex.EncodeToString(hasher.Sum(nil)) + + if _, err := file.Seek(0, 0); err != nil { + return err + } + + // Try to claim logical file for this hash (concurrency-safe) + fileID, filestatus, err := claimLogicalFile(dbconn, fileinfo, fileHash) + if err != nil { + return err + } + + if filestatus == "COMPLETED" { + // File already stored and ready: we can reuse it + return nil + } + + completed := false + defer func() { + if !completed { + dbconn.Exec(`UPDATE logical_file SET status='ABORTED' WHERE id=$1`, fileID) + } + }() + + // At this point, we have a logical_file row in "PROCESSING" status for this file hash, either created by us or by another process. + chunks, err := chunk.ChunkFile(path) + if err != nil { + return err + } + + chunkOrder := 0 + + for _, chunkData := range chunks { + sum := sha256.Sum256(chunkData) + hash := hex.EncodeToString(sum[:]) + // Try to claim chunk for this hash (concurrency-safe) + claimedChunkID, chunkStatus, err := claimChunk(dbconn, hash, int64(len(chunkData))) + if err != nil { + return err + } + + if chunkStatus == "COMPLETED" { + // Chunk already stored and ready: we can reuse it, just need to link it to the logical file + _, err = dbconn.Exec( + `INSERT INTO file_chunk (logical_file_id, chunk_id, chunk_order) + VALUES ($1, $2, $3)`, + fileID, + claimedChunkID, + chunkOrder, + ) + if err != nil { + return err + } + fmt.Printf("Reusing existing chunk %s for file '%s'\n", hash, path) + chunkOrder++ + continue // Move to next chunk + } + + tx, err := dbconn.Begin() + if err != nil { + return err + } + + // At this point, we have a chunk row in "PROCESSING" status for this chunk hash, either created by us or by another process. + containerID, containerfilename, containercurrentSize, err2 := container.GetOrCreateOpenContainer(tx) + if err2 != nil { + _ = tx.Rollback() + return err2 + } + // Append chunk data to container file + offset, newSize, err2 := container.AppendChunkPhysical(containerfilename, containercurrentSize, chunkData) + if err2 != nil { + _ = tx.Rollback() + + if _, err3 := dbconn.Exec( + `UPDATE chunk SET status = 'ABORTED' WHERE id = $1`, + claimedChunkID, + ); err3 != nil { + return err3 + } + return err2 + } + + // Update chunk row with container_id and chunk_offset, and mark it as "COMPLETED" + if _, err2 := tx.Exec( + `UPDATE chunk SET container_id = $1, chunk_offset = $2, status = 'COMPLETED' WHERE id = $3`, + containerID, + offset, + claimedChunkID, + ); err2 != nil { + _ = tx.Rollback() + return err2 + } + + // Update container current size + if err2 := container.UpdateContainerSize(tx, containerID, newSize); err2 != nil { + _ = tx.Rollback() + return err2 + } + + // Seal if reached max size + var maxSize int64 + if err2 := tx.QueryRow(`SELECT max_size FROM container WHERE id = $1`, containerID).Scan(&maxSize); err2 != nil { + _ = tx.Rollback() + return err2 + } + + if newSize >= maxSize { + if err2 := container.SealContainer(tx, containerID, containerfilename); err2 != nil { + _ = tx.Rollback() + return err2 + } + } + + // Link file ↔ chunk + _, err = tx.Exec( + `INSERT INTO file_chunk (logical_file_id, chunk_id, chunk_order) + VALUES ($1, $2, $3)`, + fileID, + claimedChunkID, + chunkOrder, + ) + if err != nil { + _ = tx.Rollback() + return err + } + + if err = tx.Commit(); err != nil { + return err + } + + fmt.Printf("Stored new chunk %s for file '%s'\n", hash, path) + + chunkOrder++ + } + + // After all chunks are stored and linked, mark logical file as "COMPLETED" + _, err = dbconn.Exec( + `UPDATE logical_file SET status='COMPLETED' WHERE id=$1`, + fileID, + ) + if err != nil { + return err + } + // Mark the operation as completed to avoid aborting it in the deferred function + completed = true + + utils.PrintSuccess("File stored successfully") + fmt.Printf(" FileID: %d\n", fileID) + fmt.Printf(" Path: %s\n", path) + fmt.Printf(" SHA256: %s\n", fileHash) + utils.PrintDuration(start) + + return nil +} +func StoreFolder(root string) error { + start := time.Now() + + dbconn, err := db.ConnectDB() + if err != nil { + return fmt.Errorf("failed to connect DB: %w", err) + } + defer dbconn.Close() + + workerCount := runtime.NumCPU() + + fileChan := make(chan string, 256) + errChan := make(chan error, workerCount) + + // Workers + for i := 0; i < workerCount; i++ { + go func() { + for p := range fileChan { + if err := StoreFileWithDB(dbconn, p); err != nil { + errChan <- err + return + } + } + errChan <- nil + }() + } + + // Producer + walkErr := filepath.WalkDir(root, func(path string, d os.DirEntry, walkErr error) error { + if walkErr != nil { + return walkErr + } + + if !d.IsDir() { + fileChan <- path + } + + return nil + }) + + close(fileChan) + + if walkErr != nil { + return walkErr + } + + // Wait workers + for i := 0; i < workerCount; i++ { + if werr := <-errChan; werr != nil { + return werr + } + } + + utils.PrintSuccess("Folder stored successfully") + utils.PrintDuration(start) + + return nil +} diff --git a/app/compression.go b/internal/utils/compression.go similarity index 97% rename from app/compression.go rename to internal/utils/compression.go index dcaeef5..71fc2e4 100644 --- a/app/compression.go +++ b/internal/utils/compression.go @@ -1,4 +1,4 @@ -package main +package utils import ( "compress/gzip" @@ -17,6 +17,8 @@ const ( CompressionZstd CompressionType = "zstd" ) +var DefaultCompression = CompressionNone //CompressionZstd + func CompressFile(path string, algo CompressionType) (string, int64, error) { // No compression if algo == CompressionNone { diff --git a/app/env.go b/internal/utils/env.go similarity index 67% rename from app/env.go rename to internal/utils/env.go index 40e9643..8703da2 100644 --- a/app/env.go +++ b/internal/utils/env.go @@ -1,18 +1,18 @@ -package main +package utils import ( "fmt" "os" ) -func envOrDefault(key, fallback string) string { +func GetenvOrDefault(key, fallback string) string { if val, ok := os.LookupEnv(key); ok { return val } return fallback } -func envOrDefaultInt64(key string, fallback int64) int64 { +func GetenvOrDefaultInt64(key string, fallback int64) int64 { if val, ok := os.LookupEnv(key); ok { var result int64 if _, err := fmt.Sscanf(val, "%d", &result); err == nil { diff --git a/internal/utils/print.go b/internal/utils/print.go new file mode 100644 index 0000000..5907bc0 --- /dev/null +++ b/internal/utils/print.go @@ -0,0 +1,14 @@ +package utils + +import ( + "fmt" + "time" +) + +func PrintSuccess(title string) { + fmt.Println(title) +} + +func PrintDuration(start time.Time) { + fmt.Printf(" Time: %v\n", time.Since(start)) +} diff --git a/samples/hello.txt b/samples/hello.txt index 78f8f8e..c780366 100644 --- a/samples/hello.txt +++ b/samples/hello.txt @@ -1 +1,100 @@ hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 \ No newline at end of file diff --git a/samples/hello_dup.txt b/samples/hello_dup.txt index dc61771..c780366 100644 --- a/samples/hello_dup.txt +++ b/samples/hello_dup.txt @@ -1 +1,100 @@ +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 +hello coldkeep v0 hello coldkeep v0 \ No newline at end of file diff --git a/scripts/smoke.sh b/scripts/smoke.sh index db051f6..ea4b2fe 100755 --- a/scripts/smoke.sh +++ b/scripts/smoke.sh @@ -10,6 +10,12 @@ set -euo pipefail # docker compose up -d postgres # docker compose run --rm -e COLDKEEP_SAMPLES_DIR=/samples -v ./samples:/samples --entrypoint bash app scripts/smoke.sh +cleanup() { + rm -rf ./_smoke_out +} + +trap cleanup EXIT + echo "[smoke] starting" : "${COLDKEEP_STORAGE_DIR:=./storage/containers}" @@ -38,4 +44,19 @@ if [[ -n "${FIRST_ID}" ]]; then coldkeep store "./_smoke_out/${RESTORED}" || true fi +echo "[smoke] search test" +coldkeep search hello || true + +echo "[smoke] remove test" +if [[ -n "${FIRST_ID}" ]]; then + echo "[smoke] removing first file id=${FIRST_ID}" + coldkeep remove "${FIRST_ID}" +fi + +echo "[smoke] gc test" +coldkeep gc + +echo "[smoke] stats (after gc)" +coldkeep stats + echo "[smoke] done" diff --git a/tests/integration_test.go b/tests/integration_test.go new file mode 100644 index 0000000..cbc5811 --- /dev/null +++ b/tests/integration_test.go @@ -0,0 +1,969 @@ +package main + +import ( + "bytes" + "crypto/sha256" + "database/sql" + "encoding/hex" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/franchoy/coldkeep/internal/container" + "github.com/franchoy/coldkeep/internal/db" + "github.com/franchoy/coldkeep/internal/maintenance" + "github.com/franchoy/coldkeep/internal/recovery" + "github.com/franchoy/coldkeep/internal/storage" + "github.com/franchoy/coldkeep/internal/utils" +) + +// NOTE: +// These tests are integration-style (DB + filesystem). +// They are skipped unless COLDKEEP_TEST_DB=1 is set. +// +// Run (example): +// COLDKEEP_TEST_DB=1 DB_HOST=localhost DB_PORT=5432 DB_USER=coldkeep DB_PASSWORD=coldkeep DB_NAME=coldkeep go test ./app -v + +func requireDB(t *testing.T) { + t.Helper() + if os.Getenv("COLDKEEP_TEST_DB") == "" { + t.Skip("Set COLDKEEP_TEST_DB=1 to run integration tests") + } +} + +func applySchema(t *testing.T, dbconn *sql.DB) { + t.Helper() + + // 1) Allow explicit override (best for Docker / CI) + if p := os.Getenv("COLDKEEP_SCHEMA_PATH"); p != "" { + b, err := os.ReadFile(p) + if err != nil { + t.Fatalf("read schema %s: %v", p, err) + } + if _, err := dbconn.Exec(string(b)); err != nil { + t.Fatalf("apply schema: %v", err) + } + return + } + + // 2) Walk upwards from cwd to find db/init.sql + cwd, err := os.Getwd() + if err == nil { + dir := cwd + for i := 0; i < 6; i++ { // climb up a few levels + candidate := filepath.Join(dir, "db", "init.sql") + if _, statErr := os.Stat(candidate); statErr == nil { + b, err := os.ReadFile(candidate) + if err != nil { + t.Fatalf("read schema %s: %v", candidate, err) + } + if _, err := dbconn.Exec(string(b)); err != nil { + t.Fatalf("apply schema: %v", err) + } + return + } + parent := filepath.Dir(dir) + if parent == dir { + break + } + dir = parent + } + } + + // 3) Common fallbacks for containers / mounts + candidates := []string{ + "../db/init.sql", + "../../db/init.sql", + "/repo/db/init.sql", + "/work/db/init.sql", + "/db/init.sql", + "/app/db/init.sql", + } + for _, p := range candidates { + if _, statErr := os.Stat(p); statErr == nil { + b, err := os.ReadFile(p) + if err != nil { + t.Fatalf("read schema %s: %v", p, err) + } + if _, err := dbconn.Exec(string(b)); err != nil { + t.Fatalf("apply schema: %v", err) + } + return + } + } + + t.Fatalf("could not find db/init.sql; set COLDKEEP_SCHEMA_PATH to an absolute path") +} + +func resetDB(t *testing.T, dbconn *sql.DB) { + t.Helper() + // Keep schema_version; clear the data tables and reset sequences. + _, err := dbconn.Exec(` + TRUNCATE TABLE + file_chunk, + chunk, + logical_file, + container + RESTART IDENTITY CASCADE + `) + if err != nil { + t.Fatalf("truncate tables: %v", err) + } +} + +func resetStorage(t *testing.T) { + t.Helper() + if container.ContainersDir == "" { + t.Fatalf("ContainersDir is empty") + } + _ = os.RemoveAll(container.ContainersDir) + if err := os.MkdirAll(container.ContainersDir, 0o755); err != nil { + t.Fatalf("mkdir ContainersDir: %v", err) + } +} + +func sha256File(t *testing.T, path string) string { + t.Helper() + b, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:]) +} + +func createTempFile(t *testing.T, dir, name string, size int) string { + t.Helper() + p := filepath.Join(dir, name) + data := make([]byte, size) + + // Deterministic content (repeatable). + for i := 0; i < size; i++ { + data[i] = byte((i*31 + 7) % 251) + } + + if err := os.WriteFile(p, data, 0o644); err != nil { + t.Fatalf("write temp file: %v", err) + } + return p +} + +func fetchFileIDByHash(t *testing.T, dbconn *sql.DB, fileHash string) int64 { + t.Helper() + var id int64 + err := dbconn.QueryRow(`SELECT id FROM logical_file WHERE file_hash = $1`, fileHash).Scan(&id) + if err != nil { + t.Fatalf("query logical_file by hash: %v", err) + } + return id +} + +func TestRoundTripStoreRestore(t *testing.T) { + requireDB(t) + + // Use temp dirs per test + tmp := t.TempDir() + container.ContainersDir = filepath.Join(tmp, "containers") + _ = os.Setenv("COLDKEEP_STORAGE_DIR", container.ContainersDir) + resetStorage(t) + + dbconn, err := db.ConnectDB() + if err != nil { + t.Fatalf("connectDB: %v", err) + } + defer dbconn.Close() + + applySchema(t, dbconn) + resetDB(t, dbconn) + + // Ensure we don't exercise heavy compression here. + utils.DefaultCompression = utils.CompressionNone + + inputDir := filepath.Join(tmp, "input") + _ = os.MkdirAll(inputDir, 0o755) + + // 512KB file should create multiple chunks depending on CDC params. + inPath := createTempFile(t, inputDir, "roundtrip.bin", 512*1024) + want := mustRead(t, inPath) + wantHash := sha256File(t, inPath) + + if err := storage.StoreFileWithDB(dbconn, inPath); err != nil { + t.Fatalf("storeFileWithDB: %v", err) + } + + fileID := fetchFileIDByHash(t, dbconn, wantHash) + + outDir := filepath.Join(tmp, "out") + _ = os.MkdirAll(outDir, 0o755) + outPath := filepath.Join(outDir, "roundtrip.restored.bin") + + if err := storage.RestoreFileWithDB(dbconn, fileID, outPath); err != nil { + t.Fatalf("restoreFileWithDB: %v", err) + } + + got := mustRead(t, outPath) + if !bytes.Equal(want, got) { + t.Fatalf("restored bytes differ from original") + } + + gotHash := sha256File(t, outPath) + if gotHash != wantHash { + t.Fatalf("hash mismatch: want %s got %s", wantHash, gotHash) + } +} + +func TestDedupSameFile(t *testing.T) { + requireDB(t) + + tmp := t.TempDir() + container.ContainersDir = filepath.Join(tmp, "containers") + _ = os.Setenv("COLDKEEP_STORAGE_DIR", container.ContainersDir) + resetStorage(t) + + dbconn, err := db.ConnectDB() + if err != nil { + t.Fatalf("connectDB: %v", err) + } + defer dbconn.Close() + + applySchema(t, dbconn) + resetDB(t, dbconn) + + utils.DefaultCompression = utils.CompressionNone + + inputDir := filepath.Join(tmp, "input") + _ = os.MkdirAll(inputDir, 0o755) + inPath := createTempFile(t, inputDir, "dup.bin", 256*1024) + fileHash := sha256File(t, inPath) + + if err := storage.StoreFileWithDB(dbconn, inPath); err != nil { + t.Fatalf("first store: %v", err) + } + if err := storage.StoreFileWithDB(dbconn, inPath); err != nil { + t.Fatalf("second store: %v", err) + } + + // Should still be 1 logical file for this hash + var n int + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM logical_file WHERE file_hash = $1`, fileHash).Scan(&n); err != nil { + t.Fatalf("count logical_file: %v", err) + } + if n != 1 { + t.Fatalf("expected 1 logical_file row, got %d", n) + } +} + +func TestStoreFolderParallelSmoke(t *testing.T) { + requireDB(t) + + tmp := t.TempDir() + container.ContainersDir = filepath.Join(tmp, "containers") + _ = os.Setenv("COLDKEEP_STORAGE_DIR", container.ContainersDir) + resetStorage(t) + + dbconn, err := db.ConnectDB() + if err != nil { + t.Fatalf("connectDB: %v", err) + } + defer dbconn.Close() + applySchema(t, dbconn) + resetDB(t, dbconn) + + utils.DefaultCompression = utils.CompressionNone + + // Build folder with duplicates + shared-chunk variants + inputDir := filepath.Join(tmp, "folder") + _ = os.MkdirAll(inputDir, 0o755) + + paths := make([]string, 0, 32) + + // 1) Base unique-ish files + for i := 0; i < 10; i++ { + paths = append(paths, createTempFile(t, inputDir, "file_"+itoa(i)+".bin", 64*1024)) + } + + // 2) Exact duplicates (file-level dedupe) + for i := 0; i < 3; i++ { + src := paths[i] + dst := filepath.Join(inputDir, "dup_"+itoa(i)+".bin") + b := mustRead(t, src) + if err := os.WriteFile(dst, b, 0o644); err != nil { + t.Fatalf("write dup: %v", err) + } + paths = append(paths, dst) + } + + // 3) Shared-chunk-but-different files (chunk-level dedupe) + // Create a shared prefix and combine with per-file unique suffix. + // This should cause some shared chunks even when full file hashes differ. + sharedPrefix := make([]byte, 32*1024) + for i := range sharedPrefix { + sharedPrefix[i] = byte((i*17 + 3) % 251) + } + + for i := 0; i < 3; i++ { + suffix := make([]byte, 32*1024) + for j := range suffix { + suffix[j] = byte((j*31 + 7 + i) % 251) + } + + hybrid := append(append([]byte{}, sharedPrefix...), suffix...) + dst := filepath.Join(inputDir, "hybrid_"+itoa(i)+".bin") + if err := os.WriteFile(dst, hybrid, 0o644); err != nil { + t.Fatalf("write hybrid: %v", err) + } + paths = append(paths, dst) + } + + // Run storeFolder with timeout to catch deadlocks/hangs. + done := make(chan error, 1) + go func() { done <- storage.StoreFolder(inputDir) }() + + select { + case err := <-done: + if err != nil { + t.Fatalf("storeFolder: %v", err) + } + case <-time.After(30 * time.Second): + t.Fatalf("storeFolder timed out (possible deadlock or blocked workers)") + } + + // Spot-check: restore a few logical files and compare hashes. + rows, err := dbconn.Query(`SELECT id, file_hash, original_name FROM logical_file ORDER BY id ASC LIMIT 5`) + if err != nil { + t.Fatalf("query logical_file: %v", err) + } + defer rows.Close() + + outDir := filepath.Join(tmp, "out") + _ = os.MkdirAll(outDir, 0o755) + + for rows.Next() { + var id int64 + var expectHash, name string + if err := rows.Scan(&id, &expectHash, &name); err != nil { + t.Fatalf("scan: %v", err) + } + outPath := filepath.Join(outDir, name) + if err := storage.RestoreFileWithDB(dbconn, id, outPath); err != nil { + t.Fatalf("restore %d: %v", id, err) + } + gotHash := sha256File(t, outPath) + if gotHash != expectHash { + t.Fatalf("hash mismatch for restored id=%d want=%s got=%s", id, expectHash, gotHash) + } + } + if err := rows.Err(); err != nil { + t.Fatalf("rows: %v", err) + } +} + +// Helpers (small, local) +func mustRead(t *testing.T, p string) []byte { + t.Helper() + b, err := os.ReadFile(p) + if err != nil { + t.Fatalf("read %s: %v", p, err) + } + return b +} + +func itoa(i int) string { + // small int to string without fmt to keep output clean + if i == 0 { + return "0" + } + neg := i < 0 + if neg { + i = -i + } + var buf [32]byte + pos := len(buf) + for i > 0 { + pos-- + buf[pos] = byte('0' + (i % 10)) + i /= 10 + } + if neg { + pos-- + buf[pos] = '-' + } + return string(buf[pos:]) +} + +func TestGCRemovesUnusedContainers(t *testing.T) { + requireDB(t) + + tmp := t.TempDir() + container.ContainersDir = filepath.Join(tmp, "containers") + _ = os.Setenv("COLDKEEP_STORAGE_DIR", container.ContainersDir) + resetStorage(t) + + dbconn, err := db.ConnectDB() + if err != nil { + t.Fatalf("connectDB: %v", err) + } + defer dbconn.Close() + applySchema(t, dbconn) + resetDB(t, dbconn) + + utils.DefaultCompression = utils.CompressionNone + + inputDir := filepath.Join(tmp, "input") + _ = os.MkdirAll(inputDir, 0o755) + + // Create two different files large enough to likely create separate containers + fileA := createTempFile(t, inputDir, "fileA.bin", 512*1024) + + // Create fileB with slightly different deterministic pattern + fileBPath := filepath.Join(inputDir, "fileB.bin") + b := make([]byte, 512*1024) + for i := range b { + b[i] = byte((i*37 + 11) % 251) // different formula + } + if err := os.WriteFile(fileBPath, b, 0o644); err != nil { + t.Fatalf("write fileB: %v", err) + } + fileB := fileBPath + + // Store both + if err := storage.StoreFileWithDB(dbconn, fileA); err != nil { + t.Fatalf("store fileA: %v", err) + } + if err := storage.StoreFileWithDB(dbconn, fileB); err != nil { + t.Fatalf("store fileB: %v", err) + } + + // Count containers before removal + var containersBefore int + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM container`).Scan(&containersBefore); err != nil { + t.Fatalf("count container: %v", err) + } + if containersBefore == 0 { + t.Fatalf("expected at least 1 container") + } + + // Fetch fileA ID + var fileAID int64 + hashA := sha256File(t, fileA) + if err := dbconn.QueryRow( + `SELECT id FROM logical_file WHERE file_hash = $1`, + hashA, + ).Scan(&fileAID); err != nil { + t.Fatalf("fetch fileA id: %v", err) + } + + // Remove fileA + if err := storage.RemoveFileWithDB(dbconn, fileAID); err != nil { + t.Fatalf("removeFileWithDB: %v", err) + } + + // Run GC + if err := maintenance.RunGC(); err != nil { + t.Fatalf("runGC: %v", err) + } + + // Count containers after GC + var containersAfter int + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM container`).Scan(&containersAfter); err != nil { + t.Fatalf("count container after: %v", err) + } + + // GC may delete 0 containers if remaining live chunks share the same container. + // What we must guarantee is that GC does not break restore and ref_counts remain valid. + var negatives int + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM chunk WHERE ref_count < 0`).Scan(&negatives); err != nil { + t.Fatalf("check negative ref_count: %v", err) + } + if negatives != 0 { + t.Fatalf("found %d chunks with negative ref_count", negatives) + } + + // Ensure fileB still restores correctly + var fileBID int64 + hashB := sha256File(t, fileB) + if err := dbconn.QueryRow( + `SELECT id FROM logical_file WHERE file_hash = $1`, + hashB, + ).Scan(&fileBID); err != nil { + t.Fatalf("fetch fileB id: %v", err) + } + + outDir := filepath.Join(tmp, "out") + _ = os.MkdirAll(outDir, 0o755) + outPath := filepath.Join(outDir, "fileB.restored.bin") + + if err := storage.RestoreFileWithDB(dbconn, fileBID, outPath); err != nil { + t.Fatalf("restore fileB after GC: %v", err) + } + + gotHash := sha256File(t, outPath) + if gotHash != hashB { + t.Fatalf("hash mismatch after GC: want %s got %s", hashB, gotHash) + } + + // Ensure no chunk has negative ref_count + //var negatives int + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM chunk WHERE ref_count < 0`).Scan(&negatives); err != nil { + t.Fatalf("check negative ref_count: %v", err) + } + if negatives != 0 { + t.Fatalf("found chunks with negative ref_count") + } +} + +func TestConcurrentStoreSameFile(t *testing.T) { + requireDB(t) + + tmp := t.TempDir() + container.ContainersDir = filepath.Join(tmp, "containers") + _ = os.Setenv("COLDKEEP_STORAGE_DIR", container.ContainersDir) + resetStorage(t) + + dbconn, err := db.ConnectDB() + if err != nil { + t.Fatalf("connectDB: %v", err) + } + defer dbconn.Close() + + applySchema(t, dbconn) + resetDB(t, dbconn) + + utils.DefaultCompression = utils.CompressionNone + + inputDir := filepath.Join(tmp, "input") + _ = os.MkdirAll(inputDir, 0o755) + inPath := createTempFile(t, inputDir, "concurrent.bin", 256*1024) + fileHash := sha256File(t, inPath) + + // Start two goroutines trying to store the same file + done := make(chan error, 2) + go func() { done <- storage.StoreFileWithDB(dbconn, inPath) }() + go func() { done <- storage.StoreFileWithDB(dbconn, inPath) }() + + // Wait for both to complete + err1 := <-done + err2 := <-done + + if err1 != nil { + t.Fatalf("first store failed: %v", err1) + } + if err2 != nil { + t.Fatalf("second store failed: %v", err2) + } + + // Should still be 1 logical file for this hash + var n int + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM logical_file WHERE file_hash = $1`, fileHash).Scan(&n); err != nil { + t.Fatalf("count logical_file: %v", err) + } + if n != 1 { + t.Fatalf("expected 1 logical_file row, got %d", n) + } +} + +func TestConcurrentStoreSameChunk(t *testing.T) { + requireDB(t) + + tmp := t.TempDir() + container.ContainersDir = filepath.Join(tmp, "containers") + _ = os.Setenv("COLDKEEP_STORAGE_DIR", container.ContainersDir) + resetStorage(t) + + dbconn, err := db.ConnectDB() + if err != nil { + t.Fatalf("connectDB: %v", err) + } + defer dbconn.Close() + + applySchema(t, dbconn) + resetDB(t, dbconn) + + utils.DefaultCompression = utils.CompressionNone + + inputDir := filepath.Join(tmp, "input") + _ = os.MkdirAll(inputDir, 0o755) + + // Create two files that share a common chunk + // Use a large shared prefix that will be chunked the same way + sharedPrefix := make([]byte, 128*1024) // Large enough to be multiple chunks + for i := range sharedPrefix { + sharedPrefix[i] = byte((i*31 + 7) % 251) + } + + // File A: shared prefix + unique suffix + fileAData := append(append([]byte{}, sharedPrefix...), []byte("uniqueA")...) + fileAPath := filepath.Join(inputDir, "fileA.bin") + if err := os.WriteFile(fileAPath, fileAData, 0o644); err != nil { + t.Fatalf("write fileA: %v", err) + } + + // File B: same shared prefix + different unique suffix + fileBData := append(append([]byte{}, sharedPrefix...), []byte("uniqueB")...) + fileBPath := filepath.Join(inputDir, "fileB.bin") + if err := os.WriteFile(fileBPath, fileBData, 0o644); err != nil { + t.Fatalf("write fileB: %v", err) + } + + // Start two goroutines storing the files concurrently + done := make(chan error, 2) + go func() { done <- storage.StoreFileWithDB(dbconn, fileAPath) }() + go func() { done <- storage.StoreFileWithDB(dbconn, fileBPath) }() + + // Wait for both to complete + err1 := <-done + err2 := <-done + + if err1 != nil { + t.Fatalf("store fileA failed: %v", err1) + } + if err2 != nil { + t.Fatalf("store fileB failed: %v", err2) + } + + // Verify both files are stored + hashA := sha256File(t, fileAPath) + hashB := sha256File(t, fileBPath) + + var countA, countB int + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM logical_file WHERE file_hash = $1`, hashA).Scan(&countA); err != nil { + t.Fatalf("count fileA: %v", err) + } + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM logical_file WHERE file_hash = $1`, hashB).Scan(&countB); err != nil { + t.Fatalf("count fileB: %v", err) + } + + if countA != 1 || countB != 1 { + t.Fatalf("expected 1 entry each for fileA and fileB, got %d and %d", countA, countB) + } +} + +func TestRetryAfterAbortedFile(t *testing.T) { + requireDB(t) + + tmp := t.TempDir() + container.ContainersDir = filepath.Join(tmp, "containers") + _ = os.Setenv("COLDKEEP_STORAGE_DIR", container.ContainersDir) + resetStorage(t) + + dbconn, err := db.ConnectDB() + if err != nil { + t.Fatalf("connectDB: %v", err) + } + defer dbconn.Close() + + applySchema(t, dbconn) + resetDB(t, dbconn) + + utils.DefaultCompression = utils.CompressionNone + + inputDir := filepath.Join(tmp, "input") + _ = os.MkdirAll(inputDir, 0o755) + inPath := createTempFile(t, inputDir, "retry_file.bin", 256*1024) + fileHash := sha256File(t, inPath) + + // Store the file initially + if err := storage.StoreFileWithDB(dbconn, inPath); err != nil { + t.Fatalf("initial store: %v", err) + } + + // Manually set the file status to ABORTED to simulate a failed store + fileID := fetchFileIDByHash(t, dbconn, fileHash) + if _, err := dbconn.Exec(`UPDATE logical_file SET status = 'ABORTED' WHERE id = $1`, fileID); err != nil { + t.Fatalf("set status to ABORTED: %v", err) + } + + // Now try to store the same file again - it should retry and succeed + if err := storage.StoreFileWithDB(dbconn, inPath); err != nil { + t.Fatalf("retry store after abort: %v", err) + } + + // Verify the file is now marked as COMPLETED + var status string + if err := dbconn.QueryRow(`SELECT status FROM logical_file WHERE id = $1`, fileID).Scan(&status); err != nil { + t.Fatalf("check status: %v", err) + } + if status != "COMPLETED" { + t.Fatalf("expected status COMPLETED, got %s", status) + } +} + +func TestRetryAfterAbortedChunk(t *testing.T) { + requireDB(t) + + tmp := t.TempDir() + container.ContainersDir = filepath.Join(tmp, "containers") + _ = os.Setenv("COLDKEEP_STORAGE_DIR", container.ContainersDir) + resetStorage(t) + + dbconn, err := db.ConnectDB() + if err != nil { + t.Fatalf("connectDB: %v", err) + } + defer dbconn.Close() + + applySchema(t, dbconn) + resetDB(t, dbconn) + + utils.DefaultCompression = utils.CompressionNone + + inputDir := filepath.Join(tmp, "input") + _ = os.MkdirAll(inputDir, 0o755) + inPath := createTempFile(t, inputDir, "retry_chunk.bin", 256*1024) + + // Store the file initially to create chunks + if err := storage.StoreFileWithDB(dbconn, inPath); err != nil { + t.Fatalf("initial store: %v", err) + } + + // Find a chunk from this file and set it to ABORTED + var chunkID int64 + var chunkHash string + if err := dbconn.QueryRow(` + SELECT c.id, c.chunk_hash + FROM chunk c + JOIN file_chunk fc ON c.id = fc.chunk_id + JOIN logical_file lf ON fc.logical_file_id = lf.id + WHERE lf.file_hash = $1 + LIMIT 1 + `, sha256File(t, inPath)).Scan(&chunkID, &chunkHash); err != nil { + t.Fatalf("find chunk: %v", err) + } + + // Set the chunk status to ABORTED + if _, err := dbconn.Exec(`UPDATE chunk SET status = 'ABORTED' WHERE id = $1`, chunkID); err != nil { + t.Fatalf("set chunk status to ABORTED: %v", err) + } + + // Now try to store the same file again - it should retry the aborted chunk and succeed + if err := storage.StoreFileWithDB(dbconn, inPath); err != nil { + t.Fatalf("retry store after chunk abort: %v", err) + } + + // Verify the chunk is now marked as COMPLETED + var status string + if err := dbconn.QueryRow(`SELECT status FROM chunk WHERE id = $1`, chunkID).Scan(&status); err != nil { + t.Fatalf("check chunk status: %v", err) + } + if status != "COMPLETED" { + t.Fatalf("expected chunk status COMPLETED, got %s", status) + } +} +func TestContainerRollover(t *testing.T) { + requireDB(t) + + // Use temp dirs per test + tmp := t.TempDir() + container.ContainersDir = filepath.Join(tmp, "containers") + _ = os.Setenv("COLDKEEP_STORAGE_DIR", container.ContainersDir) + resetStorage(t) + + dbconn, err := db.ConnectDB() + if err != nil { + t.Fatalf("connectDB: %v", err) + } + defer dbconn.Close() + + applySchema(t, dbconn) + resetDB(t, dbconn) + + // Set small container size for testing rollover + originalMaxSize := container.GetContainerMaxSize() + container.SetContainerMaxSize(1 * 1024 * 1024) // 1MB for quick test + defer container.SetContainerMaxSize(originalMaxSize) // restore + + utils.DefaultCompression = utils.CompressionNone + + inputDir := filepath.Join(tmp, "input") + _ = os.MkdirAll(inputDir, 0o755) + + // Create files that will exceed container size + files := []string{ + createTempFile(t, inputDir, "file1.bin", 600*1024), // 600KB + createTempFile(t, inputDir, "file2.bin", 600*1024), // 600KB, should trigger rollover + } + + // Store first file + if err := storage.StoreFileWithDB(dbconn, files[0]); err != nil { + t.Fatalf("store first file: %v", err) + } + + // Check that one container exists and is not sealed + var containerCount int + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM container WHERE sealed = FALSE`).Scan(&containerCount); err != nil { + t.Fatalf("count unsealed containers: %v", err) + } + if containerCount != 1 { + t.Fatalf("expected 1 unsealed container, got %d", containerCount) + } + + // Store second file - should trigger rollover + if err := storage.StoreFileWithDB(dbconn, files[1]); err != nil { + t.Fatalf("store second file: %v", err) + } + + // Check that the first container is now sealed + var sealedCount int + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM container WHERE sealed = TRUE`).Scan(&sealedCount); err != nil { + t.Fatalf("count sealed containers: %v", err) + } + if sealedCount != 1 { + t.Fatalf("expected 1 sealed container, got %d", sealedCount) + } + + // Check that a new unsealed container exists + if err := dbconn.QueryRow(`SELECT COUNT(*) FROM container WHERE sealed = FALSE`).Scan(&containerCount); err != nil { + t.Fatalf("count unsealed containers after rollover: %v", err) + } + if containerCount != 1 { + t.Fatalf("expected 1 unsealed container after rollover, got %d", containerCount) + } + + // Verify both files can be restored + for i, file := range files { + hash := sha256File(t, file) + fileID := fetchFileIDByHash(t, dbconn, hash) + + outDir := filepath.Join(tmp, "out") + _ = os.MkdirAll(outDir, 0o755) + outPath := filepath.Join(outDir, fmt.Sprintf("restored%d.bin", i)) + + if err := storage.RestoreFileWithDB(dbconn, fileID, outPath); err != nil { + t.Fatalf("restore file %d: %v", i, err) + } + + // Verify content + original := mustRead(t, file) + restored := mustRead(t, outPath) + if !bytes.Equal(original, restored) { + t.Fatalf("file %d content mismatch", i) + } + } +} + +func TestStartupRecoverySimulation(t *testing.T) { + requireDB(t) + + // Use temp dirs per test + tmp := t.TempDir() + container.ContainersDir = filepath.Join(tmp, "containers") + _ = os.Setenv("COLDKEEP_STORAGE_DIR", container.ContainersDir) + resetStorage(t) + + dbconn, err := db.ConnectDB() + if err != nil { + t.Fatalf("connectDB: %v", err) + } + defer dbconn.Close() + + applySchema(t, dbconn) + resetDB(t, dbconn) + + utils.DefaultCompression = utils.CompressionNone + + inputDir := filepath.Join(tmp, "input") + _ = os.MkdirAll(inputDir, 0o755) + + // Create and start storing a file, but simulate it getting stuck + inPath := createTempFile(t, inputDir, "stuck_file.bin", 256*1024) + + // Manually insert a processing logical file (simulating stuck store) + hash := sha256File(t, inPath) + size := int64(256 * 1024) + + _, err = dbconn.Exec(` + INSERT INTO logical_file (original_name, total_size, file_hash, status, retry_count) + VALUES ($1, $2, $3, 'PROCESSING', 0) + `, filepath.Base(inPath), size, hash) + if err != nil { + t.Fatalf("insert processing logical file: %v", err) + } + + // Set updated_at to old time to simulate stuck processing + _, err = dbconn.Exec(` + UPDATE logical_file + SET updated_at = NOW() - INTERVAL '15 minutes' + WHERE file_hash = $1 + `, hash) + if err != nil { + t.Fatalf("update logical file timestamp: %v", err) + } + + // Also create a processing chunk + _, err = dbconn.Exec(` + INSERT INTO chunk (chunk_hash, size, status, container_id, chunk_offset, ref_count, retry_count) + VALUES ($1, $2, 'PROCESSING', NULL, NULL, 0, 0) + `, "dummy_chunk_hash", int64(128*1024)) + if err != nil { + t.Fatalf("insert processing chunk: %v", err) + } + + // Get chunk ID + var chunkID int64 + if err := dbconn.QueryRow(`SELECT id FROM chunk WHERE chunk_hash = $1`, "dummy_chunk_hash").Scan(&chunkID); err != nil { + t.Fatalf("get chunk ID: %v", err) + } + + // Set chunk updated_at to old time + _, err = dbconn.Exec(` + UPDATE chunk + SET updated_at = NOW() - INTERVAL '15 minutes' + WHERE id = $1 + `, chunkID) + if err != nil { + t.Fatalf("update chunk timestamp: %v", err) + } + + // Create a container file and then delete it to simulate missing container + containerPath := filepath.Join(container.ContainersDir, "missing_container.bin") + if err := os.WriteFile(containerPath, []byte("dummy"), 0o644); err != nil { + t.Fatalf("create dummy container file: %v", err) + } + + _, err = dbconn.Exec(` + INSERT INTO container (filename, current_size, max_size, sealed, quarantine) + VALUES ($1, $2, $3, FALSE, FALSE) + `, "missing_container.bin", int64(1024), int64(64*1024*1024)) + if err != nil { + t.Fatalf("insert container: %v", err) + } + + // Delete the file to simulate missing container + if err := os.Remove(containerPath); err != nil { + t.Fatalf("remove container file: %v", err) + } + + // Now run system recovery + if err := recovery.SystemRecovery(); err != nil { + t.Fatalf("system recovery: %v", err) + } + + // Verify that processing logical file was aborted + var fileStatus string + if err := dbconn.QueryRow(`SELECT status FROM logical_file WHERE file_hash = $1`, hash).Scan(&fileStatus); err != nil { + t.Fatalf("check logical file status: %v", err) + } + if fileStatus != "ABORTED" { + t.Fatalf("expected logical file status ABORTED, got %s", fileStatus) + } + + // Verify that processing chunk was aborted + var chunkStatus string + if err := dbconn.QueryRow(`SELECT status FROM chunk WHERE id = $1`, chunkID).Scan(&chunkStatus); err != nil { + t.Fatalf("check chunk status: %v", err) + } + if chunkStatus != "ABORTED" { + t.Fatalf("expected chunk status ABORTED, got %s", chunkStatus) + } + + // Verify that missing container was quarantined + var quarantine bool + if err := dbconn.QueryRow(`SELECT quarantine FROM container WHERE filename = $1`, "missing_container.bin").Scan(&quarantine); err != nil { + t.Fatalf("check container quarantine: %v", err) + } + if !quarantine { + t.Fatalf("expected missing container to be quarantined") + } +}