From 8e731f9efc3ce7543b64ba3f2eaf195ba5b1ab7a Mon Sep 17 00:00:00 2001 From: Miguel Diaz Date: Wed, 15 Apr 2026 17:02:11 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=A6=BA=20server:=20filter=20notifications?= =?UTF-8?q?=20to=20known=20tokens=20only?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .changeset/sharp-sails-guess.md | 5 + cspell.json | 3 + server/hooks/activity.ts | 75 ++++++++--- server/test/hooks/activity.test.ts | 200 ++++++++++++++++++++++++++++- 4 files changed, 263 insertions(+), 20 deletions(-) create mode 100644 .changeset/sharp-sails-guess.md diff --git a/.changeset/sharp-sails-guess.md b/.changeset/sharp-sails-guess.md new file mode 100644 index 0000000000..f4c54dc1ed --- /dev/null +++ b/.changeset/sharp-sails-guess.md @@ -0,0 +1,5 @@ +--- +"@exactly/server": patch +--- + +🦺 filter notifications to known tokens only diff --git a/cspell.json b/cspell.json index 54e7eee97a..c8c8072a64 100644 --- a/cspell.json +++ b/cspell.json @@ -141,10 +141,13 @@ "reentrancy", "rpid", "rustup", + "sadd", "scannability", + "scard", "serde", "simctl", "simplewebauthn", + "sismember", "siwe", "sixalime", "solady", diff --git a/server/hooks/activity.ts b/server/hooks/activity.ts index 5fdaafb9a1..745b20b233 100644 --- a/server/hooks/activity.ts +++ b/server/hooks/activity.ts @@ -17,6 +17,7 @@ import { eq, inArray } from "drizzle-orm"; import { Hono } from "hono"; import * as v from "valibot"; import { bytesToBigInt, createPublicClient, createWalletClient, hexToBigInt, http, rpcSchema, withRetry } from "viem"; +import { anvil } from "viem/chains"; import alchemyAPIKey from "@exactly/common/alchemyAPIKey"; import exaChain, { @@ -40,6 +41,7 @@ import keeper, { extender } from "../utils/keeper"; import { sendPushNotification } from "../utils/onesignal"; import { autoCredit } from "../utils/panda"; import publicClient, { captureRequests, Request } from "../utils/publicClient"; +import redis from "../utils/redis"; import revertFingerprint from "../utils/revertFingerprint"; import { track } from "../utils/segment"; import { trace, type RpcSchema } from "../utils/traceClient"; @@ -138,25 +140,27 @@ export default new Hono().post( if (chain.id === exaChain.id && rawContract?.address && markets.has(rawContract.address)) continue; const asset = rawContract?.address ?? ETH; const underlying = asset === ETH ? WETH : asset; - sendPushNotification({ - userId: account, - headings: t("Funds received"), - contents: t( - chain.id === exaChain.id && marketsByAsset.has(underlying) - ? "{{amount}} received and instantly started earning yield" - : "{{amount}} received", - { - amount: value - ? Object.fromEntries( - Object.entries(f(value)).map(([language, amount]) => [ - language, - assetSymbol ? `${amount} ${assetSymbol}` : amount, - ]), - ) - : assetSymbol, - }, - ), - }).catch((error: unknown) => captureException(error)); + if (marketsByAsset.has(underlying) || (await isKnownToken(chain.id, underlying))) { + sendPushNotification({ + userId: account, + headings: t("Funds received"), + contents: t( + chain.id === exaChain.id && marketsByAsset.has(underlying) + ? "{{amount}} received and instantly started earning yield" + : "{{amount}} received", + { + amount: value + ? Object.fromEntries( + Object.entries(f(value)).map(([language, amount]) => [ + language, + assetSymbol ? `${amount} ${assetSymbol}` : amount, + ]), + ) + : assetSymbol, + }, + ), + }).catch((error: unknown) => captureException(error, { level: "error" })); + } if (pokes.has(account)) { pokes.get(account)?.assets.add(asset); @@ -330,3 +334,36 @@ findWebhook(({ webhook_type, webhook_url }) => webhook_type === "ADDRESS_ACTIVIT signingKeys.add(newHook.signing_key); }) .catch((error: unknown) => captureException(error)); + +async function isKnownToken(chainId: number, address: Address) { + if (chainId === anvil.id) return true; + const key = `lifi:tokens:${chainId}`; + try { + const [[, isMember], [, count]] = v.parse( + v.tuple([v.tuple([v.null(), v.number()]), v.tuple([v.null(), v.number()])]), + await redis.pipeline().sismember(key, address).scard(key).exec(), + ); + if (isMember) return true; + if (count > 0) return false; + const response = await fetch(`https://li.quest/v1/tokens?chains=${chainId}`, { + signal: AbortSignal.timeout(5000), + }); + if (!response.ok) throw new Error(`lifi tokens ${response.status}`); + const { tokens } = v.parse( + v.object({ tokens: v.record(v.string(), v.array(v.object({ address: v.string() }))) }), + await response.json(), + ); + const addresses = (tokens[String(chainId)] ?? []).map((token) => v.parse(Address, token.address)); + if (addresses.length === 0) return true; + await redis + .multi() + .del(key) + .sadd(key, ...addresses) + .expire(key, 3600) + .exec(); + return addresses.includes(address); + } catch (error: unknown) { + captureException(error, { level: "error" }); + return true; + } +} diff --git a/server/test/hooks/activity.test.ts b/server/test/hooks/activity.test.ts index 7fd209720d..eb04394416 100644 --- a/server/test/hooks/activity.test.ts +++ b/server/test/hooks/activity.test.ts @@ -36,6 +36,7 @@ import keeper, * as keeperUtilities from "../../utils/keeper"; import * as onesignal from "../../utils/onesignal"; import * as panda from "../../utils/panda"; import publicClient from "../../utils/publicClient"; +import redis from "../../utils/redis"; import anvilClient from "../anvilClient"; const appClient = testClient(app); @@ -59,6 +60,11 @@ describe("address activity", () => { ]); }); + afterEach(async () => { + const keys = await redis.keys("lifi:tokens:*"); + if (keys.length > 0) await redis.del(...keys); + }); + it("captures no balance once after retries", async () => { vi.spyOn(keeper, "exaSend").mockImplementation((spanOptions) => Promise.resolve( @@ -788,6 +794,7 @@ describe("address activity", () => { const eventExaSend = vi.fn().mockResolvedValue(null); vi.spyOn(keeperUtilities, "extender").mockReturnValueOnce({ exaSend: eventExaSend }); const keeperSend = vi.spyOn(keeper, "exaSend"); + mockLifiTokens({ 1: [{ address: inject("WETH") }] }); const response = await appClient.index.$post({ ...activityPayload, @@ -838,6 +845,7 @@ describe("address activity", () => { const eventExaSend = vi.fn().mockResolvedValue(null); vi.spyOn(keeperUtilities, "extender").mockReturnValueOnce({ exaSend: eventExaSend }); const keeperSend = vi.spyOn(keeper, "exaSend"); + mockLifiTokens({ 1: [{ address: inject("WETH") }] }); const response = await appClient.index.$post({ ...activityPayload, @@ -933,7 +941,7 @@ describe("address activity", () => { await vi.waitUntil(() => vi.mocked(captureException).mock.calls.some(([captured]) => captured === error)); - expect(captureException).toHaveBeenCalledWith(error); + expect(captureException).toHaveBeenCalledWith(error, { level: "error" }); expect(response.status).toBe(200); }); @@ -1067,6 +1075,182 @@ describe("address activity", () => { expect(setUser).toHaveBeenCalledWith({ id: account }); expect(response.status).toBe(200); }); + + describe("lifi token filter", () => { + const optMainnet = NETWORKS.get("OPT_MAINNET"); + if (!optMainnet) throw new Error("missing OPT_MAINNET"); + const tokenAddress = "0x1111111111111111111111111111111111111111" as const; + const optKey = `lifi:tokens:${optMainnet.id}`; + + function lifiPayload(toAddress: Address) { + return { + ...activityPayload, + json: { + ...activityPayload.json, + event: { + network: "OPT_MAINNET", + activity: [ + { + ...activityPayload.json.event.activity[2], + toAddress, + rawContract: { + rawValue: "0x00000000000000000000000000000000000000000000000000000000004c4b40" as const, + address: tokenAddress, + }, + }, + ], + }, + }, + }; + } + + it("fetches from lifi on cache miss and sends notification for known token", async () => { + const sendPushNotification = vi.spyOn(onesignal, "sendPushNotification"); + mockLifiTokens({ [optMainnet.id]: [{ address: tokenAddress }] }); + + const response = await appClient.index.$post(lifiPayload(account)); + + await vi.waitUntil(() => sendPushNotification.mock.calls.length > 0, 5000); + + expect(globalThis.fetch).toHaveBeenCalledWith( + `https://li.quest/v1/tokens?chains=${optMainnet.id}`, + expect.objectContaining({ signal: expect.any(AbortSignal) }), // eslint-disable-line @typescript-eslint/no-unsafe-assignment + ); + expect(sendPushNotification).toHaveBeenCalledExactlyOnceWith({ + userId: account, + headings: t("Funds received"), + contents: t("{{amount}} received", { amount: { en: "5 USDT", es: "5 USDT", pt: "5 USDT" } }), + }); + expect(response.status).toBe(200); + }); + + it("uses redis cache and skips fetch for known token on cache hit", async () => { + const fetchSpy = vi.spyOn(globalThis, "fetch"); + await redis.multi().sadd(optKey, tokenAddress).expire(optKey, 120).exec(); + + const sendPushNotification = vi.spyOn(onesignal, "sendPushNotification"); + const response = await appClient.index.$post(lifiPayload(account)); + + await vi.waitUntil(() => sendPushNotification.mock.calls.length > 0, 5000); + + expect(fetchSpy).not.toHaveBeenCalledWith(expect.stringContaining("li.quest"), expect.anything()); + expect(sendPushNotification).toHaveBeenCalledExactlyOnceWith({ + userId: account, + headings: t("Funds received"), + contents: t("{{amount}} received", { amount: { en: "5 USDT", es: "5 USDT", pt: "5 USDT" } }), + }); + expect(response.status).toBe(200); + }); + + it("suppresses notification for unknown token when cache is initialized", async () => { + const sendPushNotification = vi.spyOn(onesignal, "sendPushNotification"); + + await redis.multi().sadd(optKey, "0x2222222222222222222222222222222222222222").expire(optKey, 120).exec(); + + const response = await appClient.index.$post(lifiPayload(account)); + + expect(sendPushNotification).not.toHaveBeenCalled(); + expect(response.status).toBe(200); + }); + + it("fails open and captures exception when lifi fetch throws", async () => { + const fetchError = new Error("network failure"); + mockLifiTokens(fetchError); + const sendPushNotification = vi.spyOn(onesignal, "sendPushNotification"); + + const response = await appClient.index.$post(lifiPayload(account)); + + await vi.waitUntil(() => sendPushNotification.mock.calls.length > 0, 5000); + + expect(captureException).toHaveBeenCalledWith(fetchError, { level: "error" }); + expect(sendPushNotification).toHaveBeenCalledExactlyOnceWith({ + userId: account, + headings: t("Funds received"), + contents: t("{{amount}} received", { amount: { en: "5 USDT", es: "5 USDT", pt: "5 USDT" } }), + }); + expect(response.status).toBe(200); + }); + + it("fails open and captures exception when lifi returns non ok", async () => { + mockLifiTokens(Response.json({}, { status: 503 })); + const sendPushNotification = vi.spyOn(onesignal, "sendPushNotification"); + + const response = await appClient.index.$post(lifiPayload(account)); + + await vi.waitUntil(() => sendPushNotification.mock.calls.length > 0, 5000); + + expect(captureException).toHaveBeenCalledWith(expect.objectContaining({ message: "lifi tokens 503" }), { + level: "error", + }); + expect(sendPushNotification).toHaveBeenCalledExactlyOnceWith({ + userId: account, + headings: t("Funds received"), + contents: t("{{amount}} received", { amount: { en: "5 USDT", es: "5 USDT", pt: "5 USDT" } }), + }); + expect(response.status).toBe(200); + }); + + it("fails open and captures exception when redis errors", async () => { + const redisError = new Error("redis connection refused"); + vi.spyOn(redis, "pipeline").mockImplementationOnce(() => { + throw redisError; + }); + const sendPushNotification = vi.spyOn(onesignal, "sendPushNotification"); + + const response = await appClient.index.$post(lifiPayload(account)); + + await vi.waitUntil(() => sendPushNotification.mock.calls.length > 0, 5000); + + expect(captureException).toHaveBeenCalledWith(redisError, { level: "error" }); + expect(sendPushNotification).toHaveBeenCalledExactlyOnceWith({ + userId: account, + headings: t("Funds received"), + contents: t("{{amount}} received", { amount: { en: "5 USDT", es: "5 USDT", pt: "5 USDT" } }), + }); + expect(response.status).toBe(200); + }); + + it("fails open when lifi returns empty token list", async () => { + mockLifiTokens({}); + const sendPushNotification = vi.spyOn(onesignal, "sendPushNotification"); + + const response = await appClient.index.$post(lifiPayload(account)); + + await vi.waitUntil(() => sendPushNotification.mock.calls.length > 0, 5000); + + expect(sendPushNotification).toHaveBeenCalledExactlyOnceWith({ + userId: account, + headings: t("Funds received"), + contents: t("{{amount}} received", { amount: { en: "5 USDT", es: "5 USDT", pt: "5 USDT" } }), + }); + expect(response.status).toBe(200); + }); + + it("fetches separately per chain and does not share cache between chains", async () => { + const arbMainnet = NETWORKS.get("ARB_MAINNET"); + if (!arbMainnet) throw new Error("missing ARB_MAINNET"); + const arbKey = `lifi:tokens:${arbMainnet.id}`; + await redis.multi().sadd(arbKey, tokenAddress).expire(arbKey, 120).exec(); + + mockLifiTokens({ [optMainnet.id]: [{ address: tokenAddress }] }); + const sendPushNotification = vi.spyOn(onesignal, "sendPushNotification"); + + const response = await appClient.index.$post(lifiPayload(account)); + + await vi.waitUntil(() => sendPushNotification.mock.calls.length > 0, 5000); + + expect(globalThis.fetch).toHaveBeenCalledWith( + `https://li.quest/v1/tokens?chains=${optMainnet.id}`, + expect.objectContaining({ signal: expect.any(AbortSignal) }), // eslint-disable-line @typescript-eslint/no-unsafe-assignment + ); + expect(sendPushNotification).toHaveBeenCalledExactlyOnceWith({ + userId: account, + headings: t("Funds received"), + contents: t("{{amount}} received", { amount: { en: "5 USDT", es: "5 USDT", pt: "5 USDT" } }), + }); + expect(response.status).toBe(200); + }); + }); }); async function getWETHMarket(account: Address) { @@ -1108,6 +1292,20 @@ function isNoBalance(error: unknown, hint: unknown, level: "error" | "warning") ); } +function mockLifiTokens(response: Error | Record | Response) { + const originalFetch = globalThis.fetch; + vi.spyOn(globalThis, "fetch").mockImplementation((input, init) => { + if ((input instanceof Request ? input.url : String(input)).includes("li.quest")) { + return response instanceof Error + ? Promise.reject(response) + : Promise.resolve( + response instanceof Response ? response : Response.json({ tokens: response }, { status: 200 }), + ); + } + return originalFetch(input, init); + }); +} + const activityPayload = { header: {}, json: {