diff --git a/agent_docs/tasks/2026-06-13-fresh-client-load-followups.md b/agent_docs/tasks/2026-06-13-fresh-client-load-followups.md new file mode 100644 index 00000000..d51386d6 --- /dev/null +++ b/agent_docs/tasks/2026-06-13-fresh-client-load-followups.md @@ -0,0 +1,97 @@ +# Fresh-Client Load Follow-ups: Map Asset Preload + News Feed SWR + +Follow-up to the WebSocket snapshot work (`2026-06-13-fresh-client-snapshot-replay.md`). +Two further fresh-client slowness sources: the map JS bundle waterfall and the +dashboard's slow text feeds. + +## Issue + +1. **Map asset waterfall.** The default view is `TACTICAL` → `TacticalMap`, + which depends on the `deck-gl` (~1 MB) and map-engine (~1 MB MapLibre / + ~1.7 MB Mapbox) vendor chunks. Since the views are lazy-loaded and the entry + was deliberately stopped from preloading vendors (`f32c30d`), a cold-cache + client only discovers these chunks **after** the entry + App chunks download, + parse, and the dynamic import fires — a multi-hop request waterfall before the + default view can paint. +2. **Dashboard text feeds slow to load.** `GET /api/news/feed` (NewsWidget) + fetched the 5 configured RSS feeds **sequentially**, each with a 10 s timeout + (up to ~50 s worst case), and the Redis cache was populated **lazily** by the + requesting client. So every 15-minute cache expiry made the next caller block + on the full upstream fetch. There is no news poller pre-warming the cache. + +## Solution + +1. **Hoist `modulepreload` hints for the critical map chunks.** A small build-only + Vite plugin (`mapCriticalPreloadPlugin`) injects + `` into `index.html` for `deck-gl`, the active GL + engine, and the `TacticalMap` view chunk, so the browser fetches them in + parallel with the entry instead of serially after it. The engine is chosen at + build time to mirror `mapStyles.ts`: Mapbox when a valid `VITE_MAPBOX_TOKEN` + is set (+ `VITE_ENABLE_MAPBOX !== "false"`), MapLibre otherwise. Only the + default view's engine is preloaded — the globe-only MapLibre in a Mapbox build + still loads on demand. The cacheable-vendor split is otherwise unchanged. +2. **Concurrent fetch + stale-while-revalidate for the news feed.** + - `_fetch_feeds` now fetches all sources with `asyncio.gather` (latency bounded + by the slowest single feed, not the sum) via a non-raising `_fetch_one`. + - The endpoint serves the cached payload immediately and, once it ages past the + 15-minute freshness window, kicks off a **background** refresh + (`_trigger_refresh`) so callers never block on the upstream fetch. The data + is kept for `CACHE_HARD_TTL` (6 h) for stale serving; a `CACHE_FRESH_KEY` + marks freshness. Background refreshes are deduped within a worker (a held + task ref) and across workers (a Redis `SET NX` lock). Only a truly cold + cache (no data at all, or Redis down) fetches synchronously — and that fetch + is now concurrent. + +## Changes + +- **`frontend/vite.config.ts`** + - Switched to the function form of `defineConfig` to read build env via + `loadEnv` and pick the engine chunk. + - Added `mapCriticalPreloadPlugin(engineChunk)` (uses `transformIndexHtml` with + `ctx.bundle` to resolve hashed chunk filenames and inject preload links). +- **`backend/api/routers/news.py`** + - Added `CACHE_FRESH_KEY`, `CACHE_REFRESH_LOCK`, `CACHE_HARD_TTL`, + `CACHE_REFRESH_LOCK_TTL`. + - Split `_fetch_feeds` into `_fetch_one` (per-feed, never raises) + + concurrent `gather`. + - Added `_store_feed`, `_refresh_and_release`, `_trigger_refresh`, and a module + `_refresh_task` ref. + - Rewrote `get_news_feed` for stale-while-revalidate. + - Added `warm_cache()` (non-blocking refresh delegating to the deduped + background refresh) and `prewarm_loop()` — a continuous pre-warmer that + refreshes on startup and then every `NEWS_PREWARM_INTERVAL` + (`NEWS_PREWARM_INTERVAL_SECONDS`, default 600 s — comfortably inside the + 900 s freshness window so the cache is always warm even with no traffic). +- **`backend/api/main.py`** + - Lifespan launches `news.prewarm_loop()` as a supervised background task + after `broadcast_service.start()` and cancels it on shutdown alongside the + historian / RF-cleanup tasks. The feed cache is therefore kept warm + independent of client traffic, so a fresh dashboard never blocks on the + upstream RSS fetch. +- **`backend/api/tests/test_news_router.py`** + - Added tests: fresh cache served without refresh; stale cache served + + triggers refresh; cold cache fetches synchronously; `_fetch_feeds` merges + + sorts newest-first and strips `_ts`; `_trigger_refresh` NX-lock dedupe. + +## Verification + +- **Frontend** (`frontend`): `pnpm run typecheck` (covers `vite.config.ts`), + `pnpm run lint`, `pnpm run test` → 278 passed. `pnpm run build` succeeded; + `dist/index.html` now contains `modulepreload` links for `deck-gl`, + `maplibre` (no Mapbox token in this build → engine = MapLibre, and `mapbox` + is correctly *not* preloaded), and `TacticalMap`. +- **Backend API** (`backend/api`): `ruff check` on changed files passed; + `pytest` full suite → 172 passed (was 167; +5 news tests). + +## Benefits + +- **Map paints sooner on a cold client**: the two multi-MB critical chunks and + the default view chunk download in parallel with the entry, collapsing the + discover-then-fetch waterfall — without reverting the cacheable vendor split + or preloading the unused engine. +- **Dashboard text feeds load fast and stay fast**: concurrent fetching cuts + cold-cache latency from the sum of feed latencies to the slowest single feed, + and stale-while-revalidate means the periodic 15-minute cache expiry no longer + blocks a user — they get instant (slightly stale) data while a background + refresh runs. Background refreshes are deduped so a burst of clients triggers + at most one upstream fetch. diff --git a/agent_docs/tasks/2026-06-13-fresh-client-snapshot-replay.md b/agent_docs/tasks/2026-06-13-fresh-client-snapshot-replay.md new file mode 100644 index 00000000..c93eab4d --- /dev/null +++ b/agent_docs/tasks/2026-06-13-fresh-client-snapshot-replay.md @@ -0,0 +1,89 @@ +# Fresh-Client Snapshot Replay (Last-Value Cache) + +## Issue + +After the recent frontend rendering optimizations (cached static layers, lazy +map loading), the map paints almost instantly — but on a **fresh client** it +stays empty for a long time while entities trickle in. Aircraft, ships, and +satellites only arrive over the live WebSocket (`/api/tracks/live`), and the +broadcast consumer reads Kafka with `auto_offset_reset="latest"`. A late joiner +therefore receives **no backlog** — it must wait for each poller to re-emit its +next full sweep before the world populates. The orbital sweep alone is a +~15–37 s cycle (≈11k satellites), so a fresh client can sit on a near-empty map +for tens of seconds. The faster (now near-instant) render made this pre-existing +gap glaringly obvious. + +## Solution + +Add a **last-value cache (LVC)** to `BroadcastManager` and replay it to every +newly-connected client before live streaming begins. + +- As the Kafka consume loop transforms each message to its TAK frame, it also + stores the latest frame per `uid` in an in-memory cache, keyed by entity id + and stamped with a monotonic receive time. The cache is kept warm even when + no clients are connected, so it is ready the instant someone joins. +- On WebSocket connect, the per-client worker first replays the current cache + (the "snapshot") directly to that client, then enters the normal live-stream + drain loop. Frames are sent on the existing one-frame-per-entity wire format, + so **the frontend needs no changes** — a snapshot frame is indistinguishable + from a live update, and the client's existing `lastSourceTime` de-dup guard + harmlessly ignores any overlap between snapshot and live deltas. +- Stale entries (not re-emitted within `LIVE_SNAPSHOT_TTL_SECONDS`, default + 300 s) are excluded from snapshots and periodically pruned; a hard cap + (`LIVE_SNAPSHOT_MAX_ENTITIES`, default 20 000) bounds memory. + +### Why direct send, not the live queue + +The per-client live queue is bounded at 256 messages (it intentionally drops +oldest under back-pressure). A multi-thousand-entity snapshot pushed through it +would be almost entirely dropped, so the snapshot is sent directly via +`send_bytes` with the same 3 s per-frame timeout, yielding to the event loop +every 256 frames so a large replay never starves the consume loop or other +clients. + +## Changes + +- **`backend/api/core/config.py`** + - Added `LIVE_SNAPSHOT_TTL_SECONDS` (default 300) and + `LIVE_SNAPSHOT_MAX_ENTITIES` (default 20 000). +- **`backend/api/services/broadcast.py`** + - Added `import time` and the `_LVC_PRUNE_INTERVAL_S` constant. + - `BroadcastManager.__init__`: added the `_lvc` cache and `_last_prune`. + - `_consume`: records every transformed frame into the LVC + (`_record_live`) before the early-out on zero clients. + - New helpers: `_record_live`, `_maybe_prune` (TTL sweep + hard cap), + `_snapshot_frames` (fresh frames, copied for safe concurrent iteration), + and `_send_snapshot` (direct, yielding, disconnect-aware replay). + - `_client_worker`: replays the snapshot before the live drain loop; bails + out cleanly if the client disconnects mid-snapshot. + - `stop()`: clears the cache. +- **`backend/api/tests/test_broadcast_snapshot.py`** (new) + - Covers LVC population/overwrite, blank-uid rejection, TTL exclusion, prune + (stale drop + hard cap), and snapshot send (all frames, empty no-op, + mid-stream disconnect, stale exclusion). + +## Verification + +Run on host (`backend/api`): + +- `uv tool run ruff check services/broadcast.py core/config.py tests/test_broadcast_snapshot.py` → All checks passed. +- `uv run python -m pytest tests/test_broadcast_snapshot.py tests/test_tracks_validation.py -q` → 16 passed. +- `uv run python -m pytest -q` (full API suite) → 167 passed. + +No frontend changes were required, so frontend suites were not run (per the +Targeted Verification rule). + +## Benefits + +- **Fresh clients paint the full picture immediately** instead of waiting up to + a full poller sweep (tens of seconds for satellites). The data-load latency a + late joiner perceives drops from "next sweep" to "one connect round-trip." +- **Backend-only, wire-compatible**: no frontend changes, no proto/worker + changes, no new service or DB query on connect — the snapshot is served from + memory. +- **Bounded and self-healing**: TTL + hard cap bound memory; stale entities + (landed aircraft, departed vessels) age out automatically and never appear in + a snapshot. +- **Back-pressure safe**: the snapshot bypasses the bounded live queue and + yields regularly, so a large replay cannot starve the consume loop or slow + other connected clients. diff --git a/backend/api/core/config.py b/backend/api/core/config.py index 16665a19..b4d01e32 100644 --- a/backend/api/core/config.py +++ b/backend/api/core/config.py @@ -43,6 +43,15 @@ def DB_DSN(self) -> str: # Kafka KAFKA_BROKERS = os.getenv("KAFKA_BROKERS", "sovereign-redpanda:9092") + # Live-stream snapshot (last-value cache). + # A freshly-connected WebSocket client is replayed the current world state + # so the map paints immediately instead of waiting for each poller's next + # full sweep (the orbital sweep alone is a ~15-37 s cycle). Entities not + # re-emitted within the TTL are dropped from the snapshot; the hard cap + # bounds memory if the uid space ever runs away. + LIVE_SNAPSHOT_TTL_SECONDS = int(os.getenv("LIVE_SNAPSHOT_TTL_SECONDS", "300")) + LIVE_SNAPSHOT_MAX_ENTITIES = int(os.getenv("LIVE_SNAPSHOT_MAX_ENTITIES", "20000")) + # Authentication # When AUTH_ENABLED=false all authentication checks are skipped (local dev only — NEVER in production). AUTH_ENABLED: bool = os.getenv("AUTH_ENABLED", "true").lower() not in ( diff --git a/backend/api/main.py b/backend/api/main.py index 1865e477..3e53201a 100644 --- a/backend/api/main.py +++ b/backend/api/main.py @@ -78,6 +78,7 @@ async def _historian_supervisor(): # Global task handles historian_task_handle: asyncio.Task | None = None rf_cleanup_task_handle: asyncio.Task | None = None +news_prewarm_task_handle: asyncio.Task | None = None @asynccontextmanager @@ -86,7 +87,7 @@ async def lifespan(app: FastAPI): BUG-017: Replaced deprecated @app.on_event("startup") / @app.on_event("shutdown") decorators with the modern lifespan context manager pattern (FastAPI >= 0.93). """ - global historian_task_handle, rf_cleanup_task_handle + global historian_task_handle, rf_cleanup_task_handle, news_prewarm_task_handle # --- Startup --- settings.validate() await db.connect() @@ -121,12 +122,23 @@ async def lifespan(app: FastAPI): historian_task_handle = asyncio.create_task(_historian_supervisor()) rf_cleanup_task_handle = asyncio.create_task(rf_sites_cleanup_task()) await broadcast_service.start() - logger.info("Database, Redis, Historian, RF Cleanup, and Broadcast Service started") + # Continuously pre-warm the news feed cache in the background so a fresh + # dashboard always hits a warm cache instead of blocking on the upstream + # RSS fetch (refreshes on startup, then on an interval). + news_prewarm_task_handle = asyncio.create_task(news.prewarm_loop()) + logger.info( + "Database, Redis, Historian, RF Cleanup, Broadcast Service, " + "and News Pre-warm started" + ) yield # --- Shutdown --- - for handle in (historian_task_handle, rf_cleanup_task_handle): + for handle in ( + historian_task_handle, + rf_cleanup_task_handle, + news_prewarm_task_handle, + ): if handle: handle.cancel() try: diff --git a/backend/api/routers/news.py b/backend/api/routers/news.py index a59aed02..843ecb32 100644 --- a/backend/api/routers/news.py +++ b/backend/api/routers/news.py @@ -20,7 +20,16 @@ logger = logging.getLogger("SovereignWatch.News") CACHE_KEY = "news:feed" -CACHE_TTL = 900 # 15 minutes +# Presence of this key marks the cached payload as still fresh. The data itself +# is kept for CACHE_HARD_TTL so it can be served stale while a refresh runs. +CACHE_FRESH_KEY = "news:feed:fresh" +CACHE_REFRESH_LOCK = "news:feed:refreshing" +CACHE_TTL = 900 # 15 minutes — freshness window +CACHE_HARD_TTL = 6 * 3600 # keep stale data servable (stale-while-revalidate) +CACHE_REFRESH_LOCK_TTL = 120 +# Pre-warm interval: refresh proactively, comfortably inside the freshness +# window, so the cache stays warm even when no client is polling the feed. +NEWS_PREWARM_INTERVAL = int(os.getenv("NEWS_PREWARM_INTERVAL_SECONDS", "600")) # Default RSS feeds — world/conflict/OSINT relevant, all public DEFAULT_RSS_URLS = ",".join( @@ -138,51 +147,61 @@ def _parse_rss(xml_text: str, source: str) -> list[dict]: PER_SOURCE_CAP = 20 +async def _fetch_one(client: httpx.AsyncClient, url: str) -> list[dict]: + """Fetch and parse a single RSS feed. Never raises — returns [] on failure.""" + source = _source_name(url) + try: + resp = await client.get(url, headers={"User-Agent": "SovereignWatch/1.0"}) + if resp.status_code == 200: + items = _parse_rss(resp.text, source) + # Sort per-source by recency, then cap so no single feed crowds others + items.sort(key=lambda x: x["_ts"], reverse=True) + logger.info( + f"Fetched {len(items)} items from {source} (kept {min(len(items), PER_SOURCE_CAP)})" + ) + return items[:PER_SOURCE_CAP] + logger.warning(f"Non-200 response from {source}: {resp.status_code}") + except Exception as e: + logger.warning(f"Failed to fetch feed from {source}: {e}") + if db.redis_client: + try: + await db.redis_client.set( + "poller:news:last_error", + json.dumps( + {"ts": time.time(), "msg": f"{source}: Internal fetch error"} + ), + ex=3600, + ) + except Exception: + pass + return [] + + async def _fetch_feeds() -> list[dict]: - """Fetch all configured RSS feeds and return merged, date-sorted items.""" + """Fetch all configured RSS feeds concurrently; merged, date-sorted items. + + Feeds are fetched in parallel (gather) rather than serially so total latency + is bounded by the slowest single feed, not the sum of all of them. + """ raw_urls = os.getenv("NEWS_RSS_URLS", DEFAULT_RSS_URLS) urls = [u.strip() for u in raw_urls.split(",") if u.strip()] - all_items: list[dict] = [] - async with httpx.AsyncClient(transport=SSRFSafeTransport(), timeout=10.0, follow_redirects=True) as client: - for url in urls: - source = _source_name(url) - try: - resp = await client.get( - url, headers={"User-Agent": "SovereignWatch/1.0"} - ) - if resp.status_code == 200: - items = _parse_rss(resp.text, source) - # Sort per-source by recency, then cap so no single feed crowds others - items.sort(key=lambda x: x["_ts"], reverse=True) - all_items.extend(items[:PER_SOURCE_CAP]) - logger.info( - f"Fetched {len(items)} items from {source} (kept {min(len(items), PER_SOURCE_CAP)})" - ) - else: - logger.warning( - f"Non-200 response from {source}: {resp.status_code}" - ) - except Exception as e: - logger.warning(f"Failed to fetch feed from {source}: {e}") - if db.redis_client: - try: - await db.redis_client.set( - "poller:news:last_error", - json.dumps({"ts": time.time(), "msg": f"{source}: Internal fetch error"}), - ex=3600 - ) - except Exception: - pass + async with httpx.AsyncClient( + transport=SSRFSafeTransport(), timeout=10.0, follow_redirects=True + ) as client: + results = await asyncio.gather(*(_fetch_one(client, url) for url in urls)) - # Sort all collected items newest-first so the feed is interleaved by recency - if all_items: - if db.redis_client: - try: - await db.redis_client.set("news:last_fetch", str(time.time()), ex=CACHE_TTL * 2) - except Exception: - pass + all_items: list[dict] = [item for feed_items in results for item in feed_items] + if all_items and db.redis_client: + try: + await db.redis_client.set( + "news:last_fetch", str(time.time()), ex=CACHE_TTL * 2 + ) + except Exception: + pass + + # Sort all collected items newest-first so the feed is interleaved by recency all_items.sort(key=lambda x: x["_ts"], reverse=True) # Strip internal timestamp before returning @@ -192,39 +211,123 @@ async def _fetch_feeds() -> list[dict]: return all_items +# Holds the in-flight background refresh so it isn't garbage-collected and so a +# new refresh isn't spawned while one is already running. +_refresh_task: "asyncio.Task | None" = None + + +async def _store_feed(items: list[dict]) -> None: + """Persist a freshly-fetched feed and mark it fresh.""" + if not (db.redis_client and items): + return + try: + payload = json.dumps(items) + await db.redis_client.setex(CACHE_KEY, CACHE_HARD_TTL, payload) + await db.redis_client.setex(CACHE_FRESH_KEY, CACHE_TTL, "1") + except Exception as e: + logger.warning(f"Redis cache write failed: {e}") + + +async def _refresh_and_release() -> None: + """Background refresh body: fetch, store, then release the dedupe lock.""" + try: + items = await _fetch_feeds() + await _store_feed(items) + except Exception as e: + logger.warning(f"Background news refresh failed: {e}") + finally: + if db.redis_client: + try: + await db.redis_client.delete(CACHE_REFRESH_LOCK) + except Exception: + pass + + +async def _trigger_refresh() -> None: + """Kick off a single background refresh, deduped within and across workers.""" + global _refresh_task + if _refresh_task is not None and not _refresh_task.done(): + return + # Best-effort cross-worker dedupe: only the worker that wins the NX lock + # spawns the refresh; the lock is released in _refresh_and_release. + if db.redis_client: + try: + got = await db.redis_client.set( + CACHE_REFRESH_LOCK, "1", nx=True, ex=CACHE_REFRESH_LOCK_TTL + ) + if not got: + return + except Exception: + pass + _refresh_task = asyncio.create_task(_refresh_and_release()) + + +async def warm_cache() -> None: + """Populate the feed cache in the background (non-blocking). + + Delegates to the same deduped background refresh used by the + stale-while-revalidate path, so it returns immediately. + """ + await _trigger_refresh() + + +async def prewarm_loop() -> None: + """Keep the news cache warm independent of client traffic. + + Refreshes immediately on startup and then every NEWS_PREWARM_INTERVAL, so a + fresh dashboard always hits a warm cache instead of waiting on the upstream + RSS fetch — even if no one has polled the feed recently. Per-cycle errors + are logged and the loop continues; cancellation (lifespan shutdown) + propagates for a clean stop. + """ + while True: + try: + await warm_cache() + except Exception as e: # never let one bad cycle kill the loop + logger.warning(f"News pre-warm cycle failed: {e}") + await asyncio.sleep(NEWS_PREWARM_INTERVAL) + + @router.get("/api/news/feed") async def get_news_feed(limit: int = Query(default=40, le=100)): """ Returns aggregated news items from configured RSS feeds. - Results are cached in Redis for 15 minutes. + + Served stale-while-revalidate: a cached payload is returned immediately and, + once it ages past the 15-minute freshness window, a background refresh is + kicked off so callers never block on the upstream RSS fetch. Only a cold + cache (no data at all) fetches synchronously — and that fetch is concurrent. """ # Try cache first if db.redis_client: try: # Set/Update heartbeat on every access to show aggregator is alive - await db.redis_client.set("news:last_fetch", str(time.time()), ex=CACHE_TTL * 2) + await db.redis_client.set( + "news:last_fetch", str(time.time()), ex=CACHE_TTL * 2 + ) cached = await db.redis_client.get(CACHE_KEY) if cached: items = json.loads(cached) + # Serve immediately; refresh in the background if stale. + try: + is_fresh = await db.redis_client.exists(CACHE_FRESH_KEY) + except Exception: + is_fresh = True + if not is_fresh: + await _trigger_refresh() return items[:limit] except Exception as e: logger.warning(f"Redis cache read failed: {e}") - # Fetch fresh data + # Cold cache — fetch synchronously (concurrent across feeds, so still fast). try: items = await _fetch_feeds() except Exception as e: logger.error(f"News feed fetch failed: {e}") raise HTTPException(status_code=503, detail="Failed to fetch news feeds") - # Store in cache - if db.redis_client and items: - try: - await db.redis_client.setex(CACHE_KEY, CACHE_TTL, json.dumps(items)) - except Exception as e: - logger.warning(f"Redis cache write failed: {e}") - + await _store_feed(items) return items[:limit] diff --git a/backend/api/services/broadcast.py b/backend/api/services/broadcast.py index 0d417cc2..d0ffe328 100644 --- a/backend/api/services/broadcast.py +++ b/backend/api/services/broadcast.py @@ -1,6 +1,7 @@ import asyncio import logging import json +import time from typing import Dict from fastapi import WebSocket, WebSocketDisconnect from aiokafka import AIOKafkaConsumer @@ -17,6 +18,9 @@ # At ~37s orbital cycles emitting 11k messages, 256 gives ~23ms grace before dropping. _CLIENT_QUEUE_SIZE = 256 +# How often the last-value cache is swept for stale entities (seconds). +_LVC_PRUNE_INTERVAL_S = 30.0 + class BroadcastManager: def __init__(self): @@ -28,6 +32,13 @@ def __init__(self): self.redis_pubsub: aioredis.client.PubSub | None = None self.redis_task: asyncio.Task | None = None self.running = False + # Last-value cache: uid → (TAK frame bytes, monotonic receive time). + # Replayed to freshly-connected clients so a late joiner paints the + # current world immediately instead of waiting for each poller's next + # full sweep. Populated even when no clients are connected so the cache + # is warm by the time someone joins. + self._lvc: Dict[str, tuple[bytes, float]] = {} + self._last_prune: float = 0.0 @property def active_connections(self): @@ -137,6 +148,8 @@ async def stop(self): for ws in list(self._clients.keys()): await self.disconnect(ws) + self._lvc.clear() + async def _consume_redis_alerts(self): """Consume from Redis pub/sub and broadcast alerts (jamming zones, holding patterns).""" if not self.redis_pubsub: @@ -211,6 +224,10 @@ async def _consume(self): logger.error(f"Error transforming message: {e}") continue + # Keep the last-value cache warm regardless of client count, so + # the next client to connect can be bootstrapped immediately. + self._record_live(data.get("uid"), tak_bytes) + if not self._clients: continue @@ -237,9 +254,81 @@ async def _consume(self): pass self._clients.clear() + def _record_live(self, uid, frame: bytes) -> None: + """Store the latest frame for an entity, pruning stale ones periodically.""" + if not uid: + return + now = time.monotonic() + self._lvc[str(uid)] = (frame, now) + self._maybe_prune(now) + + def _maybe_prune(self, now: float) -> None: + """Drop entities not seen within the TTL; enforce a hard size cap.""" + over_cap = len(self._lvc) > settings.LIVE_SNAPSHOT_MAX_ENTITIES + if not over_cap and now - self._last_prune < _LVC_PRUNE_INTERVAL_S: + return + self._last_prune = now + cutoff = now - settings.LIVE_SNAPSHOT_TTL_SECONDS + self._lvc = { + uid: entry for uid, entry in self._lvc.items() if entry[1] >= cutoff + } + # If a runaway uid space still blows the budget, keep the freshest. + if len(self._lvc) > settings.LIVE_SNAPSHOT_MAX_ENTITIES: + freshest = sorted( + self._lvc.items(), key=lambda kv: kv[1][1], reverse=True + )[: settings.LIVE_SNAPSHOT_MAX_ENTITIES] + self._lvc = dict(freshest) + + def _snapshot_frames(self) -> list[bytes]: + """Current, non-stale frames for bootstrapping a freshly-connected client.""" + cutoff = time.monotonic() - settings.LIVE_SNAPSHOT_TTL_SECONDS + # Copy first — _consume mutates _lvc from another task. + return [frame for (frame, ts) in list(self._lvc.values()) if ts >= cutoff] + + async def _send_snapshot(self, ws: WebSocket) -> bool: + """Replay the last-value cache to a newly connected client. + + Frames are sent directly rather than through the bounded live queue, + which would otherwise drop most of a multi-thousand-entity snapshot. We + yield periodically so a large replay never starves the event loop or + other clients. Returns False if the client went away mid-snapshot so the + caller can stop. + """ + frames = self._snapshot_frames() + if not frames: + return True + + sent = 0 + for frame in frames: + try: + await asyncio.wait_for(ws.send_bytes(frame), timeout=3.0) + except asyncio.TimeoutError: + logger.warning("Snapshot send timed out — disconnecting") + return False + except ( + WebSocketDisconnect, + ConnectionClosedOK, + ConnectionClosedError, + ClientDisconnected, + ): + return False + except Exception as e: + logger.error(f"Snapshot send error: {e}") + return False + sent += 1 + if sent % 256 == 0: + await asyncio.sleep(0) + + logger.info(f"Replayed snapshot of {sent} entities to new client") + return True + async def _client_worker(self, ws: WebSocket, q: asyncio.Queue): """Background task per client: dequeue and send, with a generous timeout.""" try: + # Bootstrap the late joiner with the current world before streaming + # live deltas, so the map isn't empty until the next poller sweep. + if not await self._send_snapshot(ws): + return while True: msg = await q.get() try: diff --git a/backend/api/tests/test_broadcast_snapshot.py b/backend/api/tests/test_broadcast_snapshot.py new file mode 100644 index 00000000..1efb74e3 --- /dev/null +++ b/backend/api/tests/test_broadcast_snapshot.py @@ -0,0 +1,139 @@ +"""Tests for the broadcast last-value cache and fresh-client snapshot replay. + +A freshly-connected WebSocket client must be bootstrapped with the current +world state immediately, rather than waiting for each poller's next full sweep +(the orbital sweep alone is a ~15-37 s cycle). These tests exercise the +last-value cache (LVC) and the snapshot sender directly, without Kafka/Redis. +""" + +import os +import sys +import time + +import pytest + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) +from .test_stubs import install_common_test_stubs # noqa: E402 + +install_common_test_stubs() + +from core.config import settings # noqa: E402 +from services.broadcast import BroadcastManager # noqa: E402 + + +def _frame(tag: str) -> bytes: + """Build a TAK-shaped opaque frame (magic header + tag) for assertions.""" + return bytes([0xBF, 0x01, 0xBF]) + tag.encode() + + +class FakeWS: + """Minimal WebSocket double capturing sent binary frames.""" + + def __init__(self, fail_after: int | None = None): + self.sent: list[bytes] = [] + self.fail_after = fail_after + + async def send_bytes(self, data: bytes) -> None: + if self.fail_after is not None and len(self.sent) >= self.fail_after: + raise ConnectionError("client gone") + self.sent.append(data) + + +def test_record_live_populates_and_overwrites(): + mgr = BroadcastManager() + mgr._record_live("SAT-1", _frame("a")) + mgr._record_live("AIR-1", _frame("b")) + assert set(mgr._lvc.keys()) == {"SAT-1", "AIR-1"} + + # Latest frame wins for the same uid. + mgr._record_live("SAT-1", _frame("a2")) + assert mgr._lvc["SAT-1"][0] == _frame("a2") + + +def test_record_live_ignores_blank_uid(): + mgr = BroadcastManager() + mgr._record_live(None, _frame("x")) + mgr._record_live("", _frame("y")) + assert mgr._lvc == {} + + +def test_snapshot_frames_excludes_stale(): + mgr = BroadcastManager() + mgr._record_live("FRESH", _frame("f")) + # Backdate a second entity beyond the TTL. + stale_ts = mgr._lvc["FRESH"][1] - settings.LIVE_SNAPSHOT_TTL_SECONDS - 10 + mgr._lvc["STALE"] = (_frame("s"), stale_ts) + + frames = mgr._snapshot_frames() + assert _frame("f") in frames + assert _frame("s") not in frames + + +def test_maybe_prune_drops_stale(): + mgr = BroadcastManager() + now = time.monotonic() + for i in range(3): + mgr._lvc[f"fresh{i}"] = (_frame(f"fresh{i}"), now) + for i in range(2): + mgr._lvc[f"stale{i}"] = ( + _frame(f"stale{i}"), + now - settings.LIVE_SNAPSHOT_TTL_SECONDS - 1, + ) + mgr._last_prune = 0.0 # force the interval gate open + mgr._maybe_prune(now) + assert set(mgr._lvc.keys()) == {"fresh0", "fresh1", "fresh2"} + + +def test_maybe_prune_enforces_hard_cap(monkeypatch): + mgr = BroadcastManager() + monkeypatch.setattr(settings, "LIVE_SNAPSHOT_MAX_ENTITIES", 2) + now = time.monotonic() + for i in range(5): + mgr._lvc[f"e{i}"] = (_frame(f"e{i}"), now) + # Over cap → prune runs even though the interval just reset. + mgr._maybe_prune(now) + assert len(mgr._lvc) == 2 + + +@pytest.mark.asyncio +async def test_send_snapshot_sends_all_frames(): + mgr = BroadcastManager() + for i in range(10): + mgr._record_live(f"e{i}", _frame(f"e{i}")) + ws = FakeWS() + ok = await mgr._send_snapshot(ws) + assert ok is True + assert len(ws.sent) == 10 + + +@pytest.mark.asyncio +async def test_send_snapshot_empty_is_noop(): + mgr = BroadcastManager() + ws = FakeWS() + ok = await mgr._send_snapshot(ws) + assert ok is True + assert ws.sent == [] + + +@pytest.mark.asyncio +async def test_send_snapshot_handles_disconnect_midway(): + mgr = BroadcastManager() + for i in range(10): + mgr._record_live(f"e{i}", _frame(f"e{i}")) + ws = FakeWS(fail_after=3) + ok = await mgr._send_snapshot(ws) + assert ok is False + assert len(ws.sent) == 3 + + +@pytest.mark.asyncio +async def test_send_snapshot_excludes_stale_entities(): + mgr = BroadcastManager() + mgr._record_live("FRESH", _frame("fresh")) + stale_ts = mgr._lvc["FRESH"][1] - settings.LIVE_SNAPSHOT_TTL_SECONDS - 10 + mgr._lvc["STALE"] = (_frame("stale"), stale_ts) + + ws = FakeWS() + ok = await mgr._send_snapshot(ws) + assert ok is True + assert ws.sent == [_frame("fresh")] diff --git a/backend/api/tests/test_news_router.py b/backend/api/tests/test_news_router.py index f4d1388e..9a54b2c1 100644 --- a/backend/api/tests/test_news_router.py +++ b/backend/api/tests/test_news_router.py @@ -1,15 +1,195 @@ from __future__ import annotations +import asyncio +import json import os import sys +import pytest + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from .test_stubs import install_common_test_stubs # noqa: E402 install_common_test_stubs() +import routers.news as news # noqa: E402 from routers.news import DEFAULT_RSS_URLS # noqa: E402 def test_default_news_feeds_exclude_defensenews() -> None: - assert "defensenews.com" not in DEFAULT_RSS_URLS \ No newline at end of file + assert "defensenews.com" not in DEFAULT_RSS_URLS + + +class FakeRedis: + """Minimal async Redis double for the news cache paths.""" + + def __init__(self, store: dict | None = None): + self.store = dict(store or {}) + + async def set(self, key, value, ex=None, nx=False): + if nx and key in self.store: + return None + self.store[key] = value + return True + + async def setex(self, key, ttl, value): + self.store[key] = value + return True + + async def get(self, key): + return self.store.get(key) + + async def exists(self, key): + return 1 if key in self.store else 0 + + async def delete(self, key): + self.store.pop(key, None) + return True + + +@pytest.mark.asyncio +async def test_fresh_cache_served_without_refresh(monkeypatch): + """A fresh cache is returned directly and triggers no background refresh.""" + cached = [{"title": "x", "link": "", "pub_date": "", "source": "S"}] + fake = FakeRedis({news.CACHE_KEY: json.dumps(cached), news.CACHE_FRESH_KEY: "1"}) + monkeypatch.setattr(news.db, "redis_client", fake) + + triggered = {"n": 0} + + async def _spy(): + triggered["n"] += 1 + + monkeypatch.setattr(news, "_trigger_refresh", _spy) + + result = await news.get_news_feed(limit=40) + assert result == cached + assert triggered["n"] == 0 + + +@pytest.mark.asyncio +async def test_stale_cache_served_and_triggers_refresh(monkeypatch): + """A stale cache (no fresh marker) is served immediately and refreshed in bg.""" + cached = [{"title": "x", "link": "", "pub_date": "", "source": "S"}] + fake = FakeRedis({news.CACHE_KEY: json.dumps(cached)}) # no CACHE_FRESH_KEY + monkeypatch.setattr(news.db, "redis_client", fake) + + triggered = {"n": 0} + + async def _spy(): + triggered["n"] += 1 + + monkeypatch.setattr(news, "_trigger_refresh", _spy) + + result = await news.get_news_feed(limit=40) + assert result == cached + assert triggered["n"] == 1 + + +@pytest.mark.asyncio +async def test_cold_cache_fetches_synchronously(monkeypatch): + """With no cache available, the endpoint fetches synchronously.""" + monkeypatch.setattr(news.db, "redis_client", None) + fetched = [{"title": "live", "link": "", "pub_date": "", "source": "S"}] + + async def _fake_fetch(): + return fetched + + monkeypatch.setattr(news, "_fetch_feeds", _fake_fetch) + + result = await news.get_news_feed(limit=40) + assert result == fetched + + +@pytest.mark.asyncio +async def test_fetch_feeds_merges_and_sorts_concurrently(monkeypatch): + """_fetch_feeds gathers every source, merges, sorts newest-first, strips _ts.""" + monkeypatch.setenv("NEWS_RSS_URLS", "https://a.example,https://b.example") + monkeypatch.setattr(news.db, "redis_client", None) + + per_url = { + "https://a.example": [ + {"title": "old", "link": "", "pub_date": "", "source": "A", "_ts": 100.0} + ], + "https://b.example": [ + {"title": "new", "link": "", "pub_date": "", "source": "B", "_ts": 200.0} + ], + } + + async def _fake_fetch_one(_client, url): + return per_url[url] + + monkeypatch.setattr(news, "_fetch_one", _fake_fetch_one) + + result = await news._fetch_feeds() + assert [r["title"] for r in result] == ["new", "old"] # sorted newest-first + assert all("_ts" not in r for r in result) # internal field stripped + + +@pytest.mark.asyncio +async def test_trigger_refresh_dedupes_with_nx_lock(monkeypatch): + """Only the worker that wins the NX lock spawns the refresh task.""" + # Lock already held → no task spawned. + fake = FakeRedis({news.CACHE_REFRESH_LOCK: "1"}) + monkeypatch.setattr(news.db, "redis_client", fake) + monkeypatch.setattr(news, "_refresh_task", None) + + spawned = {"n": 0} + + async def _fake_refresh(): + spawned["n"] += 1 + + monkeypatch.setattr(news, "_refresh_and_release", _fake_refresh) + + await news._trigger_refresh() + assert spawned["n"] == 0 # lock contended → nothing spawned + + +@pytest.mark.asyncio +async def test_warm_cache_triggers_refresh(monkeypatch): + """Startup warm-up delegates to the background refresh and never blocks.""" + called = {"n": 0} + + async def _spy(): + called["n"] += 1 + + monkeypatch.setattr(news, "_trigger_refresh", _spy) + await news.warm_cache() + assert called["n"] == 1 + + +@pytest.mark.asyncio +async def test_prewarm_loop_refreshes_then_sleeps(monkeypatch): + """The loop warms the cache, then waits the interval (cancelled to exit).""" + calls = {"warm": 0} + + async def _warm(): + calls["warm"] += 1 + + async def _sleep(_seconds): + raise asyncio.CancelledError # break out after the first cycle + + monkeypatch.setattr(news, "warm_cache", _warm) + monkeypatch.setattr(news.asyncio, "sleep", _sleep) + + with pytest.raises(asyncio.CancelledError): + await news.prewarm_loop() + assert calls["warm"] == 1 + + +@pytest.mark.asyncio +async def test_prewarm_loop_survives_refresh_error(monkeypatch): + """A failing refresh cycle is swallowed; the loop still reaches the sleep.""" + + async def _warm(): + raise RuntimeError("boom") + + async def _sleep(_seconds): + raise asyncio.CancelledError + + monkeypatch.setattr(news, "warm_cache", _warm) + monkeypatch.setattr(news.asyncio, "sleep", _sleep) + + # If the RuntimeError weren't caught it would surface here instead of the + # CancelledError from sleep. + with pytest.raises(asyncio.CancelledError): + await news.prewarm_loop() diff --git a/frontend/vite.config.ts b/frontend/vite.config.ts index 3e9bd89b..12288185 100644 --- a/frontend/vite.config.ts +++ b/frontend/vite.config.ts @@ -1,69 +1,125 @@ import tailwindcss from "@tailwindcss/vite"; import react from "@vitejs/plugin-react"; -import { defineConfig } from "vite"; +import { defineConfig, loadEnv, type HtmlTagDescriptor, type Plugin } from "vite"; + +/** + * Hoist modulepreload hints for the critical map vendors into index.html. + * + * The map/globe views are lazy-loaded (so heavy vendors stay out of the entry + * and remain cacheable across releases), but that means a fresh client only + * discovers `deck-gl` and the GL engine AFTER the entry + App chunks download, + * parse, and the dynamic import fires — a multi-hop request waterfall before + * the default view can paint. These chunks are needed on virtually every first + * load, so we emit `` for them; the browser then + * fetches them in parallel with the entry instead of serially after it. + * + * Only the engine the default TACTICAL view actually uses is preloaded: Mapbox + * when a valid token is configured, MapLibre otherwise (globe-only MapLibre in + * a Mapbox build loads on demand when the user switches to a globe view). + */ +function mapCriticalPreloadPlugin(engineChunk: "mapbox" | "maplibre"): Plugin { + const wantedChunks = new Set(["deck-gl", engineChunk]); + return { + name: "map-critical-preload", + apply: "build", + transformIndexHtml(html, ctx) { + const bundle = ctx.bundle; + if (!bundle) return html; + const tags: HtmlTagDescriptor[] = []; + for (const file of Object.values(bundle)) { + if (file.type !== "chunk") continue; + const isVendor = wantedChunks.has(file.name); + const isDefaultView = + file.facadeModuleId?.includes("/components/map/TacticalMap") ?? false; + if (isVendor || isDefaultView) { + tags.push({ + tag: "link", + attrs: { + rel: "modulepreload", + href: `/${file.fileName}`, + crossorigin: true, + }, + injectTo: "head", + }); + } + } + return { html, tags }; + }, + }; +} // https://vitejs.dev/config/ -export default defineConfig({ - plugins: [react(), tailwindcss()], - build: { - rollupOptions: { - output: { - // Split the heavyweight vendors into their own chunks. Each chunk is - // only fetched when a module that needs it loads (the map views, the - // stats dashboard, …), and vendor chunks stay byte-identical across - // app releases so browsers keep them cached. - // - // Function form (not the object form) so that ONLY these packages' - // own modules land in the vendor chunks — the object form hoists - // shared helper modules into them, which silently makes the entry - // chunk depend on (and eagerly preload) the multi-MB vendors. - manualChunks(id: string) { - // Vite's dynamic-import preload helper is a virtual module shared - // by every code-split chunk; isolate it so Rollup can't park it - // inside a vendor chunk and drag that vendor into the entry. - if (id.includes("vite/preload-helper")) return "preload-helper"; - if (!id.includes("node_modules")) return; - if (id.includes("echarts") || id.includes("zrender")) { - return "echarts"; - } - if (id.includes("maplibre-gl")) return "maplibre"; - if (id.includes("mapbox-gl")) return "mapbox"; - if ( - id.includes("@deck.gl") || - id.includes("@luma.gl") || - id.includes("@loaders.gl") || - id.includes("@math.gl") - ) { - return "deck-gl"; - } - if ( - id.includes("/react-dom/") || - id.includes("/react/") || - id.includes("/scheduler/") - ) { - return "react-vendor"; - } +export default defineConfig(({ mode }) => { + const env = loadEnv(mode, process.cwd(), ""); + const mapboxToken = env.VITE_MAPBOX_TOKEN ?? ""; + const mapboxEnabled = env.VITE_ENABLE_MAPBOX !== "false"; + // Mirror the runtime engine choice in mapStyles.ts so we preload the engine + // the default view will actually instantiate. + const useMapbox = mapboxEnabled && mapboxToken.startsWith("pk."); + const engineChunk: "mapbox" | "maplibre" = useMapbox ? "mapbox" : "maplibre"; + + return { + plugins: [react(), tailwindcss(), mapCriticalPreloadPlugin(engineChunk)], + build: { + rollupOptions: { + output: { + // Split the heavyweight vendors into their own chunks. Each chunk is + // only fetched when a module that needs it loads (the map views, the + // stats dashboard, …), and vendor chunks stay byte-identical across + // app releases so browsers keep them cached. + // + // Function form (not the object form) so that ONLY these packages' + // own modules land in the vendor chunks — the object form hoists + // shared helper modules into them, which silently makes the entry + // chunk depend on (and eagerly preload) the multi-MB vendors. + manualChunks(id: string) { + // Vite's dynamic-import preload helper is a virtual module shared + // by every code-split chunk; isolate it so Rollup can't park it + // inside a vendor chunk and drag that vendor into the entry. + if (id.includes("vite/preload-helper")) return "preload-helper"; + if (!id.includes("node_modules")) return; + if (id.includes("echarts") || id.includes("zrender")) { + return "echarts"; + } + if (id.includes("maplibre-gl")) return "maplibre"; + if (id.includes("mapbox-gl")) return "mapbox"; + if ( + id.includes("@deck.gl") || + id.includes("@luma.gl") || + id.includes("@loaders.gl") || + id.includes("@math.gl") + ) { + return "deck-gl"; + } + if ( + id.includes("/react-dom/") || + id.includes("/react/") || + id.includes("/scheduler/") + ) { + return "react-vendor"; + } + }, }, }, + // deck.gl and the map engines are inherently large single libraries; + // raise the warning threshold so CI noise reflects real regressions. + chunkSizeWarningLimit: 1600, }, - // deck.gl and the map engines are inherently large single libraries; - // raise the warning threshold so CI noise reflects real regressions. - chunkSizeWarningLimit: 1600, - }, - test: { - // Exclude Playwright E2E specs — those run via `pnpm exec playwright test` - exclude: ["e2e/**", "node_modules/**"], - }, - server: { - port: 3700, - host: true, // Listen on all interfaces (required for Docker) - watch: { - usePolling: true, // Required for Docker on Windows/Mac - interval: 1000, // Poll every 1s (reduces CPU usage) + test: { + // Exclude Playwright E2E specs — those run via `pnpm exec playwright test` + exclude: ["e2e/**", "node_modules/**"], }, - hmr: { - // clientPort: 80, // Removed to allow auto-detection on HTTPS VPS + server: { + port: 3700, + host: true, // Listen on all interfaces (required for Docker) + watch: { + usePolling: true, // Required for Docker on Windows/Mac + interval: 1000, // Poll every 1s (reduces CPU usage) + }, + hmr: { + // clientPort: 80, // Removed to allow auto-detection on HTTPS VPS + }, + allowedHosts: true, // Allow nginx to proxy requests (host header will be 'frontend') }, - allowedHosts: true, // Allow nginx to proxy requests (host header will be 'frontend') - }, + }; });