diff --git a/bin/ci/run_e2e_smoke.sh b/bin/ci/run_e2e_smoke.sh index 9ddf09a..dad4781 100755 --- a/bin/ci/run_e2e_smoke.sh +++ b/bin/ci/run_e2e_smoke.sh @@ -18,4 +18,6 @@ pytest -v \ tests/e2e/test_health.py \ tests/e2e/test_decision_api.py \ tests/e2e/test_metrics.py \ + tests/e2e/test_llm_openai_contract.py \ + tests/e2e/test_llm_streaming.py \ --junitxml="${ARTIFACTS_DIR}/junit.xml" diff --git a/spec/integration/features/rule_engine.feature b/spec/integration/features/rule_engine.feature index 36ecf74..2f0b67b 100644 --- a/spec/integration/features/rule_engine.feature +++ b/spec/integration/features/rule_engine.feature @@ -53,3 +53,52 @@ Feature: Rule engine golden integration behavior And request context is path /v1/chat with jwt org_id org-1 and no precomputed descriptors When I run rule engine evaluation Then integration decision is reject with reason kill_switch + + Rule: Issue #4 regression and P0 coverage + Scenario: header_hint estimator works in full integration for Circuit Breaker + Given the full chain integration is reset with real bundle_loader and token_bucket + And a real bundle with token_bucket_llm rule and header_hint estimator is loaded + And request context is path /v1/chat with jwt org_id org-1 + And request context has header X-Token-Estimate 5000 + When I run rule engine evaluation + Then integration decision is allow all rules passed + And circuit breaker was checked with cost 6000 + + Scenario: UC-09 SaaS Unavailable - Edge enforces policy even when audit event queue fails + # Uses a reject decision (tpm_exceeded) so queue_event IS called, proving + # that a saas_client failure does not block or corrupt the enforcement decision. + Given the full chain integration is reset with real bundle_loader and token_bucket + And a real bundle with token_bucket_llm rule and TPM 1000 is loaded + And SaaS client is configured but unreachable + And request context is path /v1/chat with jwt org_id org-1 + And request context has header X-Token-Estimate 1500 + When I run rule engine evaluation + Then integration decision is reject with reason "tpm_exceeded" + And saas queue_event was attempted but did not block the decision + + Scenario: UC-16 OpenAI-compatible rate limiting headers + # Tests rule_engine decision.headers output (pre-HTTP layer). + # X-Fairvisor-Reason is stripped by decision_api in non-debug mode — see decision_api_spec.lua:519. + Given the full chain integration is reset with real bundle_loader and token_bucket + And a real bundle with token_bucket_llm rule and TPM 1000 is loaded + And request context is path /v1/chat with jwt org_id org-1 + And request context has header X-Token-Estimate 1500 + When I run rule engine evaluation + Then integration decision is reject with reason "tpm_exceeded" + And decision headers include "X-Fairvisor-Reason" with value "tpm_exceeded" + And decision headers include "Retry-After" + And decision headers include "RateLimit-Limit" with value "1000" + And decision headers include "RateLimit-Remaining" with value "0" + And decision headers include "RateLimit-Reset" + And decision headers include "RateLimit" matching pattern p_tpm.*;r=0;t=%d+ + + Scenario: UC-18 Token usage shadow mode + Given the full chain integration is reset with real bundle_loader and token_bucket + And a real bundle with token_bucket_llm rule in shadow mode is loaded + And request context is path /v1/chat with jwt org_id org-1 + And request context has header X-Token-Estimate 15000 + When I run rule engine evaluation + Then integration decision is allow + And integration decision mode is "shadow" + And would_reject is true + diff --git a/spec/integration/rule_engine_spec.lua b/spec/integration/rule_engine_spec.lua index 7a43db5..9d64d25 100644 --- a/spec/integration/rule_engine_spec.lua +++ b/spec/integration/rule_engine_spec.lua @@ -217,8 +217,22 @@ local function _setup_full_chain(ctx) local health = require("fairvisor.health") ctx.loader = loader ctx.health = health + + -- Mock saas_client that can be toggled to fail + ctx.saas_client = { + queue_event = function(event) + ctx.saas_event_attempts = (ctx.saas_event_attempts or 0) + 1 + if ctx.saas_unreachable then + return nil, "SaaS is unreachable" + end + ctx.queued_events = ctx.queued_events or {} + ctx.queued_events[#ctx.queued_events + 1] = event + return true + end + } + ctx.engine = require("fairvisor.rule_engine") - ctx.engine.init({ dict = ctx.dict }) + ctx.engine.init({ dict = ctx.dict, health = ctx.health, saas_client = ctx.saas_client }) end runner:given("^the full chain integration is reset with real bundle_loader and token_bucket$", function(ctx) @@ -422,6 +436,10 @@ runner:then_("^integration decision is allow all rules passed$", function(ctx) assert.equals("all_rules_passed", ctx.decision.reason) end) +runner:then_("^integration decision is allow$", function(ctx) + assert.equals("allow", ctx.decision.action) +end) + runner:then_("^missing descriptor log was emitted$", function(ctx) local found = false for i = 1, #ctx.logs do @@ -459,4 +477,219 @@ runner:then_("^loop short%-circuited circuit and limiter checks$", function(ctx) assert.is_true(saw_loop) end) +runner:given("^a real bundle with token_bucket_llm rule and header_hint estimator is loaded$", function(ctx) + local bundle = mock_bundle.new_bundle({ + bundle_version = 102, + policies = { + { + id = "p_llm", + spec = { + selector = { pathPrefix = "/v1/", methods = { "POST" } }, + mode = "enforce", + circuit_breaker = { + enabled = true, + spend_rate_threshold_per_minute = 10000, + window_seconds = 60 + }, + rules = { + { + name = "r_llm", + limit_keys = { "jwt:org_id" }, + algorithm = "token_bucket_llm", + algorithm_config = { + tokens_per_minute = 10000, + default_max_completion = 1000, + token_source = { estimator = "header_hint" } + }, + }, + }, + }, + }, + }, + }) + local payload = mock_bundle.encode(bundle) + local compiled, err = ctx.loader.load_from_string(payload, nil, nil) + assert.is_table(compiled, tostring(err)) + local ok, apply_err = ctx.loader.apply(compiled) + assert.is_true(ok, tostring(apply_err)) + ctx.bundle = ctx.loader.get_current() + + -- Mock circuit_breaker to record calls + local real_cb = require("fairvisor.circuit_breaker") + ctx.circuit_calls = {} + package.loaded["fairvisor.circuit_breaker"] = { + check = function(dict, config, key, cost, now) + ctx.circuit_calls[#ctx.circuit_calls + 1] = { key = key, cost = cost } + return real_cb.check(dict, config, key, cost, now) + end, + } + -- Reload rule_engine to use mocked circuit_breaker + package.loaded["fairvisor.rule_engine"] = nil + ctx.engine = require("fairvisor.rule_engine") + ctx.engine.init({ dict = ctx.dict }) +end) + +runner:given("^request context has header X%-Token%-Estimate (%d+)$", function(ctx, estimate) + ctx.request_context.headers["X-Token-Estimate"] = tostring(estimate) +end) + +runner:then_("^circuit breaker was checked with cost (%d+)$", function(ctx, expected_cost) + local found = false + for i = 1, #ctx.circuit_calls do + if ctx.circuit_calls[i].cost == tonumber(expected_cost) then + found = true + break + end + end + assert.is_true(found, "Circuit breaker should be called with cost " .. expected_cost) +end) + +runner:given("^a real bundle is loaded and applied$", function(ctx) + local bundle = mock_bundle.new_bundle({ bundle_version = 103 }) + local payload = mock_bundle.encode(bundle) + local compiled, _ = ctx.loader.load_from_string(payload, nil, nil) + ctx.loader.apply(compiled) + ctx.bundle = ctx.loader.get_current() +end) + +runner:given("^SaaS client is configured but unreachable$", function(ctx) + ctx.saas_unreachable = true +end) + +runner:then_("^saas queue_event was attempted but did not block the decision$", function(ctx) + -- queue_event was called (reject path always audits) and failed, but decision was still returned + assert.is_true((ctx.saas_event_attempts or 0) > 0, "saas queue_event should have been attempted") + assert.is_table(ctx.decision) + assert.is_nil(ctx.decision.error) +end) + +runner:given("^a real bundle with token_bucket_llm rule and TPM (%d+) is loaded$", function(ctx, tpm) + local bundle = mock_bundle.new_bundle({ + bundle_version = 104, + policies = { + { + id = "p_tpm", + spec = { + selector = { pathPrefix = "/v1/", methods = { "POST" } }, + mode = "enforce", + rules = { + { + name = "r_tpm", + limit_keys = { "jwt:org_id" }, + algorithm = "token_bucket_llm", + algorithm_config = { + tokens_per_minute = tonumber(tpm), + default_max_completion = 100, + token_source = { estimator = "header_hint" } + }, + }, + }, + }, + }, + }, + }) + local payload = mock_bundle.encode(bundle) + local compiled, _ = ctx.loader.load_from_string(payload, nil, nil) + ctx.loader.apply(compiled) + ctx.bundle = ctx.loader.get_current() +end) + +runner:then_('^integration decision is reject with reason "([^"]+)"$', function(ctx, reason) + assert.equals("reject", ctx.decision.action) + assert.equals(reason, ctx.decision.reason) +end) + +runner:then_('^decision headers include "([^"]+)" with value "([^"]+)"$', function(ctx, name, value) + assert.equals(value, ctx.decision.headers[name]) +end) + +runner:then_('^decision headers include "([^"]+)"$', function(ctx, name) + assert.is_not_nil(ctx.decision.headers[name]) +end) + +runner:then_('^decision headers include "([^"]+)" matching pattern (.+)$', function(ctx, name, pattern) + assert.is_not_nil(ctx.decision.headers[name]) + assert.matches(pattern, ctx.decision.headers[name]) +end) + +runner:given("^a real bundle with token_bucket_llm rule in shadow mode is loaded$", function(ctx) + local bundle = mock_bundle.new_bundle({ + bundle_version = 105, + policies = { + { + id = "p_shadow", + spec = { + selector = { pathPrefix = "/v1/", methods = { "POST" } }, + mode = "shadow", + rules = { + { + name = "r_shadow", + limit_keys = { "jwt:org_id" }, + algorithm = "token_bucket_llm", + algorithm_config = { + tokens_per_minute = 1000, + default_max_completion = 100, + token_source = { estimator = "header_hint" } + }, + }, + }, + }, + }, + }, + }) + local payload = mock_bundle.encode(bundle) + local compiled, _ = ctx.loader.load_from_string(payload, nil, nil) + ctx.loader.apply(compiled) + ctx.bundle = ctx.loader.get_current() +end) + +runner:then_('^integration decision mode is "([^"]+)"$', function(ctx, mode) + assert.equals(mode, ctx.decision.mode) +end) + +runner:then_("^would_reject is true$", function(ctx) + assert.is_true(ctx.decision.would_reject) +end) + +runner:given("^a real bundle with token_bucket_llm rule and TPM 0 is loaded$", function(ctx) + local bundle = mock_bundle.new_bundle({ + bundle_version = 106, + policies = { + { + id = "p_bot", + spec = { + selector = { pathPrefix = "/v1/", methods = { "POST" } }, + mode = "enforce", + rules = { + { + name = "bot_rule", + limit_keys = { "jwt:org_id" }, + algorithm = "token_bucket_llm", + algorithm_config = { + tokens_per_minute = 0, + burst_tokens = 0, + default_max_completion = 100, + }, + }, + }, + }, + }, + }, + }) + local payload = mock_bundle.encode(bundle) + local compiled, _ = ctx.loader.load_from_string(payload, nil, nil) + ctx.loader.apply(compiled) + ctx.bundle = ctx.loader.get_current() +end) + +runner:given("^request context is path /v1/chat with jwt org_id bot%-org$", function(ctx) + ctx.request_context = { + method = "POST", + path = "/v1/chat", + headers = {}, + query_params = {}, + jwt_claims = { org_id = "bot-org" }, + } +end) + runner:feature_file_relative("features/rule_engine.feature") diff --git a/spec/integration/streaming_spec.lua b/spec/integration/streaming_spec.lua index 6bc1920..401c977 100644 --- a/spec/integration/streaming_spec.lua +++ b/spec/integration/streaming_spec.lua @@ -64,6 +64,14 @@ runner:given("^the nginx integration mock environment is reset$", function(ctx) } end) +runner:given("^stream limit behavior is error_chunk$", function(ctx) + ctx.config.streaming.on_limit_exceeded = "error_chunk" +end) + +runner:given("^shadow mode is enabled for the request$", function(ctx) + ctx.reservation.is_shadow = true +end) + runner:when("^I initialize streaming for the request$", function(ctx) ctx.stream_ctx = streaming.init_stream(ctx.config, ctx.request_context, ctx.reservation) end) @@ -89,11 +97,37 @@ runner:then_("^TK%-005 truncates with length finish reason and done marker$", fu assert.is_true(ctx.stream_ctx.truncated) end) +runner:then_("^TK%-006 truncates with rate_limit_error and done marker$", function(ctx) + assert.matches('"type":"rate_limit_error"', ctx.out2) + assert.matches('data: %[DONE%]\n\n$', ctx.out2) + assert.is_true(ctx.stream_ctx.truncated) +end) + +runner:then_("^TK%-007 passes through both chunks and reconciles later$", function(ctx) + assert.matches('^data: %{"choices":%[%{"delta":%{"content":"y+"%}%}%]%}%\n\n$', ctx.out1) + assert.matches('^data: %{"choices":%[%{"delta":%{"content":"y+"%}%}%]%}%\n\n$', ctx.out2) + assert.is_not_true(ctx.stream_ctx.truncated) + -- End the stream to check reconciliation + streaming.body_filter("data: [DONE]\n\n", false) + assert.equals(1, #reconcile_calls) + assert.equals(120 + ctx.reservation.prompt_tokens, reconcile_calls[1].actual_total) +end) + runner:then_("^TK%-004 completes stream and reconciles once$", function(ctx) assert.matches('^data: %{"choices":%[%{"delta":%{"content":"y+"%}%}%]%}%\n\n$', ctx.out1) assert.equals("data: [DONE]\n\n", ctx.out2) assert.equals(1, #reconcile_calls) - assert.equals(60, reconcile_calls[1].actual_total) + assert.equals(20 + ctx.reservation.prompt_tokens, reconcile_calls[1].actual_total) +end) + +runner:given("^include_partial_usage is disabled for the stream$", function(ctx) + ctx.config.streaming.include_partial_usage = false +end) + +runner:then_("^TK%-008 truncates without usage fragment$", function(ctx) + assert.matches('finish_reason":"length"', ctx.out2) + assert.not_matches('"usage":', ctx.out2) + assert.matches('data: %[DONE%]\n\n$', ctx.out2) end) runner:feature([[ @@ -110,4 +144,25 @@ Feature: Streaming enforcement integration flows When I initialize streaming for the request And I process two chunks with 60 and 60 completion tokens Then TK-005 truncates with length finish reason and done marker + + Scenario: TK-006 Mid-Stream Truncation with Error Chunk + Given the nginx integration mock environment is reset + And stream limit behavior is error_chunk + When I initialize streaming for the request + And I process two chunks with 60 and 60 completion tokens + Then TK-006 truncates with rate_limit_error and done marker + + Scenario: TK-007 Shadow Mode does not truncate + Given the nginx integration mock environment is reset + And shadow mode is enabled for the request + When I initialize streaming for the request + And I process two chunks with 60 and 60 completion tokens + Then TK-007 passes through both chunks and reconciles later + + Scenario: TK-008 Truncation without partial usage + Given the nginx integration mock environment is reset + And include_partial_usage is disabled for the stream + When I initialize streaming for the request + And I process two chunks with 60 and 60 completion tokens + Then TK-008 truncates without usage fragment ]]) diff --git a/spec/unit/features/rule_engine.feature b/spec/unit/features/rule_engine.feature index a2a2f4b..8657fd9 100644 --- a/spec/unit/features/rule_engine.feature +++ b/spec/unit/features/rule_engine.feature @@ -132,6 +132,24 @@ Feature: Rule evaluation engine orchestration And kill switch check was skipped And decision does not expose override headers + Rule: Circuit Breaker Cost Resolution + Scenario: AC-12 Circuit breaker uses LLM estimator for cost + Given the rule engine test environment is reset + And fixture policy with circuit breaker and token_bucket_llm rule + And the llm prompt estimate is 120 + And the request context max_tokens is 300 + When I evaluate the request + Then llm prompt estimation was called + And circuit breaker was checked with cost 420 + + Scenario: AC-12b Circuit breaker uses default_max_completion when max_tokens missing + Given the rule engine test environment is reset + And fixture policy with circuit breaker and token_bucket_llm rule + And the llm prompt estimate is 120 + When I evaluate the request + Then llm prompt estimation was called + And circuit breaker was checked with cost 620 + Rule: Audit event emission Scenario: Decision events are emitted for every evaluation Given the rule engine test environment is reset diff --git a/spec/unit/rule_engine_spec.lua b/spec/unit/rule_engine_spec.lua index 37061ff..6b81814 100644 --- a/spec/unit/rule_engine_spec.lua +++ b/spec/unit/rule_engine_spec.lua @@ -195,15 +195,15 @@ local function _setup_engine(ctx) } local circuit_breaker = { - check = function(_dict, _config, _key, _cost, _now) + check = function(_dict, _config, _key, cost, _now) ctx.calls[#ctx.calls + 1] = "circuit_check" + ctx.last_circuit_cost = cost if ctx.circuit_tripped then return { tripped = true, retry_after = 30 } end return { tripped = false } end, } - local kill_switch = { check = function(_kill_switches, _descriptors, _path, _now) ctx.calls[#ctx.calls + 1] = "kill_switch_check" @@ -235,8 +235,14 @@ local function _setup_engine(ctx) ctx.calls[#ctx.calls + 1] = "llm_check" return { allowed = true } end, + estimate_prompt_tokens = function(_config, _request_context) + ctx.calls[#ctx.calls + 1] = "llm_estimate" + return ctx.llm_prompt_estimate or 0 + end, + build_error_response = function(_reason, _extra) + return '{"error":"mock"}' + end, } - local health = { inc = function(_self, name, labels, value) ctx.metrics[#ctx.metrics + 1] = { @@ -552,10 +558,46 @@ runner:given("^fixture kill switch override skips kill switch$", function(ctx) ctx.rule_results.allow_rule = { allowed = true, limit = 100, remaining = 90, reset = 1 } end) +runner:given("^the llm prompt estimate is (%d+)$", function(ctx, estimate) + ctx.llm_prompt_estimate = tonumber(estimate) +end) + +runner:given("^the request context max_tokens is (%d+)$", function(ctx, max_tokens) + ctx.request_context.max_tokens = tonumber(max_tokens) +end) + +runner:given("^fixture policy with circuit breaker and token_bucket_llm rule$", function(ctx) + ctx.matching_policy_ids = { "p_llm" } + ctx.request_context._descriptors["jwt:org_id"] = "org-llm" + ctx.bundle.policies_by_id.p_llm = { + id = "p_llm", + spec = { + mode = "enforce", + circuit_breaker = { enabled = true, threshold = 10, window_seconds = 60 }, + rules = { + { + name = "llm_rule", + algorithm = "token_bucket_llm", + limit_keys = { "jwt:org_id" }, + algorithm_config = { tokens_per_minute = 1000, default_max_completion = 500 } + } + } + } + } +end) + +runner:then_("^llm prompt estimation was called$", function(ctx) + assert.is_true(_contains(ctx.calls, "llm_estimate")) +end) + +runner:then_("^circuit breaker was checked with cost (%d+)$", function(ctx, expected_cost) + assert.is_true(_contains(ctx.calls, "circuit_check")) + assert.equals(tonumber(expected_cost), ctx.last_circuit_cost) +end) + runner:when("^I evaluate the request$", function(ctx) ctx.decision = ctx.engine.evaluate(ctx.request_context, ctx.bundle) end) - runner:then_("^decision action is \"([^\"]+)\"$", function(ctx, action) assert.equals(action, ctx.decision.action) end) diff --git a/src/fairvisor/decision_api.lua b/src/fairvisor/decision_api.lua index bc9fd3a..3b904b7 100644 --- a/src/fairvisor/decision_api.lua +++ b/src/fairvisor/decision_api.lua @@ -21,6 +21,7 @@ local os_time = os.time local utils = require("fairvisor.utils") local json_lib = utils.get_json() local streaming = require("fairvisor.streaming") +local llm_limiter = require("fairvisor.llm_limiter") local _M = {} @@ -913,6 +914,12 @@ function _M.access_handler() _maybe_emit_retry_after_metric(reject_headers["Retry-After"]) _set_response_headers(reject_headers) ngx.status = HTTP_TOO_MANY_REQUESTS + local reason = decision.reason or "" + if reason == "tpm_exceeded" or reason == "tpd_exceeded" + or reason == "prompt_tokens_exceeded" or reason == "max_tokens_per_request_exceeded" then + ngx.header["Content-Type"] = "application/json" + ngx.say(llm_limiter.build_error_response(reason)) + end return ngx.exit(HTTP_TOO_MANY_REQUESTS) end diff --git a/src/fairvisor/llm_limiter.lua b/src/fairvisor/llm_limiter.lua index 63ce920..3b29bc0 100644 --- a/src/fairvisor/llm_limiter.lua +++ b/src/fairvisor/llm_limiter.lua @@ -45,6 +45,8 @@ local RESULT_FIELDS = { "remaining_tpd", "limit_tpm", "limit_tpd", + "limit", + "remaining", } local _result = {} @@ -382,6 +384,8 @@ function _M.check(dict, key, config, request_context, now) local result = _set_reject("tpm_exceeded") result.remaining_tokens = tpm_result.remaining result.limit_tokens = config.tokens_per_minute + result.remaining = tpm_result.remaining + result.limit = config.tokens_per_minute result.retry_after = tpm_result.retry_after result.estimated_total = estimated_total return result @@ -396,6 +400,8 @@ function _M.check(dict, key, config, request_context, now) local result = _set_reject("tpd_exceeded") result.remaining_tokens = tpd_result.remaining result.limit_tokens = config.tokens_per_day + result.remaining = tpd_result.remaining + result.limit = config.tokens_per_day result.retry_after = tpd_result.retry_after result.estimated_total = estimated_total return result @@ -411,6 +417,8 @@ function _M.check(dict, key, config, request_context, now) _result.remaining_tpd = tpd_result and tpd_result.remaining or nil _result.limit_tpm = config.tokens_per_minute _result.limit_tpd = config.tokens_per_day + _result.remaining = tpm_result.remaining + _result.limit = config.tokens_per_minute return _result end diff --git a/src/fairvisor/rule_engine.lua b/src/fairvisor/rule_engine.lua index 6701b63..8808fdb 100644 --- a/src/fairvisor/rule_engine.lua +++ b/src/fairvisor/rule_engine.lua @@ -266,7 +266,12 @@ local function _resolve_request_cost(policy, request_context) end if rule.algorithm == "token_bucket_llm" then - return request_context and request_context.max_tokens or 1 + local prompt = _call(_llm_limiter.estimate_prompt_tokens, 0, config, request_context) + local max_completion = config.default_max_completion or 1000 + if request_context and type(request_context.max_tokens) == "number" and request_context.max_tokens > 0 then + max_completion = request_context.max_tokens + end + return prompt + max_completion end return 1 diff --git a/src/fairvisor/streaming.lua b/src/fairvisor/streaming.lua index a99ea01..79f5759 100644 --- a/src/fairvisor/streaming.lua +++ b/src/fairvisor/streaming.lua @@ -421,7 +421,7 @@ function _M.body_filter(chunk, eof) stream_ctx.truncated = true _reconcile_once(stream_ctx) _maybe_emit_cutoff_event(stream_ctx) - return _build_termination(stream_ctx) + return table_concat(out) .. _build_termination(stream_ctx) end end end diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index ba981a4..6acd924 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -19,6 +19,8 @@ EDGE_REVERSE_URL = os.environ.get("FAIRVISOR_E2E_REVERSE_URL", "http://localhost:18085") EDGE_ASN_URL = os.environ.get("FAIRVISOR_E2E_ASN_URL", "http://localhost:18087") EDGE_LLM_RECONCILE_URL = os.environ.get("FAIRVISOR_E2E_LLM_RECONCILE_URL", "http://localhost:18088") +EDGE_LLM_OPENAI_CONTRACT_URL = os.environ.get("FAIRVISOR_E2E_LLM_OPENAI_CONTRACT_URL", "http://localhost:18089") +EDGE_LLM_STREAMING_URL = os.environ.get("FAIRVISOR_E2E_LLM_STREAMING_URL", "http://localhost:18090") HEALTH_TIMEOUT_S = float(os.environ.get("FAIRVISOR_E2E_HEALTH_TIMEOUT", "15")) COMPOSE_FILE = os.path.join(os.path.dirname(__file__), "docker-compose.test.yml") @@ -181,3 +183,29 @@ def edge_llm_reconcile_base_url(): "Edge LLM reconcile container not ready at {}. Run: docker compose -f tests/e2e/docker-compose.test.yml up -d".format(url) ) + +@pytest.fixture(scope="session") +def edge_llm_openai_contract_base_url(): + """Base URL of the LLM OpenAI contract container (decision_service, header_hint estimator, low TPM). + Tests that tpm_exceeded rejection produces an OpenAI-compatible JSON error body.""" + url = EDGE_LLM_OPENAI_CONTRACT_URL + ready, _ = _wait_ready(url, HEALTH_TIMEOUT_S) + if not ready: + pytest.skip( + "Edge LLM OpenAI contract container not ready at {}. Run: docker compose -f tests/e2e/docker-compose.test.yml up -d".format(url) + ) + return url + + +@pytest.fixture(scope="session") +def edge_llm_streaming_base_url(): + """Base URL of the LLM streaming container (reverse_proxy + mock SSE backend, max_completion_tokens=10). + Tests mid-stream truncation at the edge body_filter layer.""" + url = EDGE_LLM_STREAMING_URL + ready, _ = _wait_ready(url, HEALTH_TIMEOUT_S) + if not ready: + pytest.skip( + "Edge LLM streaming container not ready at {}. Run: docker compose -f tests/e2e/docker-compose.test.yml up -d".format(url) + ) + return url + diff --git a/tests/e2e/docker-compose.test.yml b/tests/e2e/docker-compose.test.yml index df4a63d..9fd7459 100644 --- a/tests/e2e/docker-compose.test.yml +++ b/tests/e2e/docker-compose.test.yml @@ -201,3 +201,61 @@ services: timeout: 2s retries: 10 start_period: 5s + + edge_llm_openai_contract: + build: + context: ../.. + dockerfile: docker/Dockerfile + ports: + - "18089:8080" + environment: + FAIRVISOR_CONFIG_FILE: /etc/fairvisor/policy.json + FAIRVISOR_SHARED_DICT_SIZE: 4m + FAIRVISOR_LOG_LEVEL: info + FAIRVISOR_MODE: decision_service + FAIRVISOR_WORKER_PROCESSES: "1" + volumes: + - ./policy_llm_openai_contract.json:/etc/fairvisor/policy.json:ro + healthcheck: + test: ["CMD", "curl", "-sf", "http://127.0.0.1:8080/readyz"] + interval: 2s + timeout: 2s + retries: 10 + start_period: 5s + + mock_streaming_backend: + image: python:3.12-alpine + volumes: + - ./mock-streaming-server.py:/mock-streaming-server.py:ro + command: ["python3", "/mock-streaming-server.py"] + healthcheck: + test: ["CMD", "wget", "-q", "-O", "-", "http://127.0.0.1:80/"] + interval: 2s + timeout: 2s + retries: 10 + start_period: 5s + + edge_llm_streaming: + build: + context: ../.. + dockerfile: docker/Dockerfile + ports: + - "18090:8080" + environment: + FAIRVISOR_CONFIG_FILE: /etc/fairvisor/policy.json + FAIRVISOR_SHARED_DICT_SIZE: 4m + FAIRVISOR_LOG_LEVEL: info + FAIRVISOR_MODE: reverse_proxy + FAIRVISOR_BACKEND_URL: http://mock_streaming_backend:80 + FAIRVISOR_WORKER_PROCESSES: "1" + volumes: + - ./policy_llm_streaming.json:/etc/fairvisor/policy.json:ro + depends_on: + mock_streaming_backend: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-sf", "http://127.0.0.1:8080/readyz"] + interval: 2s + timeout: 2s + retries: 10 + start_period: 5s diff --git a/tests/e2e/mock-streaming-server.py b/tests/e2e/mock-streaming-server.py new file mode 100644 index 0000000..40857be --- /dev/null +++ b/tests/e2e/mock-streaming-server.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +"""Minimal mock streaming backend for e2e tests. + +Serves a static SSE fixture for any POST request (simulates an LLM streaming endpoint). +GET / returns 200 ok (used by healthcheck). + +SSE token accounting (ceil(char_count / 4), buffer_tokens=1, max_completion_tokens=10): + event 1: "one two three" = 13 chars -> 4 tokens (cumulative 4, <= 10 pass) + event 2: "four five six seven" = 19 chars -> 5 tokens (cumulative 9, <= 10 pass) + event 3: "eight nine ten eleven"= 21 chars -> 6 tokens (cumulative 15, > 10 truncate) +""" +from http.server import BaseHTTPRequestHandler, HTTPServer + +_SSE_BODY = ( + b'data: {"id":"sse1","object":"chat.completion.chunk",' + b'"choices":[{"index":0,"delta":{"role":"assistant","content":"one two three"}}]}\n\n' + b'data: {"id":"sse2","object":"chat.completion.chunk",' + b'"choices":[{"index":0,"delta":{"content":"four five six seven"}}]}\n\n' + b'data: {"id":"sse3","object":"chat.completion.chunk",' + b'"choices":[{"index":0,"delta":{"content":"eight nine ten eleven"}}]}\n\n' + b'data: [DONE]\n\n' +) + + +class _Handler(BaseHTTPRequestHandler): + def do_POST(self): + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + self.send_header("X-Accel-Buffering", "no") + self.end_headers() + self.wfile.write(_SSE_BODY) + self.wfile.flush() + + def do_GET(self): + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.end_headers() + self.wfile.write(b"ok") + + def log_message(self, fmt, *args): # suppress per-request logs + pass + + +if __name__ == "__main__": + HTTPServer(("0.0.0.0", 80), _Handler).serve_forever() diff --git a/tests/e2e/policy_llm_openai_contract.json b/tests/e2e/policy_llm_openai_contract.json new file mode 100644 index 0000000..69f491a --- /dev/null +++ b/tests/e2e/policy_llm_openai_contract.json @@ -0,0 +1,33 @@ +{ + "bundle_version": 1, + "issued_at": "2026-01-01T00:00:00Z", + "expires_at": "2030-01-01T00:00:00Z", + "policies": [ + { + "id": "llm-openai-contract-policy", + "spec": { + "selector": { + "pathPrefix": "/" + }, + "mode": "enforce", + "rules": [ + { + "name": "llm-contract-rule", + "limit_keys": [ + "header:x-e2e-key" + ], + "algorithm": "token_bucket_llm", + "algorithm_config": { + "tokens_per_minute": 500, + "burst_tokens": 500, + "default_max_completion": 10, + "token_source": { + "estimator": "header_hint" + } + } + } + ] + } + } + ] +} diff --git a/tests/e2e/policy_llm_streaming.json b/tests/e2e/policy_llm_streaming.json new file mode 100644 index 0000000..b7049c4 --- /dev/null +++ b/tests/e2e/policy_llm_streaming.json @@ -0,0 +1,39 @@ +{ + "bundle_version": 1, + "issued_at": "2026-01-01T00:00:00Z", + "expires_at": "2030-01-01T00:00:00Z", + "defaults": { + "max_completion_tokens": 10, + "streaming": { + "enabled": true, + "enforce_mid_stream": true, + "buffer_tokens": 1, + "on_limit_exceeded": "graceful_close" + } + }, + "policies": [ + { + "id": "llm-streaming-policy", + "spec": { + "selector": { + "pathPrefix": "/" + }, + "mode": "enforce", + "rules": [ + { + "name": "llm-streaming-rule", + "limit_keys": [ + "header:x-e2e-key" + ], + "algorithm": "token_bucket_llm", + "algorithm_config": { + "tokens_per_minute": 100000, + "burst_tokens": 100000, + "default_max_completion": 1000 + } + } + ] + } + } + ] +} diff --git a/tests/e2e/test_llm_openai_contract.py b/tests/e2e/test_llm_openai_contract.py new file mode 100644 index 0000000..7965f77 --- /dev/null +++ b/tests/e2e/test_llm_openai_contract.py @@ -0,0 +1,92 @@ +# Feature: OpenAI wire contract for LLM rate limit rejections (Issue #4 AC / UC-16). +# When the edge rejects a request due to tpm_exceeded, the 429 response body must be +# an OpenAI-compatible JSON error: {"error":{"type":"rate_limit_error","code":"rate_limit_exceeded",...}} +# +# Policy: decision_service mode, token_bucket_llm with header_hint estimator, TPM=500. +# Trigger: X-Token-Estimate: 600 → estimated_total = 610 > burst(500) → tpm_exceeded. + +import uuid + +import pytest +import requests + + +def _reject_request(base_url, limit_key, token_estimate=600): + """Send a decision request that exceeds the TPM budget and returns the 429 response.""" + return requests.post( + f"{base_url}/v1/decision", + headers={ + "X-Original-Method": "POST", + "X-Original-URI": "/v1/chat/completions", + "X-E2E-Key": limit_key, + "X-Token-Estimate": str(token_estimate), + }, + timeout=5, + ) + + +class TestLLMOpenAIContractE2E: + """E2E: tpm_exceeded rejection produces OpenAI-compatible JSON error body (Issue #4 P0).""" + + def test_tpm_exceeded_returns_429(self, edge_llm_openai_contract_base_url): + """Sending X-Token-Estimate exceeding burst_tokens yields 429.""" + key = f"openai-contract-{uuid.uuid4().hex[:8]}" + r = _reject_request(edge_llm_openai_contract_base_url, key) + assert r.status_code == 429, ( + f"Expected 429 for over-limit token estimate; got {r.status_code}; body: {r.text[:200]}" + ) + + def test_tpm_exceeded_body_is_openai_json(self, edge_llm_openai_contract_base_url): + """429 response body is valid JSON with OpenAI error structure.""" + key = f"openai-body-{uuid.uuid4().hex[:8]}" + r = _reject_request(edge_llm_openai_contract_base_url, key) + assert r.status_code == 429 + + try: + body = r.json() + except Exception as exc: + pytest.fail( + f"429 body is not valid JSON: {exc}; raw body: {r.text[:300]}; " + f"Content-Type: {r.headers.get('Content-Type')}" + ) + + assert "error" in body, f"Missing 'error' key in response: {body}" + error = body["error"] + assert error.get("type") == "rate_limit_error", ( + f"Expected error.type='rate_limit_error'; got: {error}" + ) + assert error.get("code") == "rate_limit_exceeded", ( + f"Expected error.code='rate_limit_exceeded'; got: {error}" + ) + assert "message" in error, f"Missing 'message' in error object: {error}" + + def test_tpm_exceeded_content_type_is_json(self, edge_llm_openai_contract_base_url): + """429 response sets Content-Type: application/json.""" + key = f"openai-ctype-{uuid.uuid4().hex[:8]}" + r = _reject_request(edge_llm_openai_contract_base_url, key) + assert r.status_code == 429 + content_type = r.headers.get("Content-Type", "") + assert "application/json" in content_type, ( + f"Expected Content-Type: application/json; got: {content_type}" + ) + + def test_tpm_exceeded_includes_rate_limit_headers(self, edge_llm_openai_contract_base_url): + """429 response for tpm_exceeded still includes RateLimit-* and Retry-After headers.""" + key = f"openai-hdrs-{uuid.uuid4().hex[:8]}" + r = _reject_request(edge_llm_openai_contract_base_url, key) + assert r.status_code == 429 + for header in ("RateLimit-Limit", "RateLimit-Remaining", "Retry-After"): + assert header in r.headers, ( + f"Missing '{header}' in 429 headers; headers: {dict(r.headers)}" + ) + + def test_tpm_exceeded_error_message_contains_reason(self, edge_llm_openai_contract_base_url): + """The error.message field identifies the rejection reason.""" + key = f"openai-msg-{uuid.uuid4().hex[:8]}" + r = _reject_request(edge_llm_openai_contract_base_url, key) + assert r.status_code == 429 + body = r.json() + message = body["error"]["message"] + assert "tpm_exceeded" in message or "rate limit" in message.lower(), ( + f"Error message should reference tpm_exceeded or rate limit; got: {message}" + ) diff --git a/tests/e2e/test_llm_streaming.py b/tests/e2e/test_llm_streaming.py new file mode 100644 index 0000000..3f78ba6 --- /dev/null +++ b/tests/e2e/test_llm_streaming.py @@ -0,0 +1,118 @@ +# Feature: Mid-stream truncation at the Edge body_filter layer (Issue #4 AC / UC-18 streaming). +# The edge acts as a reverse proxy with max_completion_tokens=10 and buffer_tokens=1. +# The mock SSE backend emits 3 content events (4 + 5 + 6 = 15 tokens total). +# After event 2 (total 9 ≤ 10), events pass through. +# Event 3 pushes total to 15 > 10 → edge truncates and sends a graceful_close termination. +# +# SSE fixture token accounting (ceil(char_count / 4)): +# event 1: "one two three" = 13 chars → 4 tokens (cumulative: 4, ≤ 10 → pass) +# event 2: "four five six seven" = 19 chars → 5 tokens (cumulative: 9, ≤ 10 → pass) +# event 3: "eight nine ten eleven" = 21 chars → 6 tokens (cumulative: 15, > 10 → truncate) + +import uuid + +import requests + + +LLM_STREAMING_BODY = ( + b'{"model":"gpt-4","messages":[{"role":"user","content":"hi"}],"stream":true}' +) +LLM_STREAMING_HEADERS = { + "Content-Type": "application/json", + "Accept": "text/event-stream", +} + + +def _parse_sse_events(raw_text): + """Return list of raw data payloads from SSE text (strips 'data: ' prefix).""" + events = [] + for line in raw_text.splitlines(): + line = line.strip() + if line.startswith("data:"): + payload = line[5:].strip() + if payload: + events.append(payload) + return events + + +class TestLLMStreamingTruncationE2E: + """E2E: Edge truncates mid-stream when completion token budget is exhausted (Issue #4 P0).""" + + def test_streaming_request_returns_200(self, edge_llm_streaming_base_url): + """Streaming request is initially allowed (high TPM policy) and returns 200.""" + key = f"stream-ok-{uuid.uuid4().hex[:8]}" + r = requests.post( + f"{edge_llm_streaming_base_url}/v1/chat/completions", + headers={**LLM_STREAMING_HEADERS, "X-E2E-Key": key}, + data=LLM_STREAMING_BODY, + timeout=10, + ) + assert r.status_code == 200, ( + f"Expected 200 for allowed streaming request; got {r.status_code}: {r.text[:200]}" + ) + + def test_streaming_response_contains_first_two_events(self, edge_llm_streaming_base_url): + """Events 1 and 2 from the backend pass through before truncation.""" + key = f"stream-evts-{uuid.uuid4().hex[:8]}" + r = requests.post( + f"{edge_llm_streaming_base_url}/v1/chat/completions", + headers={**LLM_STREAMING_HEADERS, "X-E2E-Key": key}, + data=LLM_STREAMING_BODY, + timeout=10, + ) + assert r.status_code == 200 + body = r.text + assert "one two three" in body, ( + f"Event 1 content 'one two three' missing from truncated stream; body: {body[:400]}" + ) + assert "four five six seven" in body, ( + f"Event 2 content 'four five six seven' missing from truncated stream; body: {body[:400]}" + ) + + def test_streaming_response_truncated_before_event_3(self, edge_llm_streaming_base_url): + """Event 3 content must NOT appear — the edge truncates before forwarding it.""" + key = f"stream-trunc-{uuid.uuid4().hex[:8]}" + r = requests.post( + f"{edge_llm_streaming_base_url}/v1/chat/completions", + headers={**LLM_STREAMING_HEADERS, "X-E2E-Key": key}, + data=LLM_STREAMING_BODY, + timeout=10, + ) + assert r.status_code == 200 + assert "eight nine ten eleven" not in r.text, ( + f"Event 3 content should be suppressed by truncation; body: {r.text[:400]}" + ) + + def test_streaming_response_ends_with_done(self, edge_llm_streaming_base_url): + """Truncated stream ends with 'data: [DONE]' from the termination event.""" + key = f"stream-done-{uuid.uuid4().hex[:8]}" + r = requests.post( + f"{edge_llm_streaming_base_url}/v1/chat/completions", + headers={**LLM_STREAMING_HEADERS, "X-E2E-Key": key}, + data=LLM_STREAMING_BODY, + timeout=10, + ) + assert r.status_code == 200 + assert "[DONE]" in r.text, ( + f"Truncated stream must end with [DONE]; body: {r.text[:400]}" + ) + + def test_streaming_response_contains_finish_reason_length(self, edge_llm_streaming_base_url): + """Termination event includes finish_reason='length' (graceful_close mode).""" + key = f"stream-fin-{uuid.uuid4().hex[:8]}" + r = requests.post( + f"{edge_llm_streaming_base_url}/v1/chat/completions", + headers={**LLM_STREAMING_HEADERS, "X-E2E-Key": key}, + data=LLM_STREAMING_BODY, + timeout=10, + ) + assert r.status_code == 200 + events = _parse_sse_events(r.text) + finish_events = [e for e in events if e != "[DONE]" and "finish_reason" in e] + assert finish_events, ( + f"No finish_reason event found in truncated stream; events: {events}" + ) + assert any("length" in e for e in finish_events), ( + f"Expected finish_reason='length' in graceful_close truncation; " + f"finish events: {finish_events}" + )