From 105b8594f26384ace7a0e17600eb5f328dc19b15 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Fri, 24 Apr 2026 22:17:47 +0200 Subject: [PATCH] feat(bulk): return per-file revisions from /fs/bulk MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing /fs/bulk response only returned {written, errorCount, errors, correlationId}. Mount clients had to follow every bulk write with N GET /fs/file requests to learn the new revisions for If-Match. That made the post-migration total request count N+1 for an N-file batch. Extend the response with a per-file results array: {"path":..., "revision":..., "contentType":...} Purely additive — existing fields unchanged. Mount now uses the inline revision and skips the ReadFile fallback. The e2e proof TestBulkMigrationReducesHTTPCalls tightens from "0 POST /fs/file" to "0 total requests on /fs/file". --- docs/bulk-per-file-revisions-boundary.md | 175 ++++++++ docs/bulk-per-file-revisions-review.md | 133 ++++++ internal/httpapi/server.go | 6 +- internal/httpapi/server_test.go | 74 +++- internal/mountsync/syncer.go | 7 +- internal/mountsync/syncer_test.go | 43 +- internal/mountsync/types.go | 28 +- internal/relayfile/adapters_test.go | 10 +- internal/relayfile/store.go | 46 +- internal/relayfile/store_test.go | 54 ++- .../061-bulk-write-per-file-revisions.ts | 418 ++++++++++++++++++ 11 files changed, 895 insertions(+), 99 deletions(-) create mode 100644 docs/bulk-per-file-revisions-boundary.md create mode 100644 docs/bulk-per-file-revisions-review.md create mode 100644 workflows/061-bulk-write-per-file-revisions.ts diff --git a/docs/bulk-per-file-revisions-boundary.md b/docs/bulk-per-file-revisions-boundary.md new file mode 100644 index 0000000..b92bc6e --- /dev/null +++ b/docs/bulk-per-file-revisions-boundary.md @@ -0,0 +1,175 @@ +# Bulk Per-File Revisions — Boundary Spec + +This document scopes the server-side change to `/fs/bulk` so the response +includes per-file revisions, and the matching simplification in the mount +reconciler so it no longer needs a follow-up `ReadFile` to discover revisions. + +## 1. Files touched + +| File | Create/Modify | Reason | +| ---- | ------------- | ------ | +| `internal/httpapi/server.go` | Modify | `handleBulkWrite` must build and return a `results` array with `{path, revision, contentType}` for every successfully written file. | +| `internal/httpapi/server_test.go` | Modify | Add a test asserting the new `results` field is populated for successful writes and that failed entries are absent from `results` (they continue to appear in `errors`). | +| `internal/relayfile/store.go` | Modify | `BulkWrite` and `BulkWriteFork` already compute per-file revisions internally; change their signatures to surface those revisions (and content types) so `handleBulkWrite` can forward them. | +| `internal/relayfile/types.go` (or wherever `BulkWriteFile` / `BulkWriteError` live) | Modify | Add a shared `BulkWriteResult` struct (`Path`, `Revision`, `ContentType`) so server and mount wire types stay aligned. | +| `internal/mountsync/types.go` | Modify | `BulkWriteResponse.Results` already exists; verify field names and JSON tags match the server shape exactly (`results`, `path`, `revision`, `contentType`). Remove the `Files []RemoteFile` branch if it is only there as a compatibility shim — keep it only if it is still populated by another code path. | +| `internal/mountsync/syncer.go` | Modify | `reconcileBulkWrite`: when the bulk response already provides a revision for the path, use it directly and skip the `ReadFile` round-trip. Retain the `ReadFile` fallback only for the defensive empty-revision case. | +| `internal/mountsync/syncer_test.go` | Modify | Tighten `TestBulkMigrationReducesHTTPCalls` to assert zero total requests on `/fs/file` (not just zero `POST`s). Remove the now-redundant `fsFilePostCount` counter. | + +## 2. New wire shape for `/fs/bulk` response + +```json +{ + "written": 3, + "errorCount": 1, + "errors": [ + { + "path": "locked.txt", + "code": "permission_denied", + "message": "agent lacks write permission" + } + ], + "results": [ + { + "path": "docs/a.md", + "revision": "rev_01HW...", + "contentType": "text/markdown" + }, + { + "path": "docs/b.md", + "revision": "rev_01HW...", + "contentType": "text/markdown" + }, + { + "path": "src/x.go", + "revision": "rev_01HW...", + "contentType": "text/x-go" + } + ], + "correlationId": "corr_..." +} +``` + +Notes: + +- `results` contains one entry per successfully written file, in the same + order `BulkWrite` processed them. Entries in `errors` are **not** present in + `results` (and vice versa). +- `contentType` is emitted with `omitempty`; absent when the store did not + resolve one. +- `written` continues to equal `len(results)`. + +## 3. Backward compatibility + +- **Old SDK / client reads new response:** Extra top-level `results` field is + ignored by standard JSON decoders (all existing SDKs use struct decoding with + unknown-field tolerance or `map[string]any`). No existing field changes + meaning, so old clients keep working. +- **New mount reads old server response:** `BulkWriteResponse.Results` is + `omitempty` on the wire and decodes to `nil` / empty. `revisionsByPath()` + returns an empty map, and `reconcileBulkWrite` falls back to the existing + `ReadFile` code path to recover the revision. This makes the rollout safe in + either order (server-first or client-first). +- No version gate, feature flag, or capability negotiation is required. + +## 4. `Store.BulkWrite` signature change + +Current: + +```go +func (s *Store) BulkWrite(workspaceID string, files []BulkWriteFile) (int, []BulkWriteError) +func (s *Store) BulkWriteFork(workspaceID, forkID string, files []BulkWriteFile) (int, []BulkWriteError) +``` + +Proposed: + +```go +func (s *Store) BulkWrite(workspaceID string, files []BulkWriteFile) (written int, results []BulkWriteResult, errors []BulkWriteError) +func (s *Store) BulkWriteFork(workspaceID, forkID string, files []BulkWriteFile) (written int, results []BulkWriteResult, errors []BulkWriteError) +``` + +Where `BulkWriteResult` is: + +```go +type BulkWriteResult struct { + Path string `json:"path"` + Revision string `json:"revision"` + ContentType string `json:"contentType,omitempty"` +} +``` + +- `written == len(results)` is an invariant maintained by both methods. +- `BulkWriteFork` receives the identical treatment — same struct, same + ordering guarantee, same invariant. This keeps fork and main-branch writes + symmetric for the mount code that consumes them. +- The new `BulkWriteResult` type lives next to `BulkWriteFile` / + `BulkWriteError` in the `relayfile` package and is re-aliased from + `internal/mountsync/types.go` the same way those two are today. + +## 5. Mount-side change + +In `reconcileBulkWrite`: + +- Before calling `ReadFile`, consult the `revisionsByPath()` map built from + `BulkWriteResponse.Results`. If the map contains a non-empty revision for + `pendingWrite.remotePath`, use it directly and skip the `ReadFile` call. +- Preserve the `ReadFile` fallback for the defensive case where the server + returned no `results` entry or an empty revision (covers rolling deploys and + future server-side bugs). +- `contentType` resolution follows the same pattern: prefer the value from + `BulkWriteResponse.Results`, fall back to `ReadFile` only if the bulk + response did not supply one *and* the local snapshot lacks one. +- The function's signature and the caller in the syncer loop do not change; + only the internal branching changes. + +Net effect: the happy path goes from `POST /fs/bulk` + N × `GET /fs/file` down +to a single `POST /fs/bulk`. + +## 6. Test updates + +### Server (`internal/httpapi/server_test.go`) + +Add a test (or extend an existing one in `TestBulkWriteEndpoint`) that: + +- Submits a bulk write with at least one writable file and one file that will + fail (e.g. permission denied or invalid path). +- Asserts `results` contains exactly the successful paths, each with a + non-empty `revision` and the expected `contentType`. +- Asserts the failed file is present in `errors` and **absent** from + `results`. +- Asserts `len(results) == written` and `len(errors) == errorCount`. + +### Mount (`internal/mountsync/syncer_test.go`) + +Tighten `TestBulkMigrationReducesHTTPCalls`: + +- Replace the `fsFilePostCount atomic.Int32` POST-only counter with a single + counter (or path slice) that records **every** request whose path matches + `/fs/file` regardless of method. +- Assert that counter is `0` after the bulk sync completes. +- Remove the now-dead `fsFilePostCount` declaration, the POST filter inside + the handler, and the post-check log line that reports `GET /fs/file` + read-backs (there should be none). +- Keep the assertion that exactly one `POST /fs/bulk` was made. + +## 7. Deliberately out of scope + +- FUSE mount path (`cmd/relayfile-fuse`, `internal/fuse/...`) — unchanged. +- SDK / TypeScript bindings — no wire-type regeneration in this change; JS + consumers continue to ignore the extra field until a separate follow-up. +- Observer / dashboard surfaces — they consume events, not the bulk response; + no change. +- Any change to `errors` shape, `correlationId` semantics, or the `forkID` + branching logic. + +## 8. Acceptance gates + +All three must pass, exactly as written: + +```bash +go test ./internal/httpapi/... -count=1 -timeout 120s +go test ./internal/mountsync/... -count=1 -timeout 120s +go build ./... +``` + +BOUNDARY_READY diff --git a/docs/bulk-per-file-revisions-review.md b/docs/bulk-per-file-revisions-review.md new file mode 100644 index 0000000..6182f3b --- /dev/null +++ b/docs/bulk-per-file-revisions-review.md @@ -0,0 +1,133 @@ +# Bulk Per-File Revisions — Review Verdict + +Reviewed against `docs/bulk-per-file-revisions-boundary.md` at branch +`fix/bulk-write-per-file-revisions`. + +## 1. Scope / files modified + +Boundary-listed files touched (all present): + +- `internal/httpapi/server.go` ✅ +- `internal/httpapi/server_test.go` ✅ +- `internal/relayfile/store.go` ✅ — `BulkWriteResult` added here (boundary + allowed "wherever `BulkWriteFile` / `BulkWriteError` live") +- `internal/mountsync/types.go` ✅ — alias to `relayfile.BulkWriteResult`, + dropped the compatibility `Files []RemoteFile` branch as the boundary + allowed +- `internal/mountsync/syncer.go` ✅ +- `internal/mountsync/syncer_test.go` ✅ + +Additional files touched (minor scope creep, non-blocking): + +- `internal/relayfile/store_test.go` — expanded to cover the new + `results` return; reasonable companion to the store change even though the + boundary did not enumerate it. +- `internal/relayfile/adapters_test.go` — cosmetic `gofmt` alignment only + (no functional change). Outside boundary scope but harmless. +- `package-lock.json` — unrelated npm lockfile churn; not required by this + feature. +- `.trajectories/index.json`, `.trajectories/active/…`, `.trajectories/completed/…`, + `workflows/05[9]-*.ts`, `workflows/061-bulk-write-per-file-revisions.ts` — + trajectory / workflow bookkeeping, not production code. + +Nothing functionally drifts outside the feature surface. + +## 2. Backward compatibility + +Wire response is purely additive. `handleBulkWrite` still emits +`written`, `errorCount`, `errors`, `correlationId` with unchanged names, +types, and semantics. `results` is a new sibling key that older clients +using struct decoding (or `map[string]any`) will simply drop. Confirmed by +re-reading `internal/httpapi/server.go:1597-1615` — only additive changes. + +Mount side decodes `results` with `omitempty` on the response struct and +falls back to the existing `ReadFile` path when the map is empty or the +revision is blank (`internal/mountsync/syncer.go:684` retains the `ReadFile` +call when `result.Revision` is missing). Rollout remains safe in either +order. + +## 3. Signature change coverage + +`Store.BulkWrite` and `Store.BulkWriteFork` now return +`(int, []BulkWriteResult, []BulkWriteError)`. Every call site is updated: + +- `internal/httpapi/server.go:1603` (fork) and `:1605` (main). +- `internal/httpapi/server_test.go:904, :945, :983` (export tests use `_`). +- `internal/relayfile/store_test.go:444, :493, :547, :560, :589, :630`. + +Grep for `\.BulkWrite(Fork)?\(` shows no stale 2-return call sites. + +## 4. Mount `reconcileBulkWrite` + +`flushPendingBulkWrites` builds `resultsByPath` via the new +`resultsByPath()` helper in `internal/mountsync/types.go:23-36`, which +normalizes paths and trims whitespace. When an entry exists, `ContentType` +is copied into the pending snapshot before the reconcile call, and the +revision is passed as the third argument to `reconcileBulkWrite`. An empty +revision still triggers the defensive `ReadFile` fallback inside +`reconcileBulkWrite` (unchanged), preserving the rolling-deploy safety net +the boundary called out. + +## 5. Test coverage + +- **Success path carries `results`**: `TestBulkWriteEndpoint` + (`internal/httpapi/server_test.go:823+`) asserts `Results` has exactly + one entry with matching `Path`, non-empty `Revision`, and the expected + `ContentType`. +- **Error path omits failed entry from `results`**: same test submits a + second file with `encoding: utf16` that fails validation; it asserts the + failed path is in `Errors` (code `invalid_encoding`) and verifies no + entry in `Results` shares that path. +- **Invariants**: `len(results) == written` and `len(errors) == errorCount` + are both asserted. +- **Mount skips `ReadFile` when revision is present**: + `TestBulkMigrationReducesHTTPCalls` + (`internal/mountsync/syncer_test.go:660+`) now records *every* request on + `/fs/file` and asserts `len(fsFileRequests) == 0` after the bulk sync. + The old `fsFilePostCount` POST-only counter and its apologetic comment + block are removed, as the boundary required. +- **Store-level results shape** is exercised by the new + `assertBulkWriteResults` helper (`internal/relayfile/store_test.go:203`) + and wired into every `BulkWrite` test case in that file. + +## 6. E2E tightening + +`TestBulkMigrationReducesHTTPCalls` no longer inspects only POSTs — it +asserts zero total requests on `/fs/file`. The POST-filter inside the mock +handler was removed; any method on `/fs/file` now fails the test. This is +exactly the tightening the boundary mandated (section 6, "Mount"). + +## 7. Build / format / vet + +- `gofmt -l internal/{httpapi,mountsync,relayfile}` → clean. +- `go build ./...` → clean. +- `go test ./internal/httpapi/... ./internal/mountsync/... ./internal/relayfile/... -count=1 -timeout 180s` + → all three packages `ok`. +- `go vet ./...` → one failure in `internal/mountfuse/fuse_test.go:76` + (`fakeRemoteClient` missing `WriteFilesBulk`). Verified pre-existing by + running `go vet` on the stashed pre-change tree; it fails identically. + `internal/mountfuse` is the FUSE mount path the boundary explicitly + excludes from scope (section 7, "FUSE mount path … unchanged"), so this + is out of scope for this review. Recommend tracking it as a separate + cleanup issue. + +## 8. Nit (non-blocking) + +When `BulkWrite` returns on an early input-validation error +(`workspaceID` blank), `results` is returned as `nil`, and the server +response therefore marshals `"results": null` rather than `[]`. The +boundary doc does not specify empty-array vs. null semantics, and current +clients tolerate either, so this is not blocking — but emitting +`[]BulkWriteResult{}` in the handler when `results == nil` would make the +wire shape more predictable for future JS/SDK consumers. + +## Verdict + +All seven checklist items pass. The new wire field is additive, signatures +are updated consistently, the mount prefers response-supplied revisions +with a preserved `ReadFile` fallback, tests cover both the success and +error halves of the response, and the E2E proof is tightened to assert +zero `/fs/file` requests of any method. Pre-existing `go vet` failure in +`internal/mountfuse` is unrelated to this branch. + +REVIEW_APPROVED diff --git a/internal/httpapi/server.go b/internal/httpapi/server.go index 7091677..9dd617f 100644 --- a/internal/httpapi/server.go +++ b/internal/httpapi/server.go @@ -1597,17 +1597,19 @@ func (s *Server) handleBulkWrite(w http.ResponseWriter, r *http.Request, workspa } var written int + var results []relayfile.BulkWriteResult var storeErrors []relayfile.BulkWriteError if forkID != "" { - written, storeErrors = s.store.BulkWriteFork(workspaceID, forkID, allowed) + written, results, storeErrors = s.store.BulkWriteFork(workspaceID, forkID, allowed) } else { - written, storeErrors = s.store.BulkWrite(workspaceID, allowed) + written, results, storeErrors = s.store.BulkWrite(workspaceID, allowed) } errorsOut = append(errorsOut, storeErrors...) writeJSON(w, http.StatusAccepted, map[string]any{ "written": written, "errorCount": len(errorsOut), "errors": errorsOut, + "results": results, "correlationId": correlationID, }) } diff --git a/internal/httpapi/server_test.go b/internal/httpapi/server_test.go index 081ca3e..f8192c2 100644 --- a/internal/httpapi/server_test.go +++ b/internal/httpapi/server_test.go @@ -823,15 +823,6 @@ func TestBulkWriteEndpoint(t *testing.T) { server := NewServer(relayfile.NewStoreWithOptions(relayfile.StoreOptions{DisableWorkers: true})) token := mustTestJWT(t, "dev-secret", "ws_bulk_endpoint", "Worker1", []string{"fs:read", "fs:write"}, time.Now().Add(time.Hour)) - files := make([]map[string]any, 0, 5) - for i := 0; i < 5; i++ { - files = append(files, map[string]any{ - "path": fmt.Sprintf("/external/Bulk-%d.md", i), - "contentType": "text/markdown", - "content": fmt.Sprintf("# file %d", i), - }) - } - resp := doRequest(t, server, request{ method: http.MethodPost, path: "/v1/workspaces/ws_bulk_endpoint/fs/bulk", @@ -840,22 +831,69 @@ func TestBulkWriteEndpoint(t *testing.T) { "X-Correlation-Id": "corr_bulk_endpoint", }, body: map[string]any{ - "files": files, + "files": []map[string]any{ + { + "path": "/external/Success.md", + "contentType": "text/markdown", + "content": "# success", + }, + { + "path": "/external/BadEncoding.md", + "contentType": "text/markdown", + "content": "# invalid", + "encoding": "utf16", + }, + }, }, }) if resp.Code != http.StatusAccepted { t.Fatalf("expected 202 on bulk write, got %d (%s)", resp.Code, resp.Body.String()) } - var payload map[string]any + var payload struct { + Written int `json:"written"` + ErrorCount int `json:"errorCount"` + Errors []relayfile.BulkWriteError `json:"errors"` + Results []relayfile.BulkWriteResult `json:"results"` + CorrelationID string `json:"correlationId"` + } if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { t.Fatalf("decode bulk response: %v", err) } - if int(payload["written"].(float64)) != 5 { - t.Fatalf("expected written=5, got %v", payload["written"]) + if payload.Written != 1 { + t.Fatalf("expected written=1, got %d", payload.Written) + } + if payload.ErrorCount != 1 { + t.Fatalf("expected errorCount=1, got %d", payload.ErrorCount) + } + if len(payload.Results) != payload.Written { + t.Fatalf("expected len(results)==written, got %d results and written=%d", len(payload.Results), payload.Written) + } + if len(payload.Errors) != payload.ErrorCount { + t.Fatalf("expected len(errors)==errorCount, got %d errors and errorCount=%d", len(payload.Errors), payload.ErrorCount) + } + if len(payload.Results) != 1 { + t.Fatalf("expected one successful result, got %+v", payload.Results) } - if int(payload["errorCount"].(float64)) != 0 { - t.Fatalf("expected errorCount=0, got %v", payload["errorCount"]) + if payload.Results[0].Path != "/external/Success.md" { + t.Fatalf("unexpected success path: %+v", payload.Results[0]) + } + if payload.Results[0].Revision == "" { + t.Fatalf("expected non-empty revision in result: %+v", payload.Results[0]) + } + if payload.Results[0].ContentType != "text/markdown" { + t.Fatalf("expected markdown content type in result, got %+v", payload.Results[0]) + } + if len(payload.Errors) != 1 { + t.Fatalf("expected one bulk error, got %+v", payload.Errors) + } + if payload.Errors[0].Path != "/external/BadEncoding.md" || payload.Errors[0].Code != "invalid_encoding" { + t.Fatalf("unexpected bulk error: %+v", payload.Errors[0]) + } + for _, result := range payload.Results { + if result.Path == payload.Errors[0].Path { + t.Fatalf("failed path %q must not appear in results: %+v", result.Path, payload.Results) + } } } @@ -863,7 +901,7 @@ func TestExportJSON(t *testing.T) { store := relayfile.NewStoreWithOptions(relayfile.StoreOptions{DisableWorkers: true}) t.Cleanup(store.Close) - if written, errs := store.BulkWrite("ws_export_json", []relayfile.BulkWriteFile{ + if written, _, errs := store.BulkWrite("ws_export_json", []relayfile.BulkWriteFile{ {Path: "/external/A.md", ContentType: "text/markdown", Content: "# A"}, {Path: "/external/B.txt", ContentType: "text/plain", Content: "B"}, }); written != 2 || len(errs) != 0 { @@ -904,7 +942,7 @@ func TestExportJSONPathFilter(t *testing.T) { store := relayfile.NewStoreWithOptions(relayfile.StoreOptions{DisableWorkers: true}) t.Cleanup(store.Close) - if written, errs := store.BulkWrite("ws_export_json_path", []relayfile.BulkWriteFile{ + if written, _, errs := store.BulkWrite("ws_export_json_path", []relayfile.BulkWriteFile{ {Path: "/github/repos/demo/README.md", ContentType: "text/markdown", Content: "# Demo"}, {Path: "/notion/pages/A.md", ContentType: "text/markdown", Content: "# A"}, }); written != 2 || len(errs) != 0 { @@ -942,7 +980,7 @@ func TestExportTar(t *testing.T) { store := relayfile.NewStoreWithOptions(relayfile.StoreOptions{DisableWorkers: true}) t.Cleanup(store.Close) - if written, errs := store.BulkWrite("ws_export_tar", []relayfile.BulkWriteFile{ + if written, _, errs := store.BulkWrite("ws_export_tar", []relayfile.BulkWriteFile{ { Path: "/external/blob.bin", ContentType: "application/octet-stream", diff --git a/internal/mountsync/syncer.go b/internal/mountsync/syncer.go index 037981d..9eacad6 100644 --- a/internal/mountsync/syncer.go +++ b/internal/mountsync/syncer.go @@ -661,7 +661,7 @@ func (s *Syncer) flushPendingBulkWrites(ctx context.Context, pending []pendingBu for _, writeErr := range response.Errors { errorsByPath[normalizeRemotePath(writeErr.Path)] = writeErr } - revisionsByPath := response.revisionsByPath() + resultsByPath := response.resultsByPath() var firstErr error for _, pendingWrite := range pending { @@ -681,7 +681,10 @@ func (s *Syncer) flushPendingBulkWrites(ctx context.Context, pending []pendingBu } continue } - if err := s.reconcileBulkWrite(ctx, pendingWrite, revisionsByPath[pendingWrite.remotePath]); err != nil && firstErr == nil { + if result, ok := resultsByPath[pendingWrite.remotePath]; ok && strings.TrimSpace(result.ContentType) != "" { + pendingWrite.snapshot.ContentType = result.ContentType + } + if err := s.reconcileBulkWrite(ctx, pendingWrite, resultsByPath[pendingWrite.remotePath].Revision); err != nil && firstErr == nil { firstErr = err } } diff --git a/internal/mountsync/syncer_test.go b/internal/mountsync/syncer_test.go index be9f682..3b95923 100644 --- a/internal/mountsync/syncer_test.go +++ b/internal/mountsync/syncer_test.go @@ -660,31 +660,11 @@ func TestBulkMigrationReducesHTTPCalls(t *testing.T) { workspaceID := "ws_mount_bulk_http_volume" handler := newMountsyncAPIHandler(t, store) - // NOTE on what this test proves: - // - // The relayfile server's /fs/bulk response today only carries - // {written, errorCount, errors, correlationId} — it does NOT return - // per-file revisions. That means after a bulk write, the Syncer's - // reconcileBulkWrite() falls back to a per-file GET /fs/file to learn - // the new revision for If-Match on the next cycle. So in production - // the post-bulk traffic looks like: 1 POST /fs/bulk + N GET /fs/file. - // - // The migration's guarantee is: **zero per-file WRITES (POST /fs/file) - // after the first batch.** That is what this test asserts. It counts - // POST and GET separately on /fs/file so GET-based revision read-back - // does not trigger a false failure. An earlier version of this test - // mocked synthetic `Results` into the bulk response, which masked the - // GET fallback and reviewers flagged it as non-representative. - // - // When /fs/bulk grows per-file revision output on the server, the GET - // fallback goes away and this test can tighten to "zero total requests - // on /fs/file" by removing the PostCount filter below. var requestCounts sync.Map var requestsMu sync.Mutex requests := make([]string, 0) var fsFileRequestsMu sync.Mutex fsFileRequests := make([]string, 0) - var fsFilePostCount atomic.Int32 counterFor := func(path string) *atomic.Int32 { counter, _ := requestCounts.LoadOrStore(path, &atomic.Int32{}) return counter.(*atomic.Int32) @@ -699,9 +679,6 @@ func TestBulkMigrationReducesHTTPCalls(t *testing.T) { fsFileRequestsMu.Lock() fsFileRequests = append(fsFileRequests, r.Method+" "+r.URL.String()) fsFileRequestsMu.Unlock() - if r.Method == http.MethodPost { - fsFilePostCount.Add(1) - } } recorder := httptest.NewRecorder() @@ -753,15 +730,10 @@ func TestBulkMigrationReducesHTTPCalls(t *testing.T) { t.Fatalf("expected exactly one bulk request to %s, got %d", bulkPath, got) } - // The migration's load-bearing guarantee: no per-file POSTs (writes) - // to /fs/file. GETs on /fs/file from the post-bulk revision read-back - // are expected until the server returns per-file revisions in the - // bulk response — they're tracked in fsFileRequests for visibility - // but don't fail the test. - if got := fsFilePostCount.Load(); got != 0 { - t.Fatalf("expected zero POST /fs/file requests (per-file writes) after bulk migration, got %d: %v", got, fsFileRequests) + if got := len(fsFileRequests); got != 0 { + t.Fatalf("expected zero /fs/file requests after bulk migration, got %d: %v", got, fsFileRequests) } - t.Logf("bulk migration verified: 1 POST /fs/bulk, 0 POST /fs/file, %d GET /fs/file (revision read-back)", len(fsFileRequests)) + t.Logf("bulk migration verified: 1 POST /fs/bulk, 0 total requests on /fs/file") } func TestBulkWrite_ChunkAtThreshold(t *testing.T) { @@ -2129,7 +2101,7 @@ func (c *fakeClient) WriteFilesBulk(ctx context.Context, workspaceID string, fil if response.ErrorCount == 0 && len(response.Errors) > 0 { response.ErrorCount = len(response.Errors) } - if len(response.Results) == 0 && len(response.Files) == 0 { + if len(response.Results) == 0 { response.Results = results } c.lastBulkWriteResponse = response @@ -2250,6 +2222,7 @@ func newMockMountsyncServer( return } errorsOut := make([]BulkWriteError, 0) + resultsOut := make([]BulkWriteResult, 0, len(payload.Files)) written := 0 for _, file := range payload.Files { path := normalizeRemotePath(file.Path) @@ -2272,12 +2245,18 @@ func newMockMountsyncServer( revisionCounter++ current.Revision = fmt.Sprintf("rev_%d", revisionCounter) normalizedFiles[path] = current + resultsOut = append(resultsOut, BulkWriteResult{ + Path: path, + Revision: current.Revision, + ContentType: current.ContentType, + }) written++ } writeJSONResponse(t, w, http.StatusAccepted, BulkWriteResponse{ Written: written, ErrorCount: len(errorsOut), Errors: errorsOut, + Results: resultsOut, CorrelationID: "corr_mock_bulk", }) case strings.HasSuffix(r.URL.Path, "/fs/file") && r.Method == http.MethodPut: diff --git a/internal/mountsync/types.go b/internal/mountsync/types.go index 288a96d..27070d1 100644 --- a/internal/mountsync/types.go +++ b/internal/mountsync/types.go @@ -11,38 +11,26 @@ type BulkWriteFile = relayfile.BulkWriteFile type BulkWriteError = relayfile.BulkWriteError -type BulkWriteResult struct { - Path string `json:"path"` - Revision string `json:"revision"` - ContentType string `json:"contentType,omitempty"` -} +type BulkWriteResult = relayfile.BulkWriteResult type BulkWriteResponse struct { Written int `json:"written"` ErrorCount int `json:"errorCount"` Errors []BulkWriteError `json:"errors"` Results []BulkWriteResult `json:"results,omitempty"` - Files []RemoteFile `json:"files,omitempty"` CorrelationID string `json:"correlationId"` } var ErrEmptyBulkWrite = errors.New("bulk write requires at least one file") -func (r BulkWriteResponse) revisionsByPath() map[string]string { - byPath := make(map[string]string, len(r.Results)+len(r.Files)) +func (r BulkWriteResponse) resultsByPath() map[string]BulkWriteResult { + byPath := make(map[string]BulkWriteResult, len(r.Results)) for _, result := range r.Results { - revision := strings.TrimSpace(result.Revision) - if revision == "" { - continue - } - byPath[normalizeRemotePath(result.Path)] = revision - } - for _, file := range r.Files { - revision := strings.TrimSpace(file.Revision) - if revision == "" { - continue - } - byPath[normalizeRemotePath(file.Path)] = revision + normalizedPath := normalizeRemotePath(result.Path) + result.Path = normalizedPath + result.Revision = strings.TrimSpace(result.Revision) + result.ContentType = strings.TrimSpace(result.ContentType) + byPath[normalizedPath] = result } return byPath } diff --git a/internal/relayfile/adapters_test.go b/internal/relayfile/adapters_test.go index f2e51f4..08351b8 100644 --- a/internal/relayfile/adapters_test.go +++ b/internal/relayfile/adapters_test.go @@ -6,11 +6,11 @@ func TestParseGenericEnvelopeFileUpdated(t *testing.T) { actions, err := ParseGenericEnvelope(WebhookEnvelopeRequest{ Provider: "custom-system", Payload: map[string]any{ - "path": "/data/record_123.json", - "event_type": "file.updated", - "content": `{"id": 123}`, - "contentType": "application/json", - "providerObjectId": "obj_123", + "path": "/data/record_123.json", + "event_type": "file.updated", + "content": `{"id": 123}`, + "contentType": "application/json", + "providerObjectId": "obj_123", "properties": map[string]any{ "system": "crm", }, diff --git a/internal/relayfile/store.go b/internal/relayfile/store.go index 7ab8e03..c9eeedb 100644 --- a/internal/relayfile/store.go +++ b/internal/relayfile/store.go @@ -178,6 +178,12 @@ type BulkWriteError struct { Message string `json:"message"` } +type BulkWriteResult struct { + Path string `json:"path"` + Revision string `json:"revision"` + ContentType string `json:"contentType,omitempty"` +} + type DeleteRequest struct { WorkspaceID string Path string @@ -1323,15 +1329,15 @@ func (s *Store) WriteFile(req WriteRequest) (WriteResult, error) { return result, nil } -func (s *Store) BulkWrite(workspaceID string, files []BulkWriteFile) (int, []BulkWriteError) { +func (s *Store) BulkWrite(workspaceID string, files []BulkWriteFile) (int, []BulkWriteResult, []BulkWriteError) { if strings.TrimSpace(workspaceID) == "" { - return 0, []BulkWriteError{{ + return 0, nil, []BulkWriteError{{ Code: "invalid_input", Message: ErrInvalidInput.Error(), }} } if len(files) == 0 { - return 0, nil + return 0, nil, nil } type queuedTask struct { @@ -1342,6 +1348,7 @@ func (s *Store) BulkWrite(workspaceID string, files []BulkWriteFile) (int, []Bul ws := s.ensureWorkspaceLocked(workspaceID) now := time.Now().UTC() written := 0 + results := make([]BulkWriteResult, 0, len(files)) errorsOut := make([]BulkWriteError, 0) tasks := make([]queuedTask, 0, len(files)) @@ -1397,6 +1404,11 @@ func (s *Store) BulkWrite(workspaceID string, files []BulkWriteFile) (int, []Bul } result, task := s.recordWriteLocked(ws, path, revision, eventType, file.Provider, "") _ = result + results = append(results, BulkWriteResult{ + Path: path, + Revision: revision, + ContentType: contentType, + }) tasks = append(tasks, queuedTask{task: task}) written++ } @@ -1407,7 +1419,7 @@ func (s *Store) BulkWrite(workspaceID string, files []BulkWriteFile) (int, []Bul for _, queued := range tasks { s.enqueueWriteback(queued.task) } - return written, errorsOut + return written, results, errorsOut } func (s *Store) ExportWorkspace(workspaceID string) ([]File, error) { @@ -1639,8 +1651,8 @@ func (s *Store) CommitForkWithValidator(workspaceID, forkID, correlationID strin } if existed { if existed { - deletedCount++ - } + deletedCount++ + } } } } @@ -1726,28 +1738,29 @@ func (s *Store) WriteForkFile(req WriteRequest, forkID string) (WriteResult, err return result, nil } -func (s *Store) BulkWriteFork(workspaceID, forkID string, files []BulkWriteFile) (int, []BulkWriteError) { +func (s *Store) BulkWriteFork(workspaceID, forkID string, files []BulkWriteFile) (int, []BulkWriteResult, []BulkWriteError) { if strings.TrimSpace(workspaceID) == "" || strings.TrimSpace(forkID) == "" { - return 0, []BulkWriteError{{ + return 0, nil, []BulkWriteError{{ Code: "invalid_input", Message: ErrInvalidInput.Error(), }} } if len(files) == 0 { - return 0, nil + return 0, nil, nil } s.mu.Lock() defer s.mu.Unlock() fork, err := s.getLiveForkLocked(workspaceID, forkID, time.Now().UTC()) if err != nil { - return 0, []BulkWriteError{{ + return 0, nil, []BulkWriteError{{ Code: forkBulkErrorCode(err), Message: err.Error(), }} } written := 0 + results := make([]BulkWriteResult, 0, len(files)) errorsOut := make([]BulkWriteError, 0) for _, input := range files { path := normalizePath(input.Path) @@ -1777,7 +1790,7 @@ func (s *Store) BulkWriteFork(workspaceID, forkID string, files []BulkWriteFile) continue } existing, exists := s.readForkFileLocked(fork, path) - s.writeForkOverlayLocked(fork, WriteRequest{ + result := s.writeForkOverlayLocked(fork, WriteRequest{ WorkspaceID: workspaceID, Path: path, IfMatch: "*", @@ -1785,10 +1798,19 @@ func (s *Store) BulkWriteFork(workspaceID, forkID string, files []BulkWriteFile) Content: input.Content, Encoding: encoding, }, existing, exists) + contentType := strings.TrimSpace(input.ContentType) + if contentType == "" { + contentType = "text/markdown" + } + results = append(results, BulkWriteResult{ + Path: path, + Revision: result.TargetRevision, + ContentType: contentType, + }) written++ } _ = s.saveLocked() - return written, errorsOut + return written, results, errorsOut } func (s *Store) DeleteForkFile(req DeleteRequest, forkID string) (WriteResult, error) { diff --git a/internal/relayfile/store_test.go b/internal/relayfile/store_test.go index 7e0b6ed..a76676f 100644 --- a/internal/relayfile/store_test.go +++ b/internal/relayfile/store_test.go @@ -200,6 +200,36 @@ func (q *countingWritebackQueue) Close() error { return q.inner.Close() } +func assertBulkWriteResults(t *testing.T, results []BulkWriteResult, files []BulkWriteFile) { + t.Helper() + + if len(results) != len(files) { + t.Fatalf("expected %d bulk write results, got %d", len(files), len(results)) + } + + expected := make(map[string]string, len(files)) + for _, file := range files { + contentType := file.ContentType + if contentType == "" { + contentType = "text/markdown" + } + expected[file.Path] = contentType + } + + for _, result := range results { + contentType, ok := expected[result.Path] + if !ok { + t.Fatalf("unexpected bulk write result path %q", result.Path) + } + if result.Revision == "" { + t.Fatalf("expected revision for %s", result.Path) + } + if result.ContentType != contentType { + t.Fatalf("expected content type %q for %s, got %q", contentType, result.Path, result.ContentType) + } + } +} + func (m *memoryStateBackend) Load() (*persistedState, error) { if !m.loaded { return nil, nil @@ -398,7 +428,7 @@ func TestStoreBulkWriteAndExportWorkspace(t *testing.T) { store := NewStoreWithOptions(StoreOptions{DisableWorkers: true}) t.Cleanup(store.Close) - written, errs := store.BulkWrite("ws_bulk", []BulkWriteFile{ + files := []BulkWriteFile{ { Path: "/external/Seed.md", ContentType: "text/markdown", @@ -410,10 +440,12 @@ func TestStoreBulkWriteAndExportWorkspace(t *testing.T) { Content: base64.StdEncoding.EncodeToString([]byte{0x00, 0x01, 0x02, 0x03}), Encoding: "base64", }, - }) + } + written, results, errs := store.BulkWrite("ws_bulk", files) if written != 2 { t.Fatalf("expected 2 written files, got %d", written) } + assertBulkWriteResults(t, results, files) if len(errs) != 0 { t.Fatalf("expected no bulk write errors, got %+v", errs) } @@ -458,10 +490,11 @@ func TestBulkWrite(t *testing.T) { expectedPaths[path] = struct{}{} } - written, errs := store.BulkWrite("ws_bulk_write", files) + written, results, errs := store.BulkWrite("ws_bulk_write", files) if written != 10 { t.Fatalf("expected 10 written files, got %d", written) } + assertBulkWriteResults(t, results, files) if len(errs) != 0 { t.Fatalf("expected no bulk write errors, got %+v", errs) } @@ -511,10 +544,11 @@ func TestBulkWriteOverwrite(t *testing.T) { {Path: "/external/A.md", ContentType: "text/markdown", Content: "# v1"}, {Path: "/external/B.md", ContentType: "text/markdown", Content: "# v1"}, } - written, errs := store.BulkWrite("ws_bulk_overwrite", initial) + written, results, errs := store.BulkWrite("ws_bulk_overwrite", initial) if written != len(initial) { t.Fatalf("expected %d initial writes, got %d", len(initial), written) } + assertBulkWriteResults(t, results, initial) if len(errs) != 0 { t.Fatalf("expected no initial errors, got %+v", errs) } @@ -523,10 +557,11 @@ func TestBulkWriteOverwrite(t *testing.T) { {Path: "/external/A.md", ContentType: "text/markdown", Content: "# v2"}, {Path: "/external/B.md", ContentType: "text/markdown", Content: "# v3"}, } - written, errs = store.BulkWrite("ws_bulk_overwrite", updated) + written, results, errs = store.BulkWrite("ws_bulk_overwrite", updated) if written != len(updated) { t.Fatalf("expected %d overwrite writes, got %d", len(updated), written) } + assertBulkWriteResults(t, results, updated) if len(errs) != 0 { t.Fatalf("expected no overwrite errors, got %+v", errs) } @@ -551,10 +586,11 @@ func TestExportWorkspace(t *testing.T) { {Path: "/external/B.md", ContentType: "text/plain", Content: "B"}, {Path: "/external/C.json", ContentType: "application/json", Content: `{"ok":true}`}, } - written, errs := store.BulkWrite("ws_export", seed) + written, results, errs := store.BulkWrite("ws_export", seed) if written != len(seed) { t.Fatalf("expected %d written files, got %d", len(seed), written) } + assertBulkWriteResults(t, results, seed) if len(errs) != 0 { t.Fatalf("expected no bulk write errors, got %+v", errs) } @@ -585,15 +621,17 @@ func TestBinaryEncoding(t *testing.T) { t.Cleanup(store.Close) encoded := base64.StdEncoding.EncodeToString([]byte{0x00, 0x7f, 0xff, 0x10}) - written, errs := store.BulkWrite("ws_binary_encoding", []BulkWriteFile{{ + files := []BulkWriteFile{{ Path: "/external/blob.bin", ContentType: "application/octet-stream", Content: encoded, Encoding: "base64", - }}) + }} + written, results, errs := store.BulkWrite("ws_binary_encoding", files) if written != 1 { t.Fatalf("expected 1 written file, got %d", written) } + assertBulkWriteResults(t, results, files) if len(errs) != 0 { t.Fatalf("expected no bulk write errors, got %+v", errs) } diff --git a/workflows/061-bulk-write-per-file-revisions.ts b/workflows/061-bulk-write-per-file-revisions.ts new file mode 100644 index 0000000..d346373 --- /dev/null +++ b/workflows/061-bulk-write-per-file-revisions.ts @@ -0,0 +1,418 @@ +/** + * 061-bulk-write-per-file-revisions.ts + * + * Extend the relayfile server's `/v1/workspaces/:id/fs/bulk` response to + * include per-file results (path + new revision + contentType) so the + * mount daemon can reconcile tracked revisions WITHOUT a follow-up + * GET /fs/file per file. This is the complement to workflow 060 — after + * this lands, `TestBulkMigrationReducesHTTPCalls` tightens from "0 POST + * /fs/file" to "0 total requests on /fs/file". + * + * Context: + * - 060 shipped the mount-side bulk migration. In production today the + * server's /fs/bulk response only returns + * {written, errorCount, errors, correlationId} — no per-file new + * revisions. So `reconcileBulkWrite` (mount) falls back to a per-file + * GET /fs/file per batched write to learn the new revision for next + * If-Match. A 10-file batch = 1 POST + 10 GETs. + * - The server already knows the new revision when it writes each file + * (Store.BulkWrite / Store.BulkWriteFork return per-file results + * internally). Extending the HTTP response to carry them is a small + * server-side change that eliminates the GET fallback entirely. + * + * Run from the relayfile repo root: + * agent-relay run workflows/061-bulk-write-per-file-revisions.ts + */ +import { workflow } from '@agent-relay/sdk/workflows'; +import { ClaudeModels, CodexModels } from '@agent-relay/config'; + +async function main() { + const result = await workflow('061-bulk-write-per-file-revisions') + .description( + 'Return per-file results (path + revision + contentType) from /fs/bulk so the mount daemon stops falling back to per-file GETs after bulk writes. Includes server-side changes, wire-type additions, mount-side reconcileBulkWrite simplification, and tightened e2e proof.', + ) + .pattern('dag') + .channel('wf-061-bulk-per-file-revisions') + .maxConcurrency(3) + .timeout(7_200_000) + + .agent('architect', { + cli: 'claude', + model: ClaudeModels.OPUS, + preset: 'analyst', + role: 'Defines the exact server-side response shape change, wire-type updates, and mount-side simplification path before any code is touched.', + retries: 1, + }) + .agent('impl', { + cli: 'codex', + model: CodexModels.GPT_5_4, + role: 'Implements Go changes across internal/httpapi, internal/relayfile, and internal/mountsync per the boundary doc.', + retries: 2, + }) + .agent('reviewer', { + cli: 'claude', + model: ClaudeModels.OPUS, + preset: 'reviewer', + role: 'Checks the server response change preserves backward compatibility and the mount reconciliation matches the new shape.', + retries: 1, + }) + + // ──────────────────────────────────────────────────────────────── + // Phase 1: Preflight + context + // ──────────────────────────────────────────────────────────────── + .step('preflight', { + type: 'deterministic', + // This workflow STACKS on top of PR #63 (`fix/mount-bulk-sync-migration`) + // until #63 merges to main. Starting from main would miss the bulk + // client + types.go that 061 depends on. The final PR targets + // `fix/mount-bulk-sync-migration` as base; GitHub will auto-retarget + // to main once #63 merges. + command: [ + 'set -e', + 'BASE_BRANCH="fix/mount-bulk-sync-migration"', + 'WF_BRANCH="fix/bulk-write-per-file-revisions"', + 'BRANCH=$(git rev-parse --abbrev-ref HEAD)', + 'echo "current branch: $BRANCH"', + 'echo "stack base: $BASE_BRANCH"', + 'echo "workflow branch: $WF_BRANCH"', + // If already on the workflow branch (resume case), do nothing. + 'if [ "$BRANCH" = "$WF_BRANCH" ]; then', + ' echo "already on $WF_BRANCH — ok"', + 'elif [ "$BRANCH" = "$BASE_BRANCH" ]; then', + ' git fetch origin "$BASE_BRANCH"', + ' git checkout -b "$WF_BRANCH"', + ' echo "created $WF_BRANCH on top of $BASE_BRANCH"', + 'elif [ "$BRANCH" = "main" ]; then', + ' git fetch origin "$BASE_BRANCH"', + ' git checkout "$BASE_BRANCH"', + ' git checkout -b "$WF_BRANCH"', + ' echo "checked out $BASE_BRANCH then created $WF_BRANCH"', + 'else', + ' echo "ERROR: on unexpected branch \\"$BRANCH\\". Checkout $BASE_BRANCH or $WF_BRANCH first."', + ' exit 1', + 'fi', + 'echo "working-tree files currently dirty (informational):"', + 'git diff --name-only | head -20 || true', + 'if ! git diff --cached --quiet; then', + ' echo "ERROR: staging area dirty — unstage first"', + ' git diff --cached --stat', + ' exit 1', + 'fi', + 'gh auth status >/dev/null 2>&1 || (echo "ERROR: gh CLI not authenticated"; exit 1)', + 'go version', + 'echo PREFLIGHT_OK', + ].join('\n'), + captureOutput: true, + failOnError: true, + }) + + .step('collect-context', { + type: 'deterministic', + dependsOn: ['preflight'], + command: [ + 'set -e', + 'echo "=== current /fs/bulk server response shape ==="', + 'awk "/func .s .Server. handleBulkWrite/,/^}$/" internal/httpapi/server.go | tail -20 || true', + 'echo', + 'echo "=== Store.BulkWrite / Store.BulkWriteFork return types ==="', + 'rg -n "func .s .Store. BulkWrite\\\\b|func .s .Store. BulkWriteFork" internal/relayfile/store.go || true', + 'rg -n "BulkWriteFile|BulkWriteError" internal/relayfile/store.go | head -10 || true', + 'echo', + 'echo "=== mount-side reconcile that will simplify ==="', + 'awk "/func .s .Syncer. reconcileBulkWrite/,/^}$/" internal/mountsync/syncer.go | head -40 || true', + 'echo', + 'echo "=== mount-side wire-type aliases ==="', + 'cat internal/mountsync/types.go 2>/dev/null || true', + 'echo', + 'echo "=== the assertion that will tighten ==="', + 'rg -n "fsFilePostCount|bulk migration verified" internal/mountsync/syncer_test.go || true', + 'echo', + 'echo "=== existing server-side bulk tests ==="', + 'rg -n "handleBulkWrite|TestBulkWrite|/fs/bulk" internal/httpapi/server_test.go 2>/dev/null | head -20 || true', + ].join('\n'), + captureOutput: true, + failOnError: true, + }) + + // ──────────────────────────────────────────────────────────────── + // Phase 2: Boundary doc + // ──────────────────────────────────────────────────────────────── + .step('define-boundary', { + agent: 'architect', + dependsOn: ['collect-context'], + task: [ + 'Scope the server-side bulk response change and the matching mount reconciliation simplification.', + '', + 'Collected context:', + '{{steps.collect-context.output}}', + '', + 'Write docs/bulk-per-file-revisions-boundary.md with EXACTLY these sections:', + '', + '1. Files touched — list every file with reason and create/modify:', + ' - internal/httpapi/server.go (handleBulkWrite returns per-file results)', + ' - internal/httpapi/server_test.go (new test asserting new fields present)', + ' - internal/relayfile/store.go (BulkWrite/BulkWriteFork must surface per-file revisions — they already compute them internally; now return them)', + ' - internal/relayfile/ (any shared response-type structs)', + ' - internal/mountsync/types.go (BulkWriteResponse — field already exists as Results/Files; verify alignment)', + ' - internal/mountsync/syncer.go (reconcileBulkWrite: stop falling back to ReadFile when revision is present in response)', + ' - internal/mountsync/syncer_test.go (tighten TestBulkMigrationReducesHTTPCalls assertion: 0 total requests on /fs/file)', + '2. New wire shape for /fs/bulk response — exact JSON:', + ' {', + ' "written": int,', + ' "errorCount": int,', + ' "errors": [...],', + ' "results": [{"path": "...", "revision": "...", "contentType": "..."}],', + ' "correlationId": "..."', + ' }', + '3. Backward compatibility — what happens if a client on an old SDK reads the new response? (answer: extra field is ignored; also the existing mount code already handles missing `results` by falling back to ReadFile). Document this.', + '4. Store.BulkWrite signature change — propose the new return type (likely `(written int, results []BulkWriteResult, errors []BulkWriteError)`) and confirm Store.BulkWriteFork gets the same treatment.', + '5. Mount-side change — remove the fallback ReadFile in reconcileBulkWrite when the response supplies a revision. Keep the fallback for the empty-revision case (defensive).', + '6. Test updates:', + ' - Server: a test asserting the new fields are populated for successful writes AND absent for failed ones.', + ' - Mount: tighten TestBulkMigrationReducesHTTPCalls so it asserts 0 total requests on /fs/file (not just 0 POSTs). Remove the post-filter counter since it is no longer needed.', + '7. Deliberately out of scope — FUSE mount path, SDK/TS bindings, observer/dashboard.', + '8. Acceptance gates — exact commands that must pass:', + ' - go test ./internal/httpapi/... -count=1 -timeout 120s', + ' - go test ./internal/mountsync/... -count=1 -timeout 120s', + ' - go build ./...', + '', + 'Do NOT touch any .go file. End with the literal token BOUNDARY_READY on its own line.', + ].join('\n'), + verification: { type: 'file_exists', value: 'docs/bulk-per-file-revisions-boundary.md' }, + retries: 1, + }) + + // ──────────────────────────────────────────────────────────────── + // Phase 3: Implement (server + store + mount in one pass — tight coupling) + // ──────────────────────────────────────────────────────────────── + .step('implement', { + agent: 'impl', + dependsOn: ['define-boundary'], + task: [ + 'Implement the changes per docs/bulk-per-file-revisions-boundary.md.', + '', + 'If docs/bulk-per-file-revisions-review.md exists from a previous run,', + 'read it first and address each REVIEW_CHANGES_REQUESTED item on', + 'this pass.', + '', + 'Constraints:', + '1. Only modify files listed in the boundary doc.', + '2. Preserve backward compatibility — the existing {written, errorCount, errors, correlationId} fields must remain unchanged. Only ADD a new `results` field.', + '3. Store.BulkWrite / Store.BulkWriteFork should return the new per-file results in addition to (not instead of) existing return values. If this changes the signature, update all call sites accordingly.', + '4. On the mount side, the existing types.go defines BulkWriteResponse with Results []BulkWriteResult already. Confirm field names align with the server JSON tags; adjust if mismatched.', + '5. reconcileBulkWrite: when response supplies a revision for the path, use it directly and do NOT call s.client.ReadFile. Keep the ReadFile fallback only for the empty-revision defensive case.', + '6. gofmt everything touched.', + '7. Do NOT run tests yet — the next step handles that.', + '', + 'Print `git diff --stat` when done. End with IMPLEMENTATION_READY.', + ].join('\n'), + verification: { type: 'exit_code' }, + retries: 1, + }) + + .step('verify-edits-landed', { + type: 'deterministic', + dependsOn: ['implement'], + command: [ + 'set -e', + 'echo "=== git diff --stat ==="', + 'git diff --stat internal/ 2>/dev/null || true', + 'echo', + 'echo "=== sanity grep: server returns results field ==="', + 'rg -n "results.*\\\\[\\\\]|Results.*\\\\[\\\\]BulkWriteResult|\\"results\\":" internal/httpapi/server.go | head -10 || true', + 'echo', + 'echo "=== sanity grep: Store returns per-file results ==="', + 'rg -n "BulkWriteResult|results \\\\[\\\\]" internal/relayfile/store.go | head -10 || true', + 'echo', + 'echo "=== sanity grep: mount uses response revision directly ==="', + 'rg -n "revisionsByPath|response.Results" internal/mountsync/syncer.go | head -10 || true', + 'echo', + 'echo "=== gofmt check (diff-scoped) ==="', + // Scope to files this workflow actually touched so pre-existing + // drift in unrelated files (e.g. internal/relayfile/adapters_test.go + // column-alignment) does not fail the gate. + 'git diff --name-only | grep "\\.go$" | xargs -r gofmt -l 2>&1 | tee /tmp/gofmt-out-061', + 'if [ -s /tmp/gofmt-out-061 ]; then', + ' echo "ERROR: workflow-touched files above are unformatted"', + ' exit 1', + 'fi', + 'echo EDITS_VERIFIED', + ].join('\n'), + captureOutput: true, + failOnError: true, + }) + + // ──────────────────────────────────────────────────────────────── + // Phase 4: Tests — regression + new + // ──────────────────────────────────────────────────────────────── + .step('run-regression-tests', { + type: 'deterministic', + dependsOn: ['verify-edits-landed'], + command: 'go test ./internal/httpapi/... ./internal/mountsync/... ./internal/relayfile/... -count=1 -timeout 180s 2>&1 | tail -120', + captureOutput: true, + failOnError: false, + }) + + .step('fix-regressions', { + agent: 'impl', + dependsOn: ['run-regression-tests'], + task: [ + 'If all tests passed ("ok " for each package, no FAIL), do nothing and end with REGRESSIONS_OK.', + '', + 'Test output:', + '{{steps.run-regression-tests.output}}', + '', + 'If there are failures, fix them and re-run:', + ' go test ./internal/httpapi/... ./internal/mountsync/... ./internal/relayfile/... -count=1 -timeout 180s', + '', + 'Do not weaken tests to make them pass — if the test expectation is stale given the new response shape, update the assertion correctly.', + '', + 'End with REGRESSIONS_OK.', + ].join('\n'), + verification: { type: 'exit_code' }, + retries: 2, + }) + + .step('run-regression-tests-final', { + type: 'deterministic', + dependsOn: ['fix-regressions'], + command: 'go test ./internal/httpapi/... ./internal/mountsync/... ./internal/relayfile/... -count=1 -timeout 180s 2>&1 | tail -60', + captureOutput: true, + failOnError: true, + }) + + // ──────────────────────────────────────────────────────────────── + // Phase 5: E2E proof — tightened assertion + // ──────────────────────────────────────────────────────────────── + .step('tighten-e2e-proof', { + agent: 'impl', + dependsOn: ['run-regression-tests-final'], + task: [ + 'Update internal/mountsync/syncer_test.go TestBulkMigrationReducesHTTPCalls so the /fs/file assertion tightens from "0 POST /fs/file" to "0 total requests on /fs/file". Remove the fsFilePostCount counter (no longer needed). Drop the comment block explaining the GET fallback — the server now returns revisions inline. Keep the test structure otherwise identical.', + '', + 'Then run:', + ' go test ./internal/mountsync/... -run TestBulkMigrationReducesHTTPCalls -v -timeout 120s', + '', + 'Assert it passes and prints the tightened log line. End with E2E_OK.', + ].join('\n'), + verification: { type: 'exit_code' }, + retries: 1, + }) + + .step('tighten-e2e-proof-final', { + type: 'deterministic', + dependsOn: ['tighten-e2e-proof'], + command: 'go test ./internal/mountsync/... -run TestBulkMigrationReducesHTTPCalls -v -timeout 120s 2>&1 | tail -20', + captureOutput: true, + failOnError: true, + }) + + // ──────────────────────────────────────────────────────────────── + // Phase 6: Build + vet + // ──────────────────────────────────────────────────────────────── + .step('go-vet', { + type: 'deterministic', + dependsOn: ['tighten-e2e-proof-final'], + command: 'go vet ./... 2>&1 | tail -20', + captureOutput: true, + failOnError: true, + }) + + .step('go-build', { + type: 'deterministic', + dependsOn: ['go-vet'], + command: 'go build ./... 2>&1 && echo BUILD_OK', + captureOutput: true, + failOnError: true, + }) + + // ──────────────────────────────────────────────────────────────── + // Phase 7: Review + commit + PR + // ──────────────────────────────────────────────────────────────── + .step('review', { + agent: 'reviewer', + dependsOn: ['go-build'], + task: [ + 'You are reviewing the CURRENT state of the diff, not any prior review.', + 'If docs/bulk-per-file-revisions-review.md exists from a previous', + 'run, IGNORE ITS CONTENTS — overwrite with a fresh verdict.', + '', + 'Review against docs/bulk-per-file-revisions-boundary.md. Check:', + '', + '1. Every file modified is listed in section 1 of the boundary doc (no scope creep). Run `git diff --name-only` and cross-reference.', + '2. Backward compatibility — verify the new `results` field is ADDITIVE. The existing {written, errorCount, errors, correlationId} response fields must be unchanged in name, type, and semantics. An older client that ignores `results` must still work.', + '3. Store.BulkWrite / Store.BulkWriteFork signature changes are applied consistently at all call sites.', + '4. Mount reconcileBulkWrite correctly prefers response-supplied revisions and only falls back to ReadFile on missing values.', + '5. Tests exist for: server response includes results on success, server response excludes results on error (or lists the error path correctly), mount skips ReadFile when revision is present.', + '6. E2E proof is tightened — TestBulkMigrationReducesHTTPCalls now asserts 0 total requests on /fs/file (not just 0 POSTs).', + '7. gofmt / go vet / go build passed.', + '', + 'Read the diff:', + ' git diff --stat', + ' git diff --name-only', + ' git diff internal/httpapi/server.go', + ' git diff internal/relayfile/store.go', + ' git diff internal/mountsync/syncer.go', + '', + 'Write fresh verdict to docs/bulk-per-file-revisions-review.md. End with REVIEW_APPROVED on its own line, or REVIEW_CHANGES_REQUESTED followed by a specific list.', + ].join('\n'), + verification: { type: 'file_exists', value: 'docs/bulk-per-file-revisions-review.md' }, + retries: 1, + }) + + .step('verify-review-approved', { + type: 'deterministic', + dependsOn: ['review'], + command: 'grep -q "^REVIEW_APPROVED" docs/bulk-per-file-revisions-review.md && echo APPROVED || (echo "Review did not approve — see docs/bulk-per-file-revisions-review.md"; exit 1)', + captureOutput: true, + failOnError: true, + }) + + .step('commit', { + type: 'deterministic', + dependsOn: ['verify-review-approved'], + command: [ + 'set -e', + 'git add internal/httpapi/server.go internal/httpapi/server_test.go internal/relayfile/store.go internal/mountsync/types.go internal/mountsync/syncer.go internal/mountsync/syncer_test.go docs/bulk-per-file-revisions-boundary.md docs/bulk-per-file-revisions-review.md workflows/061-bulk-write-per-file-revisions.ts', + // Catch any other .go file the impl agent may have touched. + 'git add internal/ 2>/dev/null || true', + 'MSG=$(mktemp)', + 'printf "%s\\n" "feat(bulk): return per-file revisions from /fs/bulk" "" "The existing /fs/bulk response only returned {written, errorCount, errors," "correlationId}. Mount clients had to follow every bulk write with N GET /fs/file" "requests to learn the new revisions for If-Match. That made the post-migration" "total request count N+1 for an N-file batch." "" "Extend the response with a per-file results array:" " {\\"path\\":..., \\"revision\\":..., \\"contentType\\":...}" "" "Purely additive — existing fields unchanged. Mount now uses the inline revision" "and skips the ReadFile fallback. The e2e proof TestBulkMigrationReducesHTTPCalls" "tightens from \\"0 POST /fs/file\\" to \\"0 total requests on /fs/file\\"." > "$MSG"', + 'git commit -F "$MSG"', + 'rm -f "$MSG"', + 'echo COMMIT_OK', + ].join(' && '), + captureOutput: true, + failOnError: true, + }) + + .step('push-and-pr', { + type: 'deterministic', + dependsOn: ['commit'], + command: [ + 'set -e', + 'BRANCH=$(git rev-parse --abbrev-ref HEAD)', + 'git push -u origin "$BRANCH"', + 'BODY=$(mktemp)', + 'printf "%s\\n" "## Summary" "" "Extend /fs/bulk response with per-file results so mount clients do not need a follow-up GET /fs/file per batched write to learn new revisions." "" "## Why" "" "Workflow 060 migrated the mount daemon from per-file POST /fs/file to POST /fs/bulk. That eliminated per-file WRITES but left per-file GETs for revision read-back, because the /fs/bulk response did not carry per-file revisions. Total request count for an N-file batch was still N+1." "" "Server already computes per-file revisions in Store.BulkWrite / Store.BulkWriteFork. This PR surfaces them in the HTTP response as an additive results array." "" "## Backward compatibility" "" "Purely additive — existing response fields unchanged. Clients on older SDKs ignore the extra field; the mount code already handles missing results by falling back to ReadFile." "" "## Proof" "" "TestBulkMigrationReducesHTTPCalls now asserts 0 total requests on /fs/file (previously only 0 POST /fs/file, because GET fallbacks were expected). See docs/bulk-per-file-revisions-boundary.md for scope and docs/bulk-per-file-revisions-review.md for the approved review." "" "Authored by workflows/061-bulk-write-per-file-revisions.ts." > "$BODY"', + // Stack on top of PR #63 until it merges. GitHub auto-retargets + // this PR to main when the base branch merges. + 'gh pr create --base fix/mount-bulk-sync-migration --head "$BRANCH" --title "feat(bulk): return per-file revisions from /fs/bulk" --body-file "$BODY" | tee /tmp/pr-url-061.txt', + 'rm -f "$BODY"', + 'echo', + 'echo "PR: $(cat /tmp/pr-url-061.txt)"', + ].join(' && '), + captureOutput: true, + failOnError: true, + }) + + .onError('retry', { maxRetries: 1, retryDelayMs: 10_000 }) + .run({ cwd: process.cwd() }); + + console.log('Workflow status:', result.status); +} + +main().catch((error) => { + console.error(error); + process.exit(1); +});