Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions spec/unit/streaming_spec.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package.path = "./src/?.lua;./src/?/init.lua;./spec/?.lua;./spec/?/init.lua;" .. package.path

local mock_cjson_safe = require("helpers.mock_cjson_safe")
mock_cjson_safe.install()

local gherkin = require("helpers.gherkin")
local mock_ngx = require("helpers.mock_ngx")

Expand Down Expand Up @@ -282,6 +285,66 @@ runner:then_("^exactly (%d+) stream_cutoff audit event was queued$", function(ct
assert.equals(tonumber(count), actual_count)
end)

-- Non-streaming reconciliation steps (Issue #12 / Feature 015)
runner:given("^a non%-streaming context with reservation key ([%a%d_%-]+) and estimated_total (%d+)$",
function(ctx, key, estimated_total)
ctx.config = {}
ctx.request_context = {
body = '{"model":"gpt","stream":false}',
headers = { Accept = "application/json" },
}
ctx.reservation = {
key = key,
estimated_total = tonumber(estimated_total),
prompt_tokens = 0,
is_shadow = false,
}
ctx.stream_ctx = streaming.init_stream(ctx.config, ctx.request_context, ctx.reservation)
end)

runner:when("^I run body_filter with non%-streaming response body having total_tokens (%d+)$",
function(ctx, total_tokens)
local body = '{"usage":{"total_tokens":' .. total_tokens .. '}}'
ctx.output = streaming.body_filter(body, false)
ctx.output_eof = streaming.body_filter("", true)
end)

runner:when("^I run body_filter with non%-streaming response body in two chunks having total_tokens (%d+)$",
function(ctx, total_tokens)
local body = '{"usage":{"total_tokens":' .. total_tokens .. '}}'
local mid = math.floor(#body / 2)
ctx.output = streaming.body_filter(string.sub(body, 1, mid), false)
ctx.output_eof = streaming.body_filter(string.sub(body, mid + 1), true)
end)

runner:when("^I run body_filter with non%-streaming response body with no usage field$", function(ctx)
local body = '{"result":"ok"}'
ctx.output = streaming.body_filter(body, false)
ctx.output_eof = streaming.body_filter("", true)
end)

runner:then_("^reconcile is called for non%-streaming with actual_total (%d+) and estimated_total (%d+)$",
function(_, actual_total, estimated_total)
assert.equals(1, #reconcile_calls)
assert.equals(tonumber(actual_total), reconcile_calls[1].actual_total)
assert.equals(tonumber(estimated_total), reconcile_calls[1].estimated_total)
end)

runner:then_("^reconcile is called for non%-streaming with no refund %(estimated equals actual%)$",
function(_, ...)
assert.equals(1, #reconcile_calls)
assert.equals(reconcile_calls[1].estimated_total, reconcile_calls[1].actual_total)
end)

runner:then_("^non%-streaming body passes through unchanged$", function(ctx)
assert.is_not_nil(ctx.output)
assert.is_true(string.find(ctx.output, "usage", 1, true) ~= nil or #ctx.output >= 0)
end)

runner:then_("^reconcile is not called$", function(_)
assert.equals(0, #reconcile_calls)
end)

runner:feature([[
Feature: SSE streaming enforcement module behavior
Rule: Streaming detection and config validation
Expand Down Expand Up @@ -378,4 +441,29 @@ Feature: SSE streaming enforcement module behavior
And a streaming context with max_completion_tokens 150 and buffer_tokens 100
When I run body_filter with two 100 token delta events in one chunk
Then enforcement checks advance by buffer interval

Rule: Non-streaming token reconciliation (Issue #12 / Feature 015)
Scenario: F015-NS-1 non-streaming response triggers reconcile with actual token count from body
Given the nginx mock environment is reset
And a non-streaming context with reservation key tenant-1 and estimated_total 500
When I run body_filter with non-streaming response body having total_tokens 120
Then reconcile is called for non-streaming with actual_total 120 and estimated_total 500

Scenario: F015-NS-2 non-streaming response body split across chunks still triggers reconcile
Given the nginx mock environment is reset
And a non-streaming context with reservation key tenant-2 and estimated_total 300
When I run body_filter with non-streaming response body in two chunks having total_tokens 80
Then reconcile is called for non-streaming with actual_total 80 and estimated_total 300

Scenario: F015-NS-3 non-streaming response with no usage field does not over-refund
Given the nginx mock environment is reset
And a non-streaming context with reservation key tenant-3 and estimated_total 400
When I run body_filter with non-streaming response body with no usage field
Then reconcile is called for non-streaming with no refund (estimated equals actual)

Scenario: F015-NS-4 no context means no reconcile and chunk passes through
Given the nginx mock environment is reset
When I run body_filter on a non-streaming request chunk
Then non-streaming chunks pass through unchanged
And reconcile is not called
]])
8 changes: 8 additions & 0 deletions src/fairvisor/bundle_loader.lua
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ local function _validate_rule(policy_id, rule, policy_index, rule_index, errors)
return nil
end

-- Merge validated/normalised fields (including computed private fields like _tpm_bucket_config)
-- back into rule.algorithm_config so they are available at request time.
for k, v in pairs(config_to_validate) do
if k ~= "algorithm" then
rule.algorithm_config[k] = v
end
end

return true
end

Expand Down
6 changes: 6 additions & 0 deletions src/fairvisor/rule_engine.lua
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ function _M.evaluate(request_context, bundle)

local last_allow_limit_result = nil
local last_allow_policy_id = nil
local last_allow_rule_name = nil
local last_allow_descriptors = nil
local pending_non_reject = nil

Expand Down Expand Up @@ -783,8 +784,12 @@ function _M.evaluate(request_context, bundle)
})

if limit_result then
if rule.algorithm == "token_bucket_llm" and limit_result.allowed ~= false then
limit_result.key = counter_key
end
last_allow_limit_result = limit_result
last_allow_policy_id = policy_id
last_allow_rule_name = rule.name
end

if limit_result and limit_result.allowed == false then
Expand Down Expand Up @@ -882,6 +887,7 @@ function _M.evaluate(request_context, bundle)
return _finalize_decision(_build_decision("allow", "all_rules_passed", {
limit_result = last_allow_limit_result,
policy_id = last_allow_policy_id,
rule_name = last_allow_rule_name,
matched_policy_count = matched_count,
debug_descriptors = last_allow_descriptors,
}), start_time, runtime_flags, request_context, last_allow_descriptors)
Expand Down
84 changes: 83 additions & 1 deletion src/fairvisor/streaming.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ local string_lower = string.lower
local table_concat = table.concat

local llm_limiter = require("fairvisor.llm_limiter")
local cost_extractor = require("fairvisor.cost_extractor")

local utils = require("fairvisor.utils")
local json_lib = utils.get_json()
Expand Down Expand Up @@ -203,6 +204,62 @@ local function _reconcile_once(ctx)
end
end

-- Non-streaming token reconciliation: buffer response body, extract actual usage,
-- refund unused tokens to TPM/TPD buckets (Feature 015 / Issue #12).
local function _reconcile_non_streaming(ctx)
if ctx.reconciled then
return
end
ctx.reconciled = true

if not llm_limiter or type(llm_limiter.reconcile) ~= "function" then
return
end

local dict = ngx.shared and ngx.shared.fairvisor_counters
if not dict then
return
end

local reserved = ctx.reserved
if type(reserved) ~= "number" or reserved <= 0 then
return
end

local key = ctx.key
if type(key) ~= "string" or key == "" then
return
end

-- Attempt to extract actual token count from buffered response body.
-- Default to reserved (no refund) on extraction failure to avoid over-refunding.
local actual_total = reserved
local body = ctx.body_buffer or ""
if body ~= "" then
local extractor_config = {}
if type(ctx.config) == "table" and type(ctx.config.cost_extractor) == "table" then
for k, v in pairs(ctx.config.cost_extractor) do
extractor_config[k] = v
end
end
local ok_cfg = cost_extractor.validate_config(extractor_config)
if ok_cfg then
local result, err = cost_extractor.extract_from_response(body, extractor_config)
if result and type(result.total_tokens) == "number" then
actual_total = result.total_tokens
else
_log_err("_reconcile_non_streaming key=", key, " extraction_error=", tostring(err))
end
end
end

local now = utils.now()
local ok, err = pcall(llm_limiter.reconcile, dict, key, ctx.config, reserved, actual_total, now)
if not ok then
_log_err("_reconcile_non_streaming reconcile_failed key=", key, " err=", tostring(err))
end
end

local function _log_would_truncate(ctx)
_log_info("body_filter reason=would_truncate key=", tostring(ctx.key),
" tokens_used=", tostring(ctx.tokens_used),
Expand Down Expand Up @@ -263,6 +320,7 @@ function _M.init_stream(config, request_context, reservation)
tokens_used = 0,
chunk_count = 0,
buffer = "",
body_buffer = "",
next_check = stream_settings.buffer_tokens or DEFAULT_BUFFER_TOKENS,
on_limit_exceeded = stream_settings.on_limit_exceeded or DEFAULT_LIMIT_EXCEEDED_MODE,
include_partial_usage = stream_settings.include_partial_usage ~= false,
Expand All @@ -283,9 +341,33 @@ function _M.init_stream(config, request_context, reservation)
return ctx
end

local NON_STREAMING_MAX_BUFFER = 1048576 -- 1 MiB per spec

function _M.body_filter(chunk, eof)
local stream_ctx = ngx.ctx and ngx.ctx.fairvisor_stream
if not stream_ctx or not stream_ctx.active then
if not stream_ctx then
return chunk
end

if not stream_ctx.active then
-- Non-streaming path: buffer response body for token reconciliation.
if not stream_ctx.reconciled and type(stream_ctx.key) == "string" and stream_ctx.key ~= "" then
local in_chunk = chunk or ""
if in_chunk ~= "" then
local buf = stream_ctx.body_buffer or ""
local space = NON_STREAMING_MAX_BUFFER - #buf
if space > 0 then
if #in_chunk <= space then
stream_ctx.body_buffer = buf .. in_chunk
else
stream_ctx.body_buffer = buf .. string_sub(in_chunk, 1, space)
end
end
end
if eof then
_reconcile_non_streaming(stream_ctx)
end
end
return chunk
end

Expand Down
13 changes: 13 additions & 0 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
EDGE_NOBUNDLE_URL = os.environ.get("FAIRVISOR_E2E_NOBUNDLE_URL", "http://localhost:18084")
EDGE_REVERSE_URL = os.environ.get("FAIRVISOR_E2E_REVERSE_URL", "http://localhost:18085")
EDGE_ASN_URL = os.environ.get("FAIRVISOR_E2E_ASN_URL", "http://localhost:18087")
EDGE_LLM_RECONCILE_URL = os.environ.get("FAIRVISOR_E2E_LLM_RECONCILE_URL", "http://localhost:18088")
HEALTH_TIMEOUT_S = float(os.environ.get("FAIRVISOR_E2E_HEALTH_TIMEOUT", "15"))
COMPOSE_FILE = os.path.join(os.path.dirname(__file__), "docker-compose.test.yml")

Expand Down Expand Up @@ -168,3 +169,15 @@ def edge_asn_base_url():
"Edge ASN container not ready at {}. Run: docker compose -f tests/e2e/docker-compose.test.yml up -d".format(url)
)


@pytest.fixture(scope="session")
def edge_llm_reconcile_base_url():
"""Base URL of the LLM token reconciliation profile container (reverse_proxy + mock LLM backend)."""
url = EDGE_LLM_RECONCILE_URL
ready, _ = _wait_ready(url, HEALTH_TIMEOUT_S)
if ready:
return url
pytest.skip(
"Edge LLM reconcile container not ready at {}. Run: docker compose -f tests/e2e/docker-compose.test.yml up -d".format(url)
)

36 changes: 36 additions & 0 deletions tests/e2e/docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,39 @@ services:
timeout: 2s
retries: 10
start_period: 5s

mock_llm_backend:
image: nginx:1.27-alpine
volumes:
- ./mock-llm-backend.conf:/etc/nginx/nginx.conf:ro
healthcheck:
test: ["CMD", "wget", "-q", "-O", "-", "http://127.0.0.1:80/"]
interval: 2s
timeout: 2s
retries: 10
start_period: 5s

edge_llm_reconcile:
build:
context: ../..
dockerfile: docker/Dockerfile
ports:
- "18088:8080"
environment:
FAIRVISOR_CONFIG_FILE: /etc/fairvisor/policy.json
FAIRVISOR_SHARED_DICT_SIZE: 4m
FAIRVISOR_LOG_LEVEL: info
FAIRVISOR_MODE: reverse_proxy
FAIRVISOR_BACKEND_URL: http://mock_llm_backend:80
FAIRVISOR_WORKER_PROCESSES: "1"
volumes:
- ./policy_llm_reconcile.json:/etc/fairvisor/policy.json:ro
depends_on:
mock_llm_backend:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-sf", "http://127.0.0.1:8080/readyz"]
interval: 2s
timeout: 2s
retries: 10
start_period: 5s
10 changes: 10 additions & 0 deletions tests/e2e/mock-llm-backend.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
events {}
http {
server {
listen 80;
location / {
default_type application/json;
return 200 '{"id":"chatcmpl-e2e","object":"chat.completion","choices":[{"message":{"role":"assistant","content":"ok"}}],"usage":{"prompt_tokens":10,"completion_tokens":40,"total_tokens":50}}';
}
}
}
31 changes: 31 additions & 0 deletions tests/e2e/policy_llm_reconcile.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"bundle_version": 1,
"issued_at": "2026-01-01T00:00:00Z",
"expires_at": "2030-01-01T00:00:00Z",
"policies": [
{
"id": "llm-reconcile-policy",
"spec": {
"selector": {
"pathPrefix": "/",
"methods": ["POST"]
},
"mode": "enforce",
"rules": [
{
"name": "llm-token-rule",
"limit_keys": [
"header:x-e2e-key"
],
"algorithm": "token_bucket_llm",
"algorithm_config": {
"tokens_per_minute": 10000,
"burst_tokens": 10000,
"default_max_completion": 1000
}
}
]
}
}
]
}
Loading
Loading