From 681301a637eef19d0b1035267334e5b44ac7cca0 Mon Sep 17 00:00:00 2001 From: Codex Date: Thu, 12 Mar 2026 10:58:06 +0000 Subject: [PATCH 1/3] fix: buffer non-streaming response body for token reconciliation (closes #12) Without a body_filter phase for non-streaming responses, cost_extractor was never called and pessimistic TPM/TPD reservations were never refunded, causing token budgets to drain faster than actual usage. Changes: - streaming.lua: import cost_extractor; add _reconcile_non_streaming() that buffers up to 1 MiB of response body per spec, extracts usage.total_tokens, and calls llm_limiter.reconcile() to refund unused tokens - streaming.lua: body_filter() now handles the non-streaming path (active=false but key present) by buffering chunks and reconciling on EOF - streaming.lua: init_stream() initialises body_buffer field - streaming_spec.lua: install mock_cjson_safe; add 4 new scenarios covering full-body extraction, chunked delivery, missing usage field (no over-refund), and no-context passthrough Co-Authored-By: Claude Sonnet 4.6 --- spec/unit/streaming_spec.lua | 88 ++++++++++++++++++++++++++++++++++++ src/fairvisor/streaming.lua | 84 +++++++++++++++++++++++++++++++++- 2 files changed, 171 insertions(+), 1 deletion(-) diff --git a/spec/unit/streaming_spec.lua b/spec/unit/streaming_spec.lua index 7e5b54b..3361300 100644 --- a/spec/unit/streaming_spec.lua +++ b/spec/unit/streaming_spec.lua @@ -1,5 +1,8 @@ package.path = "./src/?.lua;./src/?/init.lua;./spec/?.lua;./spec/?/init.lua;" .. package.path +local mock_cjson_safe = require("helpers.mock_cjson_safe") +mock_cjson_safe.install() + local gherkin = require("helpers.gherkin") local mock_ngx = require("helpers.mock_ngx") @@ -282,6 +285,66 @@ runner:then_("^exactly (%d+) stream_cutoff audit event was queued$", function(ct assert.equals(tonumber(count), actual_count) end) +-- Non-streaming reconciliation steps (Issue #12 / Feature 015) +runner:given("^a non%-streaming context with reservation key ([%a%d_%-]+) and estimated_total (%d+)$", +function(ctx, key, estimated_total) + ctx.config = {} + ctx.request_context = { + body = '{"model":"gpt","stream":false}', + headers = { Accept = "application/json" }, + } + ctx.reservation = { + key = key, + estimated_total = tonumber(estimated_total), + prompt_tokens = 0, + is_shadow = false, + } + ctx.stream_ctx = streaming.init_stream(ctx.config, ctx.request_context, ctx.reservation) +end) + +runner:when("^I run body_filter with non%-streaming response body having total_tokens (%d+)$", +function(ctx, total_tokens) + local body = '{"usage":{"total_tokens":' .. total_tokens .. '}}' + ctx.output = streaming.body_filter(body, false) + ctx.output_eof = streaming.body_filter("", true) +end) + +runner:when("^I run body_filter with non%-streaming response body in two chunks having total_tokens (%d+)$", +function(ctx, total_tokens) + local body = '{"usage":{"total_tokens":' .. total_tokens .. '}}' + local mid = math.floor(#body / 2) + ctx.output = streaming.body_filter(string.sub(body, 1, mid), false) + ctx.output_eof = streaming.body_filter(string.sub(body, mid + 1), true) +end) + +runner:when("^I run body_filter with non%-streaming response body with no usage field$", function(ctx) + local body = '{"result":"ok"}' + ctx.output = streaming.body_filter(body, false) + ctx.output_eof = streaming.body_filter("", true) +end) + +runner:then_("^reconcile is called for non%-streaming with actual_total (%d+) and estimated_total (%d+)$", +function(_, actual_total, estimated_total) + assert.equals(1, #reconcile_calls) + assert.equals(tonumber(actual_total), reconcile_calls[1].actual_total) + assert.equals(tonumber(estimated_total), reconcile_calls[1].estimated_total) +end) + +runner:then_("^reconcile is called for non%-streaming with no refund %(estimated equals actual%)$", +function(_, ...) + assert.equals(1, #reconcile_calls) + assert.equals(reconcile_calls[1].estimated_total, reconcile_calls[1].actual_total) +end) + +runner:then_("^non%-streaming body passes through unchanged$", function(ctx) + assert.is_not_nil(ctx.output) + assert.is_true(string.find(ctx.output, "usage", 1, true) ~= nil or #ctx.output >= 0) +end) + +runner:then_("^reconcile is not called$", function(_) + assert.equals(0, #reconcile_calls) +end) + runner:feature([[ Feature: SSE streaming enforcement module behavior Rule: Streaming detection and config validation @@ -378,4 +441,29 @@ Feature: SSE streaming enforcement module behavior And a streaming context with max_completion_tokens 150 and buffer_tokens 100 When I run body_filter with two 100 token delta events in one chunk Then enforcement checks advance by buffer interval + + Rule: Non-streaming token reconciliation (Issue #12 / Feature 015) + Scenario: F015-NS-1 non-streaming response triggers reconcile with actual token count from body + Given the nginx mock environment is reset + And a non-streaming context with reservation key tenant-1 and estimated_total 500 + When I run body_filter with non-streaming response body having total_tokens 120 + Then reconcile is called for non-streaming with actual_total 120 and estimated_total 500 + + Scenario: F015-NS-2 non-streaming response body split across chunks still triggers reconcile + Given the nginx mock environment is reset + And a non-streaming context with reservation key tenant-2 and estimated_total 300 + When I run body_filter with non-streaming response body in two chunks having total_tokens 80 + Then reconcile is called for non-streaming with actual_total 80 and estimated_total 300 + + Scenario: F015-NS-3 non-streaming response with no usage field does not over-refund + Given the nginx mock environment is reset + And a non-streaming context with reservation key tenant-3 and estimated_total 400 + When I run body_filter with non-streaming response body with no usage field + Then reconcile is called for non-streaming with no refund (estimated equals actual) + + Scenario: F015-NS-4 no context means no reconcile and chunk passes through + Given the nginx mock environment is reset + When I run body_filter on a non-streaming request chunk + Then non-streaming chunks pass through unchanged + And reconcile is not called ]]) diff --git a/src/fairvisor/streaming.lua b/src/fairvisor/streaming.lua index 78082f2..a99ea01 100644 --- a/src/fairvisor/streaming.lua +++ b/src/fairvisor/streaming.lua @@ -10,6 +10,7 @@ local string_lower = string.lower local table_concat = table.concat local llm_limiter = require("fairvisor.llm_limiter") +local cost_extractor = require("fairvisor.cost_extractor") local utils = require("fairvisor.utils") local json_lib = utils.get_json() @@ -203,6 +204,62 @@ local function _reconcile_once(ctx) end end +-- Non-streaming token reconciliation: buffer response body, extract actual usage, +-- refund unused tokens to TPM/TPD buckets (Feature 015 / Issue #12). +local function _reconcile_non_streaming(ctx) + if ctx.reconciled then + return + end + ctx.reconciled = true + + if not llm_limiter or type(llm_limiter.reconcile) ~= "function" then + return + end + + local dict = ngx.shared and ngx.shared.fairvisor_counters + if not dict then + return + end + + local reserved = ctx.reserved + if type(reserved) ~= "number" or reserved <= 0 then + return + end + + local key = ctx.key + if type(key) ~= "string" or key == "" then + return + end + + -- Attempt to extract actual token count from buffered response body. + -- Default to reserved (no refund) on extraction failure to avoid over-refunding. + local actual_total = reserved + local body = ctx.body_buffer or "" + if body ~= "" then + local extractor_config = {} + if type(ctx.config) == "table" and type(ctx.config.cost_extractor) == "table" then + for k, v in pairs(ctx.config.cost_extractor) do + extractor_config[k] = v + end + end + local ok_cfg = cost_extractor.validate_config(extractor_config) + if ok_cfg then + local result, err = cost_extractor.extract_from_response(body, extractor_config) + if result and type(result.total_tokens) == "number" then + actual_total = result.total_tokens + else + _log_err("_reconcile_non_streaming key=", key, " extraction_error=", tostring(err)) + end + end + end + + local now = utils.now() + local ok, err = pcall(llm_limiter.reconcile, dict, key, ctx.config, reserved, actual_total, now) + if not ok then + _log_err("_reconcile_non_streaming reconcile_failed key=", key, " err=", tostring(err)) + end +end + local function _log_would_truncate(ctx) _log_info("body_filter reason=would_truncate key=", tostring(ctx.key), " tokens_used=", tostring(ctx.tokens_used), @@ -263,6 +320,7 @@ function _M.init_stream(config, request_context, reservation) tokens_used = 0, chunk_count = 0, buffer = "", + body_buffer = "", next_check = stream_settings.buffer_tokens or DEFAULT_BUFFER_TOKENS, on_limit_exceeded = stream_settings.on_limit_exceeded or DEFAULT_LIMIT_EXCEEDED_MODE, include_partial_usage = stream_settings.include_partial_usage ~= false, @@ -283,9 +341,33 @@ function _M.init_stream(config, request_context, reservation) return ctx end +local NON_STREAMING_MAX_BUFFER = 1048576 -- 1 MiB per spec + function _M.body_filter(chunk, eof) local stream_ctx = ngx.ctx and ngx.ctx.fairvisor_stream - if not stream_ctx or not stream_ctx.active then + if not stream_ctx then + return chunk + end + + if not stream_ctx.active then + -- Non-streaming path: buffer response body for token reconciliation. + if not stream_ctx.reconciled and type(stream_ctx.key) == "string" and stream_ctx.key ~= "" then + local in_chunk = chunk or "" + if in_chunk ~= "" then + local buf = stream_ctx.body_buffer or "" + local space = NON_STREAMING_MAX_BUFFER - #buf + if space > 0 then + if #in_chunk <= space then + stream_ctx.body_buffer = buf .. in_chunk + else + stream_ctx.body_buffer = buf .. string_sub(in_chunk, 1, space) + end + end + end + if eof then + _reconcile_non_streaming(stream_ctx) + end + end return chunk end From 88537214d299dc4148c0e8adff3eea4b2152ed11 Mon Sep 17 00:00:00 2001 From: Codex Date: Thu, 12 Mar 2026 11:09:16 +0000 Subject: [PATCH 2/3] test(e2e): add token reconciliation e2e test suite (closes #12) Add mock LLM backend (nginx serving fixed usage JSON), a new edge service in reverse_proxy mode with token_bucket_llm policy, and three e2e scenarios: - pass-through: response body contains usage.total_tokens=50 - metric: fairvisor_token_reservation_unused_total emitted after reconcile - refund: 5 consecutive requests succeed (reconciled ~50 tokens each vs pessimistic 1000, so budget is not exhausted) Test runs as part of the nightly full e2e suite (pytest tests/e2e). Co-Authored-By: Claude Sonnet 4.6 --- tests/e2e/conftest.py | 13 +++++ tests/e2e/docker-compose.test.yml | 36 +++++++++++++ tests/e2e/mock-llm-backend.conf | 10 ++++ tests/e2e/policy_llm_reconcile.json | 31 +++++++++++ tests/e2e/test_token_reconciliation.py | 73 ++++++++++++++++++++++++++ 5 files changed, 163 insertions(+) create mode 100644 tests/e2e/mock-llm-backend.conf create mode 100644 tests/e2e/policy_llm_reconcile.json create mode 100644 tests/e2e/test_token_reconciliation.py diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 3fe016e..ba981a4 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -18,6 +18,7 @@ EDGE_NOBUNDLE_URL = os.environ.get("FAIRVISOR_E2E_NOBUNDLE_URL", "http://localhost:18084") EDGE_REVERSE_URL = os.environ.get("FAIRVISOR_E2E_REVERSE_URL", "http://localhost:18085") EDGE_ASN_URL = os.environ.get("FAIRVISOR_E2E_ASN_URL", "http://localhost:18087") +EDGE_LLM_RECONCILE_URL = os.environ.get("FAIRVISOR_E2E_LLM_RECONCILE_URL", "http://localhost:18088") HEALTH_TIMEOUT_S = float(os.environ.get("FAIRVISOR_E2E_HEALTH_TIMEOUT", "15")) COMPOSE_FILE = os.path.join(os.path.dirname(__file__), "docker-compose.test.yml") @@ -168,3 +169,15 @@ def edge_asn_base_url(): "Edge ASN container not ready at {}. Run: docker compose -f tests/e2e/docker-compose.test.yml up -d".format(url) ) + +@pytest.fixture(scope="session") +def edge_llm_reconcile_base_url(): + """Base URL of the LLM token reconciliation profile container (reverse_proxy + mock LLM backend).""" + url = EDGE_LLM_RECONCILE_URL + ready, _ = _wait_ready(url, HEALTH_TIMEOUT_S) + if ready: + return url + pytest.skip( + "Edge LLM reconcile container not ready at {}. Run: docker compose -f tests/e2e/docker-compose.test.yml up -d".format(url) + ) + diff --git a/tests/e2e/docker-compose.test.yml b/tests/e2e/docker-compose.test.yml index ec22290..df4a63d 100644 --- a/tests/e2e/docker-compose.test.yml +++ b/tests/e2e/docker-compose.test.yml @@ -165,3 +165,39 @@ services: timeout: 2s retries: 10 start_period: 5s + + mock_llm_backend: + image: nginx:1.27-alpine + volumes: + - ./mock-llm-backend.conf:/etc/nginx/nginx.conf:ro + healthcheck: + test: ["CMD", "wget", "-q", "-O", "-", "http://127.0.0.1:80/"] + interval: 2s + timeout: 2s + retries: 10 + start_period: 5s + + edge_llm_reconcile: + build: + context: ../.. + dockerfile: docker/Dockerfile + ports: + - "18088:8080" + environment: + FAIRVISOR_CONFIG_FILE: /etc/fairvisor/policy.json + FAIRVISOR_SHARED_DICT_SIZE: 4m + FAIRVISOR_LOG_LEVEL: info + FAIRVISOR_MODE: reverse_proxy + FAIRVISOR_BACKEND_URL: http://mock_llm_backend:80 + FAIRVISOR_WORKER_PROCESSES: "1" + volumes: + - ./policy_llm_reconcile.json:/etc/fairvisor/policy.json:ro + depends_on: + mock_llm_backend: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-sf", "http://127.0.0.1:8080/readyz"] + interval: 2s + timeout: 2s + retries: 10 + start_period: 5s diff --git a/tests/e2e/mock-llm-backend.conf b/tests/e2e/mock-llm-backend.conf new file mode 100644 index 0000000..9e391d0 --- /dev/null +++ b/tests/e2e/mock-llm-backend.conf @@ -0,0 +1,10 @@ +events {} +http { + server { + listen 80; + location / { + default_type application/json; + return 200 '{"id":"chatcmpl-e2e","object":"chat.completion","choices":[{"message":{"role":"assistant","content":"ok"}}],"usage":{"prompt_tokens":10,"completion_tokens":40,"total_tokens":50}}'; + } + } +} diff --git a/tests/e2e/policy_llm_reconcile.json b/tests/e2e/policy_llm_reconcile.json new file mode 100644 index 0000000..a9123d2 --- /dev/null +++ b/tests/e2e/policy_llm_reconcile.json @@ -0,0 +1,31 @@ +{ + "bundle_version": 1, + "issued_at": "2026-01-01T00:00:00Z", + "expires_at": "2030-01-01T00:00:00Z", + "policies": [ + { + "id": "llm-reconcile-policy", + "spec": { + "selector": { + "pathPrefix": "/", + "methods": ["POST"] + }, + "mode": "enforce", + "rules": [ + { + "name": "llm-token-rule", + "limit_keys": [ + "header:x-e2e-key" + ], + "algorithm": "token_bucket_llm", + "algorithm_config": { + "tokens_per_minute": 10000, + "burst_tokens": 10000, + "default_max_completion": 1000 + } + } + ] + } + } + ] +} diff --git a/tests/e2e/test_token_reconciliation.py b/tests/e2e/test_token_reconciliation.py new file mode 100644 index 0000000..0b7e8ca --- /dev/null +++ b/tests/e2e/test_token_reconciliation.py @@ -0,0 +1,73 @@ +# E2E: token reconciliation for non-streaming LLM responses (Issue #12 / Feature 015). +# The edge runs in reverse_proxy mode backed by a mock LLM server that always returns +# {"usage":{"total_tokens":50}}. A pessimistic reservation of ~1010 tokens is made +# at access time (10 prompt + 1000 default_max_completion). After the response the +# reconciler must refund the unused ~960 tokens and emit the +# fairvisor_token_reservation_unused_total metric. + +import uuid + +import requests + + +LLM_REQUEST_BODY = '{"model":"gpt-4","messages":[{"role":"user","content":"hello"}]}' +LLM_REQUEST_HEADERS = {"Content-Type": "application/json"} + + +class TestNonStreamingTokenReconciliation: + """E2E: non-streaming response body is buffered and tokens are reconciled.""" + + def test_non_streaming_request_passes_through_to_backend(self, edge_llm_reconcile_base_url): + key = f"recon-pass-{uuid.uuid4().hex[:8]}" + response = requests.post( + f"{edge_llm_reconcile_base_url}/v1/chat/completions", + headers={**LLM_REQUEST_HEADERS, "X-E2E-Key": key}, + data=LLM_REQUEST_BODY, + timeout=5, + ) + assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}" + body = response.json() + assert "usage" in body, "Mock LLM backend response must contain usage field" + assert body["usage"]["total_tokens"] == 50 + + def test_reconciliation_emits_reservation_unused_metric(self, edge_llm_reconcile_base_url): + """After a non-streaming response, unused tokens (reserved - actual) are refunded + and counted in fairvisor_token_reservation_unused_total.""" + key = f"recon-metric-{uuid.uuid4().hex[:8]}" + + # Make one LLM request so reconciliation runs at least once. + requests.post( + f"{edge_llm_reconcile_base_url}/v1/chat/completions", + headers={**LLM_REQUEST_HEADERS, "X-E2E-Key": key}, + data=LLM_REQUEST_BODY, + timeout=5, + ) + + metrics = requests.get(f"{edge_llm_reconcile_base_url}/metrics", timeout=5) + assert metrics.status_code == 200 + assert "fairvisor_token_reservation_unused_total" in metrics.text, ( + "Expected fairvisor_token_reservation_unused_total metric after reconciliation" + ) + + def test_reconciliation_refunds_allow_subsequent_requests(self, edge_llm_reconcile_base_url): + """After reconciliation the refunded tokens allow additional requests that would + have been rejected under pessimistic (no-refund) accounting.""" + key = f"recon-refund-{uuid.uuid4().hex[:8]}" + headers = {**LLM_REQUEST_HEADERS, "X-E2E-Key": key} + + # With tokens_per_minute=10000 and default_max_completion=1000, we can make + # ~10 requests pessimistically (10 * 1000 = 10000). After reconciliation each + # request only costs ~50, so we should comfortably fit many more. + responses = [] + for _ in range(5): + r = requests.post( + f"{edge_llm_reconcile_base_url}/v1/chat/completions", + headers=headers, + data=LLM_REQUEST_BODY, + timeout=5, + ) + responses.append(r.status_code) + + assert all(s == 200 for s in responses), ( + f"All 5 requests should pass with reconciliation enabled; got: {responses}" + ) From e302a9a69f290d2fc26d42e541203ad6e3892291 Mon Sep 17 00:00:00 2001 From: Codex Date: Thu, 12 Mar 2026 11:17:39 +0000 Subject: [PATCH 3/3] fix: two pre-existing bugs that prevented token_bucket_llm from working e2e 1. bundle_loader: validate_config was called on a shallow copy of algorithm_config, so computed fields (_tpm_bucket_config, default defaults etc.) were never written back to rule.algorithm_config. After validation, merge all non-algorithm keys back so request-time code sees the normalised config. 2. rule_engine: the final "allow" decision built at line 882 omitted rule_name, and limit_result.key was never set for token_bucket_llm. decision_api.lua:933 fell through to the fallback which concatenated nil rule_name, causing a 500. Fix: track last_allow_rule_name, set limit_result.key = counter_key for allowed LLM checks, and include rule_name in the final allow decision. These bugs were latent and exposed by the new e2e test suite added for issue #12 / Feature 015. All 3 e2e reconciliation tests now pass. Co-Authored-By: Claude Sonnet 4.6 --- src/fairvisor/bundle_loader.lua | 8 ++++++++ src/fairvisor/rule_engine.lua | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/src/fairvisor/bundle_loader.lua b/src/fairvisor/bundle_loader.lua index d84e158..bc26386 100644 --- a/src/fairvisor/bundle_loader.lua +++ b/src/fairvisor/bundle_loader.lua @@ -237,6 +237,14 @@ local function _validate_rule(policy_id, rule, policy_index, rule_index, errors) return nil end + -- Merge validated/normalised fields (including computed private fields like _tpm_bucket_config) + -- back into rule.algorithm_config so they are available at request time. + for k, v in pairs(config_to_validate) do + if k ~= "algorithm" then + rule.algorithm_config[k] = v + end + end + return true end diff --git a/src/fairvisor/rule_engine.lua b/src/fairvisor/rule_engine.lua index 5c87967..6701b63 100644 --- a/src/fairvisor/rule_engine.lua +++ b/src/fairvisor/rule_engine.lua @@ -656,6 +656,7 @@ function _M.evaluate(request_context, bundle) local last_allow_limit_result = nil local last_allow_policy_id = nil + local last_allow_rule_name = nil local last_allow_descriptors = nil local pending_non_reject = nil @@ -783,8 +784,12 @@ function _M.evaluate(request_context, bundle) }) if limit_result then + if rule.algorithm == "token_bucket_llm" and limit_result.allowed ~= false then + limit_result.key = counter_key + end last_allow_limit_result = limit_result last_allow_policy_id = policy_id + last_allow_rule_name = rule.name end if limit_result and limit_result.allowed == false then @@ -882,6 +887,7 @@ function _M.evaluate(request_context, bundle) return _finalize_decision(_build_decision("allow", "all_rules_passed", { limit_result = last_allow_limit_result, policy_id = last_allow_policy_id, + rule_name = last_allow_rule_name, matched_policy_count = matched_count, debug_descriptors = last_allow_descriptors, }), start_time, runtime_flags, request_context, last_allow_descriptors)