From 7c842e43b135eed6f0589c2dd6f2a3f5b9d893f2 Mon Sep 17 00:00:00 2001 From: githoboman Date: Mon, 29 Jun 2026 20:49:11 +0100 Subject: [PATCH 1/3] feat: implement redis response caching middleware with configurable environment settings --- .env.example | 12 ++ package-lock.json | 130 +++++++++++++++- package.json | 1 + src/api.ts | 10 +- src/cache/__tests__/redis.test.ts | 246 ++++++++++++++++++++++++++++++ src/cache/redis.ts | 195 +++++++++++++++++++++++ 6 files changed, 586 insertions(+), 8 deletions(-) create mode 100644 src/cache/__tests__/redis.test.ts create mode 100644 src/cache/redis.ts diff --git a/.env.example b/.env.example index 17f0f7f4..b9ff57df 100644 --- a/.env.example +++ b/.env.example @@ -84,3 +84,15 @@ RETENTION_DAYS=30 # ─── API ───────────────────────────────────────────────────── PORT=3000 + +# ─── Response cache (Redis) ────────────────────────────────── +# Opt-in caching for the hot read endpoints (/assets/popular, /search). +# Off by default; set CACHE_ENABLED=true to turn it on. +# Clients can force a fresh response with the `X-No-Cache` request header. +CACHE_ENABLED=false +REDIS_URL="redis://localhost:6379" +# Key namespace applied to every cached entry. +CACHE_KEY_PREFIX="wraith:cache:" +# Per-route TTLs in milliseconds. +CACHE_TTL_POPULAR_MS=60000 +CACHE_TTL_SEARCH_MS=15000 diff --git a/package-lock.json b/package-lock.json index ba275487..5dcd827c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,6 +19,7 @@ "express": "^4.18.3", "express-rate-limit": "^8.3.2", "graphql": "^16.11.0", + "ioredis": "^5.11.1", "parquetjs-lite": "^0.8.7", "ws": "^8.20.0", "zod": "^4.4.3" @@ -555,6 +556,7 @@ "integrity": "sha512-CGOfOJqWjg2qW/Mb6zNsDm+u5vFQ8DxXfbM09z69p5Z6+mE1ikP2jUXw+j42Pf1XTYED2Rni5f95npYeuwMDQA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@babel/code-frame": "^7.29.0", "@babel/generator": "^7.29.0", @@ -1094,6 +1096,17 @@ "node": ">=12" } }, + "node_modules/@emnapi/wasi-threads": { + "version": "1.2.2", + "resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.2.2.tgz", + "integrity": "sha512-c95qOXkHdydNKhscBTebqEC1CVAZpyqOfVfBzQ1qgzyl3gfeldUjIggDbIZgDKsHLgnsM+igH7TJ/eAasaVuMA==", + "dev": true, + "license": "MIT", + "optional": true, + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/@fast-csv/format": { "version": "5.0.7", "resolved": "https://registry.npmjs.org/@fast-csv/format/-/format-5.0.7.tgz", @@ -1177,6 +1190,12 @@ "graphql": "^0.8.0 || ^0.9.0 || ^0.10.0 || ^0.11.0 || ^0.12.0 || ^0.13.0 || ^14.0.0 || ^15.0.0 || ^16.0.0 || ^17.0.0" } }, + "node_modules/@ioredis/commands": { + "version": "1.10.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.10.0.tgz", + "integrity": "sha512-UmeW7z4LfctwoQ5wkhVzgq8tXkreED2xZGpX+Bg+zA+WJFZCT6c062AfCK/Dfk81xZnnwdhJCUMkitihRaoC2Q==", + "license": "MIT" + }, "node_modules/@isaacs/cliui": { "version": "8.0.2", "resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz", @@ -1903,14 +1922,14 @@ "version": "5.22.0", "resolved": "https://registry.npmjs.org/@prisma/debug/-/debug-5.22.0.tgz", "integrity": "sha512-AUt44v3YJeggO2ZU5BkXI7M4hu9BF2zzH2iF2V5pyXT/lRTyWiElZ7It+bRH1EshoMRxHgpYg4VB6rCM+mG5jQ==", - "dev": true, + "devOptional": true, "license": "Apache-2.0" }, "node_modules/@prisma/engines": { "version": "5.22.0", "resolved": "https://registry.npmjs.org/@prisma/engines/-/engines-5.22.0.tgz", "integrity": "sha512-UNjfslWhAt06kVL3CjkuYpHAWSO6L4kDCVPegV6itt7nD1kSJavd3vhgAEhjglLJJKEdJ7oIqDJ+yHk6qO8gPA==", - "dev": true, + "devOptional": true, "hasInstallScript": true, "license": "Apache-2.0", "dependencies": { @@ -1924,14 +1943,14 @@ "version": "5.22.0-44.605197351a3c8bdd595af2d2a9bc3025bca48ea2", "resolved": "https://registry.npmjs.org/@prisma/engines-version/-/engines-version-5.22.0-44.605197351a3c8bdd595af2d2a9bc3025bca48ea2.tgz", "integrity": "sha512-2PTmxFR2yHW/eB3uqWtcgRcgAbG1rwG9ZriSvQw+nnb7c4uCr3RAcGMb6/zfE88SKlC1Nj2ziUvc96Z379mHgQ==", - "dev": true, + "devOptional": true, "license": "Apache-2.0" }, "node_modules/@prisma/fetch-engine": { "version": "5.22.0", "resolved": "https://registry.npmjs.org/@prisma/fetch-engine/-/fetch-engine-5.22.0.tgz", "integrity": "sha512-bkrD/Mc2fSvkQBV5EpoFcZ87AvOgDxbG99488a5cexp5Ccny+UM6MAe/UFkUC0wLYD9+9befNOqGiIJhhq+HbA==", - "dev": true, + "devOptional": true, "license": "Apache-2.0", "dependencies": { "@prisma/debug": "5.22.0", @@ -1943,7 +1962,7 @@ "version": "5.22.0", "resolved": "https://registry.npmjs.org/@prisma/get-platform/-/get-platform-5.22.0.tgz", "integrity": "sha512-pHhpQdr1UPFpt+zFfnPazhulaZYCUqeIcPpJViYoq9R+D/yw4fjE+CtnsnKzPYm0ddUbeXUzjGVGIRVgPDCk4Q==", - "dev": true, + "devOptional": true, "license": "Apache-2.0", "dependencies": { "@prisma/debug": "5.22.0" @@ -2377,6 +2396,7 @@ "integrity": "sha512-8kzdPJ3FsNsVIurqBs7oodNnCEVbni9yUEkaHbgptDACOPW04jimGagZ51E6+lXUwJjgnBw+hyko/lkFWCldqw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~6.21.0" } @@ -3277,6 +3297,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "baseline-browser-mapping": "^2.10.12", "caniuse-lite": "^1.0.30001782", @@ -3532,6 +3553,15 @@ "node": ">=12" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.1.tgz", + "integrity": "sha512-rwHwUfXL40Chm1r08yrhU3qpUvdVlgkKNeyeGPOxnW8/SyVDvgRaed/Uz54AqWNaTCAThlj6QAs3TZcKI0xDEw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/co": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/co/-/co-4.6.0.tgz", @@ -3769,6 +3799,15 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -4087,6 +4126,7 @@ "resolved": "https://registry.npmjs.org/express/-/express-4.22.1.tgz", "integrity": "sha512-F2X8g9P1X7uCPZMA3MVf9wcTqlyNp7IhH5qPCI0izhaOIYXaW9L535tGA3qmjRzpH+bZczqq7hVKxTR4NWnu+g==", "license": "MIT", + "peer": true, "dependencies": { "accepts": "~1.3.8", "array-flatten": "1.1.1", @@ -4551,6 +4591,7 @@ "resolved": "https://registry.npmjs.org/graphql/-/graphql-16.14.2.tgz", "integrity": "sha512-Chq1s4CY7jmh8gO2qvLIJyfCDIN+EHLFW/9iShnp1z8FjBQMoodWP1kDC36VAMXXIvAjj4ARa7ntfAV2BrjsbA==", "license": "MIT", + "peer": true, "engines": { "node": "^12.22.0 || ^14.16.0 || ^16.0.0 || >=17.0.0" } @@ -4761,6 +4802,51 @@ "integrity": "sha512-a5jlKftS7HUOhkUyYD7j2sJ/ZnvWiNlZS1ldR+g1ifQ+/UuZXIE+YTc/lK1qGj/GwAU5F8Z0e1eVq2t1J5Ob2g==", "license": "BSD-3-Clause" }, + "node_modules/ioredis": { + "version": "5.11.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.11.1.tgz", + "integrity": "sha512-ehuGcf94bQXhfagULNXrJdfnWO38v070jxSx/qE87Kjzmu2fU7ro5EFAb+OPituLqgfyuQaym5DlrNydW2sJ9A==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "1.10.0", + "cluster-key-slot": "1.1.1", + "debug": "4.4.3", + "denque": "2.1.0", + "redis-errors": "1.2.0", + "redis-parser": "3.0.0", + "standard-as-callback": "2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, + "node_modules/ioredis/node_modules/debug": { + "version": "4.4.3", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", + "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/ioredis/node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", + "license": "MIT" + }, "node_modules/ip-address": { "version": "10.1.0", "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-10.1.0.tgz", @@ -5075,6 +5161,7 @@ "integrity": "sha512-Yi1jqNC/Oq0N4hBgNH/YvBpP1P57QqundgytzYqy3yqAa7NZPNjSoi4SGbRAXDMdBzNE6xBCi5U7RgfrvMEUVQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@jest/core": "30.4.2", "@jest/types": "30.4.1", @@ -6623,9 +6710,10 @@ "version": "5.22.0", "resolved": "https://registry.npmjs.org/prisma/-/prisma-5.22.0.tgz", "integrity": "sha512-vtpjW3XuYCSnMsNVBjLMNkTj6OZbudcPPTPYHqX0CJfpcdWciI1dM8uHETwmDxxiqEwCIE6WvXucWUetJgfu/A==", - "dev": true, + "devOptional": true, "hasInstallScript": true, "license": "Apache-2.0", + "peer": true, "dependencies": { "@prisma/engines": "5.22.0" }, @@ -6765,6 +6853,27 @@ "node": ">=8.10.0" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "license": "MIT", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/require-directory": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz", @@ -7125,6 +7234,12 @@ "node": ">=10" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "license": "MIT" + }, "node_modules/statuses": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.2.tgz", @@ -7531,6 +7646,7 @@ "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@cspotcode/source-map-support": "^0.8.0", "@tsconfig/node10": "^1.0.7", @@ -7742,6 +7858,7 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -8173,6 +8290,7 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-4.4.3.tgz", "integrity": "sha512-ytENFjIJFl2UwYglde2jchW2Hwm4GJFLDiSXWdTrJQBIN9Fcyp7n4DhxJEiWNAJMV1/BqWfW/kkg71UDcHJyTQ==", "license": "MIT", + "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/package.json b/package.json index 50e8ba45..5ca781a8 100644 --- a/package.json +++ b/package.json @@ -68,6 +68,7 @@ "express": "^4.18.3", "express-rate-limit": "^8.3.2", "graphql": "^16.11.0", + "ioredis": "^5.11.1", "parquetjs-lite": "^0.8.7", "ws": "^8.20.0", "zod": "^4.4.3" diff --git a/src/api.ts b/src/api.ts index d226af4b..9a87bff6 100644 --- a/src/api.ts +++ b/src/api.ts @@ -11,6 +11,7 @@ import { createGraphQLMiddleware } from "./graphql/server"; import { createPopularAssetsRouter } from "./routes/assets/popular"; import { createExportsRouter } from "./routes/exports"; import { createSearchRouter } from "./routes/search"; +import { cacheMiddleware } from "./cache/redis"; import { hostFnQuerySchema, nftOwnerParamsSchema, @@ -31,6 +32,11 @@ const limiter = rateLimit({ message: { error: "Too many requests, please try again later." }, }); +// ── Response cache (opt-in via CACHE_ENABLED) ─────────────────────────────────── +// Per-route TTLs: popular-asset rollups tolerate more staleness than search. +const POPULAR_CACHE_TTL_MS = parseInt(process.env.CACHE_TTL_POPULAR_MS ?? "60000", 10); +const SEARCH_CACHE_TTL_MS = parseInt(process.env.CACHE_TTL_SEARCH_MS ?? "15000", 10); + // ── Amount formatting ───────────────────────────────────────────────────────── const STROOPS = 10_000_000n; @@ -96,13 +102,13 @@ export function createApp(): express.Application { app.use("/graphql", createGraphQLMiddleware()); // ── Assets routes ─────────────────────────────────────────────────────────── - app.use("/assets", createPopularAssetsRouter()); + app.use("/assets", cacheMiddleware({ ttlMs: POPULAR_CACHE_TTL_MS }), createPopularAssetsRouter()); // ── Export routes ───────────────────────────────────────────────────────────── app.use("/", createExportsRouter()); // ── Fuzzy search across accounts, assets, and contracts ────────────────────── - app.use("/search", createSearchRouter()); + app.use("/search", cacheMiddleware({ ttlMs: SEARCH_CACHE_TTL_MS }), createSearchRouter()); // ── Helpers ────────────────────────────────────────────────────────────────── const parseIntParam = (val: unknown, fallback: number): number => { diff --git a/src/cache/__tests__/redis.test.ts b/src/cache/__tests__/redis.test.ts new file mode 100644 index 00000000..55137e40 --- /dev/null +++ b/src/cache/__tests__/redis.test.ts @@ -0,0 +1,246 @@ +import request from "supertest"; +import express, { Request, Response } from "express"; +import { + cacheMiddleware, + cacheConfigFromEnv, + defaultKeyFn, + RedisCache, + type CacheClient, + type CacheConfig, +} from "../redis"; + +// ── In-memory fake of the narrow CacheClient surface ──────────────────────── +class FakeRedis implements CacheClient { + store = new Map(); + getCalls = 0; + setCalls = 0; + + async get(key: string): Promise { + this.getCalls += 1; + return this.store.has(key) ? (this.store.get(key) as string) : null; + } + + async set(key: string, value: string, _mode: "PX", _ttlMs: number): Promise { + this.setCalls += 1; + this.store.set(key, value); + return "OK"; + } + + async del(...keys: string[]): Promise { + for (const k of keys) this.store.delete(k); + return keys.length; + } +} + +const enabledConfig: CacheConfig = { + enabled: true, + redisUrl: "redis://unused", + keyPrefix: "test:", +}; + +/** + * Builds an app whose handler records how many times it actually ran, so a + * cache hit is observable as "origin not invoked". + */ +function buildApp(client: CacheClient | null, config: CacheConfig = enabledConfig) { + const app = express(); + let originHits = 0; + + app.get( + "/widgets", + cacheMiddleware({ ttlMs: 1000, client, config }), + (_req: Request, res: Response) => { + originHits += 1; + res.json({ value: "fresh", originHits }); + }, + ); + + app.get( + "/maybe-error", + cacheMiddleware({ ttlMs: 1000, client, config }), + (req: Request, res: Response) => { + originHits += 1; + res.status(500).json({ error: "boom" }); + }, + ); + + return { app, getOriginHits: () => originHits }; +} + +describe("cacheMiddleware", () => { + it("misses on the first request and runs the origin handler", async () => { + const redis = new FakeRedis(); + const { app, getOriginHits } = buildApp(redis); + + const res = await request(app).get("/widgets").query({ a: "1" }); + + expect(res.status).toBe(200); + expect(res.headers["x-cache"]).toBe("MISS"); + expect(res.body.value).toBe("fresh"); + expect(getOriginHits()).toBe(1); + expect(redis.setCalls).toBe(1); // response written back + }); + + it("serves a hit from cache without re-running the origin handler", async () => { + const redis = new FakeRedis(); + const { app, getOriginHits } = buildApp(redis); + + const first = await request(app).get("/widgets").query({ a: "1" }); + const second = await request(app).get("/widgets").query({ a: "1" }); + + expect(first.headers["x-cache"]).toBe("MISS"); + expect(second.headers["x-cache"]).toBe("HIT"); + // Origin ran exactly once; the second response came from Redis. + expect(getOriginHits()).toBe(1); + expect(second.body).toEqual(first.body); + }); + + it("treats different query strings as different keys", async () => { + const redis = new FakeRedis(); + const { app, getOriginHits } = buildApp(redis); + + await request(app).get("/widgets").query({ a: "1" }); + const other = await request(app).get("/widgets").query({ a: "2" }); + + expect(other.headers["x-cache"]).toBe("MISS"); + expect(getOriginHits()).toBe(2); + }); + + describe("X-No-Cache header", () => { + it("bypasses the read but still refreshes the stored value", async () => { + const redis = new FakeRedis(); + const { app, getOriginHits } = buildApp(redis); + + // Prime the cache. + await request(app).get("/widgets").query({ a: "1" }); + expect(getOriginHits()).toBe(1); + + // Bypass: origin must run again despite the entry existing. + const bypass = await request(app) + .get("/widgets") + .query({ a: "1" }) + .set("X-No-Cache", "1"); + + expect(bypass.headers["x-cache"]).toBe("BYPASS"); + expect(getOriginHits()).toBe(2); + + // The bypass refreshed the entry, so the next plain request is a HIT + // that does not run the origin. + const after = await request(app).get("/widgets").query({ a: "1" }); + expect(after.headers["x-cache"]).toBe("HIT"); + expect(getOriginHits()).toBe(2); + }); + }); + + it("does not cache non-2xx responses", async () => { + const redis = new FakeRedis(); + const { app, getOriginHits } = buildApp(redis); + + const first = await request(app).get("/maybe-error"); + const second = await request(app).get("/maybe-error"); + + expect(first.status).toBe(500); + expect(second.status).toBe(500); + expect(redis.setCalls).toBe(0); + expect(getOriginHits()).toBe(2); // never served from cache + }); + + it("passes through transparently when caching is disabled", async () => { + const redis = new FakeRedis(); + const { app, getOriginHits } = buildApp(null, { ...enabledConfig, enabled: false }); + + const res = await request(app).get("/widgets").query({ a: "1" }); + + expect(res.status).toBe(200); + expect(res.headers["x-cache"]).toBeUndefined(); + expect(getOriginHits()).toBe(1); + expect(redis.setCalls).toBe(0); + }); + + it("falls through to the origin when a cache read throws", async () => { + const flaky: CacheClient = { + get: jest.fn().mockRejectedValue(new Error("connection reset")), + set: jest.fn().mockResolvedValue("OK"), + del: jest.fn().mockResolvedValue(0), + }; + const { app, getOriginHits } = buildApp(flaky); + + const res = await request(app).get("/widgets"); + + expect(res.status).toBe(200); + expect(res.body.value).toBe("fresh"); + expect(getOriginHits()).toBe(1); + }); +}); + +describe("defaultKeyFn", () => { + const baseReq = (query: Record): Request => + ({ method: "GET", baseUrl: "/search", path: "/", query } as unknown as Request); + + it("is stable regardless of query param order", () => { + expect(defaultKeyFn(baseReq({ a: "1", b: "2" }))).toBe( + defaultKeyFn(baseReq({ b: "2", a: "1" })), + ); + }); + + it("encodes method, path and sorted query", () => { + expect(defaultKeyFn(baseReq({ b: "2", a: "1" }))).toBe("GET:/search/?a=1&b=2"); + }); + + it("flattens array query values", () => { + expect(defaultKeyFn(baseReq({ tag: ["x", "y"] }))).toBe("GET:/search/?tag=x,y"); + }); +}); + +describe("RedisCache", () => { + it("round-trips JSON values through get/set", async () => { + const redis = new FakeRedis(); + const cache = new RedisCache(redis, "p:"); + + await cache.set("k", { hello: "world" }, 500); + expect(redis.store.has("p:k")).toBe(true); + expect(await cache.get<{ hello: string }>("k")).toEqual({ hello: "world" }); + }); + + it("returns null for a missing key", async () => { + const cache = new RedisCache(new FakeRedis()); + expect(await cache.get("nope")).toBeNull(); + }); + + it("returns null (a miss) for a corrupt stored value", async () => { + const redis = new FakeRedis(); + redis.store.set("p:bad", "{not json"); + const cache = new RedisCache(redis, "p:"); + expect(await cache.get("bad")).toBeNull(); + }); + + it("deletes a key", async () => { + const redis = new FakeRedis(); + const cache = new RedisCache(redis, "p:"); + await cache.set("k", 1, 500); + await cache.del("k"); + expect(redis.store.has("p:k")).toBe(false); + }); +}); + +describe("cacheConfigFromEnv", () => { + it("is disabled by default", () => { + expect(cacheConfigFromEnv({}).enabled).toBe(false); + }); + + it("enables on CACHE_ENABLED=true or 1", () => { + expect(cacheConfigFromEnv({ CACHE_ENABLED: "true" }).enabled).toBe(true); + expect(cacheConfigFromEnv({ CACHE_ENABLED: "1" }).enabled).toBe(true); + expect(cacheConfigFromEnv({ CACHE_ENABLED: "yes" }).enabled).toBe(false); + }); + + it("reads REDIS_URL and CACHE_KEY_PREFIX", () => { + const cfg = cacheConfigFromEnv({ + CACHE_ENABLED: "true", + REDIS_URL: "redis://example:6379", + CACHE_KEY_PREFIX: "x:", + }); + expect(cfg.redisUrl).toBe("redis://example:6379"); + expect(cfg.keyPrefix).toBe("x:"); + }); +}); diff --git a/src/cache/redis.ts b/src/cache/redis.ts new file mode 100644 index 00000000..41e67dfe --- /dev/null +++ b/src/cache/redis.ts @@ -0,0 +1,195 @@ +import type { Request, Response, NextFunction, RequestHandler } from "express"; +import type { Redis } from "ioredis"; + +/** + * Opt-in Redis response cache. + * + * The hot read endpoints (`/assets/popular`, `/search`) re-run the same handful + * of queries thousands of times an hour for identical query strings. A small + * Redis layer with a per-route TTL collapses those into a single cache hit. + * + * Caching is **off by default**. Set `CACHE_ENABLED=true` (and optionally + * `REDIS_URL`) to turn it on. When disabled — or when Redis is unreachable — the + * middleware degrades to a transparent pass-through so the API never depends on + * the cache being up. + * + * Clients can force a fresh response with the `X-No-Cache` header: the cached + * value is ignored on read but the fresh response is still written back, so the + * next caller benefits. + */ + +// ── Minimal client surface ──────────────────────────────────────────────────── +// We only use these three commands, so the cache (and its tests) depend on this +// narrow interface rather than the full ioredis type. +export interface CacheClient { + get(key: string): Promise; + set(key: string, value: string, mode: "PX", ttlMs: number): Promise; + del(...keys: string[]): Promise; +} + +// ── Config ──────────────────────────────────────────────────────────────────── + +export interface CacheConfig { + enabled: boolean; + redisUrl: string; + /** Prefix applied to every key, namespacing this app's entries. */ + keyPrefix: string; +} + +export function cacheConfigFromEnv(env: NodeJS.ProcessEnv = process.env): CacheConfig { + return { + enabled: env.CACHE_ENABLED === "true" || env.CACHE_ENABLED === "1", + redisUrl: env.REDIS_URL ?? "redis://localhost:6379", + keyPrefix: env.CACHE_KEY_PREFIX ?? "wraith:cache:", + }; +} + +// ── Lazy singleton client ───────────────────────────────────────────────────── + +let sharedClient: CacheClient | null = null; + +/** + * Returns the process-wide Redis client, constructing it on first use. Returns + * `null` when caching is disabled so callers can cheaply skip all cache work. + * + * `ioredis` is imported lazily so environments that never enable caching don't + * pay the connection/setup cost (and tests can run without it installed). + */ +export function getCacheClient(config: CacheConfig = cacheConfigFromEnv()): CacheClient | null { + if (!config.enabled) return null; + if (sharedClient) return sharedClient; + + // eslint-disable-next-line @typescript-eslint/no-var-requires + const IORedis = require("ioredis") as { default?: new (url: string) => Redis } & (new (url: string) => Redis); + const Ctor = IORedis.default ?? IORedis; + const redis = new Ctor(config.redisUrl); + // A failed connection must never crash the process — log once and let the + // middleware's try/catch fall through to the origin handler. + redis.on("error", (err: Error) => { + console.error("[cache] redis error:", err.message); + }); + sharedClient = redis as unknown as CacheClient; + return sharedClient; +} + +/** Test seam: inject a fake client (or reset with `null`). */ +export function setCacheClient(client: CacheClient | null): void { + sharedClient = client; +} + +// ── Cache primitive ─────────────────────────────────────────────────────────── + +export class RedisCache { + constructor( + private readonly client: CacheClient, + private readonly keyPrefix = "wraith:cache:", + ) {} + + private prefixed(key: string): string { + return `${this.keyPrefix}${key}`; + } + + async get(key: string): Promise { + const raw = await this.client.get(this.prefixed(key)); + if (raw === null) return null; + try { + return JSON.parse(raw) as T; + } catch { + // A corrupt/foreign value behaves like a miss rather than throwing. + return null; + } + } + + async set(key: string, value: unknown, ttlMs: number): Promise { + await this.client.set(this.prefixed(key), JSON.stringify(value), "PX", ttlMs); + } + + async del(key: string): Promise { + await this.client.del(this.prefixed(key)); + } +} + +// ── Middleware ──────────────────────────────────────────────────────────────── + +export interface CacheMiddlewareOptions { + /** Time-to-live for this route's entries, in milliseconds. */ + ttlMs: number; + /** + * Derives a cache key from the request. Defaults to method + path + a stable, + * sorted serialization of the query string. + */ + keyFn?: (req: Request) => string; + /** Inject a client/config (primarily for tests). */ + client?: CacheClient | null; + config?: CacheConfig; +} + +const HEADER_NO_CACHE = "x-no-cache"; +const HEADER_CACHE_STATUS = "X-Cache"; + +/** Stable key: sorts query params so `?a=1&b=2` and `?b=2&a=1` collide. */ +export function defaultKeyFn(req: Request): string { + const entries = Object.entries(req.query as Record) + .map(([k, v]) => [k, Array.isArray(v) ? v.join(",") : String(v)] as const) + .sort(([a], [b]) => a.localeCompare(b)); + const qs = entries.map(([k, v]) => `${k}=${v}`).join("&"); + return `${req.method}:${req.baseUrl}${req.path}?${qs}`; +} + +/** + * Returns an Express middleware that caches JSON responses for a single route. + * + * - On a hit, replies from Redis and sets `X-Cache: HIT`. + * - On a miss, runs the handler, captures the `res.json(...)` body, writes it + * back with the route's TTL, and sets `X-Cache: MISS`. + * - With the `X-No-Cache` request header, skips the read but still refreshes the + * stored value (`X-Cache: BYPASS`). + * - When caching is disabled or Redis errors, passes straight through. + * + * Only successful (2xx) JSON responses are stored. + */ +export function cacheMiddleware(options: CacheMiddlewareOptions): RequestHandler { + const config = options.config ?? cacheConfigFromEnv(); + const keyFn = options.keyFn ?? defaultKeyFn; + + return async function cache(req: Request, res: Response, next: NextFunction): Promise { + const client = options.client !== undefined ? options.client : getCacheClient(config); + if (!client) { + next(); + return; + } + + const store = new RedisCache(client, config.keyPrefix); + const key = keyFn(req); + const bypassRead = req.header(HEADER_NO_CACHE) !== undefined; + + if (!bypassRead) { + try { + const hit = await store.get(key); + if (hit !== null) { + res.setHeader(HEADER_CACHE_STATUS, "HIT"); + res.json(hit); + return; + } + } catch (err) { + // Read failure → treat as a miss and fall through to the origin. + console.error("[cache] read failed:", (err as Error).message); + } + } + + // Wrap res.json so we can capture the payload on the way out. + const originalJson = res.json.bind(res); + res.json = (body: unknown): Response => { + if (res.statusCode >= 200 && res.statusCode < 300) { + // Fire-and-forget: a write failure must not delay or break the response. + store.set(key, body, options.ttlMs).catch((err: Error) => { + console.error("[cache] write failed:", err.message); + }); + } + return originalJson(body); + }; + + res.setHeader(HEADER_CACHE_STATUS, bypassRead ? "BYPASS" : "MISS"); + next(); + }; +} From 1ae82ff28b5967e68f395ae5f192ca35286533fa Mon Sep 17 00:00:00 2001 From: githoboman Date: Mon, 29 Jun 2026 21:32:45 +0100 Subject: [PATCH 2/3] feat: implement contract liveness tracking with tombstone database schema and indexer utilities --- .../migration.sql | 17 ++ prisma/schema.prisma | 25 +++ src/__tests__/tombstones.test.ts | 177 ++++++++++++++++++ src/indexer/tombstones.ts | 177 ++++++++++++++++++ 4 files changed, 396 insertions(+) create mode 100644 prisma/migrations/20260629120000_add_contract_tombstones/migration.sql create mode 100644 src/__tests__/tombstones.test.ts create mode 100644 src/indexer/tombstones.ts diff --git a/prisma/migrations/20260629120000_add_contract_tombstones/migration.sql b/prisma/migrations/20260629120000_add_contract_tombstones/migration.sql new file mode 100644 index 00000000..3c726cd3 --- /dev/null +++ b/prisma/migrations/20260629120000_add_contract_tombstones/migration.sql @@ -0,0 +1,17 @@ +-- Track contract liveness: one tombstone row per contract whose persistent +-- storage instance entry has expired (liveUntilLedger fell behind the current +-- ledger). Downstream consumers watch this table for a "contract gone" signal. +CREATE TABLE "wraith"."ContractTombstone" ( + "id" SERIAL NOT NULL, + "contractId" TEXT NOT NULL, + "liveUntilLedger" INTEGER NOT NULL, + "detectedLedger" INTEGER NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "ContractTombstone_pkey" PRIMARY KEY ("id") +); + +-- One tombstone per contract; first expiry detection wins, re-detection is a no-op. +CREATE UNIQUE INDEX "ContractTombstone_contractId_key" ON "wraith"."ContractTombstone"("contractId"); + +CREATE INDEX "ContractTombstone_detectedLedger_idx" ON "wraith"."ContractTombstone"("detectedLedger"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 9b50798c..089ee611 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -214,6 +214,31 @@ model RetentionJobRun { @@schema("wraith") } +// ─── Contract Tombstones ──────────────────────────────────────────────────── +// A tombstone marks a contract whose persistent storage has expired (its +// instance entry's liveUntilLedger fell behind the current ledger). Downstream +// consumers watch this table for a "the contract is gone" signal. One row per +// contract — the first expiry detection wins; re-detection is idempotent. +model ContractTombstone { + id Int @id @default(autoincrement()) + + // The contract whose storage expired (C...) + contractId String @unique + + // The contract instance's liveUntilLedger at the moment of expiry — the last + // ledger for which the entry was still live. + liveUntilLedger Int + + // The ledger at which we observed the entry had expired + // (detectedLedger > liveUntilLedger). + detectedLedger Int + + createdAt DateTime @default(now()) + + @@index([detectedLedger]) + @@schema("wraith") +} + // ─── Backfill Cursor ─────────────────────────────────────────────────────────── // Durable cursor so the backfill job can resume mid-range after a crash. // Always row ID 1 — singleton, one backfill at a time. diff --git a/src/__tests__/tombstones.test.ts b/src/__tests__/tombstones.test.ts new file mode 100644 index 00000000..55fd2146 --- /dev/null +++ b/src/__tests__/tombstones.test.ts @@ -0,0 +1,177 @@ +/** + * Tests for contract tombstones — liveness tracking + expiry detection. + * + * Covers the pure expiry rule, the injectable TTL fetcher / detection helpers, + * and the idempotent insert path. No network is used — a fake TTL fetcher is + * injected throughout — and the Prisma client is mocked for the persistence + * tests, mirroring the sac-detect test style. + */ + +import { describe, it, expect, jest, beforeEach } from "@jest/globals"; + +// Mock the Prisma client before importing the module under test so the +// `insertTombstones` path exercises a fake `createMany`. +const createMany = jest.fn< + (args: { data: unknown[]; skipDuplicates: boolean }) => Promise<{ count: number }> +>(); +jest.mock("../db", () => ({ + prisma: { contractTombstone: { createMany } }, +})); + +import { + isExpired, + tombstoneFor, + fetchLiveness, + detectExpiredContracts, + insertTombstones, + tombstoneExpiredContracts, + type TtlFetcher, + type ContractLiveness, + type TombstoneRecord, +} from "../indexer/tombstones"; + +// ─── Fixture ────────────────────────────────────────────────────────────────── +// A small fleet of contracts with known TTLs, and the "current" ledger we +// evaluate them against. CALIVE is comfortably live; CEXPIRED expired one ledger +// ago; CBORDER's liveUntil equals the current ledger (still live, edge case); +// CUNKNOWN has no resolvable TTL. +const CURRENT_LEDGER = 1_000; + +const TTL_FIXTURE: Record = { + CALIVE: 5_000, + CEXPIRED: 999, + CBORDER: 1_000, + CUNKNOWN: null, +}; + +const fixtureFetcher: TtlFetcher = async (contractId) => + contractId in TTL_FIXTURE ? TTL_FIXTURE[contractId] : null; + +beforeEach(() => { + createMany.mockReset(); +}); + +describe("isExpired", () => { + it("is false while the current ledger is before liveUntil", () => { + expect(isExpired(5_000, 1_000)).toBe(false); + }); + + it("is false on the exact liveUntil ledger (live through that ledger)", () => { + expect(isExpired(1_000, 1_000)).toBe(false); + }); + + it("is true once the current ledger passes liveUntil", () => { + expect(isExpired(999, 1_000)).toBe(true); + }); + + it("never treats an unknown TTL as expired", () => { + expect(isExpired(null, 1_000)).toBe(false); + }); +}); + +describe("tombstoneFor", () => { + it("returns a tombstone for an expired contract", () => { + const liveness: ContractLiveness = { contractId: "CEXPIRED", liveUntilLedger: 999 }; + expect(tombstoneFor(liveness, CURRENT_LEDGER)).toEqual({ + contractId: "CEXPIRED", + liveUntilLedger: 999, + detectedLedger: CURRENT_LEDGER, + }); + }); + + it("returns null for a live contract", () => { + expect(tombstoneFor({ contractId: "CALIVE", liveUntilLedger: 5_000 }, CURRENT_LEDGER)).toBeNull(); + }); + + it("returns null when the TTL is unknown", () => { + expect(tombstoneFor({ contractId: "CUNKNOWN", liveUntilLedger: null }, CURRENT_LEDGER)).toBeNull(); + }); +}); + +describe("fetchLiveness", () => { + it("resolves liveness via the injected fetcher", async () => { + const liveness = await fetchLiveness(["CALIVE", "CEXPIRED"], fixtureFetcher); + expect(liveness).toEqual([ + { contractId: "CALIVE", liveUntilLedger: 5_000 }, + { contractId: "CEXPIRED", liveUntilLedger: 999 }, + ]); + }); + + it("de-duplicates contract IDs so each is fetched once", async () => { + const fetcher = jest.fn(async () => 5_000); + await fetchLiveness(["CA", "CA", "CB"], fetcher); + expect(fetcher).toHaveBeenCalledTimes(2); + }); +}); + +describe("detectExpiredContracts", () => { + it("emits tombstones only for contracts past their liveUntil", async () => { + const tombstones = await detectExpiredContracts( + ["CALIVE", "CEXPIRED", "CBORDER", "CUNKNOWN"], + CURRENT_LEDGER, + fixtureFetcher, + ); + + expect(tombstones).toEqual([ + { contractId: "CEXPIRED", liveUntilLedger: 999, detectedLedger: CURRENT_LEDGER }, + ]); + }); + + it("returns an empty array when every contract is still live", async () => { + const tombstones = await detectExpiredContracts(["CALIVE", "CBORDER"], CURRENT_LEDGER, fixtureFetcher); + expect(tombstones).toEqual([]); + }); +}); + +describe("insertTombstones", () => { + it("inserts on expiry detection, idempotently by contractId", async () => { + createMany.mockResolvedValue({ count: 1 }); + + const records: TombstoneRecord[] = [ + { contractId: "CEXPIRED", liveUntilLedger: 999, detectedLedger: CURRENT_LEDGER }, + ]; + const inserted = await insertTombstones(records); + + expect(inserted).toBe(1); + expect(createMany).toHaveBeenCalledWith({ data: records, skipDuplicates: true }); + }); + + it("skips the DB entirely for an empty batch", async () => { + const inserted = await insertTombstones([]); + expect(inserted).toBe(0); + expect(createMany).not.toHaveBeenCalled(); + }); +}); + +describe("tombstoneExpiredContracts", () => { + it("detects expiry and persists exactly the expired contracts", async () => { + createMany.mockResolvedValue({ count: 1 }); + + const { tombstones, inserted } = await tombstoneExpiredContracts( + ["CALIVE", "CEXPIRED", "CUNKNOWN"], + CURRENT_LEDGER, + fixtureFetcher, + ); + + expect(tombstones).toEqual([ + { contractId: "CEXPIRED", liveUntilLedger: 999, detectedLedger: CURRENT_LEDGER }, + ]); + expect(inserted).toBe(1); + expect(createMany).toHaveBeenCalledWith({ + data: [{ contractId: "CEXPIRED", liveUntilLedger: 999, detectedLedger: CURRENT_LEDGER }], + skipDuplicates: true, + }); + }); + + it("writes nothing when no contract has expired", async () => { + const { tombstones, inserted } = await tombstoneExpiredContracts( + ["CALIVE", "CBORDER"], + CURRENT_LEDGER, + fixtureFetcher, + ); + + expect(tombstones).toEqual([]); + expect(inserted).toBe(0); + expect(createMany).not.toHaveBeenCalled(); + }); +}); diff --git a/src/indexer/tombstones.ts b/src/indexer/tombstones.ts new file mode 100644 index 00000000..680bc322 --- /dev/null +++ b/src/indexer/tombstones.ts @@ -0,0 +1,177 @@ +/** + * Contract tombstones — liveness tracking for expired contract storage. + * + * Soroban persistent state is not permanent: every contract's instance ledger + * entry carries a `liveUntilLedger`. Once the network's current ledger passes + * that value the entry is archived and the contract effectively disappears — + * its storage can no longer be read without a restore. Downstream consumers + * that cached a contract's events need a signal that this has happened, so when + * we observe an expiry we emit a *tombstone* row. + * + * Detection strategy (mirrors sac-detect's instance lookup, #136): + * 1. Read the contract instance ledger entry (ContractData keyed by + * `ScVal::LedgerKeyContractInstance`) and pull its `liveUntilLedgerSeq`. + * 2. Compare against the current ledger: expired when current > liveUntil. + * 3. Insert one tombstone per contract, idempotently — the first detection + * wins; re-observing the same expiry is a no-op. + * + * The TTL fetcher is injectable so the detection logic can be exercised without + * a network round-trip. + */ + +import { xdr } from "@stellar/stellar-sdk"; +import { getRpc } from "../rpc"; +import { prisma } from "../db"; + +// ─── Types ──────────────────────────────────────────────────────────────────── + +/** + * Liveness of a single contract's instance entry at a point in time. + * `liveUntilLedger` is null when the entry has no TTL / could not be resolved. + */ +export interface ContractLiveness { + contractId: string; + liveUntilLedger: number | null; +} + +/** A tombstone ready to be persisted. */ +export interface TombstoneRecord { + contractId: string; + liveUntilLedger: number; + detectedLedger: number; +} + +// ─── Pure helpers ───────────────────────────────────────────────────────────── + +/** + * True when a contract instance has expired relative to `currentLedger`. + * + * An entry is live *through* its `liveUntilLedger`, so it is only expired once + * the current ledger has moved strictly past it. A null TTL (unknown / no entry) + * is never treated as expired — we don't tombstone on missing information. + */ +export function isExpired( + liveUntilLedger: number | null, + currentLedger: number, +): boolean { + if (liveUntilLedger === null) return false; + return currentLedger > liveUntilLedger; +} + +/** + * Build a tombstone for an expired contract, or null if it is still live (or its + * TTL is unknown). Keeping this pure makes the expiry rule trivially testable. + */ +export function tombstoneFor( + liveness: ContractLiveness, + currentLedger: number, +): TombstoneRecord | null { + if (!isExpired(liveness.liveUntilLedger, currentLedger)) return null; + return { + contractId: liveness.contractId, + liveUntilLedger: liveness.liveUntilLedger as number, + detectedLedger: currentLedger, + }; +} + +// ─── TTL fetch ──────────────────────────────────────────────────────────────── + +/** + * Resolve the `liveUntilLedger` of a contract's instance entry, or null if the + * contract has no instance entry / the lookup fails. Injectable for testing. + */ +export type TtlFetcher = (contractId: string) => Promise; + +async function fetchLiveUntilLedger(contractId: string): Promise { + try { + const entry = await getRpc().getContractData( + contractId, + xdr.ScVal.scvLedgerKeyContractInstance(), + ); + return entry.liveUntilLedgerSeq ?? null; + } catch { + // Missing entry, already-evicted state, or RPC error — not determinable. + return null; + } +} + +/** + * Read liveness for a set of contracts, de-duplicating IDs so each unique + * contract is looked up at most once. One RPC call per unique contract. + */ +export async function fetchLiveness( + contractIds: Iterable, + fetchTtl: TtlFetcher = fetchLiveUntilLedger, +): Promise { + const unique = [...new Set(contractIds)]; + return Promise.all( + unique.map(async (contractId) => ({ + contractId, + liveUntilLedger: await fetchTtl(contractId), + })), + ); +} + +// ─── Detection ────────────────────────────────────────────────────────────── + +/** + * Compute the tombstones owed for a batch of contracts at `currentLedger`, + * fetching each one's TTL. Pure-ish: does no DB writes, so callers can inspect + * or test the result before persisting. + */ +export async function detectExpiredContracts( + contractIds: Iterable, + currentLedger: number, + fetchTtl: TtlFetcher = fetchLiveUntilLedger, +): Promise { + const liveness = await fetchLiveness(contractIds, fetchTtl); + return liveness + .map((l) => tombstoneFor(l, currentLedger)) + .filter((t): t is TombstoneRecord => t !== null); +} + +// ─── Persistence ────────────────────────────────────────────────────────────── + +/** + * Idempotently insert tombstone rows. Conflicts on `contractId` are ignored — + * the first expiry detection wins, so replaying a ledger range never duplicates + * or overwrites a contract's tombstone. Returns the number of rows inserted. + */ +export async function insertTombstones( + records: TombstoneRecord[], +): Promise { + if (records.length === 0) return 0; + + const result = await prisma.contractTombstone.createMany({ + data: records, + skipDuplicates: true, + }); + + return result.count; +} + +/** + * Detect expired contracts at `currentLedger` and persist a tombstone for each. + * Combines {@link detectExpiredContracts} and {@link insertTombstones}; returns + * the records and how many were newly inserted (vs. already tombstoned). + */ +export async function tombstoneExpiredContracts( + contractIds: Iterable, + currentLedger: number, + fetchTtl: TtlFetcher = fetchLiveUntilLedger, +): Promise<{ tombstones: TombstoneRecord[]; inserted: number }> { + const tombstones = await detectExpiredContracts( + contractIds, + currentLedger, + fetchTtl, + ); + const inserted = await insertTombstones(tombstones); + + if (inserted > 0) { + console.log( + `[tombstone] ${inserted} contract(s) tombstoned at ledger ${currentLedger}`, + ); + } + + return { tombstones, inserted }; +} From 039008cd0e4e512667e53de9e40da78083afcee6 Mon Sep 17 00:00:00 2001 From: githoboman Date: Tue, 30 Jun 2026 05:13:42 +0100 Subject: [PATCH 3/3] feat: implement LP-share transfer indexer and schema to track deposit/withdraw events --- .../migration.sql | 29 ++ prisma/schema.prisma | 30 ++ src/__tests__/lpShares.test.ts | 283 ++++++++++++++++++ src/db.ts | 5 +- src/indexer.ts | 15 +- src/indexer/lp-shares.ts | 225 ++++++++++++++ 6 files changed, 584 insertions(+), 3 deletions(-) create mode 100644 prisma/migrations/20260629130000_add_lp_share_transfers/migration.sql create mode 100644 src/__tests__/lpShares.test.ts create mode 100644 src/indexer/lp-shares.ts diff --git a/prisma/migrations/20260629130000_add_lp_share_transfers/migration.sql b/prisma/migrations/20260629130000_add_lp_share_transfers/migration.sql new file mode 100644 index 00000000..f985f3ee --- /dev/null +++ b/prisma/migrations/20260629130000_add_lp_share_transfers/migration.sql @@ -0,0 +1,29 @@ +-- Surface liquidity-pool deposits/withdrawals as LP-share transfers. The shares +-- have no symbol of their own, so each row carries the pool whose shares moved +-- (poolId = the emitting pool contract). A deposit mints shares to "toAddress" +-- (no from); a withdrawal burns shares from "fromAddress" (no to). +CREATE TABLE "wraith"."LpShareTransfer" ( + "id" SERIAL NOT NULL, + "poolId" TEXT NOT NULL, + "action" TEXT NOT NULL, + "fromAddress" TEXT, + "toAddress" TEXT, + "shares" TEXT NOT NULL, + "ledger" INTEGER NOT NULL, + "ledgerClosedAt" TIMESTAMP(3) NOT NULL, + "txHash" TEXT NOT NULL, + "eventId" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "LpShareTransfer_pkey" PRIMARY KEY ("id") +); + +-- One row per event; replaying an overlapping ledger range never duplicates. +CREATE UNIQUE INDEX "LpShareTransfer_eventId_key" ON "wraith"."LpShareTransfer"("eventId"); + +CREATE INDEX "LpShareTransfer_poolId_idx" ON "wraith"."LpShareTransfer"("poolId"); +CREATE INDEX "LpShareTransfer_toAddress_idx" ON "wraith"."LpShareTransfer"("toAddress"); +CREATE INDEX "LpShareTransfer_fromAddress_idx" ON "wraith"."LpShareTransfer"("fromAddress"); +CREATE INDEX "LpShareTransfer_ledger_idx" ON "wraith"."LpShareTransfer"("ledger"); +CREATE INDEX "LpShareTransfer_txHash_idx" ON "wraith"."LpShareTransfer"("txHash"); +CREATE INDEX "LpShareTransfer_poolId_action_idx" ON "wraith"."LpShareTransfer"("poolId", "action"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 089ee611..ca600783 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -119,6 +119,36 @@ model NftMetadata { @@schema("wraith") } +// ─── LP-Share Transfers ────────────────────────────────────────────────────── +// Liquidity-pool deposits/withdrawals surfaced as LP-share movements. The shares +// have no symbol of their own, so each row carries the pool whose shares moved +// (poolId = the emitting pool contract). A deposit mints shares to toAddress +// (no from); a withdrawal burns shares from fromAddress (no to). +model LpShareTransfer { + id Int @id @default(autoincrement()) + // The liquidity-pool contract whose shares moved (C...) + poolId String + // "deposit" (shares minted to provider) | "withdraw" (shares burned from provider) + action String + fromAddress String? + toAddress String? + // i128 share amount as a decimal string + shares String + ledger Int + ledgerClosedAt DateTime + txHash String + eventId String @unique + createdAt DateTime @default(now()) + + @@index([poolId]) + @@index([toAddress]) + @@index([fromAddress]) + @@index([ledger]) + @@index([txHash]) + @@index([poolId, action]) + @@schema("wraith") +} + // ─── Account Summaries ──────────────────────────────────────────────────────── model AccountSummary { id Int @id @default(autoincrement()) diff --git a/src/__tests__/lpShares.test.ts b/src/__tests__/lpShares.test.ts new file mode 100644 index 00000000..6f13ece9 --- /dev/null +++ b/src/__tests__/lpShares.test.ts @@ -0,0 +1,283 @@ +/** + * Tests for the LP-share transfer indexer. + * + * Covers the two event dialects (explicit deposit/withdraw and bare SEP-41 + * mint/burn of the pool's own share token), share extraction from both bare + * i128 and map-wrapped values, the pure decoder, batch filtering, and the + * idempotent insert path. No network is used; the Prisma client is mocked for + * the persistence test, mirroring the tombstones test style. + */ + +import { describe, it, expect, jest, beforeEach } from "@jest/globals"; +import { xdr, Address, nativeToScVal } from "@stellar/stellar-sdk"; + +// Mock the Prisma client before importing the module so `upsertLpShareTransfers` +// exercises a fake `createMany`. +const createMany = jest.fn< + (args: { data: unknown[]; skipDuplicates: boolean }) => Promise<{ count: number }> +>(); +jest.mock("../db", () => ({ + prisma: { lpShareTransfer: { createMany } }, +})); + +import { + isLpShareEvent, + extractShares, + parseLpShareEvent, + parseLpShareEvents, + upsertLpShareTransfers, + type LpShareTransferRecord, +} from "../indexer/lp-shares"; +import type { RawEvent } from "../rpc"; + +// ─── Fixtures ─────────────────────────────────────────────────────────────── +const ALICE = "GDWCO35QUYQLGO6P7OLW4BZWNMMGGUWNPLRVPLCBVG7YNVDZKUDIW4KN"; +const BOB = "GCXOO7OIJZ2HEOZODLOEISNVO6CBPK4PISRJCZYRFT37H7XGHDLB3C7O"; +const POOL = "CBC42KFZO33TYVFDOUXFRWXYYXHFGH7W5GM4IJQSXKGFINKL2XPP4XTE"; +// A second valid contract address to stand in for a pool's admin in mint events. +const POOL_ADMIN = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"; + +const COMMON: Omit = { + id: "0000000000000000001-00001", + type: "contract", + ledger: 100, + ledgerClosedAt: "2024-01-01T00:00:00Z", + contractId: POOL, + txHash: "abc123txhash", +}; + +/** Explicit AMM deposit: topics=[Symbol("deposit"), Address(provider)], value=i128(shares). */ +function makeDepositEvent(shares: bigint = 1_000n, id = COMMON.id): RawEvent { + return { + ...COMMON, + id, + topic: [nativeToScVal("deposit", { type: "symbol" }), Address.fromString(ALICE).toScVal()], + value: nativeToScVal(shares, { type: "i128" }), + }; +} + +/** Explicit AMM withdraw: topics=[Symbol("withdraw"), Address(provider)], value=i128(shares). */ +function makeWithdrawEvent(shares: bigint = 500n, id = "0000000000000000001-00002"): RawEvent { + return { + ...COMMON, + id, + topic: [nativeToScVal("withdraw", { type: "symbol" }), Address.fromString(BOB).toScVal()], + value: nativeToScVal(shares, { type: "i128" }), + }; +} + +/** SEP-41 share mint: topics=[Symbol("mint"), Address(admin), Address(to)], value=i128. */ +function makeShareMintEvent(shares: bigint = 750n, id = "0000000000000000001-00003"): RawEvent { + return { + ...COMMON, + id, + topic: [ + nativeToScVal("mint", { type: "symbol" }), + Address.fromString(POOL_ADMIN).toScVal(), + Address.fromString(ALICE).toScVal(), + ], + value: nativeToScVal(shares, { type: "i128" }), + }; +} + +/** SEP-41 share burn: topics=[Symbol("burn"), Address(from)], value=i128. */ +function makeShareBurnEvent(shares: bigint = 250n, id = "0000000000000000001-00004"): RawEvent { + return { + ...COMMON, + id, + topic: [nativeToScVal("burn", { type: "symbol" }), Address.fromString(ALICE).toScVal()], + value: nativeToScVal(shares, { type: "i128" }), + }; +} + +/** A non-LP event (plain swap) that must be ignored. */ +function makeSwapEvent(): RawEvent { + return { + ...COMMON, + id: "0000000000000000001-00099", + topic: [nativeToScVal("swap", { type: "symbol" }), Address.fromString(ALICE).toScVal()], + value: nativeToScVal(1n, { type: "i128" }), + }; +} + +beforeEach(() => { + createMany.mockReset(); +}); + +// ─── extractShares ──────────────────────────────────────────────────────────── +describe("extractShares", () => { + it("reads a bare i128 value", () => { + expect(extractShares(nativeToScVal(1_234n, { type: "i128" }))).toBe("1234"); + }); + + it("returns the absolute value (shares are never negative)", () => { + expect(extractShares(nativeToScVal(-99n, { type: "i128" }))).toBe("99"); + }); + + it("preserves precision for very large amounts", () => { + const big = 2n ** 100n + 7n; + expect(extractShares(nativeToScVal(big, { type: "i128" }))).toBe(big.toString()); + }); + + it("digs the amount out of a map-wrapped value by share_amount", () => { + const val = nativeToScVal( + { share_amount: 4_200n, amount_a: 1n, amount_b: 2n }, + { type: { share_amount: ["symbol", "i128"], amount_a: ["symbol", "i128"], amount_b: ["symbol", "i128"] } } + ); + expect(extractShares(val)).toBe("4200"); + }); + + it("falls back to the `amount` key when no share-specific key exists", () => { + const val = nativeToScVal({ amount: 88n }, { type: { amount: ["symbol", "i128"] } }); + expect(extractShares(val)).toBe("88"); + }); + + it("returns null when no amount can be found", () => { + expect(extractShares(xdr.ScVal.scvVoid())).toBeNull(); + }); +}); + +// ─── isLpShareEvent ──────────────────────────────────────────────────────────── +describe("isLpShareEvent", () => { + it("accepts an explicit deposit", () => { + expect(isLpShareEvent(makeDepositEvent())).toBe(true); + }); + + it("accepts an explicit withdraw", () => { + expect(isLpShareEvent(makeWithdrawEvent())).toBe(true); + }); + + it("accepts a SEP-41 share mint and burn", () => { + expect(isLpShareEvent(makeShareMintEvent())).toBe(true); + expect(isLpShareEvent(makeShareBurnEvent())).toBe(true); + }); + + it("rejects an unrelated event symbol", () => { + expect(isLpShareEvent(makeSwapEvent())).toBe(false); + }); + + it("rejects an empty topics array", () => { + expect(isLpShareEvent({ ...COMMON, topic: [], value: xdr.ScVal.scvVoid() })).toBe(false); + }); + + it("rejects a deposit with no decodable share amount", () => { + const ev: RawEvent = { + ...COMMON, + topic: [nativeToScVal("deposit", { type: "symbol" }), Address.fromString(ALICE).toScVal()], + value: xdr.ScVal.scvVoid(), + }; + expect(isLpShareEvent(ev)).toBe(false); + }); + + it("rejects a mint with no recipient topic", () => { + const ev: RawEvent = { + ...COMMON, + topic: [nativeToScVal("mint", { type: "symbol" }), Address.fromString(POOL_ADMIN).toScVal()], + value: nativeToScVal(1n, { type: "i128" }), + }; + expect(isLpShareEvent(ev)).toBe(false); + }); +}); + +// ─── parseLpShareEvent ───────────────────────────────────────────────────────── +describe("parseLpShareEvent", () => { + it("decodes a deposit as shares minted to the provider, tagged with the pool", () => { + const r = parseLpShareEvent(makeDepositEvent(1_000n)); + expect(r).not.toBeNull(); + expect(r?.poolId).toBe(POOL); + expect(r?.action).toBe("deposit"); + expect(r?.fromAddress).toBeNull(); + expect(r?.toAddress).toBe(ALICE); + expect(r?.shares).toBe("1000"); + expect(r?.ledger).toBe(100); + expect(r?.txHash).toBe("abc123txhash"); + expect(r?.eventId).toBe(COMMON.id); + expect(r?.ledgerClosedAt).toBeInstanceOf(Date); + }); + + it("decodes a withdraw as shares burned from the provider", () => { + const r = parseLpShareEvent(makeWithdrawEvent(500n)); + expect(r?.action).toBe("withdraw"); + expect(r?.fromAddress).toBe(BOB); + expect(r?.toAddress).toBeNull(); + expect(r?.shares).toBe("500"); + expect(r?.poolId).toBe(POOL); + }); + + it("decodes a SEP-41 share mint to the recipient (topics[2]), not the admin", () => { + const r = parseLpShareEvent(makeShareMintEvent(750n)); + expect(r?.action).toBe("deposit"); + expect(r?.toAddress).toBe(ALICE); + expect(r?.fromAddress).toBeNull(); + expect(r?.shares).toBe("750"); + }); + + it("decodes a SEP-41 share burn from the holder (topics[1])", () => { + const r = parseLpShareEvent(makeShareBurnEvent(250n)); + expect(r?.action).toBe("withdraw"); + expect(r?.fromAddress).toBe(ALICE); + expect(r?.toAddress).toBeNull(); + expect(r?.shares).toBe("250"); + }); + + it("returns null for an unrelated event", () => { + expect(parseLpShareEvent(makeSwapEvent())).toBeNull(); + }); + + it("never throws on a malformed event", () => { + const ev: RawEvent = { + ...COMMON, + topic: [nativeToScVal("deposit", { type: "symbol" }), xdr.ScVal.scvVoid()], + value: nativeToScVal(1n, { type: "i128" }), + }; + expect(parseLpShareEvent(ev)).toBeNull(); + }); +}); + +// ─── parseLpShareEvents ──────────────────────────────────────────────────────── +describe("parseLpShareEvents", () => { + it("extracts only LP events from a mixed batch", () => { + const batch: RawEvent[] = [ + makeDepositEvent(10n, "id-001"), + makeSwapEvent(), + makeWithdrawEvent(20n, "id-002"), + ]; + const records = parseLpShareEvents(batch); + expect(records).toHaveLength(2); + expect(records[0].action).toBe("deposit"); + expect(records[1].action).toBe("withdraw"); + }); + + it("returns an empty array when there are no LP events", () => { + expect(parseLpShareEvents([makeSwapEvent()])).toHaveLength(0); + }); + + it("returns an empty array for an empty batch", () => { + expect(parseLpShareEvents([])).toHaveLength(0); + }); +}); + +// ─── upsertLpShareTransfers ──────────────────────────────────────────────────── +describe("upsertLpShareTransfers", () => { + it("bulk-inserts idempotently, skipping duplicates", async () => { + createMany.mockResolvedValue({ count: 2 }); + const records = parseLpShareEvents([makeDepositEvent(1n, "a"), makeWithdrawEvent(2n, "b")]); + + const inserted = await upsertLpShareTransfers(records); + + expect(inserted).toBe(2); + expect(createMany).toHaveBeenCalledWith({ data: records, skipDuplicates: true }); + }); + + it("skips the DB entirely for an empty batch", async () => { + const inserted = await upsertLpShareTransfers([]); + expect(inserted).toBe(0); + expect(createMany).not.toHaveBeenCalled(); + }); + + it("returns the count reported by the database", async () => { + createMany.mockResolvedValue({ count: 1 }); + const records: LpShareTransferRecord[] = parseLpShareEvents([makeDepositEvent(5n, "dup")]); + expect(await upsertLpShareTransfers(records)).toBe(1); + }); +}); diff --git a/src/db.ts b/src/db.ts index 338f4f7f..b8e91c13 100644 --- a/src/db.ts +++ b/src/db.ts @@ -436,10 +436,11 @@ export async function getNftMetadata( */ export async function rollbackToLedger(targetLedger: number): Promise { // Perform deletes and state update atomically. - const [deletedTransfers, deletedNftTransfers, deletedHostFnLogs, _state] = await prisma.$transaction([ + const [deletedTransfers, deletedNftTransfers, deletedHostFnLogs, deletedLpTransfers, _state] = await prisma.$transaction([ prisma.tokenTransfer.deleteMany({ where: { ledger: { gt: targetLedger } } }), prisma.nftTransfer.deleteMany({ where: { ledger: { gt: targetLedger } } }), prisma.hostFnLog.deleteMany({ where: { ledger: { gt: targetLedger } } }), + prisma.lpShareTransfer.deleteMany({ where: { ledger: { gt: targetLedger } } }), prisma.indexerState.upsert({ where: { id: 1 }, create: { id: 1, lastIndexedLedger: targetLedger }, @@ -448,7 +449,7 @@ export async function rollbackToLedger(targetLedger: number): Promise { ]); const totalDeleted = - (deletedTransfers?.count ?? 0) + (deletedNftTransfers?.count ?? 0) + (deletedHostFnLogs?.count ?? 0); + (deletedTransfers?.count ?? 0) + (deletedNftTransfers?.count ?? 0) + (deletedHostFnLogs?.count ?? 0) + (deletedLpTransfers?.count ?? 0); if (totalDeleted > 0) { console.log(`[reorg] Rolled back to ledger ${targetLedger}, deleted ${totalDeleted} rows`); diff --git a/src/indexer.ts b/src/indexer.ts index 3cdb051c..606afeae 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -14,6 +14,7 @@ import { import { emitTransfer } from "./events"; import { parseHostFnEvent, upsertHostFnLogs, type HostFnRecord } from "./indexer/host-fn-log"; import { tagSacTransfers } from "./indexer/sac-detect"; +import { parseLpShareEvents, upsertLpShareTransfers } from "./indexer/lp-shares"; import { pollParallel } from "./indexer/parallel"; import { isNftTransferEvent, parseNftEvents, fetchNftMetadata } from "./ingester/nft"; import { createSourceSwitcherWithConfig } from "./indexer/sources"; @@ -170,6 +171,18 @@ async function pollOnce( ); } + // ── LP-share path ────────────────────────────────────────────────────────── + // Decode pool deposits/withdrawals as LP-share transfers tagged with the pool + // ID. Best-effort and additive: deposit/withdraw events are ignored by the + // fungible path, while a pool's own share mint/burn is recorded here in + // addition to its token-transfer row. + const lpRecords = parseLpShareEvents(events); + const lpInserted = await upsertLpShareTransfers(lpRecords).catch((e) => { + console.error("[indexer] LP-share upsert failed:", e); + return 0; + }); + totalIndexed += lpInserted; + // ── NFT path ───────────────────────────────────────────────────────────────── const nftParsed = parseNftEvents(nftRawEvents); const nftRecords = nftParsed.map((p) => p.record); @@ -196,7 +209,7 @@ async function pollOnce( await setLastIndexedLedger(highestLedger); console.log( - `[indexer] Processed ${events.length} events → ${inserted} fungible + ${nftInserted} NFT records saved (ledger ${highestLedger})` + `[indexer] Processed ${events.length} events → ${inserted} fungible + ${nftInserted} NFT + ${lpInserted} LP-share records saved (ledger ${highestLedger})` ); return highestLedger; diff --git a/src/indexer/lp-shares.ts b/src/indexer/lp-shares.ts new file mode 100644 index 00000000..d5bfe2a4 --- /dev/null +++ b/src/indexer/lp-shares.ts @@ -0,0 +1,225 @@ +/** + * LP-share transfer indexer (liquidity-pool deposits & withdrawals). + * + * When a liquidity provider deposits into a Soroban AMM pool the pool *mints* + * LP-share tokens to them; when they withdraw the pool *burns* those shares. + * The shares are a real token, but they are non-trivially named — an LP position + * in pool `C…` is just "shares of C…", with no symbol of its own — so plain + * SEP-41 indexing buries them. This module surfaces them as first-class + * LP-share transfers, each tagged with the pool whose shares moved. + * + * The pool *is* the contract that emits the event, so the pool ID is always the + * event's `contractId`. We normalise both event dialects seen in the wild: + * + * Explicit deposit/withdraw (Soroswap, Phoenix, …): + * topics[0] = Symbol("deposit" | "withdraw") + * topics[1] = Address(provider) ← the LP + * value = i128(shares) | Map{ shares / amount: i128, … } + * + * Bare SEP-41 mint/burn of the pool's own share token (native liquidity_pool + * SAC and pools that emit the token event directly): + * mint: topics[0]=Symbol("mint"), topics[2]=Address(to), value=i128(shares) + * burn: topics[0]=Symbol("burn"), topics[1]=Address(from), value=i128(shares) + * + * Deposits are modelled as a transfer *to* the provider (shares minted, no + * sender); withdrawals as a transfer *from* the provider (shares burned, no + * recipient) — mirroring how `decoder.ts` treats mint/burn. + * + * Decoding is pure and never throws; a malformed or unrecognised event yields + * null so a single bad event can never stall ingest. + */ + +import * as StellarSdk from "@stellar/stellar-sdk"; +import { prisma } from "../db"; +import type { RawEvent } from "../rpc"; + +const { xdr, Address, scValToNative } = StellarSdk; + +// ─── Recognised event types ───────────────────────────────────────────────── +// Symbols (topic[0]) that denote a change in a provider's LP-share balance. +// "deposit"/"withdraw" are the explicit AMM dialect; "mint"/"burn" are the +// SEP-41 share-token dialect emitted by pools that surface the token directly. +const DEPOSIT_EVENTS = new Set(["deposit", "mint"]); +const WITHDRAW_EVENTS = new Set(["withdraw", "burn"]); + +// ─── Types ────────────────────────────────────────────────────────────────── + +/** Direction of an LP-share movement relative to the provider. */ +export type LpAction = "deposit" | "withdraw"; + +/** + * A normalised LP-share transfer. `poolId` is the pool contract whose shares + * moved (always the emitting contract). For a deposit the shares are minted to + * `toAddress` (no `fromAddress`); for a withdrawal they are burned from + * `fromAddress` (no `toAddress`). + */ +export interface LpShareTransferRecord { + poolId: string; + action: LpAction; + fromAddress: string | null; + toAddress: string | null; + shares: string; // i128 share amount as a decimal string + ledger: number; + ledgerClosedAt: Date; + txHash: string; + eventId: string; +} + +// ─── Pure helpers ───────────────────────────────────────────────────────────── + +/** Decode topic[0] to its symbol string, lower-cased, or null if not a symbol. */ +function eventSymbol(raw: RawEvent): string | null { + if (!raw.topic || raw.topic.length === 0) return null; + try { + const native = scValToNative(raw.topic[0]); + return typeof native === "string" ? native.toLowerCase() : null; + } catch { + return null; + } +} + +/** Decode an Address ScVal to a G…/C… string, or null if it is not an address. */ +function decodeAddress(scVal: StellarSdk.xdr.ScVal): string | null { + try { + if (scVal.switch() !== xdr.ScValType.scvAddress()) return null; + return Address.fromScVal(scVal).toString(); + } catch { + return null; + } +} + +/** + * Pull the share amount out of an event value. Pools emit it either as a bare + * i128 or wrapped in a map under a `share_amount` / `shares` / `amount` key + * (Soroswap and friends bundle several figures into the deposit/withdraw value). + * Returns a non-negative decimal string, or null if no amount can be found. + */ +export function extractShares(value: StellarSdk.xdr.ScVal): string | null { + let native: unknown; + try { + native = scValToNative(value); + } catch { + return null; + } + return sharesFromNative(native); +} + +const SHARE_KEYS = ["share_amount", "shares", "share", "amount", "liquidity"]; + +function sharesFromNative(native: unknown): string | null { + if (typeof native === "bigint") return absString(native); + if (typeof native === "number" && Number.isFinite(native)) return absString(BigInt(native)); + if (native !== null && typeof native === "object" && !Array.isArray(native)) { + const map = native as Record; + for (const key of SHARE_KEYS) { + if (key in map) { + const found = sharesFromNative(map[key]); + if (found !== null) return found; + } + } + } + return null; +} + +/** Absolute value of an i128 as a decimal string — shares are never negative. */ +function absString(v: bigint): string { + return (v < 0n ? -v : v).toString(); +} + +// ─── Detection ──────────────────────────────────────────────────────────────── + +/** + * True when an event looks like a liquidity-pool deposit or withdrawal that + * moves LP shares: a recognised symbol, at least one address topic, and a + * decodable share amount. + */ +export function isLpShareEvent(raw: RawEvent): boolean { + const sym = eventSymbol(raw); + if (sym === null) return false; + if (!DEPOSIT_EVENTS.has(sym) && !WITHDRAW_EVENTS.has(sym)) return false; + if (extractShares(raw.value) === null) return false; + return providerAddress(raw, sym) !== null; +} + +/** + * Resolve the provider (the LP whose share balance changed) for an event. + * + * For mint the recipient is topics[2] (topics[1] is the admin/pool, ignored); + * every other recognised dialect carries the provider in topics[1]. + */ +function providerAddress(raw: RawEvent, sym: string): string | null { + const topic = raw.topic ?? []; + if (sym === "mint") { + return topic.length >= 3 ? decodeAddress(topic[2]) : null; + } + return topic.length >= 2 ? decodeAddress(topic[1]) : null; +} + +// ─── Decoding ───────────────────────────────────────────────────────────────── + +/** + * Decode a single raw event into an LpShareTransferRecord, or null if it is not + * a recognised LP deposit/withdraw. Never throws. + */ +export function parseLpShareEvent(raw: RawEvent): LpShareTransferRecord | null { + const sym = eventSymbol(raw); + if (sym === null) return null; + + const isDeposit = DEPOSIT_EVENTS.has(sym); + const isWithdraw = WITHDRAW_EVENTS.has(sym); + if (!isDeposit && !isWithdraw) return null; + + const shares = extractShares(raw.value); + if (shares === null) return null; + + const provider = providerAddress(raw, sym); + if (provider === null) return null; + + const { contractId, ledger, ledgerClosedAt, txHash, id: eventId } = raw; + + return { + poolId: contractId, + action: isDeposit ? "deposit" : "withdraw", + // Deposit mints shares *to* the provider; withdraw burns them *from* it. + fromAddress: isDeposit ? null : provider, + toAddress: isDeposit ? provider : null, + shares, + ledger, + ledgerClosedAt: new Date(ledgerClosedAt), + txHash, + eventId, + }; +} + +/** + * Decode a batch of raw events into LP-share transfers, skipping any that are + * not recognised LP deposits/withdrawals. + */ +export function parseLpShareEvents(rawEvents: RawEvent[]): LpShareTransferRecord[] { + const records: LpShareTransferRecord[] = []; + for (const raw of rawEvents) { + const record = parseLpShareEvent(raw); + if (record) records.push(record); + } + return records; +} + +// ─── Persistence ──────────────────────────────────────────────────────────── + +/** + * Idempotently insert LP-share transfers. Conflicts on `eventId` are ignored — + * replaying an overlapping ledger range never duplicates a row. Returns the + * number of rows newly inserted. + */ +export async function upsertLpShareTransfers( + records: LpShareTransferRecord[], +): Promise { + if (records.length === 0) return 0; + + const result = await prisma.lpShareTransfer.createMany({ + data: records, + skipDuplicates: true, + }); + + return result.count; +}