From 935e01c116f12c4788073219d0fbae25e08d3e12 Mon Sep 17 00:00:00 2001 From: Leo Developer Date: Sat, 4 Nov 2023 12:50:06 +0100 Subject: [PATCH 1/8] do the thing --- src/index.ts | 9 +++++---- src/storage.ts | 46 ++++++++++++++++++++++++++++++++++++++++++++++ src/types.d.ts | 4 +++- 3 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 src/storage.ts diff --git a/src/index.ts b/src/index.ts index 146f686..da7f660 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,7 @@ import { publishMessage, sendToDiscord } from './discord'; import { attachDebug } from './utils'; import Config from './config'; +import { retrieveFromStorage, saveToStorage } from './storage'; export default { async fetch(req: Request, env: Env) { @@ -35,7 +36,7 @@ export default { const json = await res.json(); await Promise.all(json.incidents.map(async incident => { - const kv = await env.KV.get(incident.id, 'json'); + const kv = await retrieveFromStorage(env, incident.id); console.log('-----\nIncident ' + incident.id + ' in KV: ' + (kv !== null) + '\n-----'); @@ -62,8 +63,8 @@ export default { if (messageId !== null) { incident.messageId = messageId; } - // Update KV - await env.KV.put(incident.id, JSON.stringify(incident)); + // Update storage + await saveToStorage(env, incident.id, incident); // Check if we can publish if (messageId !== null && Config.PUBLISH_CHANNEL_ID !== '') { @@ -90,7 +91,7 @@ export default { console.log('Updating incident:', incident.id); // Update KV - await env.KV.put(incident.id, JSON.stringify(incident)); + await saveToStorage(env, incident.id, incident); // Update Discord await sendToDiscord(incident, env); } diff --git a/src/storage.ts b/src/storage.ts new file mode 100644 index 0000000..78345d3 --- /dev/null +++ b/src/storage.ts @@ -0,0 +1,46 @@ +export async function saveToStorage(env: Env, key: string, value: any) { + const now = new Date().getTime(); + const data = JSON.stringify({ ts: now, value }); + + await Promise.allSettled([ + env.KV && env.KV.put(key, data), + env.D1 && + env.D1.prepare( + `INSERT INTO KV(key, value) VALUES(?1, ?2) ON CONFLICT(key) DO UPDATE SET value = ?2`, + ) + .bind(key, data) + .run(), + env.R2 && env.R2.put(key, data), + ]); +} + +export async function retrieveFromStorage( + env: Env, + key: string, +): Promise { + const rawValues = await Promise.allSettled([ + env.KV && env.KV.get(key), + env.D1 && + env.D1.prepare(`SELECT value WHERE key = ?`).bind(key).first("value"), + env.R2 && env.R2.get(key).then((object) => object && object.text()), + ]); + const values = rawValues.filter( + (x) => x.status === "fulfilled" && x.value, + ) as Array>; + // none of our storage providers had the value in stock, so either all of them are + // broken, or its just a new key + if (!values.length) return null; + + const parsed: Array<{ ts: number; value: T }> = []; + for (const value of values) { + // lets make sure we handle data corruption cases + try { + parsed.push(JSON.parse(value.value)); + } catch {} + } + + // to account for cases where a write failed, we have the timestamp! + // so we use whatever has the newest timestamp + const freshness = parsed.sort((a, b) => a.ts - b.ts); + return freshness[0].value; +} diff --git a/src/types.d.ts b/src/types.d.ts index 9ca3579..9ddee5b 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -6,6 +6,8 @@ declare module globalThis { interface Env { KV: KVNamespace; + D1?: D1Database; + R2?: R2Bucket; DISCORD_WEBHOOK: string; DISCORD_TOKEN: string; } @@ -87,4 +89,4 @@ interface DiscordResponse { interface Debug { // Allows me to trigger an update for a specific incident updateIncident?: string; -} \ No newline at end of file +} From 8dad62124e709ca785c6502309f09f09b9e5152c Mon Sep 17 00:00:00 2001 From: Leo Developer Date: Sat, 4 Nov 2023 12:51:14 +0100 Subject: [PATCH 2/8] tabs --- src/types.d.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/types.d.ts b/src/types.d.ts index 9ddee5b..32fb97c 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -6,8 +6,8 @@ declare module globalThis { interface Env { KV: KVNamespace; - D1?: D1Database; - R2?: R2Bucket; + D1?: D1Database; + R2?: R2Bucket; DISCORD_WEBHOOK: string; DISCORD_TOKEN: string; } From 34e54f50c6fde32dfab9ebfc58db44278fe65bee Mon Sep 17 00:00:00 2001 From: Leo Developer Date: Sat, 4 Nov 2023 12:53:30 +0100 Subject: [PATCH 3/8] remove last KV references --- src/index.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/index.ts b/src/index.ts index da7f660..c358f68 100644 --- a/src/index.ts +++ b/src/index.ts @@ -36,19 +36,19 @@ export default { const json = await res.json(); await Promise.all(json.incidents.map(async incident => { - const kv = await retrieveFromStorage(env, incident.id); + const stored = await retrieveFromStorage(env, incident.id); - console.log('-----\nIncident ' + incident.id + ' in KV: ' + (kv !== null) + '\n-----'); + console.log('-----\nIncident ' + incident.id + ' in storage: ' + (stored !== null) + '\n-----'); if (globalThis.DEBUG?.updateIncident === incident.id) { // Set update to now so we force an update incident.updated_at = new Date().toISOString(); } - if (kv === null) { + if (stored === null) { await this.postNew(incident, env); } else { - await this.postUpdate(incident, kv, env); + await this.postUpdate(incident, stored, env); } })); @@ -90,7 +90,7 @@ export default { if (incident.updated_at !== cachedIncident.updated_at) { console.log('Updating incident:', incident.id); - // Update KV + // Update storage await saveToStorage(env, incident.id, incident); // Update Discord await sendToDiscord(incident, env); From 7c7dd979db0869cf26a2a80b391d42f0b9622389 Mon Sep 17 00:00:00 2001 From: Leo Developer Date: Sat, 4 Nov 2023 12:55:24 +0100 Subject: [PATCH 4/8] WRONG ORDER WHOOPS --- src/storage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage.ts b/src/storage.ts index 78345d3..3ef1cae 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -41,6 +41,6 @@ export async function retrieveFromStorage( // to account for cases where a write failed, we have the timestamp! // so we use whatever has the newest timestamp - const freshness = parsed.sort((a, b) => a.ts - b.ts); + const freshness = parsed.sort((a, b) => b.ts - a.ts); return freshness[0].value; } From ac38e19b01247cf31f52c0c111d6cb1371e1d965 Mon Sep 17 00:00:00 2001 From: Leo Developer Date: Sat, 4 Nov 2023 12:57:09 +0100 Subject: [PATCH 5/8] add error if all stores are corrupted --- src/storage.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/storage.ts b/src/storage.ts index 3ef1cae..ef32819 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -38,6 +38,10 @@ export async function retrieveFromStorage( parsed.push(JSON.parse(value.value)); } catch {} } + if (!parsed.length) { + console.error("all data stores hold corrupted data, somehow...", values); + return null; + } // to account for cases where a write failed, we have the timestamp! // so we use whatever has the newest timestamp From a523fa39b3f5bff737f5a215a8ed27e912dfcabe Mon Sep 17 00:00:00 2001 From: Leo Developer Date: Sat, 4 Nov 2023 13:24:08 +0100 Subject: [PATCH 6/8] add to README --- README.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/README.md b/README.md index 30abf8a..76961a4 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,25 @@ There are a few steps to the setup but it should hopefully be pretty straightfor 4b. (optional) If you want publishing, you'll also need to add a Discord bot token with `wrangler secret put DISCORD_TOKEN` 5. Run `npm run publish` :) +## Data redundancy + +This worker is using KV as its main store, but that can go down, so you can additionally +save data to R2 or D1. + +```toml +[[d1_databases]] +binding = "D1" +database_name = "" +database_id = "" + +[[r2_buckets]] +binding = "R2" +bucket_name = "" +``` + +To setup D1, you need to run the following SQL statement: +`CREATE TABLE IF NOT EXISTS KV (key TEXT UNIQUE, value TEXT)` + ## Example ### New Incident ![New Incident](https://user-images.githubusercontent.com/8492901/131903623-352dd6ec-bd7f-470f-9468-4a271c4ddc69.png) From f4cbf05220b71b0eae9e4729bc7d21c7ade8551a Mon Sep 17 00:00:00 2001 From: Leo Developer Date: Sat, 4 Nov 2023 13:29:33 +0100 Subject: [PATCH 7/8] fix query --- src/storage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage.ts b/src/storage.ts index ef32819..9026c6f 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -21,7 +21,7 @@ export async function retrieveFromStorage( const rawValues = await Promise.allSettled([ env.KV && env.KV.get(key), env.D1 && - env.D1.prepare(`SELECT value WHERE key = ?`).bind(key).first("value"), + env.D1.prepare(`SELECT value FROM KV WHERE key = ?`).bind(key).first("value"), env.R2 && env.R2.get(key).then((object) => object && object.text()), ]); const values = rawValues.filter( From 6da34517546dc4da4384d01866e30e861dcd50f7 Mon Sep 17 00:00:00 2001 From: Leo Developer Date: Mon, 6 Nov 2023 15:57:02 +0100 Subject: [PATCH 8/8] Update README.md Co-authored-by: Daniel Walsh --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 76961a4..69fc67d 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ There are a few steps to the setup but it should hopefully be pretty straightfor ## Data redundancy -This worker is using KV as its main store, but that can go down, so you can additionally +This worker is using KV as its main store, but if that was to suffer a problem, you can additionally save data to R2 or D1. ```toml