diff --git a/docs/zap-wire-spec-v1.0.md b/docs/zap-wire-spec-v1.0.md new file mode 100644 index 0000000..3372124 --- /dev/null +++ b/docs/zap-wire-spec-v1.0.md @@ -0,0 +1,379 @@ +# ZAP Wire Protocol v1.0 — Session Layer + +Status: Draft for magicians / LP review +Issue: kcolbchain/switchboard#85 +Scope: connection-level **session** semantics that sit *under* the existing +`PaymentOffer` / `PaymentProof` payload codecs in `switchboard/zap_transport.py`. + +This document specifies the framing, handshake, sequencing, acknowledgement, +retry, idempotency, and teardown rules for a ZAP wire session. It is +deliberately payload-agnostic: a session carries opaque application payloads +(typically an encoded `PaymentOffer`/`PaymentProof`, or a generic Switchboard +frame from #76). The session layer never inspects payload bytes. + +The session framing here is **pure Python with no external dependency** +(`zap_py` is NOT required). It is a self-describing big-endian binary format so +a Go or Rust peer can implement the same bytes from this document alone. + +--- + +## 1. Design goals + +1. **Implementable from bytes.** Every field has a fixed offset or an explicit + length prefix. No reliance on a schema-compiler at the session layer. +2. **Cross-language.** All multi-byte integers are unsigned big-endian + ("network byte order"). No floats on the wire. +3. **Deterministic.** Encoding a frame is a pure function of its fields, so + conformance vectors can pin exact hex. +4. **Forward-compatible.** A version + capability handshake lets peers refuse + or downgrade gracefully instead of misparsing. +5. **At-least-once with dedup.** Retries are safe because every request carries + an idempotency key the receiver deduplicates on. + +--- + +## 2. Byte conventions + +- All integers are unsigned, big-endian, fixed width as stated. +- `u8`, `u16`, `u32`, `u64` denote 1/2/4/8-byte unsigned integers. +- A frame's `PAYLOAD` is explicitly length-prefixed by the `PAYLOAD_LEN` header + field, so frames are self-delimiting on a byte stream. +- Maximum payload length is `2^32 - 1`; v1.0 receivers MAY reject payloads + larger than `MAX_PAYLOAD` (default 16 MiB) with `ERR_TOO_LARGE`. +- The `RESERVED` byte and reserved bits MUST be written as zero and MUST be + ignored on read (so a later minor version can assign them). + +--- + +## 3. Frame format + +Every byte exchanged in a session is a sequence of self-delimiting **frames**. +A frame is: + +``` +offset size field +------ ---- ----------------------------------------------------------- +0 2 MAGIC = 0x5A50 ("ZP", ZAP) +2 1 WIRE_VERSION = 0x01 (session-layer version; see §4) +3 1 FRAME_TYPE (see §3.1) +4 1 FLAGS (bitfield; see §3.2) +5 1 RESERVED (MUST be 0) +6 4 SEQ u32 — per-direction sequence number (§5) +10 4 ACK u32 — cumulative ack of peer's SEQ (§6) +14 8 REQUEST_ID u64 — idempotency key (§8); 0 = "not a request" +22 4 PAYLOAD_LEN u32 — length of PAYLOAD that follows +26 N PAYLOAD N = PAYLOAD_LEN bytes (opaque to session layer) +``` + +Fixed header size is **26 bytes**. Total frame size is `26 + PAYLOAD_LEN`. + +A frame with a bad `MAGIC` or an unknown `WIRE_VERSION` MUST be rejected before +any other field is interpreted (see §4.3). + +### 3.1 Frame types + +| Value | Name | Direction | Carries payload | Purpose | +|-------|-----------|------------------|-----------------|--------------------------------------| +| 0x01 | `HELLO` | initiator → peer | yes (caps) | Open: offered version + capabilities | +| 0x02 | `WELCOME` | peer → initiator | yes (caps) | Accept: negotiated version + caps | +| 0x03 | `DATA` | either | yes | Application payload | +| 0x04 | `ACK` | either | no | Standalone cumulative ack | +| 0x05 | `FIN` | either | no | Graceful half-close (§9) | +| 0x06 | `RST` | either | optional (err) | Abort session with error code (§10) | + +Unknown frame types received on an established session MUST be answered with +`RST(ERR_PROTOCOL)` and the session torn down. + +### 3.2 FLAGS bitfield + +``` +bit 0 (0x01) ACK_PRESENT — the ACK field is meaningful (cumulative ack) +bit 1 (0x02) REQ — this frame is an idempotent request (REQUEST_ID set) +bit 2 (0x04) RETRANSMIT — this is a retransmission of a previously sent SEQ +bits 3..7 RESERVED — MUST be 0 +``` + +`ACK` is only read when `ACK_PRESENT` is set. `HELLO`/`WELCOME`/`DATA`/`FIN`/ +`RST` MAY set `ACK_PRESENT` to piggyback an ack. A standalone `ACK` frame MUST +set `ACK_PRESENT`. + +--- + +## 4. Handshake and capability negotiation + +### 4.1 Capability bitmask (DECISION) + +The capability field is a **`u32` bitmask split as 8 bits version + 24 bits +feature flags**: + +``` + 31 24 23 0 ++------------------------+-------------------------------------------------+ +| VERSION (8 bits) | FEATURE FLAGS (24 bits) | ++------------------------+-------------------------------------------------+ +``` + +- **VERSION** (`bits 24..31`, 8 bits): highest session-protocol version the + sender supports. v1.0 sends `0x01`. Value `0` is invalid. +- **FEATURE FLAGS** (`bits 0..23`, 24 bits): independently negotiable features. + +Defined v1.0 feature flags: + +| Bit | Mask | Name | Meaning | +|------|------------|-------------------|--------------------------------------------------| +| 0 | 0x000001 | `CAP_ACK` | Sender honours cumulative ACK + retransmit (§6) | +| 1 | 0x000002 | `CAP_RETRY` | Sender will retransmit unacked frames (§7) | +| 2 | 0x000004 | `CAP_IDEMPOTENT` | Sender dedups on REQUEST_ID (§8) | +| 3 | 0x000008 | `CAP_FIN` | Sender performs graceful FIN teardown (§9) | +| 4 | 0x000010 | `CAP_NESTED` | Sender understands nested payload tags (#76) | +| 5..23| — | RESERVED | MUST be 0 in v1.0; ignored on read | + +The full v1.0 baseline mask is therefore +`0x0100001F` (version 1, all five flags set). + +Rationale for 8+24: the protocol version is a single small monotonic integer, +so 8 bits (255 versions) is ample and keeps it byte-aligned in the high octet +for easy human inspection of the hex. 24 flag bits leaves generous headroom for +post-quantum / streaming / multiplexing features without a v2 wire bump. + +### 4.2 HELLO / WELCOME payload + +The `HELLO` and `WELCOME` payloads are exactly: + +``` +offset size field +0 4 CAPABILITIES u32 (§4.1) +4 8 SESSION_ID u64 (initiator-chosen random; echoed in WELCOME) +``` + +Payload length is 12. The handshake completes when the initiator has sent +`HELLO` and received a `WELCOME` echoing the same `SESSION_ID`. + +### 4.3 Negotiation algorithm + +1. Initiator sends `HELLO` with `SEQ = 0`, its full capability mask, and a + random non-zero `SESSION_ID`. +2. Responder: + - If `MAGIC`/`WIRE_VERSION` invalid → drop / `RST(ERR_PROTOCOL)`. + - Compute `negotiated_version = min(my_version, peer_version)`. + If `negotiated_version == 0` (no common version) → `RST(ERR_VERSION)`. + - Compute `negotiated_flags = my_flags & peer_flags` (intersection: a + feature is active only if **both** peers advertise it). + - Reply `WELCOME` with `SEQ = 0`, `CAPABILITIES = + (negotiated_version << 24) | negotiated_flags`, echoing `SESSION_ID`. +3. Initiator validates the echoed `SESSION_ID`; on mismatch → `RST(ERR_PROTOCOL)`. + The negotiated capability mask in the `WELCOME` is authoritative for **both** + sides for the life of the session. + +A session is **ESTABLISHED** once the initiator processes a valid `WELCOME` and +the responder has sent it. `DATA`/`ACK`/`FIN` before establishment → `RST(ERR_PROTOCOL)`. + +--- + +## 5. Sequence numbers + +- Each direction has its own `u32` SEQ space starting at **0** (the handshake + frame in that direction). +- Every frame that occupies sequence space increments the sender's SEQ by 1: + `HELLO`, `WELCOME`, `DATA`, and `FIN` consume one SEQ each. +- `ACK` and `RST` do **not** consume sequence space (they carry the *current* + SEQ as a non-advancing marker; receivers ignore the SEQ of an `ACK`/`RST` + for ordering purposes). +- SEQ is monotonic and wraps modulo `2^32`. v1.0 sessions are not expected to + exceed `2^31` frames; comparisons use unsigned-mod-2^32 "is-after" semantics + (`a` is after `b` iff `0 < (a - b) mod 2^32 < 2^31`). + +### 5.1 In-order delivery and reordering + +The receiver tracks `expected_seq` (next SEQ it will accept in order), +initialized to `0` and advanced past the handshake frame. + +- **In-order** (`SEQ == expected_seq`): deliver payload to the application, + advance `expected_seq`, then drain any buffered contiguous successors. +- **Future / out-of-order** (`SEQ` is after `expected_seq`): buffer the frame + in a reorder map keyed by SEQ (bounded by `MAX_REORDER`, default 256 frames; + overflow → `RST(ERR_FLOW)`), and re-ack the last in-order SEQ to prompt fast + retransmit of the gap. +- **Duplicate / past** (`SEQ` is before `expected_seq`): the frame has already + been delivered. Drop the payload but still send an `ACK` (the peer's prior + ack was likely lost). + +--- + +## 6. ACK semantics + +ACK is **cumulative**: `ACK = N` means "I have received, in order, every frame +with SEQ ≤ N in your stream." The first legal ack value acknowledges SEQ 0 +(the handshake), so a peer that has only processed the handshake acks `0`. + +- A receiver SHOULD ack the highest *contiguous* SEQ it has delivered, never a + SEQ beyond a gap. +- ACK MAY be piggybacked on any outbound frame (set `ACK_PRESENT`) or sent + standalone as an `ACK` frame. +- Receiving `ACK = N` retires every unacked sent frame with SEQ ≤ N from the + retransmit queue (§7) and is idempotent (a repeated or stale ack is harmless; + an ack for a SEQ already retired is ignored). +- A peer without `CAP_ACK` negotiated MUST NOT be sent retransmits; the session + degrades to fire-and-forget ordered delivery. + +--- + +## 7. Retry / timeout policy + +When `CAP_RETRY` is negotiated, every SEQ-consuming frame is placed on a +**retransmit queue** with a send timestamp until it is cumulatively acked. + +Configurable parameters (defaults chosen for LAN port-9999 agent traffic): + +| Param | Default | Meaning | +|------------------|---------|----------------------------------------------------------| +| `rtt` | 200 ms | Estimated round-trip time; base retransmit timeout (RTO) | +| `rto_multiplier` | 2.0 | Exponential backoff factor per attempt | +| `max_retries` | 5 | Attempts before the frame is declared lost | +| `max_rto` | 30 s | Ceiling on the backed-off timeout | + +- The RTO for attempt `k` (0-indexed) is + `min(rtt * rto_multiplier^k, max_rto)`. +- A frame whose oldest unacked send is older than its current RTO is + **retransmitted** with the `RETRANSMIT` flag set and `attempt` incremented. + Its SEQ and REQUEST_ID are unchanged (so the receiver dedups, §8). +- After `max_retries` retransmits without an ack, the frame is declared lost; + the session raises `ERR_TIMEOUT` to the application and SHOULD `RST`. +- Time is injected (a `now()` callable) so retransmit logic is deterministic and + testable without real clocks. + +Implementations MAY refine `rtt` with an RTT estimator (e.g. EWMA over observed +ack latencies); v1.0 mandates only the static-RTO behaviour above. + +--- + +## 8. REQUEST_ID idempotency and dedup + +- A frame with the `REQ` flag carries a non-zero `u64 REQUEST_ID` chosen by the + sender to uniquely identify a logical request within the session. +- The receiver maintains a **seen-request set**. On a `REQ` frame: + - If `REQUEST_ID` is already in the set → this is a duplicate (e.g. a retried + request whose original was processed but whose response/ack was lost). The + receiver MUST NOT re-execute the application side effect; it re-acks and, if + it cached a response, replays the cached response. + - Otherwise → record the id, deliver to the application exactly once. +- `REQUEST_ID = 0` means "not an idempotent request"; such frames are delivered + by SEQ ordering only and are not deduplicated by id. +- The seen-request set is per-session and discarded on session close. Bounded by + `MAX_REQUEST_IDS` (default 4096, FIFO eviction); eviction only affects + long-lived sessions and never re-executes within the window. + +This makes the at-least-once retransmit of §7 safe: a retransmitted request +arrives with the same REQUEST_ID and is deduplicated. + +--- + +## 9. Session close (FIN) and orphaned sequences + +Close is a **graceful half-close** like TCP: + +1. A side that is done sending sends `FIN` (consuming one SEQ). It MAY piggyback + a final ACK. After sending FIN it MUST NOT originate new `DATA`/`REQ` frames, + but it MUST keep acking inbound frames and MAY still retransmit its own + in-flight (unacked) frames until they are acked or declared lost. +2. The peer, on receiving `FIN`, delivers any buffered in-order payloads up to + the FIN's SEQ, then enters `CLOSING`. It sends its own `FIN` when it too has + no more to send. +3. The session is `CLOSED` once **both** FINs have been sent and all SEQ ≤ each + FIN are cumulatively acked. + +### 9.1 Orphaned-sequence handling + +An **orphaned sequence** is a SEQ that was sent before FIN but is still unacked, +or a buffered out-of-order frame whose predecessors never arrived, at the moment +of close: + +- **Unacked-but-sent (in-flight) on the FIN-sender side:** continue to be + retransmitted per §7 until acked or `max_retries` is hit. FIN does not cancel + the retransmit queue. Only when every SEQ ≤ FIN is acked is the half-stream + truly drained. +- **Buffered out-of-order on the receiver side:** if a gap below the FIN's SEQ + is never filled (predecessor declared lost), the receiver MUST discard the + orphaned buffered frames at close and surface `ERR_INCOMPLETE` for that + session rather than deliver out of order. A clean FIN therefore requires the + full contiguous SEQ range `[0, FIN_SEQ]` to have been delivered. +- A `RST` (§10) is the hard alternative: it abandons all queues immediately + (orphans are dropped, no further retransmit, application sees `ERR_RESET`). + +--- + +## 10. Error codes (RST payload) + +`RST` MAY carry a 4-byte payload: `u32 ERROR_CODE`. + +| Code | Name | Cause | +|------|------------------|----------------------------------------------------| +| 0x01 | `ERR_PROTOCOL` | Malformed frame, illegal state transition | +| 0x02 | `ERR_VERSION` | No common protocol version in handshake | +| 0x03 | `ERR_TIMEOUT` | A frame exceeded `max_retries` without ack | +| 0x04 | `ERR_FLOW` | Reorder/flow bound exceeded (`MAX_REORDER`) | +| 0x05 | `ERR_TOO_LARGE` | Payload exceeded `MAX_PAYLOAD` | +| 0x06 | `ERR_INCOMPLETE` | FIN reached with an unfillable SEQ gap (§9.1) | +| 0x07 | `ERR_RESET` | Peer reset; surfaced to application | + +--- + +## 11. State machine + +``` + send HELLO recv WELCOME (id ok) + CLOSED ───────────────▶ HELLO_SENT ──────────────────────▶ ESTABLISHED + │ │ + │ recv HELLO │ send/recv FIN + └──────────▶ WELCOME_SENT ───── recv DATA/ack ──▶ ESTABLISHED│ + ▼ + CLOSING ──(both FIN + │ + fully + │ acked)──▶ CLOSED + any error / RST at any state ────────────────────────────▶ CLOSED +``` + +- `CLOSED → HELLO_SENT`: initiator sends HELLO. +- `CLOSED → WELCOME_SENT`: responder receives HELLO, sends WELCOME. +- `HELLO_SENT → ESTABLISHED`: initiator receives matching WELCOME. +- `WELCOME_SENT → ESTABLISHED`: responder treats WELCOME-sent as established for + sending DATA / receiving the first post-handshake frame. +- `ESTABLISHED → CLOSING`: either side sends or receives FIN. +- `CLOSING → CLOSED`: both FINs exchanged and all SEQ ≤ FIN acked. +- `* → CLOSED`: any `RST` or fatal error. + +--- + +## 12. Relationship to existing codecs + +- The session `PAYLOAD` of a `DATA` frame is normally an encoded + `PaymentOffer` / `PaymentProof` (see `encode_offer`/`encode_proof`) or a + generic Switchboard frame with nested tags (#76). The session layer treats it + as opaque bytes. +- `CAP_NESTED` advertises that the sender understands the #76 nested-tag frame + inside `DATA` payloads; it does not change session framing. +- The session layer does not sign payloads; payload signing remains the + responsibility of the `signing_transcript` path. + +--- + +## 13. Open questions for magicians / LP review + +1. **Cumulative vs. selective ack.** v1.0 is cumulative-only. Do we want + SACK ranges for lossy WAN links, or is LAN-cumulative enough for v1.0? +2. **SEQ width.** `u32` per direction. Is wrap (2^32 frames) ever a concern for + long-lived streaming sessions, or should streaming use a fresh session? +3. **REQUEST_ID allocation.** Sender-chosen `u64`. Should we mandate a structure + (e.g. high 32 bits = sender id, low 32 = counter) to avoid cross-peer + collisions when sessions are pooled? +4. **Response caching on dedup.** §8 says a deduped request "MAY replay a cached + response." Should v1.0 mandate response caching, or is re-ack-only + sufficient (application owns its own idempotency beyond the ack)? +5. **FIN vs. orphan policy.** §9.1 surfaces `ERR_INCOMPLETE` on an unfillable + gap at FIN. Alternative: allow the application to opt into "deliver what + arrived, gaps reported" — do LP flows ever want partial delivery? +6. **Capability bit budget.** 24 feature bits. Are PQ-signing / streaming / + multiplexing each a single bit, or do some need sub-fields (pushing us toward + a TLV capability list instead of a flat bitmask)? +7. **MAGIC collision.** `0x5A50` ("ZP"). Confirm no clash with the existing + raw ZAP struct stream on port 9999 when both share a socket. diff --git a/switchboard/zap_transport.py b/switchboard/zap_transport.py index e0d3e20..354d3bd 100644 --- a/switchboard/zap_transport.py +++ b/switchboard/zap_transport.py @@ -31,7 +31,10 @@ from __future__ import annotations -from dataclasses import replace +import time +from collections.abc import Callable +from dataclasses import dataclass, field, replace +from enum import Enum from .x402_middleware import PaymentOffer, PaymentProof, PaymentScheme @@ -63,6 +66,51 @@ "encode_proof", "decode_proof", "signing_transcript", + # ── ZAP wire v1.0 session layer (issue #85) ── + "WIRE_MAGIC", + "WIRE_VERSION", + "SESSION_HEADER_SIZE", + "FRAME_HELLO", + "FRAME_WELCOME", + "FRAME_DATA", + "FRAME_ACK", + "FRAME_FIN", + "FRAME_RST", + "FLAG_ACK_PRESENT", + "FLAG_REQ", + "FLAG_RETRANSMIT", + "CAP_VERSION_SHIFT", + "CAP_ACK", + "CAP_RETRY", + "CAP_IDEMPOTENT", + "CAP_FIN", + "CAP_NESTED", + "CAP_BASELINE", + "ERR_PROTOCOL", + "ERR_VERSION", + "ERR_TIMEOUT", + "ERR_FLOW", + "ERR_TOO_LARGE", + "ERR_INCOMPLETE", + "ERR_RESET", + "SessionFrame", + "SessionState", + "ReceiveResult", + "SessionError", + "SessionProtocolError", + "SessionVersionError", + "SessionTimeout", + "SessionIncomplete", + "SessionReset", + "encode_session_frame", + "decode_session_frame", + "encode_capabilities_payload", + "decode_capabilities_payload", + "make_capabilities", + "capability_version", + "capability_flags", + "negotiate_capabilities", + "ZapSession", ] @@ -441,3 +489,517 @@ def decode_proof(wire: bytes) -> PaymentProof: signature=_sig_from_bytes(signature_bytes), timestamp=float(root.uint64(f["timestamp"])), ) + + +# ═══════════════════════════════════════════════════════════════════════════ +# ZAP wire v1.0 session layer (issue #85) +# +# A pure-Python, dependency-free session/framing layer that sits UNDER the +# zap_py payload codecs above. A session carries opaque application payloads +# (typically an encoded PaymentOffer/PaymentProof). The session layer never +# inspects payload bytes. See docs/zap-wire-spec-v1.0.md for the byte format, +# handshake, sequencing, ACK, retry, idempotency, and FIN semantics. +# +# Everything below uses only the standard library so the state machine works +# whether or not zap_py is installed. +# ═══════════════════════════════════════════════════════════════════════════ + +# ─── Wire constants (spec §3) ──────────────────────────────────────────────── + +WIRE_MAGIC = 0x5A50 # "ZP" +WIRE_VERSION = 0x01 +SESSION_HEADER_SIZE = 26 + +# Frame types (spec §3.1) +FRAME_HELLO = 0x01 +FRAME_WELCOME = 0x02 +FRAME_DATA = 0x03 +FRAME_ACK = 0x04 +FRAME_FIN = 0x05 +FRAME_RST = 0x06 +_KNOWN_FRAME_TYPES = { + FRAME_HELLO, + FRAME_WELCOME, + FRAME_DATA, + FRAME_ACK, + FRAME_FIN, + FRAME_RST, +} +# Frame types that consume sequence space (spec §5). +_SEQ_CONSUMING = {FRAME_HELLO, FRAME_WELCOME, FRAME_DATA, FRAME_FIN} + +# FLAGS bitfield (spec §3.2) +FLAG_ACK_PRESENT = 0x01 +FLAG_REQ = 0x02 +FLAG_RETRANSMIT = 0x04 + +# Capability bitmask: 8 bits version (high octet) + 24 bits feature flags (spec §4.1) +CAP_VERSION_SHIFT = 24 +_CAP_FLAGS_MASK = 0x00FFFFFF +CAP_ACK = 0x000001 +CAP_RETRY = 0x000002 +CAP_IDEMPOTENT = 0x000004 +CAP_FIN = 0x000008 +CAP_NESTED = 0x000010 +# v1.0 baseline: version 1 + all five defined flags = 0x0100001F. +CAP_BASELINE = (WIRE_VERSION << CAP_VERSION_SHIFT) | ( + CAP_ACK | CAP_RETRY | CAP_IDEMPOTENT | CAP_FIN | CAP_NESTED +) + +# RST error codes (spec §10) +ERR_PROTOCOL = 0x01 +ERR_VERSION = 0x02 +ERR_TIMEOUT = 0x03 +ERR_FLOW = 0x04 +ERR_TOO_LARGE = 0x05 +ERR_INCOMPLETE = 0x06 +ERR_RESET = 0x07 + +# Tunable bounds (spec §2/§5.1/§8) +MAX_PAYLOAD = 16 * 1024 * 1024 +MAX_REORDER = 256 +MAX_REQUEST_IDS = 4096 + + +class SessionError(RuntimeError): + """Base class for ZAP session-layer errors.""" + + +class SessionProtocolError(SessionError): + """Malformed frame or illegal state transition (ERR_PROTOCOL).""" + + +class SessionVersionError(SessionError): + """No common protocol version during handshake (ERR_VERSION).""" + + +class SessionTimeout(SessionError): + """A frame exceeded max_retries without an ack (ERR_TIMEOUT).""" + + +class SessionIncomplete(SessionError): + """FIN reached with an unfillable sequence gap (ERR_INCOMPLETE).""" + + +class SessionReset(SessionError): + """Peer reset the session (ERR_RESET).""" + + +class SessionState(Enum): + CLOSED = "closed" + HELLO_SENT = "hello_sent" + WELCOME_SENT = "welcome_sent" + ESTABLISHED = "established" + CLOSING = "closing" + + +# ─── Frame encode/decode (spec §3) ────────────────────────────────────────── + + +@dataclass +class SessionFrame: + """A single ZAP session frame (26-byte header + opaque payload).""" + + frame_type: int + seq: int = 0 + ack: int = 0 + flags: int = 0 + request_id: int = 0 + payload: bytes = b"" + + +def encode_session_frame(frame: SessionFrame) -> bytes: + """Serialize a SessionFrame to its canonical big-endian wire bytes.""" + payload = bytes(frame.payload) + if len(payload) > MAX_PAYLOAD: + raise SessionProtocolError(f"payload exceeds MAX_PAYLOAD ({len(payload)} bytes)") + if frame.frame_type not in _KNOWN_FRAME_TYPES: + raise SessionProtocolError(f"unknown frame_type 0x{frame.frame_type:02x}") + header = b"".join( + ( + WIRE_MAGIC.to_bytes(2, "big"), + bytes((WIRE_VERSION, frame.frame_type, frame.flags & 0xFF, 0x00)), + (frame.seq & 0xFFFFFFFF).to_bytes(4, "big"), + (frame.ack & 0xFFFFFFFF).to_bytes(4, "big"), + (frame.request_id & 0xFFFFFFFFFFFFFFFF).to_bytes(8, "big"), + len(payload).to_bytes(4, "big"), + ) + ) + return header + payload + + +def decode_session_frame(wire: bytes) -> SessionFrame: + """Parse canonical wire bytes into a SessionFrame. + + Rejects bad magic / unknown wire version before any other field is + interpreted (spec §4.3). + """ + if len(wire) < SESSION_HEADER_SIZE: + raise SessionProtocolError("frame shorter than 26-byte header") + if int.from_bytes(wire[0:2], "big") != WIRE_MAGIC: + raise SessionProtocolError("bad MAGIC") + if wire[2] != WIRE_VERSION: + raise SessionProtocolError(f"unknown WIRE_VERSION 0x{wire[2]:02x}") + frame_type = wire[3] + if frame_type not in _KNOWN_FRAME_TYPES: + raise SessionProtocolError(f"unknown frame_type 0x{frame_type:02x}") + flags = wire[4] + seq = int.from_bytes(wire[6:10], "big") + ack = int.from_bytes(wire[10:14], "big") + request_id = int.from_bytes(wire[14:22], "big") + payload_len = int.from_bytes(wire[22:26], "big") + payload = wire[SESSION_HEADER_SIZE : SESSION_HEADER_SIZE + payload_len] + if len(payload) != payload_len: + raise SessionProtocolError("truncated payload") + return SessionFrame( + frame_type=frame_type, + seq=seq, + ack=ack, + flags=flags, + request_id=request_id, + payload=payload, + ) + + +# ─── Capability bitmask (spec §4.1/§4.2) ───────────────────────────────────── + + +def make_capabilities(version: int, flags: int) -> int: + """Pack (version, flags) into the u32 capability bitmask.""" + if not 0 <= version <= 0xFF: + raise ValueError("version must fit in 8 bits") + if flags & ~_CAP_FLAGS_MASK: + raise ValueError("flags must fit in 24 bits") + return (version << CAP_VERSION_SHIFT) | (flags & _CAP_FLAGS_MASK) + + +def capability_version(caps: int) -> int: + return (caps >> CAP_VERSION_SHIFT) & 0xFF + + +def capability_flags(caps: int) -> int: + return caps & _CAP_FLAGS_MASK + + +def negotiate_capabilities(local: int, remote: int) -> int: + """Negotiate min(version) and the intersection of feature flags (spec §4.3).""" + version = min(capability_version(local), capability_version(remote)) + if version == 0: + raise SessionVersionError("no common protocol version") + flags = capability_flags(local) & capability_flags(remote) + return make_capabilities(version, flags) + + +def encode_capabilities_payload(caps: int, session_id: int) -> bytes: + """Encode the 12-byte HELLO/WELCOME payload (capabilities + session id).""" + return (caps & 0xFFFFFFFF).to_bytes(4, "big") + ( + session_id & 0xFFFFFFFFFFFFFFFF + ).to_bytes(8, "big") + + +def decode_capabilities_payload(payload: bytes) -> tuple[int, int]: + """Decode a HELLO/WELCOME payload into (capabilities, session_id).""" + if len(payload) != 12: + raise SessionProtocolError("handshake payload must be 12 bytes") + return int.from_bytes(payload[0:4], "big"), int.from_bytes(payload[4:12], "big") + + +def _seq_is_after(a: int, b: int) -> bool: + """Unsigned mod-2^32 'is-after' comparison (spec §5).""" + return 0 < ((a - b) & 0xFFFFFFFF) < 0x80000000 + + +# ─── Receive result ────────────────────────────────────────────────────────── + + +@dataclass +class ReceiveResult: + """Outcome of feeding one inbound frame to a ZapSession.process().""" + + delivered: list[bytes] = field(default_factory=list) + should_ack: bool = False + duplicate: bool = False + fin: bool = False + + +@dataclass +class _Outstanding: + """An unacked sent frame tracked on the retransmit queue (spec §7).""" + + seq: int + wire: bytes + sent_at: float + attempts: int = 0 + + +# ─── Session state machine ─────────────────────────────────────────────────── + + +class ZapSession: + """A pure-Python ZAP wire v1.0 session endpoint. + + Drive it by feeding inbound wire bytes to ``process()`` and transmitting + the wire bytes returned from ``connect()`` / ``accept()`` / ``send_data()`` + / ``make_ack()`` / ``close()`` / ``due_retransmissions()``. The session is + transport-agnostic: it never touches a socket. + """ + + def __init__( + self, + capabilities: int = CAP_BASELINE, + *, + rtt: float = 0.2, + rto_multiplier: float = 2.0, + max_retries: int = 5, + max_rto: float = 30.0, + now: Callable[[], float] | None = None, + ): + self.capabilities = capabilities + self.negotiated = capabilities + self.rtt = rtt + self.rto_multiplier = rto_multiplier + self.max_retries = max_retries + self.max_rto = max_rto + self._now = now or time.monotonic + self.state = SessionState.CLOSED + self.session_id = 0 + + # Send side. + self._send_seq = 0 + self._outstanding: dict[int, _Outstanding] = {} + + # Receive side. + self.expected_seq = 0 # next in-order SEQ we will accept + self.ack_seq = 0 # highest contiguous SEQ delivered (cumulative ack) + self._reorder: dict[int, SessionFrame] = {} + self._seen_requests: dict[int, None] = {} + self._peer_fin_seq: int | None = None + self._sent_fin = False + + # ── helpers ── + + def _next_seq(self) -> int: + seq = self._send_seq + self._send_seq = (self._send_seq + 1) & 0xFFFFFFFF + return seq + + @property + def _retry_enabled(self) -> bool: + return bool(capability_flags(self.negotiated) & CAP_RETRY) + + def _emit(self, frame: SessionFrame, *, track: bool) -> bytes: + """Encode a frame, piggyback the current ack, and (optionally) track it.""" + frame.flags |= FLAG_ACK_PRESENT + frame.ack = self.ack_seq + wire = encode_session_frame(frame) + if track and self._retry_enabled: + self._outstanding[frame.seq] = _Outstanding( + seq=frame.seq, wire=wire, sent_at=self._now() + ) + return wire + + # ── handshake (spec §4) ── + + def connect(self, session_id: int = 1) -> bytes: + """Initiator: build the HELLO frame (SEQ 0) and enter HELLO_SENT.""" + self.session_id = session_id & 0xFFFFFFFFFFFFFFFF + self.state = SessionState.HELLO_SENT + self._send_seq = 0 + frame = SessionFrame( + frame_type=FRAME_HELLO, + seq=self._next_seq(), + payload=encode_capabilities_payload(self.capabilities, self.session_id), + ) + return self._emit(frame, track=True) + + def accept(self, hello_wire: bytes) -> bytes: + """Responder: consume a HELLO, negotiate, build WELCOME, enter ESTABLISHED.""" + hello = decode_session_frame(hello_wire) + if hello.frame_type != FRAME_HELLO: + raise SessionProtocolError("expected HELLO") + peer_caps, session_id = decode_capabilities_payload(hello.payload) + self.negotiated = negotiate_capabilities(self.capabilities, peer_caps) + self.session_id = session_id + # Receive side: the HELLO occupied SEQ 0; we have it in order. + self.expected_seq = (hello.seq + 1) & 0xFFFFFFFF + self.ack_seq = hello.seq + self._send_seq = 0 + welcome = SessionFrame( + frame_type=FRAME_WELCOME, + seq=self._next_seq(), + payload=encode_capabilities_payload(self.negotiated, self.session_id), + ) + self.state = SessionState.ESTABLISHED + return self._emit(welcome, track=True) + + # ── sending (spec §3/§5/§8) ── + + def send_data(self, payload: bytes, *, request_id: int = 0) -> bytes: + """Send a DATA frame; set request_id != 0 for an idempotent request.""" + if self.state not in (SessionState.ESTABLISHED, SessionState.WELCOME_SENT): + raise SessionProtocolError(f"cannot send DATA in state {self.state}") + flags = FLAG_REQ if request_id else 0 + frame = SessionFrame( + frame_type=FRAME_DATA, + seq=self._next_seq(), + flags=flags, + request_id=request_id, + payload=bytes(payload), + ) + return self._emit(frame, track=True) + + def make_ack(self) -> bytes: + """Build a standalone cumulative ACK frame (does not consume SEQ).""" + frame = SessionFrame(frame_type=FRAME_ACK, seq=self._send_seq) + return self._emit(frame, track=False) + + def close(self) -> bytes: + """Send a FIN (consumes one SEQ) and enter CLOSING (spec §9).""" + if self.state == SessionState.CLOSED: + raise SessionProtocolError("session already closed") + frame = SessionFrame(frame_type=FRAME_FIN, seq=self._next_seq()) + wire = self._emit(frame, track=True) + self._sent_fin = True + self._maybe_close() + if self.state != SessionState.CLOSED: + self.state = SessionState.CLOSING + return wire + + # ── receiving (spec §5.1/§6/§8/§9) ── + + def process(self, wire: bytes) -> ReceiveResult: + """Feed one inbound frame; returns delivered payloads + ack signal.""" + frame = decode_session_frame(wire) + + if frame.flags & FLAG_ACK_PRESENT: + self._apply_ack(frame.ack) + + if frame.frame_type == FRAME_WELCOME: + return self._on_welcome(frame) + if frame.frame_type == FRAME_ACK: + return ReceiveResult() + if frame.frame_type == FRAME_RST: + self.state = SessionState.CLOSED + raise SessionReset("peer reset the session") + if frame.frame_type in (FRAME_DATA, FRAME_FIN): + return self._on_sequenced(frame) + if frame.frame_type == FRAME_HELLO: + raise SessionProtocolError("unexpected HELLO on established session") + raise SessionProtocolError(f"unexpected frame_type 0x{frame.frame_type:02x}") + + def _on_welcome(self, frame: SessionFrame) -> ReceiveResult: + if self.state != SessionState.HELLO_SENT: + raise SessionProtocolError("WELCOME outside handshake") + caps, session_id = decode_capabilities_payload(frame.payload) + if session_id != self.session_id: + raise SessionProtocolError("WELCOME session_id mismatch") + self.negotiated = caps + self.expected_seq = (frame.seq + 1) & 0xFFFFFFFF + self.ack_seq = frame.seq + self.state = SessionState.ESTABLISHED + return ReceiveResult(should_ack=True) + + def _on_sequenced(self, frame: SessionFrame) -> ReceiveResult: + if self.state not in (SessionState.ESTABLISHED, SessionState.CLOSING): + raise SessionProtocolError(f"data/fin in state {self.state}") + + # Duplicate / already-delivered past frame: drop payload, still re-ack. + if frame.seq == self.expected_seq: + pass # in order, handled below + elif _seq_is_after(self.expected_seq, frame.seq): + return ReceiveResult(duplicate=True, should_ack=True) + elif _seq_is_after(frame.seq, self.expected_seq): + # A FIN signals the peer will originate no new frames, so a gap + # below it can never be filled — the session is incomplete (§9.1). + if frame.frame_type == FRAME_FIN: + self._peer_fin_seq = frame.seq + raise SessionIncomplete( + f"FIN at seq {frame.seq} with unfilled gap at {self.expected_seq}" + ) + # Future / out-of-order DATA: buffer and re-ack the last in-order SEQ. + if len(self._reorder) >= MAX_REORDER: + raise SessionProtocolError("reorder buffer overflow") + self._reorder[frame.seq] = frame + return ReceiveResult(should_ack=True) + + result = ReceiveResult(should_ack=True) + self._accept_in_order(frame, result) + # Drain any buffered contiguous successors. + while self.expected_seq in self._reorder: + nxt = self._reorder.pop(self.expected_seq) + self._accept_in_order(nxt, result) + return result + + def _accept_in_order(self, frame: SessionFrame, result: ReceiveResult) -> None: + if frame.frame_type == FRAME_FIN: + self._peer_fin_seq = frame.seq + self.ack_seq = frame.seq + self.expected_seq = (frame.seq + 1) & 0xFFFFFFFF + result.fin = True + if self.state == SessionState.ESTABLISHED: + self.state = SessionState.CLOSING + self._maybe_close() + return + + # DATA frame: idempotent dedup on request_id (spec §8). + if frame.flags & FLAG_REQ and frame.request_id: + if frame.request_id in self._seen_requests: + result.duplicate = True + self.ack_seq = frame.seq + self.expected_seq = (frame.seq + 1) & 0xFFFFFFFF + return + self._seen_requests[frame.request_id] = None + if len(self._seen_requests) > MAX_REQUEST_IDS: + # FIFO eviction (dict preserves insertion order). + oldest = next(iter(self._seen_requests)) + del self._seen_requests[oldest] + + result.delivered.append(frame.payload) + self.ack_seq = frame.seq + self.expected_seq = (frame.seq + 1) & 0xFFFFFFFF + + def _apply_ack(self, ack: int) -> None: + """Retire every outstanding frame with SEQ <= ack (cumulative).""" + for seq in list(self._outstanding): + if seq == ack or _seq_is_after(ack, seq): + del self._outstanding[seq] + self._maybe_close() + + def _maybe_close(self) -> None: + """CLOSED once both FINs are exchanged and all our frames are acked.""" + if ( + self._sent_fin + and self._peer_fin_seq is not None + and not self._outstanding + ): + self.state = SessionState.CLOSED + + # ── retransmit (spec §7) ── + + def unacked_seqs(self) -> list[int]: + """SEQs still awaiting a cumulative ack, in order.""" + return sorted(self._outstanding) + + def due_retransmissions(self) -> list[bytes]: + """Retransmit frames whose RTO has elapsed; raise on exhausted retries.""" + if not self._retry_enabled: + return [] + now = self._now() + out: list[bytes] = [] + for seq in sorted(self._outstanding): + item = self._outstanding[seq] + rto = min(self.rtt * (self.rto_multiplier**item.attempts), self.max_rto) + if now - item.sent_at < rto: + continue + if item.attempts >= self.max_retries: + raise SessionTimeout(f"seq {seq} exceeded max_retries") + # Re-mark the wire bytes with the RETRANSMIT flag. + frame = decode_session_frame(item.wire) + frame.flags |= FLAG_RETRANSMIT + frame.ack = self.ack_seq + retx = encode_session_frame(frame) + item.wire = retx + item.attempts += 1 + item.sent_at = now + out.append(retx) + return out diff --git a/tests/protocol_vectors/zap_session.v1.json b/tests/protocol_vectors/zap_session.v1.json new file mode 100644 index 0000000..d01a6e6 --- /dev/null +++ b/tests/protocol_vectors/zap_session.v1.json @@ -0,0 +1,78 @@ +{ + "schema": "switchboard/zap-session/v1", + "version": 1, + "description": "Conformance vectors for the ZAP wire v1.0 session layer: frame header byte layout, handshake payloads, flags, and capability negotiation. See docs/zap-wire-spec-v1.0.md.", + "magic": "0x5a50", + "wire_version": 1, + "header_size": 26, + "capability_baseline": "0x0100001f", + "capability_negotiation": { + "local": "0x0200000b", + "remote": "0x0100000d", + "negotiated": "0x01000009", + "note": "min(version)=1, flags = local & remote = CAP_ACK|CAP_FIN" + }, + "cases": [ + { + "name": "hello", + "frame_type": 1, + "seq": 0, + "flags": 1, + "wire_hex": "5a5001010100000000000000000000000000000000000000000c0100001f0000000000001234", + "payload_hex": "0100001f0000000000001234", + "capabilities": "0x0100001f", + "session_id": 4660 + }, + { + "name": "welcome", + "frame_type": 2, + "seq": 0, + "flags": 1, + "wire_hex": "5a5001020100000000000000000000000000000000000000000c0100001f0000000000001234", + "payload_hex": "0100001f0000000000001234", + "capabilities": "0x0100001f", + "session_id": 4660 + }, + { + "name": "data-request", + "frame_type": 3, + "seq": 1, + "flags": 3, + "wire_hex": "5a500103030000000001000000000000deadbeefcafe00000006636861726765", + "request_id": 244837814094590, + "payload_hex": "636861726765" + }, + { + "name": "ack", + "frame_type": 4, + "seq": 2, + "flags": 1, + "wire_hex": "5a50010401000000000200000002000000000000000000000000" + }, + { + "name": "fin", + "frame_type": 5, + "seq": 2, + "flags": 1, + "wire_hex": "5a50010501000000000200000001000000000000000000000000" + }, + { + "name": "rst", + "frame_type": 6, + "seq": 0, + "flags": 0, + "wire_hex": "5a5001060000000000000000000000000000000000000000000400000003", + "payload_hex": "00000003", + "error_code": 3 + }, + { + "name": "data-retransmit", + "frame_type": 3, + "seq": 1, + "flags": 7, + "wire_hex": "5a500103070000000001000000000000deadbeefcafe00000006636861726765", + "request_id": 244837814094590, + "payload_hex": "636861726765" + } + ] +} diff --git a/tests/test_zap_transport.py b/tests/test_zap_transport.py index 0a837eb..42dc78a 100644 --- a/tests/test_zap_transport.py +++ b/tests/test_zap_transport.py @@ -528,3 +528,370 @@ def test_offer_wire_smaller_than_json(): # Not an absolute guarantee (small offers may bloat with the ZAP header), # but for any realistic offer the binary form should win. assert len(wire) <= len(json_blob) + 64 # tolerance for fixed ZAP header + + +# ─────────────────────────────────────────────────────────────────────────── +# ZAP wire v1.0 session layer (issue #85) +# +# Pure-Python, no zap_py dependency. These tests are NOT gated by ZAP_REQUIRED: +# the session framing and state machine must work with the standard library. +# See docs/zap-wire-spec-v1.0.md. +# ─────────────────────────────────────────────────────────────────────────── + + +class FakeClock: + """Deterministic injectable clock for retransmit-timeout tests.""" + + def __init__(self, start: float = 0.0): + self.t = start + + def __call__(self) -> float: + return self.t + + def advance(self, dt: float) -> None: + self.t += dt + + +# ─── Frame encode/decode ───────────────────────────────────────────────────── + + +def test_session_frame_header_is_26_bytes(): + wire = zt.encode_session_frame(zt.SessionFrame(frame_type=zt.FRAME_DATA, seq=0)) + assert len(wire) == zt.SESSION_HEADER_SIZE == 26 + + +def test_session_frame_roundtrip_all_fields(): + frame = zt.SessionFrame( + frame_type=zt.FRAME_DATA, + flags=zt.FLAG_ACK_PRESENT | zt.FLAG_REQ, + seq=7, + ack=3, + request_id=0xDEADBEEFCAFE, + payload=b"hello-zap", + ) + wire = zt.encode_session_frame(frame) + out = zt.decode_session_frame(wire) + assert out.frame_type == zt.FRAME_DATA + assert out.flags == zt.FLAG_ACK_PRESENT | zt.FLAG_REQ + assert out.seq == 7 + assert out.ack == 3 + assert out.request_id == 0xDEADBEEFCAFE + assert out.payload == b"hello-zap" + + +def test_session_frame_starts_with_magic_and_version(): + wire = zt.encode_session_frame(zt.SessionFrame(frame_type=zt.FRAME_HELLO, seq=0)) + assert wire[0:2] == zt.WIRE_MAGIC.to_bytes(2, "big") + assert wire[2] == zt.WIRE_VERSION + + +def test_session_frame_decode_rejects_bad_magic(): + wire = bytearray(zt.encode_session_frame(zt.SessionFrame(frame_type=zt.FRAME_DATA, seq=0))) + wire[0] ^= 0xFF + with pytest.raises(zt.SessionProtocolError): + zt.decode_session_frame(bytes(wire)) + + +def test_session_frame_decode_rejects_unknown_version(): + wire = bytearray(zt.encode_session_frame(zt.SessionFrame(frame_type=zt.FRAME_DATA, seq=0))) + wire[2] = 0x99 + with pytest.raises(zt.SessionProtocolError): + zt.decode_session_frame(bytes(wire)) + + +# ─── Capability bitmask + negotiation ─────────────────────────────────────── + + +def test_capability_bitmask_layout_is_8_version_24_flags(): + caps = zt.make_capabilities(zt.WIRE_VERSION, zt.CAP_ACK | zt.CAP_FIN) + assert zt.capability_version(caps) == zt.WIRE_VERSION + assert zt.capability_flags(caps) == (zt.CAP_ACK | zt.CAP_FIN) + # version lives in the high 8 bits + assert caps >> 24 == zt.WIRE_VERSION + # flags live in the low 24 bits + assert caps & 0x00FFFFFF == (zt.CAP_ACK | zt.CAP_FIN) + + +def test_capability_baseline_mask_is_0x0100001f(): + assert zt.CAP_BASELINE == 0x0100001F + + +def test_negotiate_intersects_flags_and_min_version(): + local = zt.make_capabilities(2, zt.CAP_ACK | zt.CAP_RETRY | zt.CAP_FIN) + remote = zt.make_capabilities(1, zt.CAP_ACK | zt.CAP_IDEMPOTENT | zt.CAP_FIN) + negotiated = zt.negotiate_capabilities(local, remote) + assert zt.capability_version(negotiated) == 1 # min(2, 1) + assert zt.capability_flags(negotiated) == (zt.CAP_ACK | zt.CAP_FIN) # intersection + + +def test_negotiate_no_common_version_raises(): + local = zt.make_capabilities(0, zt.CAP_ACK) + remote = zt.make_capabilities(1, zt.CAP_ACK) + with pytest.raises(zt.SessionVersionError): + zt.negotiate_capabilities(local, remote) + + +# ─── Handshake roundtrip + capability negotiation ─────────────────────────── + + +def test_handshake_roundtrip_and_capability_negotiation(): + initiator = zt.ZapSession(capabilities=zt.make_capabilities(zt.WIRE_VERSION, zt.CAP_ACK | zt.CAP_FIN)) + responder = zt.ZapSession(capabilities=zt.make_capabilities(zt.WIRE_VERSION, zt.CAP_ACK | zt.CAP_RETRY)) + + hello = initiator.connect(session_id=0x1234) + decoded_hello = zt.decode_session_frame(hello) + assert decoded_hello.frame_type == zt.FRAME_HELLO + assert decoded_hello.seq == 0 + + welcome = responder.accept(hello) + decoded_welcome = zt.decode_session_frame(welcome) + assert decoded_welcome.frame_type == zt.FRAME_WELCOME + + initiator.process(welcome) + + assert initiator.state == zt.SessionState.ESTABLISHED + assert responder.state == zt.SessionState.ESTABLISHED + # negotiated = intersection of the two flag sets + assert zt.capability_flags(initiator.negotiated) == zt.CAP_ACK + assert zt.capability_flags(responder.negotiated) == zt.CAP_ACK + + +def test_accept_rejects_data_before_handshake(): + responder = zt.ZapSession() + data = zt.encode_session_frame(zt.SessionFrame(frame_type=zt.FRAME_DATA, seq=0, payload=b"x")) + with pytest.raises(zt.SessionProtocolError): + responder.process(data) + + +def test_welcome_session_id_mismatch_raises(): + initiator = zt.ZapSession() + initiator.connect(session_id=0xAAAA) + # A WELCOME echoing the wrong session id must be rejected (spec §4.3). + bad = zt.encode_session_frame( + zt.SessionFrame( + frame_type=zt.FRAME_WELCOME, + seq=0, + flags=zt.FLAG_ACK_PRESENT, + payload=zt.encode_capabilities_payload(zt.CAP_BASELINE, session_id=0x9999), + ) + ) + with pytest.raises(zt.SessionProtocolError): + initiator.process(bad) + + +# ─── In-order / out-of-order sequence handling + ACK ───────────────────────── + + +def _establish_pair(): + initiator = zt.ZapSession(capabilities=zt.CAP_BASELINE) + responder = zt.ZapSession(capabilities=zt.CAP_BASELINE) + hello = initiator.connect(session_id=0x55) + welcome = responder.accept(hello) + initiator.process(welcome) + return initiator, responder + + +def test_in_order_data_delivered_and_acked(): + initiator, responder = _establish_pair() + f1 = initiator.send_data(b"one") + f2 = initiator.send_data(b"two") + + r1 = responder.process(f1) + r2 = responder.process(f2) + + assert r1.delivered == [b"one"] + assert r2.delivered == [b"two"] + # cumulative ack: responder has delivered up to seq 2 (hello=0, one=1, two=2) + assert responder.ack_seq == 2 + + +def test_out_of_order_data_buffered_then_drained(): + initiator, responder = _establish_pair() + f1 = initiator.send_data(b"one") # seq 1 + f2 = initiator.send_data(b"two") # seq 2 + + # deliver f2 before f1 + r_future = responder.process(f2) + assert r_future.delivered == [] # buffered, nothing delivered yet + assert responder.ack_seq == 0 # still only handshake is contiguous + + r_gap = responder.process(f1) + # now both drain in order + assert r_gap.delivered == [b"one", b"two"] + assert responder.ack_seq == 2 + + +def test_duplicate_past_frame_dropped_but_reacked(): + initiator, responder = _establish_pair() + f1 = initiator.send_data(b"one") + responder.process(f1) + r_dup = responder.process(f1) # same seq again + assert r_dup.delivered == [] # not redelivered + assert r_dup.should_ack is True # but we re-ack + assert responder.ack_seq == 1 + + +def test_ack_retires_retransmit_queue(): + initiator, responder = _establish_pair() + f1 = initiator.send_data(b"one") # seq 1 + f2 = initiator.send_data(b"two") # seq 2 + assert initiator.unacked_seqs() == [1, 2] + + # Deliver both in order so the responder can cumulatively ack up to seq 2. + responder.process(f1) + ack_frame = responder.process(f2) + assert ack_frame.should_ack + assert responder.ack_seq == 2 + + initiator.process(responder.make_ack()) # ACK = 2 retires seq 1 and 2 + assert initiator.unacked_seqs() == [] + + +# ─── Retry after timeout ───────────────────────────────────────────────────── + + +def test_no_retransmit_before_rto(): + clock = FakeClock() + initiator = zt.ZapSession(capabilities=zt.CAP_BASELINE, rtt=0.2, now=clock) + responder = zt.ZapSession(capabilities=zt.CAP_BASELINE, now=clock) + initiator.process(responder.accept(initiator.connect(session_id=1))) + + initiator.send_data(b"lossy") # seq 1, sent at t=0 + clock.advance(0.1) # < rtt + assert initiator.due_retransmissions() == [] + + +def test_retransmit_after_rto_sets_retransmit_flag(): + clock = FakeClock() + initiator = zt.ZapSession(capabilities=zt.CAP_BASELINE, rtt=0.2, now=clock) + responder = zt.ZapSession(capabilities=zt.CAP_BASELINE, now=clock) + initiator.process(responder.accept(initiator.connect(session_id=1))) + + initiator.send_data(b"lossy") # seq 1 + clock.advance(0.25) # > rtt + due = initiator.due_retransmissions() + assert len(due) == 1 + rt = zt.decode_session_frame(due[0]) + assert rt.seq == 1 + assert rt.flags & zt.FLAG_RETRANSMIT + assert rt.payload == b"lossy" + + +def test_retransmit_backoff_then_declared_lost(): + clock = FakeClock() + initiator = zt.ZapSession( + capabilities=zt.CAP_BASELINE, rtt=0.2, rto_multiplier=2.0, max_retries=2, now=clock + ) + responder = zt.ZapSession(capabilities=zt.CAP_BASELINE, now=clock) + initiator.process(responder.accept(initiator.connect(session_id=1))) + + initiator.send_data(b"lossy") # seq 1 + # attempt 0 RTO = 0.2; attempt 1 RTO = 0.4 + clock.advance(0.25) + assert len(initiator.due_retransmissions()) == 1 # retransmit #1 + clock.advance(0.45) + assert len(initiator.due_retransmissions()) == 1 # retransmit #2 (== max_retries) + clock.advance(1.0) + # max_retries exhausted -> frame declared lost + with pytest.raises(zt.SessionTimeout): + initiator.due_retransmissions() + + +# ─── Idempotent request_id dedup ───────────────────────────────────────────── + + +def test_idempotent_request_deduped_on_retransmit(): + initiator, responder = _establish_pair() + req = initiator.send_data(b"charge", request_id=0xABCD) + r1 = responder.process(req) + assert r1.delivered == [b"charge"] + + # retransmit of the same request (same seq + request_id) + r2 = responder.process(req) + assert r2.delivered == [] # NOT re-executed + assert r2.duplicate is True + assert r2.should_ack is True + + +def test_distinct_request_ids_both_delivered(): + initiator, responder = _establish_pair() + r1 = responder.process(initiator.send_data(b"a", request_id=1)) + r2 = responder.process(initiator.send_data(b"b", request_id=2)) + assert r1.delivered == [b"a"] + assert r2.delivered == [b"b"] + + +# ─── Clean FIN with in-flight frames ───────────────────────────────────────── + + +def test_clean_fin_with_in_flight_frames(): + initiator, responder = _establish_pair() + data = initiator.send_data(b"last") # seq 1, in-flight (unacked) + fin = initiator.close() # seq 2 FIN + assert zt.decode_session_frame(fin).frame_type == zt.FRAME_FIN + assert initiator.state == zt.SessionState.CLOSING + # in-flight data is NOT cancelled by FIN + assert 1 in initiator.unacked_seqs() + + # responder receives the in-flight data, then the FIN, in order + responder.process(data) + fin_result = responder.process(fin) + assert fin_result.fin is True + assert responder.state == zt.SessionState.CLOSING + + # responder half-closes back; its FIN piggybacks an ack covering the + # initiator's in-flight DATA + FIN, so the initiator drains and CLOSES. + responder_fin = responder.close() + initiator.process(responder_fin) + assert initiator.unacked_seqs() == [] # in-flight DATA+FIN now acked + assert initiator.state == zt.SessionState.CLOSED + + +def test_fin_with_unfillable_gap_is_incomplete(): + initiator, responder = _establish_pair() + initiator.send_data(b"one") # seq 1 — will be "lost" (never delivered) + initiator.send_data(b"two") # seq 2 + fin = initiator.close() # seq 3 FIN + + # responder only ever sees seq 2 and the FIN, never seq 1 + responder.process(zt.encode_session_frame( + zt.SessionFrame(frame_type=zt.FRAME_DATA, seq=2, payload=b"two") + )) + with pytest.raises(zt.SessionIncomplete): + responder.process(fin) + + +# ─── Conformance vectors ───────────────────────────────────────────────────── + + +def test_session_conformance_vectors_decode_to_expected_fields(): + vectors = json.loads( + Path("tests/protocol_vectors/zap_session.v1.json").read_text(encoding="utf-8") + ) + assert vectors["schema"] == "switchboard/zap-session/v1" + cases = {c["name"]: c for c in vectors["cases"]} + assert {"hello", "welcome", "data-request", "ack", "fin", "rst"} <= set(cases) + + for case in vectors["cases"]: + wire = bytes.fromhex(case["wire_hex"]) + frame = zt.decode_session_frame(wire) + assert frame.frame_type == case["frame_type"] + assert frame.seq == case["seq"] + assert frame.flags == case["flags"] + if "request_id" in case: + assert frame.request_id == case["request_id"] + if "payload_hex" in case: + assert frame.payload == bytes.fromhex(case["payload_hex"]) + # re-encoding must reproduce the exact bytes (deterministic codec) + assert zt.encode_session_frame(frame) == wire + + +def test_session_conformance_capability_vector_matches_negotiation(): + vectors = json.loads( + Path("tests/protocol_vectors/zap_session.v1.json").read_text(encoding="utf-8") + ) + cap = vectors["capability_negotiation"] + local = int(cap["local"], 16) + remote = int(cap["remote"], 16) + expected = int(cap["negotiated"], 16) + assert zt.negotiate_capabilities(local, remote) == expected