Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
30249b2
fix(proxy): narrow previous_response recovery to not_found semantics …
Kazet111 Apr 15, 2026
f733420
fix(ws): transparently replay pre-created responses on quota/rate-lim…
Kazet111 Apr 15, 2026
a6d6efa
fix(proxy): harden shutdown and reconnect lifecycle
Kazet111 Apr 16, 2026
fd7e73f
test(proxy): fix typing in bridge shutdown regression coverage
Kazet111 Apr 16, 2026
d83df7f
style: apply ruff formatting for bridge continuity changes
Kazet111 Apr 16, 2026
0e8e05f
fix(proxy): preserve scoped previous-response ownership across bridge…
Kazet111 Apr 16, 2026
157ab94
test(proxy): add regression coverage for bridged previous-response re…
Kazet111 Apr 16, 2026
0fee562
fix(proxy): harden continuity fail-closed flows
Kazet111 Apr 16, 2026
7ba531b
fix(proxy): resolve ty diagnostics in continuity tests
Kazet111 Apr 16, 2026
e4b55e1
fix(proxy): persist non-bridge continuity anchors
Kazet111 Apr 16, 2026
86d9afe
fix(proxy): mask previous_response_not_found without breaking infligh…
Kazet111 Apr 20, 2026
c2bec85
style(proxy): fix ruff line length
Kazet111 Apr 20, 2026
4320056
style(proxy): format service.py with ruff
Kazet111 Apr 20, 2026
e25922d
fix(proxy): harden previous_response anchor matching for multiplexed …
Kazet111 Apr 20, 2026
090da75
fix(proxy): fail-closed previous_response_not_found and keep WS/HTTP …
Kazet111 Apr 21, 2026
ed08a8e
Merge origin/main into fix/ws-http-bridge-previous-response-recovery-…
Kazet111 Apr 21, 2026
53e2438
fix(db): linearize request_logs migration chain after main merge
Kazet111 Apr 21, 2026
5b1367c
fix(db): add alembic merge revision for request_logs heads
Kazet111 Apr 21, 2026
2c0736d
Merge origin/main and fix stream reservation leak on owner lookup fai…
Kazet111 Apr 21, 2026
712cb57
test(proxy): make owner-lookup reservation-release regression test ty…
Kazet111 Apr 21, 2026
8af088a
Merge remote-tracking branch 'origin/main' into fix/ws-http-bridge-pr…
Kazet111 Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 196 additions & 1 deletion app/core/clients/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import socket
import time
from contextlib import asynccontextmanager
from copy import deepcopy
from dataclasses import dataclass
from typing import (
AsyncContextManager,
Expand Down Expand Up @@ -56,6 +57,7 @@
get_circuit_breaker_for_account,
)
from app.core.types import JsonObject, JsonValue
from app.core.utils.json_guards import is_json_mapping
from app.core.utils.request_id import get_request_id
from app.core.utils.sse import format_sse_event

Expand Down Expand Up @@ -87,6 +89,12 @@
_IMAGE_INLINE_CHUNK_SIZE = 64 * 1024
_IMAGE_INLINE_TIMEOUT_SECONDS = 8.0
_BLOCKED_LITERAL_HOSTS = {"localhost", "localhost.localdomain"}
_UPSTREAM_RESPONSE_CREATE_WARN_BYTES = 12 * 1024 * 1024
_UPSTREAM_RESPONSE_CREATE_MAX_BYTES = 15 * 1024 * 1024
_RESPONSE_CREATE_TOOL_OUTPUT_OMISSION_NOTICE = (
"[codex-lb omitted historical tool output ({bytes} bytes) to fit upstream websocket budget]"
)
_RESPONSE_CREATE_IMAGE_OMISSION_NOTICE = "[codex-lb omitted historical inline image to fit upstream websocket budget]"
_UPSTREAM_TRACE_HEADER_ALLOWLIST = frozenset(
{
"accept",
Expand Down Expand Up @@ -1211,7 +1219,7 @@ async def _stream_responses_via_websocket(
) -> AsyncIterator[str]:
websocket_url = _to_websocket_upstream_url(url)
request_started_at = time.monotonic()
request_payload = _build_websocket_response_create_payload(payload_dict)
request_payload = _prepare_websocket_response_create_payload(payload_dict)
websocket_cm: AsyncContextManager[aiohttp.ClientWebSocketResponse] | None = None
websocket: aiohttp.ClientWebSocketResponse | None = None
circuit_breaker = None
Expand Down Expand Up @@ -1304,6 +1312,193 @@ def _build_websocket_response_create_payload(payload_dict: JsonObject) -> JsonOb
return request_payload


def _prepare_websocket_response_create_payload(payload_dict: JsonObject) -> JsonObject:
request_payload = _build_websocket_response_create_payload(payload_dict)
payload_text = json.dumps(request_payload, ensure_ascii=True, separators=(",", ":"))
payload_size = len(payload_text.encode("utf-8"))
if payload_size > _UPSTREAM_RESPONSE_CREATE_MAX_BYTES:
slimmed_payload, slim_summary = _slim_response_create_payload_for_upstream(
request_payload,
max_bytes=_UPSTREAM_RESPONSE_CREATE_MAX_BYTES,
)
if slim_summary is not None:
request_payload = cast(JsonObject, slimmed_payload)
slimmed_text = json.dumps(request_payload, ensure_ascii=True, separators=(",", ":"))
logger.warning(
(
"Slimmed response.create before upstream websocket connect request_id=%s "
"original_bytes=%s slimmed_bytes=%s historical_tool_outputs_slimmed=%s "
"historical_images_slimmed=%s"
),
get_request_id(),
payload_size,
len(slimmed_text.encode("utf-8")),
slim_summary["historical_tool_outputs_slimmed"],
slim_summary["historical_images_slimmed"],
)
payload_text = slimmed_text
payload_size = len(payload_text.encode("utf-8"))
if payload_size > _UPSTREAM_RESPONSE_CREATE_WARN_BYTES:
previous_response_id = request_payload.get("previous_response_id")
logger.warning(
"Large response.create prepared request_id=%s bytes=%s previous_response_id=%s",
get_request_id(),
payload_size,
previous_response_id if isinstance(previous_response_id, str) else None,
)
if payload_size <= _UPSTREAM_RESPONSE_CREATE_MAX_BYTES:
return request_payload
raise ProxyResponseError(
413,
_response_create_too_large_error_envelope(payload_size, _UPSTREAM_RESPONSE_CREATE_MAX_BYTES),
failure_phase="validation",
failure_detail=f"response.create_bytes={payload_size}",
)


def _response_create_too_large_error_envelope(actual_bytes: int, max_bytes: int) -> OpenAIErrorEnvelope:
payload = openai_error(
"payload_too_large",
(
"response.create is too large for upstream websocket "
f"({actual_bytes} bytes > {max_bytes} bytes). "
"Reduce historical images/screenshots or compact the thread."
),
error_type="invalid_request_error",
)
payload["error"]["param"] = "input"
return payload


def _slim_response_create_payload_for_upstream(
payload: JsonObject,
*,
max_bytes: int,
) -> tuple[JsonObject, dict[str, int] | None]:
del max_bytes
input_value = payload.get("input")
if not isinstance(input_value, list) or not input_value:
return payload, None

input_items = cast(list[JsonValue], deepcopy(input_value))
preserve_from = _response_create_recent_suffix_start(input_items)
historical = input_items[:preserve_from]
recent = input_items[preserve_from:]

tool_outputs_slimmed = 0
images_slimmed = 0

slimmed_historical: list[JsonValue] = []
for item in historical:
slimmed_item, item_tool_outputs_slimmed, item_images_slimmed = _slim_historical_response_input_item(item)
tool_outputs_slimmed += item_tool_outputs_slimmed
images_slimmed += item_images_slimmed
slimmed_historical.append(slimmed_item)

if tool_outputs_slimmed == 0 and images_slimmed == 0:
return payload, None

candidate_payload = dict(payload)
candidate_payload["input"] = slimmed_historical + recent
return candidate_payload, {
"historical_tool_outputs_slimmed": tool_outputs_slimmed,
"historical_images_slimmed": images_slimmed,
}


def _response_create_recent_suffix_start(input_items: list[JsonValue]) -> int:
last_user_index: int | None = None
for index, item in enumerate(input_items):
if not is_json_mapping(item):
continue
if item.get("role") == "user":
last_user_index = index
if last_user_index is not None:
return last_user_index
return 0


def _slim_historical_response_input_item(item: JsonValue) -> tuple[JsonValue, int, int]:
if not is_json_mapping(item):
return item, 0, 0

item_mapping = dict(cast(dict[str, JsonValue], deepcopy(item)))
tool_outputs_slimmed = 0
images_slimmed = 0

if item_mapping.get("type") == "function_call_output":
output = item_mapping.get("output")
output_text = output if isinstance(output, str) else None
if output_text is not None and _should_slim_historical_tool_output(output_text):
item_mapping["output"] = _RESPONSE_CREATE_TOOL_OUTPUT_OMISSION_NOTICE.format(
bytes=len(output_text.encode("utf-8"))
)
tool_outputs_slimmed += 1

content = item_mapping.get("content")
slimmed_content, content_images_slimmed = _slim_historical_response_content(content)
if content_images_slimmed > 0:
item_mapping["content"] = slimmed_content
images_slimmed += content_images_slimmed

if item_mapping.get("type") == "input_image" and _is_inline_image_reference(item_mapping.get("image_url")):
return _response_create_inline_image_notice_item(), tool_outputs_slimmed, images_slimmed + 1

return item_mapping, tool_outputs_slimmed, images_slimmed


def _slim_historical_response_content(content: JsonValue) -> tuple[JsonValue, int]:
if is_json_mapping(content):
return _slim_historical_response_content_part(content)
if not isinstance(content, list):
return content, 0

slimmed_parts: list[JsonValue] = []
images_slimmed = 0
for part in content:
slimmed_part, part_images_slimmed = _slim_historical_response_content_part(part)
slimmed_parts.append(slimmed_part)
images_slimmed += part_images_slimmed
return slimmed_parts, images_slimmed


def _slim_historical_response_content_part(part: JsonValue) -> tuple[JsonValue, int]:
if not is_json_mapping(part):
return part, 0

part_mapping = dict(cast(dict[str, JsonValue], deepcopy(part)))
part_type = part_mapping.get("type")
if part_type == "input_image" and _is_inline_image_reference(part_mapping.get("image_url")):
return _response_create_inline_image_notice_part(), 1

if part_type == "image_url":
image_url_value = part_mapping.get("image_url")
if is_json_mapping(image_url_value):
image_url = image_url_value.get("url")
else:
image_url = image_url_value
if _is_inline_image_reference(image_url):
return _response_create_inline_image_notice_part(), 1

return part_mapping, 0


def _response_create_inline_image_notice_part() -> JsonObject:
return {"type": "input_text", "text": _RESPONSE_CREATE_IMAGE_OMISSION_NOTICE}


def _response_create_inline_image_notice_item() -> JsonObject:
return {"role": "user", "content": [_response_create_inline_image_notice_part()]}


def _is_inline_image_reference(value: JsonValue) -> bool:
return isinstance(value, str) and value.startswith("data:image/")


def _should_slim_historical_tool_output(output: str) -> bool:
return "data:image/" in output or len(output.encode("utf-8")) > 32 * 1024


async def _inline_input_image_urls(
payload: JsonObject,
session: "ImageFetchSession",
Expand Down
16 changes: 16 additions & 0 deletions app/core/metrics/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,18 @@ def labels(self, *args: str, **kwargs: str) -> "HistogramLike": ...
["kind"],
registry=REGISTRY,
)
continuity_owner_resolution_total = Counter(
"codex_lb_continuity_owner_resolution_total",
"Total continuity owner resolution outcomes by surface and source",
["surface", "source", "outcome"],
registry=REGISTRY,
)
continuity_fail_closed_total = Counter(
"codex_lb_continuity_fail_closed_total",
"Total continuity fail-closed or masked retryable outcomes by surface and reason",
["surface", "reason"],
registry=REGISTRY,
)

def make_scrape_registry() -> CollectorRegistryLike:
if MULTIPROCESS_MODE:
Expand Down Expand Up @@ -211,6 +223,8 @@ def mark_process_dead() -> None:
bridge_local_rebind_total: CounterLike | None = None
bridge_forward_latency_seconds: HistogramLike | None = None
bridge_public_contract_error_total: CounterLike | None = None
continuity_owner_resolution_total: CounterLike | None = None
continuity_fail_closed_total: CounterLike | None = None

def make_scrape_registry() -> None:
return None
Expand Down Expand Up @@ -239,6 +253,8 @@ def mark_process_dead() -> None:
"bridge_same_account_takeover_total",
"bridge_soft_local_rebind_total",
"circuit_breaker_state",
"continuity_fail_closed_total",
"continuity_owner_resolution_total",
"make_scrape_registry",
"mark_process_dead",
"prometheus_client",
Expand Down
13 changes: 7 additions & 6 deletions app/core/usage/refresh_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from app.modules.accounts.repository import AccountsRepository
from app.modules.proxy.account_cache import get_account_selection_cache
from app.modules.proxy.rate_limit_cache import get_rate_limit_headers_cache
from app.modules.usage import updater as usage_updater_module
from app.modules.usage.repository import AdditionalUsageRepository, UsageRepository
from app.modules.usage.updater import UsageUpdater

Expand Down Expand Up @@ -44,13 +45,13 @@ async def start(self) -> None:
self._task = asyncio.create_task(self._run_loop())

async def stop(self) -> None:
if not self._task:
return
self._stop.set()
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
self._task = None
if self._task is not None:
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
self._task = None
await usage_updater_module._USAGE_REFRESH_SINGLEFLIGHT.cancel_all()

async def _run_loop(self) -> None:
while not self._stop.is_set():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""add request_logs response lookup index

Revision ID: 20260415_160000_add_request_logs_response_lookup_index
Revises: 20260413_000000_add_accounts_blocked_at
Create Date: 2026-04-15
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

revision = "20260415_160000_add_request_logs_response_lookup_index"
down_revision = "20260413_000000_add_accounts_blocked_at"
branch_labels = None
depends_on = None


def upgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)

existing_columns = {column["name"] for column in inspector.get_columns("request_logs")}
if "session_id" not in existing_columns:
op.add_column("request_logs", sa.Column("session_id", sa.String(), nullable=True))

op.create_index(
"idx_logs_request_status_api_key_time",
"request_logs",
[
"request_id",
"status",
"api_key_id",
sa.text("requested_at DESC"),
sa.text("id DESC"),
],
unique=False,
if_not_exists=True,
)
op.create_index(
"idx_logs_request_status_api_key_session_time",
"request_logs",
[
"request_id",
"status",
"api_key_id",
"session_id",
sa.text("requested_at DESC"),
sa.text("id DESC"),
],
unique=False,
if_not_exists=True,
)


def downgrade() -> None:
op.drop_index(
"idx_logs_request_status_api_key_session_time",
table_name="request_logs",
if_exists=True,
)
op.drop_index(
"idx_logs_request_status_api_key_time",
table_name="request_logs",
if_exists=True,
)
op.drop_column("request_logs", "session_id")
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""merge request log lookup and plan type heads

Revision ID: 20260421_120000_merge_request_log_lookup_and_plan_type_heads
Revises: 20260415_160000_add_request_logs_response_lookup_index,
20260417_000000_add_request_log_plan_type
Create Date: 2026-04-21
"""

from __future__ import annotations

revision = "20260421_120000_merge_request_log_lookup_and_plan_type_heads"
down_revision = (
"20260415_160000_add_request_logs_response_lookup_index",
"20260417_000000_add_request_log_plan_type",
)
branch_labels = None
depends_on = None


def upgrade() -> None:
return


def downgrade() -> None:
return
Loading
Loading