From 246da8b5b6ad63333cef8dc4a45dad329b35ed2d Mon Sep 17 00:00:00 2001 From: Hubert Gruszecki Date: Thu, 11 Jun 2026 09:00:55 +0200 Subject: [PATCH] feat(server-ng): reject incompatible clients at login via protocol version Version mismatches between SDK and server surfaced as runtime decode failures instead of a clear error at login. The VSR login-register body now starts with a required ClientVersionInfo prefix: the packed iggy_binary_protocol crate semver plus sdk name and version. The server gates on a [min, current-minor] range before touching credentials (patch never changes the wire) and rejects with a typed Eviction frame carrying the accepted window; a body without a decodable prefix is rejected with a dedicated MalformedLogin reason instead of the version window. The response advertises the server's protocol and build version; the language-neutral wire spec lives in the version module docs. The SDK decodes eviction and reply headers by byte offset instead of struct casts: response read buffers are not 16-aligned, and offset reads let the client sanity-check the protocol window. The new EvictionReason variants and the EvictionHeader protocol window are appended into reserved space, leaving existing consensus frames and discriminants byte-compatible. The SDK validates password and PAT token lengths before encoding, with the same bounds and errors the servers already enforce, so an oversized secret cannot desync the u8 length prefix; server-ng mirrors the legacy username/password bounds before lookup or hashing. SDK identity is recorded per connection in the SessionManager; get_clients wire exposure is a follow-up. The legacy server is untouched. --- Cargo.lock | 1 + core/binary_protocol/README.md | 2 + core/binary_protocol/src/consensus/header.rs | 92 ++++- core/binary_protocol/src/lib.rs | 5 + .../src/requests/users/change_password.rs | 5 + .../src/requests/users/create_user.rs | 4 + .../src/requests/users/login_register.rs | 139 +++++--- .../requests/users/login_register_with_pat.rs | 108 +++--- .../src/requests/users/login_user.rs | 4 + .../src/responses/users/login_register.rs | 66 +++- core/binary_protocol/src/version.rs | 336 ++++++++++++++++++ core/common/src/error/iggy_error.rs | 9 + core/common/src/traits/binary_impls/mod.rs | 43 +++ .../binary_impls/personal_access_tokens.rs | 17 +- core/common/src/traits/binary_impls/users.rs | 13 +- core/common/src/traits/binary_transport.rs | 4 + core/consensus/src/metadata_helpers.rs | 43 ++- core/integration/Cargo.toml | 1 + core/integration/tests/sdk/mod.rs | 2 + .../integration/tests/sdk/protocol_version.rs | 122 +++++++ core/sdk/src/lib.rs | 5 + core/sdk/src/quic/quic_client.rs | 4 + core/sdk/src/tcp/tcp_client.rs | 4 + core/sdk/src/vsr.rs | 162 ++++++--- core/sdk/src/websocket/websocket_client.rs | 4 + core/server-ng/src/auth.rs | 28 +- core/server-ng/src/dispatch.rs | 100 +++++- core/server-ng/src/responses.rs | 20 +- core/server-ng/src/session_manager.rs | 72 +++- core/shard/src/lib.rs | 7 + 30 files changed, 1224 insertions(+), 198 deletions(-) create mode 100644 core/binary_protocol/src/version.rs create mode 100644 core/integration/tests/sdk/protocol_version.rs diff --git a/Cargo.lock b/Cargo.lock index 5dcfa64639..12abef58dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7246,6 +7246,7 @@ dependencies = [ "async-trait", "base64", "bon", + "bytemuck", "bytes", "cfg_aliases", "compio", diff --git a/core/binary_protocol/README.md b/core/binary_protocol/README.md index dcffb06760..46000f702e 100644 --- a/core/binary_protocol/README.md +++ b/core/binary_protocol/README.md @@ -12,6 +12,8 @@ Wire protocol types and codec for the [Apache Iggy](https://iggy.apache.org) binary protocol, shared between server and SDK. This crate is an internal building block. Most users should depend on the [`iggy`](https://crates.io/crates/iggy) SDK crate instead. +The language-neutral wire spec for the login protocol-version handshake (packed version layout, `ClientVersionInfo` prefix, gate semantics, rejection frame offsets) lives in the [`version` module docs](src/version.rs). + > Apache Iggy (Incubating) is an effort undergoing incubation at the Apache Software Foundation (ASF), sponsored by the Apache Incubator PMC. > > Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. diff --git a/core/binary_protocol/src/consensus/header.rs b/core/binary_protocol/src/consensus/header.rs index 773a2bb178..e46385cd9c 100644 --- a/core/binary_protocol/src/consensus/header.rs +++ b/core/binary_protocol/src/consensus/header.rs @@ -369,6 +369,12 @@ pub enum EvictionReason { InvalidToken = 10, UserInactive = 11, SessionError = 12, + /// Client protocol version outside the server's accepted range; the + /// header carries `server_protocol_version{,_min}` so the SDK can + /// report the exact window. + IncompatibleProtocol = 13, + /// Login body without a decodable `ClientVersionInfo` prefix. + MalformedLogin = 14, } // EvictionHeader - primary -> client (session-terminal, no body) @@ -390,7 +396,10 @@ pub struct EvictionHeader { pub reserved_frame: [u8; 66], pub client: u128, - pub reserved: [u8; 111], + /// Accepted protocol window on `IncompatibleProtocol`; zero otherwise. + pub server_protocol_version: u32, + pub server_protocol_version_min: u32, + pub reserved: [u8; 103], pub reason: EvictionReason, } const _: () = { @@ -399,6 +408,8 @@ const _: () = { offset_of!(EvictionHeader, client) == offset_of!(EvictionHeader, reserved_frame) + size_of::<[u8; 66]>() ); + assert!(offset_of!(EvictionHeader, server_protocol_version) == 144); + assert!(offset_of!(EvictionHeader, server_protocol_version_min) == 148); assert!(offset_of!(EvictionHeader, reason) + size_of::() == HEADER_SIZE); }; @@ -411,7 +422,8 @@ impl EvictionHeader { /// # Panics (debug) /// On `ClientReleaseTooLow`/`ClientReleaseTooHigh`: `release` hardcoded /// to 0, those reasons need real bounds. Add `release_min`/`release_max` - /// params before emitting them. + /// params before emitting them. On `IncompatibleProtocol`: needs the + /// accepted protocol window, use [`Self::incompatible_protocol`] instead. /// /// # Safety /// `client` must be non-zero , `validate` rejects zero so SDKs can @@ -428,9 +440,11 @@ impl EvictionHeader { debug_assert!( !matches!( reason, - EvictionReason::ClientReleaseTooLow | EvictionReason::ClientReleaseTooHigh, + EvictionReason::ClientReleaseTooLow + | EvictionReason::ClientReleaseTooHigh + | EvictionReason::IncompatibleProtocol, ), - "EvictionHeader::new: ClientRelease* needs release_min/release_max", + "EvictionHeader::new: ClientRelease*/IncompatibleProtocol need extra fields", ); // Cap from consensus REPLICAS_MAX=32; literal here to avoid // wire-proto crate depending on consensus crate. @@ -449,10 +463,30 @@ impl EvictionHeader { replica, reserved_frame: [0; 66], client, - reserved: [0; 111], + server_protocol_version: 0, + server_protocol_version_min: 0, + reserved: [0; 103], reason, } } + + /// Protocol-version rejection carrying the accepted window so the SDK + /// can report `client X, server accepts [min, max]`. + #[must_use] + pub const fn incompatible_protocol( + cluster: u128, + view: u32, + replica: u8, + client: u128, + server_protocol_version: u32, + server_protocol_version_min: u32, + ) -> Self { + let mut header = Self::new(cluster, view, replica, client, EvictionReason::NoSession); + header.reason = EvictionReason::IncompatibleProtocol; + header.server_protocol_version = server_protocol_version; + header.server_protocol_version_min = server_protocol_version_min; + header + } } impl ConsensusHeader for EvictionHeader { @@ -508,6 +542,22 @@ impl ConsensusHeader for EvictionHeader { "eviction: reason must not be Reserved".to_string(), )); } + // Protocol window only travels on IncompatibleProtocol; anywhere + // else a nonzero value is smuggling (same rule as reserved bytes). + if self.reason == EvictionReason::IncompatibleProtocol { + if self.server_protocol_version_min == 0 + || self.server_protocol_version < self.server_protocol_version_min + { + return Err(ConsensusError::InvalidField( + "eviction: incompatible-protocol window must satisfy 1 <= min <= max" + .to_string(), + )); + } + } else if self.server_protocol_version != 0 || self.server_protocol_version_min != 0 { + return Err(ConsensusError::InvalidField( + "eviction: protocol window bytes must be zero".to_string(), + )); + } Ok(()) } } @@ -1075,6 +1125,38 @@ mod tests { assert_eq!(EvictionReason::InvalidToken as u8, 10); assert_eq!(EvictionReason::UserInactive as u8, 11); assert_eq!(EvictionReason::SessionError as u8, 12); + assert_eq!(EvictionReason::IncompatibleProtocol as u8, 13); + assert_eq!(EvictionReason::MalformedLogin as u8, 14); + } + + #[test] + fn eviction_incompatible_protocol_accepts_valid_window() { + let header = EvictionHeader::incompatible_protocol(0, 0, 0, 0xCAFE, 2, 1); + assert!(header.validate().is_ok()); + assert_eq!(header.reason, EvictionReason::IncompatibleProtocol); + assert_eq!(header.server_protocol_version, 2); + assert_eq!(header.server_protocol_version_min, 1); + } + + #[test] + fn eviction_incompatible_protocol_rejects_inverted_window() { + let header = EvictionHeader::incompatible_protocol(0, 0, 0, 0xCAFE, 1, 2); + assert!(header.validate().is_err()); + } + + #[test] + fn eviction_incompatible_protocol_rejects_zero_min() { + let header = EvictionHeader::incompatible_protocol(0, 0, 0, 0xCAFE, 1, 0); + assert!(header.validate().is_err()); + } + + // Protocol window is IncompatibleProtocol-only; nonzero elsewhere is + // smuggling, same rule as the reserved-byte guards. + #[test] + fn eviction_validate_rejects_window_on_other_reason() { + let mut header = EvictionHeader::new(0, 0, 0, 0xCAFE, EvictionReason::NoSession); + header.server_protocol_version = 1; + assert!(header.validate().is_err()); } #[test] diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs index 00e2e5653c..91f845f272 100644 --- a/core/binary_protocol/src/lib.rs +++ b/core/binary_protocol/src/lib.rs @@ -67,6 +67,7 @@ pub mod namespace; pub mod primitives; pub mod requests; pub mod responses; +pub mod version; pub use codec::{WireDecode, WireEncode}; pub use consensus::{ @@ -94,6 +95,10 @@ pub use primitives::user_headers::{ WireHeaderKind, WireUserHeaderEntry, WireUserHeaderIterator, WireUserHeaders, encode_user_headers, user_headers_encoded_size, validate_user_headers, }; +pub use version::{ + ClientVersionInfo, IGGY_PROTOCOL_VERSION, IGGY_PROTOCOL_VERSION_MIN, ProtocolVersion, + is_protocol_compatible, +}; /// Maximum number of partitions allowed in a single create/delete request. pub const MAX_PARTITIONS_PER_REQUEST: u32 = 1000; diff --git a/core/binary_protocol/src/requests/users/change_password.rs b/core/binary_protocol/src/requests/users/change_password.rs index ef0a26179c..82503c8b57 100644 --- a/core/binary_protocol/src/requests/users/change_password.rs +++ b/core/binary_protocol/src/requests/users/change_password.rs @@ -39,6 +39,11 @@ impl WireEncode for ChangePasswordRequest { fn encode(&self, buf: &mut BytesMut) { self.user_id.encode(buf); + debug_assert!( + u8::try_from(self.current_password.len()).is_ok() + && u8::try_from(self.new_password.len()).is_ok(), + "password exceeds u8 length prefix; callers must validate before encoding" + ); #[allow(clippy::cast_possible_truncation)] buf.put_u8(self.current_password.len() as u8); buf.put_slice(self.current_password.as_bytes()); diff --git a/core/binary_protocol/src/requests/users/create_user.rs b/core/binary_protocol/src/requests/users/create_user.rs index 8f17710c75..2cb6634ed8 100644 --- a/core/binary_protocol/src/requests/users/create_user.rs +++ b/core/binary_protocol/src/requests/users/create_user.rs @@ -50,6 +50,10 @@ impl WireEncode for CreateUserRequest { fn encode(&self, buf: &mut BytesMut) { self.username.encode(buf); + debug_assert!( + u8::try_from(self.password.len()).is_ok(), + "password exceeds u8 length prefix; callers must validate before encoding" + ); #[allow(clippy::cast_possible_truncation)] buf.put_u8(self.password.len() as u8); buf.put_slice(self.password.as_bytes()); diff --git a/core/binary_protocol/src/requests/users/login_register.rs b/core/binary_protocol/src/requests/users/login_register.rs index e91beb3840..b3c3fc3234 100644 --- a/core/binary_protocol/src/requests/users/login_register.rs +++ b/core/binary_protocol/src/requests/users/login_register.rs @@ -18,72 +18,71 @@ use crate::WireError; use crate::codec::{WireDecode, WireEncode, read_str, read_u8, read_u32_le}; use crate::primitives::identifier::WireName; +use crate::version::ClientVersionInfo; use bytes::{BufMut, BytesMut}; use secrecy::{ExposeSecret, SecretString}; /// Combined login + register request for server-ng. /// -/// The server verifies credentials locally, then submits `Operation::Register` -/// through consensus. The response carries `user_id` + `session` (commit op -/// number). The `client_id` is carried in the VSR `RequestHeader.client` field -/// (populated by the SDK at encode time); the body no longer duplicates it. +/// The server gates on `version_info.protocol_version` (see +/// [`crate::version::is_protocol_compatible`]), verifies credentials +/// locally, then submits `Operation::Register` through consensus. The +/// response carries `user_id` + `session` (commit op number) plus the +/// server's own protocol version and build version. The `client_id` is +/// carried in the VSR `RequestHeader.client` field (populated by the SDK +/// at encode time); the body no longer duplicates it. /// /// Wire format: /// ```text +/// [ClientVersionInfo] /// [username_len:u8][username:N][password_len:u8][password:N] -/// [version_len:u32_le][version:N?][context_len:u32_le][context:N?] +/// [context_len:u32_le][context:N?] /// ``` /// +/// `ClientVersionInfo` is encoded first so the server can always parse the +/// version prefix and reject incompatible clients (with an +/// `EvictionReason::IncompatibleProtocol` frame carrying the accepted +/// range) before touching the rest of the body. +/// /// # Cross-version compatibility /// /// This wire shape is gated by the `vsr` cargo feature and lives under /// `LOGIN_REGISTER_CODE`. The legacy `LOGIN_USER_CODE` shape (still in use -/// by non-`vsr` builds) is untouched. -/// -/// | Client | Server | Behavior | -/// |-----------------|-----------------|------------------------------------------------| -/// | `vsr` SDK | `vsr` server-ng | Works | -/// | non-`vsr` SDK | `vsr` server-ng | `LOGIN_USER_CODE` -- handled by legacy path | -/// | `vsr` SDK | non-`vsr` server| Server returns `IggyError::InvalidCommand` | -/// | non-`vsr` SDK | non-`vsr` server| Works | -/// -/// Foreign-language SDKs (C++, C#, Python, Go, Java) currently speak only -/// the legacy shape; they will silently pick the working leg above until -/// they wire VSR framing. Bump `IGGY_PROTOCOL_VERSION` (or the equivalent -/// when the project tracks one) when this changes. +/// by non-`vsr` builds against the legacy `iggy-server`) is untouched. +/// server-ng speaks VSR framing only; a non-`vsr` SDK cannot log in to +/// server-ng. Foreign-language SDKs (C++, C#, Python, Go, Java) adopt this +/// shape, with their own `sdk_name`, when they wire VSR framing. Bump +/// [`crate::version::IGGY_PROTOCOL_VERSION`] on any wire-incompatible +/// change. #[derive(Debug, Clone)] pub struct LoginRegisterRequest { + pub version_info: ClientVersionInfo, pub username: WireName, pub password: SecretString, - pub version: Option, pub client_context: Option, } impl WireEncode for LoginRegisterRequest { fn encoded_size(&self) -> usize { - self.username.encoded_size() + self.version_info.encoded_size() + + self.username.encoded_size() + 1 + self.password.expose_secret().len() + 4 - + self.version.as_ref().map_or(0, String::len) - + 4 + self.client_context.as_ref().map_or(0, String::len) } fn encode(&self, buf: &mut BytesMut) { + self.version_info.encode(buf); self.username.encode(buf); let password = self.password.expose_secret(); + debug_assert!( + u8::try_from(password.len()).is_ok(), + "password exceeds u8 length prefix; callers must validate before encoding" + ); #[allow(clippy::cast_possible_truncation)] buf.put_u8(password.len() as u8); buf.put_slice(password.as_bytes()); - match &self.version { - Some(v) => { - #[allow(clippy::cast_possible_truncation)] - buf.put_u32_le(v.len() as u32); - buf.put_slice(v.as_bytes()); - } - None => buf.put_u32_le(0), - } match &self.client_context { Some(c) => { #[allow(clippy::cast_possible_truncation)] @@ -95,30 +94,30 @@ impl WireEncode for LoginRegisterRequest { } } -impl WireDecode for LoginRegisterRequest { - fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { - let (username, name_len) = WireName::decode(buf)?; +impl LoginRegisterRequest { + /// Decode the body after the [`ClientVersionInfo`] prefix has already + /// been consumed; the server-side login gate decodes the prefix once + /// for both login-register shapes. Returns the bytes consumed from + /// `tail`. + /// + /// # Errors + /// [`WireError`] when `tail` is truncated or a field is malformed. + pub fn decode_after_prefix( + version_info: ClientVersionInfo, + tail: &[u8], + ) -> Result<(Self, usize), WireError> { + let (username, name_len) = WireName::decode(tail)?; let mut pos = name_len; - let password_len = read_u8(buf, pos)? as usize; + let password_len = read_u8(tail, pos)? as usize; pos += 1; - let password = SecretString::from(read_str(buf, pos, password_len)?); + let password = SecretString::from(read_str(tail, pos, password_len)?); pos += password_len; - let version_len = read_u32_le(buf, pos)? as usize; - pos += 4; - let version = if version_len > 0 { - let v = read_str(buf, pos, version_len)?; - pos += version_len; - Some(v) - } else { - None - }; - - let client_context_len = read_u32_le(buf, pos)? as usize; + let client_context_len = read_u32_le(tail, pos)? as usize; pos += 4; let client_context = if client_context_len > 0 { - let c = read_str(buf, pos, client_context_len)?; + let c = read_str(tail, pos, client_context_len)?; pos += client_context_len; Some(c) } else { @@ -127,9 +126,9 @@ impl WireDecode for LoginRegisterRequest { Ok(( Self { + version_info, username, password, - version, client_context, }, pos, @@ -137,23 +136,39 @@ impl WireDecode for LoginRegisterRequest { } } +impl WireDecode for LoginRegisterRequest { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let (version_info, prefix_len) = ClientVersionInfo::decode(buf)?; + let (request, body_len) = Self::decode_after_prefix(version_info, &buf[prefix_len..])?; + Ok((request, prefix_len + body_len)) + } +} + #[cfg(test)] mod tests { use super::*; + fn version_info() -> ClientVersionInfo { + ClientVersionInfo { + protocol_version: 1, + sdk_name: WireName::new("rust-sdk").unwrap(), + sdk_version: WireName::new("1.0.0").unwrap(), + } + } + fn assert_req_eq(a: &LoginRegisterRequest, b: &LoginRegisterRequest) { + assert_eq!(a.version_info, b.version_info); assert_eq!(a.username, b.username); assert_eq!(a.password.expose_secret(), b.password.expose_secret()); - assert_eq!(a.version, b.version); assert_eq!(a.client_context, b.client_context); } #[test] fn roundtrip_full() { let req = LoginRegisterRequest { + version_info: version_info(), username: WireName::new("admin").unwrap(), password: SecretString::from("secret"), - version: Some("1.0.0".to_string()), client_context: Some("rust-sdk".to_string()), }; let bytes = req.to_bytes(); @@ -163,11 +178,11 @@ mod tests { } #[test] - fn roundtrip_no_optionals() { + fn roundtrip_no_context() { let req = LoginRegisterRequest { + version_info: version_info(), username: WireName::new("user").unwrap(), password: SecretString::from("pass"), - version: None, client_context: None, }; let bytes = req.to_bytes(); @@ -179,9 +194,9 @@ mod tests { #[test] fn encoded_size_matches_output() { let req = LoginRegisterRequest { + version_info: version_info(), username: WireName::new("admin").unwrap(), password: SecretString::from("p"), - version: Some("v1".to_string()), client_context: Some("ctx".to_string()), }; assert_eq!(req.encoded_size(), req.to_bytes().len()); @@ -190,9 +205,9 @@ mod tests { #[test] fn truncated_returns_error() { let req = LoginRegisterRequest { + version_info: version_info(), username: WireName::new("u").unwrap(), password: SecretString::from("p"), - version: Some("v".to_string()), client_context: Some("c".to_string()), }; let bytes = req.to_bytes(); @@ -205,16 +220,22 @@ mod tests { } #[test] - fn wire_layout_username_first() { + fn wire_layout_version_info_first() { let req = LoginRegisterRequest { + version_info: version_info(), username: WireName::new("u").unwrap(), password: SecretString::from("p"), - version: None, client_context: None, }; let bytes = req.to_bytes(); - // Username: [1, b'u'], password: [1, b'p'], version: [0,0,0,0], client_context: [0,0,0,0] - assert_eq!(bytes[0], 1); // username len - assert_eq!(bytes[1], b'u'); + // protocol_version u32 LE, then sdk_name [8, b"rust-sdk"]. + assert_eq!(u32::from_le_bytes(bytes[..4].try_into().unwrap()), 1); + assert_eq!(bytes[4], 8); + assert_eq!(&bytes[5..13], b"rust-sdk"); + // The version prefix alone must decode even when the rest is garbage. + let prefix_len = req.version_info.encoded_size(); + let (prefix, consumed) = ClientVersionInfo::decode(&bytes).unwrap(); + assert_eq!(consumed, prefix_len); + assert_eq!(prefix, req.version_info); } } diff --git a/core/binary_protocol/src/requests/users/login_register_with_pat.rs b/core/binary_protocol/src/requests/users/login_register_with_pat.rs index 275b0e96b2..5e2ce8f860 100644 --- a/core/binary_protocol/src/requests/users/login_register_with_pat.rs +++ b/core/binary_protocol/src/requests/users/login_register_with_pat.rs @@ -17,50 +17,51 @@ use crate::WireError; use crate::codec::{WireDecode, WireEncode, read_str, read_u8, read_u32_le}; +use crate::version::ClientVersionInfo; use bytes::{BufMut, BytesMut}; use secrecy::{ExposeSecret, SecretString}; /// Combined login-with-PAT + register request for server-ng. /// -/// The server verifies the token locally, then submits `Operation::Register` -/// through consensus. The response carries `user_id` + `session` (commit op -/// number). The `client_id` is carried in the VSR `RequestHeader.client` field -/// (populated by the SDK at encode time); the body no longer duplicates it. +/// Shares the `ClientVersionInfo` prefix with `LoginRegisterRequest` so the +/// server gates on the protocol version once before attempting either body +/// shape. The server verifies the token locally, then submits +/// `Operation::Register` through consensus. The `client_id` is carried in +/// the VSR `RequestHeader.client` field (populated by the SDK at encode +/// time); the body no longer duplicates it. /// /// Wire format: /// ```text +/// [ClientVersionInfo] /// [token_len:u8][token:N] -/// [version_len:u32_le][version:N?][context_len:u32_le][context:N?] +/// [context_len:u32_le][context:N?] /// ``` #[derive(Debug, Clone)] pub struct LoginRegisterWithPatRequest { + pub version_info: ClientVersionInfo, pub token: SecretString, - pub version: Option, pub client_context: Option, } impl WireEncode for LoginRegisterWithPatRequest { fn encoded_size(&self) -> usize { - 1 + self.token.expose_secret().len() - + 4 - + self.version.as_ref().map_or(0, String::len) + self.version_info.encoded_size() + + 1 + + self.token.expose_secret().len() + 4 + self.client_context.as_ref().map_or(0, String::len) } fn encode(&self, buf: &mut BytesMut) { + self.version_info.encode(buf); let token = self.token.expose_secret(); + debug_assert!( + u8::try_from(token.len()).is_ok(), + "token exceeds u8 length prefix; callers must validate before encoding" + ); #[allow(clippy::cast_possible_truncation)] buf.put_u8(token.len() as u8); buf.put_slice(token.as_bytes()); - match &self.version { - Some(v) => { - #[allow(clippy::cast_possible_truncation)] - buf.put_u32_le(v.len() as u32); - buf.put_slice(v.as_bytes()); - } - None => buf.put_u32_le(0), - } match &self.client_context { Some(c) => { #[allow(clippy::cast_possible_truncation)] @@ -72,27 +73,27 @@ impl WireEncode for LoginRegisterWithPatRequest { } } -impl WireDecode for LoginRegisterWithPatRequest { - fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { - let token_len = read_u8(buf, 0)? as usize; +impl LoginRegisterWithPatRequest { + /// Decode the body after the [`ClientVersionInfo`] prefix has already + /// been consumed; the server-side login gate decodes the prefix once + /// for both login-register shapes. Returns the bytes consumed from + /// `tail`. + /// + /// # Errors + /// [`WireError`] when `tail` is truncated or a field is malformed. + pub fn decode_after_prefix( + version_info: ClientVersionInfo, + tail: &[u8], + ) -> Result<(Self, usize), WireError> { + let token_len = read_u8(tail, 0)? as usize; let mut pos = 1; - let token = SecretString::from(read_str(buf, pos, token_len)?); + let token = SecretString::from(read_str(tail, pos, token_len)?); pos += token_len; - let version_len = read_u32_le(buf, pos)? as usize; - pos += 4; - let version = if version_len > 0 { - let v = read_str(buf, pos, version_len)?; - pos += version_len; - Some(v) - } else { - None - }; - - let client_context_len = read_u32_le(buf, pos)? as usize; + let client_context_len = read_u32_le(tail, pos)? as usize; pos += 4; let client_context = if client_context_len > 0 { - let c = read_str(buf, pos, client_context_len)?; + let c = read_str(tail, pos, client_context_len)?; pos += client_context_len; Some(c) } else { @@ -101,8 +102,8 @@ impl WireDecode for LoginRegisterWithPatRequest { Ok(( Self { + version_info, token, - version, client_context, }, pos, @@ -110,21 +111,38 @@ impl WireDecode for LoginRegisterWithPatRequest { } } +impl WireDecode for LoginRegisterWithPatRequest { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let (version_info, prefix_len) = ClientVersionInfo::decode(buf)?; + let (request, body_len) = Self::decode_after_prefix(version_info, &buf[prefix_len..])?; + Ok((request, prefix_len + body_len)) + } +} + #[cfg(test)] mod tests { use super::*; + use crate::primitives::identifier::WireName; + + fn version_info() -> ClientVersionInfo { + ClientVersionInfo { + protocol_version: 1, + sdk_name: WireName::new("rust-sdk").unwrap(), + sdk_version: WireName::new("1.0.0").unwrap(), + } + } fn assert_req_eq(a: &LoginRegisterWithPatRequest, b: &LoginRegisterWithPatRequest) { + assert_eq!(a.version_info, b.version_info); assert_eq!(a.token.expose_secret(), b.token.expose_secret()); - assert_eq!(a.version, b.version); assert_eq!(a.client_context, b.client_context); } #[test] fn roundtrip_full() { let req = LoginRegisterWithPatRequest { + version_info: version_info(), token: SecretString::from("pat-abc123def456"), - version: Some("1.0.0".to_string()), client_context: Some("rust-sdk".to_string()), }; let bytes = req.to_bytes(); @@ -134,10 +152,10 @@ mod tests { } #[test] - fn roundtrip_no_optionals() { + fn roundtrip_no_context() { let req = LoginRegisterWithPatRequest { + version_info: version_info(), token: SecretString::from("tok"), - version: None, client_context: None, }; let bytes = req.to_bytes(); @@ -149,8 +167,8 @@ mod tests { #[test] fn encoded_size_matches_output() { let req = LoginRegisterWithPatRequest { + version_info: version_info(), token: SecretString::from("t"), - version: Some("v1".to_string()), client_context: Some("ctx".to_string()), }; assert_eq!(req.encoded_size(), req.to_bytes().len()); @@ -159,8 +177,8 @@ mod tests { #[test] fn truncated_returns_error() { let req = LoginRegisterWithPatRequest { + version_info: version_info(), token: SecretString::from("t"), - version: Some("v".to_string()), client_context: Some("c".to_string()), }; let bytes = req.to_bytes(); @@ -173,15 +191,15 @@ mod tests { } #[test] - fn wire_layout_token_first() { + fn wire_layout_version_info_first() { let req = LoginRegisterWithPatRequest { + version_info: version_info(), token: SecretString::from("t"), - version: None, client_context: None, }; let bytes = req.to_bytes(); - // Token: [1, b't'], version: [0,0,0,0], client_context: [0,0,0,0] - assert_eq!(bytes[0], 1); // token len - assert_eq!(bytes[1], b't'); + assert_eq!(u32::from_le_bytes(bytes[..4].try_into().unwrap()), 1); + assert_eq!(bytes[4], 8); + assert_eq!(&bytes[5..13], b"rust-sdk"); } } diff --git a/core/binary_protocol/src/requests/users/login_user.rs b/core/binary_protocol/src/requests/users/login_user.rs index ef12a9dc1e..7b9ce0327e 100644 --- a/core/binary_protocol/src/requests/users/login_user.rs +++ b/core/binary_protocol/src/requests/users/login_user.rs @@ -49,6 +49,10 @@ impl WireEncode for LoginUserRequest { fn encode(&self, buf: &mut BytesMut) { self.username.encode(buf); + debug_assert!( + u8::try_from(self.password.len()).is_ok(), + "password exceeds u8 length prefix; callers must validate before encoding" + ); #[allow(clippy::cast_possible_truncation)] buf.put_u8(self.password.len() as u8); buf.put_slice(self.password.as_bytes()); diff --git a/core/binary_protocol/src/responses/users/login_register.rs b/core/binary_protocol/src/responses/users/login_register.rs index 59bdd578d2..dd9714a938 100644 --- a/core/binary_protocol/src/responses/users/login_register.rs +++ b/core/binary_protocol/src/responses/users/login_register.rs @@ -17,31 +17,38 @@ use crate::WireError; use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le}; +use crate::primitives::identifier::WireName; use bytes::{BufMut, BytesMut}; /// Combined login + register response for server-ng. /// -/// Returns the authenticated user's ID and the consensus session number -/// (commit op number from the Register operation). +/// Returns the authenticated user's ID, the consensus session number +/// (commit op number from the Register operation), and the server's +/// protocol version + build version so both sides know what they talk to. /// -/// Wire format (12 bytes): +/// Wire format: /// ```text -/// [user_id:4 LE][session:8 LE] +/// [user_id:u32 LE][session:u64 LE][server_protocol_version:u32 LE] +/// [server_version_len:u8][server_version:N] /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct LoginRegisterResponse { pub user_id: u32, pub session: u64, + pub server_protocol_version: u32, + pub server_version: WireName, } impl WireEncode for LoginRegisterResponse { fn encoded_size(&self) -> usize { - 12 + 16 + self.server_version.encoded_size() } fn encode(&self, buf: &mut BytesMut) { buf.put_u32_le(self.user_id); buf.put_u64_le(self.session); + buf.put_u32_le(self.server_protocol_version); + self.server_version.encode(buf); } } @@ -49,7 +56,17 @@ impl WireDecode for LoginRegisterResponse { fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { let user_id = read_u32_le(buf, 0)?; let session = read_u64_le(buf, 4)?; - Ok((Self { user_id, session }, 12)) + let server_protocol_version = read_u32_le(buf, 12)?; + let (server_version, server_version_len) = WireName::decode(&buf[16..])?; + Ok(( + Self { + user_id, + session, + server_protocol_version, + server_version, + }, + 16 + server_version_len, + )) } } @@ -57,26 +74,33 @@ impl WireDecode for LoginRegisterResponse { mod tests { use super::*; - #[test] - fn roundtrip() { - let resp = LoginRegisterResponse { + fn sample() -> LoginRegisterResponse { + LoginRegisterResponse { user_id: 42, session: 100, - }; + server_protocol_version: 1, + server_version: WireName::new("0.8.0").unwrap(), + } + } + + #[test] + fn roundtrip() { + let resp = sample(); let bytes = resp.to_bytes(); - assert_eq!(bytes.len(), 12); let (decoded, consumed) = LoginRegisterResponse::decode(&bytes).unwrap(); - assert_eq!(consumed, 12); + assert_eq!(consumed, bytes.len()); assert_eq!(decoded, resp); } + #[test] + fn encoded_size_matches_output() { + let resp = sample(); + assert_eq!(resp.encoded_size(), resp.to_bytes().len()); + } + #[test] fn truncated_returns_error() { - let resp = LoginRegisterResponse { - user_id: 1, - session: 1, - }; - let bytes = resp.to_bytes(); + let bytes = sample().to_bytes(); for i in 0..bytes.len() { assert!( LoginRegisterResponse::decode(&bytes[..i]).is_err(), @@ -90,6 +114,8 @@ mod tests { let resp = LoginRegisterResponse { user_id: 0x0102_0304, session: 0x0506_0708_090A_0B0C, + server_protocol_version: 0x0D0E_0F10, + server_version: WireName::new("v").unwrap(), }; let bytes = resp.to_bytes(); assert_eq!( @@ -100,5 +126,11 @@ mod tests { u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0x0506_0708_090A_0B0C ); + assert_eq!( + u32::from_le_bytes(bytes[12..16].try_into().unwrap()), + 0x0D0E_0F10 + ); + assert_eq!(bytes[16], 1); + assert_eq!(bytes[17], b'v'); } } diff --git a/core/binary_protocol/src/version.rs b/core/binary_protocol/src/version.rs new file mode 100644 index 0000000000..5dcb25ea5a --- /dev/null +++ b/core/binary_protocol/src/version.rs @@ -0,0 +1,336 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary protocol versioning. +//! +//! The protocol version is this crate's own semver, const-parsed from +//! `CARGO_PKG_VERSION` into a packed `u32` so it auto-bumps with releases +//! and stays cheaply comparable. It is exchanged during the login/register +//! handshake: clients send [`ClientVersionInfo`] as the body prefix of both +//! login-register request shapes, the server gates on +//! [`is_protocol_compatible`] before touching credentials and advertises +//! its own version in the response. Incompatible clients are rejected with +//! an `EvictionReason::IncompatibleProtocol` frame carrying the accepted +//! range. +//! +//! # Wire spec (language-neutral) +//! +//! Reference for SDKs that do not consume this crate. All integers are +//! little-endian on the wire. +//! +//! ## Packed protocol version +//! +//! A semver `major.minor.patch` packs into one `u32`, 10 bits per +//! component (each must be < 1024): +//! +//! ```text +//! bits 31..30 reserved (zero) +//! bits 29..20 major +//! bits 19..10 minor +//! bits 9..0 patch +//! value = major << 20 | minor << 10 | patch +//! ``` +//! +//! Integer order equals semver order. The value tracks the +//! `iggy_binary_protocol` crate release; under 0.x a minor bump may break +//! the wire, so the gate is minor-scoped. Past 1.0.0 the gate follows +//! strict semver: major bump = incompatible, minor/patch = compatible. +//! +//! ## `ClientVersionInfo` body prefix +//! +//! ```text +//! [protocol_version: u32] +//! [sdk_name_len: u8][sdk_name: UTF-8, 1-255 bytes] +//! [sdk_version_len: u8][sdk_version: UTF-8, 1-255 bytes] +//! ``` +//! +//! ## Login gate +//! +//! The server accepts a client when its packed version is +//! `>= IGGY_PROTOCOL_VERSION_MIN` and its `major.minor` is `<=` the +//! server's. Patch releases never change the wire, so the upper bound +//! ignores patch. `IGGY_PROTOCOL_VERSION_MIN` defaults to the current +//! version with patch zeroed and is widened deliberately when a minor +//! bump stays wire-compatible. +//! +//! ## Rejection frame +//! +//! An incompatible login is answered with a header-only 256-byte +//! `Eviction` frame (`EvictionHeader` in `consensus::header`): `reason` +//! at byte 255 is `IncompatibleProtocol` (10), and the accepted window +//! sits at fixed offsets: `server_protocol_version` (max) at byte 144, +//! `server_protocol_version_min` at byte 148, both packed `u32`. +//! +//! A login body without a decodable `ClientVersionInfo` prefix is +//! rejected with reason `MalformedLogin` (11) instead; the window bytes +//! are zero. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode, read_u32_le}; +use crate::primitives::identifier::WireName; +use bytes::{BufMut, BytesMut}; + +/// Bits per packed semver component (each must be < 1024). +const COMPONENT_BITS: u32 = 10; +const COMPONENT_MAX: u32 = (1 << COMPONENT_BITS) - 1; +const PATCH_MASK: u32 = COMPONENT_MAX; + +/// Current binary protocol version: this crate's semver, packed. +/// Pre-release tags (`-edge.N`) are ignored. +pub const IGGY_PROTOCOL_VERSION: u32 = parse_packed_semver(env!("CARGO_PKG_VERSION")); + +/// Oldest protocol version this build still accepts at login: the current +/// version with patch zeroed (patch releases never change the wire). +/// Widen deliberately when a minor bump stays wire-compatible. +// TODO(hubcio): past 1.0.0 follow strict semver: major bump = incompatible, +// minor/patch = compatible, so MIN derives from the current major instead +// of the current minor. Under 0.x a minor bump may break the wire, so the +// minor-scoped window is correct. +pub const IGGY_PROTOCOL_VERSION_MIN: u32 = IGGY_PROTOCOL_VERSION & !PATCH_MASK; +// A 0.0.x crate version would pack MIN to 0, which `EvictionHeader::validate` +// rejects (window requires min >= 1) -- the server would emit eviction frames +// that fail its own validation. +const _: () = assert!(IGGY_PROTOCOL_VERSION_MIN > 0); + +/// Range check used by the server-side login gate. +/// +/// Packed component order preserves semver ordering, so plain integer +/// comparisons work. The upper bound ignores patch: patch releases never +/// change the wire, so a newer patch of an accepted minor is compatible. +#[must_use] +pub const fn is_protocol_compatible(client: u32) -> bool { + client >= IGGY_PROTOCOL_VERSION_MIN + && (client >> COMPONENT_BITS) <= (IGGY_PROTOCOL_VERSION >> COMPONENT_BITS) +} + +/// Pack semver components: `major << 20 | minor << 10 | patch`. +/// +/// # Panics +/// At compile time (const context) when any component exceeds 1023. +#[must_use] +pub const fn pack_protocol_version(major: u32, minor: u32, patch: u32) -> u32 { + assert!( + major <= COMPONENT_MAX && minor <= COMPONENT_MAX && patch <= COMPONENT_MAX, + "semver component exceeds 10-bit packing range" + ); + (major << (2 * COMPONENT_BITS)) | (minor << COMPONENT_BITS) | patch +} + +/// Display adapter for a packed protocol version (`major.minor.patch`). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ProtocolVersion(pub u32); + +impl std::fmt::Display for ProtocolVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}.{}.{}", + self.0 >> (2 * COMPONENT_BITS), + (self.0 >> COMPONENT_BITS) & COMPONENT_MAX, + self.0 & PATCH_MASK + ) + } +} + +/// Const-parse `major.minor.patch[-pre]` into a packed `u32`. +/// Malformed input is a compile error in const context. +const fn parse_packed_semver(version: &str) -> u32 { + let bytes = version.as_bytes(); + let (major, i) = parse_component(bytes, 0); + assert!( + i < bytes.len() && bytes[i] == b'.', + "expected '.' after major" + ); + let (minor, j) = parse_component(bytes, i + 1); + assert!( + j < bytes.len() && bytes[j] == b'.', + "expected '.' after minor" + ); + let (patch, k) = parse_component(bytes, j + 1); + assert!( + k == bytes.len() || bytes[k] == b'-' || bytes[k] == b'+', + "unexpected trailing bytes after patch" + ); + pack_protocol_version(major, minor, patch) +} + +/// Parse a decimal run starting at `start`; returns (value, index past digits). +const fn parse_component(bytes: &[u8], start: usize) -> (u32, usize) { + assert!( + start < bytes.len() && bytes[start].is_ascii_digit(), + "expected digit" + ); + let mut value: u32 = 0; + let mut i = start; + while i < bytes.len() && bytes[i].is_ascii_digit() { + value = value * 10 + (bytes[i] - b'0') as u32; + i += 1; + } + (value, i) +} + +/// Client identity sent as the prefix of every login-register request body. +/// +/// Wire format: +/// ```text +/// [protocol_version:u32 LE][sdk_name_len:u8][sdk_name:N][sdk_version_len:u8][sdk_version:N] +/// ``` +/// +/// `protocol_version` is the packed `iggy_binary_protocol` crate version the +/// client was built against; `sdk_version` is the client crate's own version +/// (e.g. the `iggy` crate for the Rust SDK). Encoded first so the server can +/// parse and gate on it regardless of how the rest of the body evolves. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ClientVersionInfo { + pub protocol_version: u32, + /// SDK identifier, e.g. `rust-sdk`, `go-sdk`. + pub sdk_name: WireName, + /// SDK build version, e.g. `0.10.1`. + pub sdk_version: WireName, +} + +impl WireEncode for ClientVersionInfo { + fn encoded_size(&self) -> usize { + 4 + self.sdk_name.encoded_size() + self.sdk_version.encoded_size() + } + + fn encode(&self, buf: &mut BytesMut) { + buf.put_u32_le(self.protocol_version); + self.sdk_name.encode(buf); + self.sdk_version.encode(buf); + } +} + +impl WireDecode for ClientVersionInfo { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let protocol_version = read_u32_le(buf, 0)?; + let mut pos = 4; + let (sdk_name, sdk_name_len) = WireName::decode(&buf[pos..])?; + pos += sdk_name_len; + let (sdk_version, sdk_version_len) = WireName::decode(&buf[pos..])?; + pos += sdk_version_len; + Ok(( + Self { + protocol_version, + sdk_name, + sdk_version, + }, + pos, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample() -> ClientVersionInfo { + ClientVersionInfo { + protocol_version: IGGY_PROTOCOL_VERSION, + sdk_name: WireName::new("rust-sdk").unwrap(), + sdk_version: WireName::new("0.10.1").unwrap(), + } + } + + #[test] + fn roundtrip() { + let info = sample(); + let bytes = info.to_bytes(); + let (decoded, consumed) = ClientVersionInfo::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, info); + } + + #[test] + fn encoded_size_matches_output() { + let info = sample(); + assert_eq!(info.encoded_size(), info.to_bytes().len()); + } + + #[test] + fn truncated_returns_error() { + let bytes = sample().to_bytes(); + for i in 0..bytes.len() { + assert!( + ClientVersionInfo::decode(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn wire_layout_protocol_version_first() { + let bytes = sample().to_bytes(); + assert_eq!( + u32::from_le_bytes(bytes[..4].try_into().unwrap()), + IGGY_PROTOCOL_VERSION + ); + assert_eq!(bytes[4], 8); // sdk_name len + assert_eq!(&bytes[5..13], b"rust-sdk"); + } + + #[test] + fn parses_crate_version_with_prerelease() { + assert_eq!( + parse_packed_semver("0.10.1-edge.2"), + pack_protocol_version(0, 10, 1) + ); + assert_eq!(parse_packed_semver("1.2.3"), pack_protocol_version(1, 2, 3)); + assert_eq!( + parse_packed_semver("10.0.0+build.5"), + pack_protocol_version(10, 0, 0) + ); + } + + #[test] + fn packing_preserves_semver_order() { + assert!(pack_protocol_version(0, 9, 999) < pack_protocol_version(0, 10, 0)); + assert!(pack_protocol_version(0, 10, 1) < pack_protocol_version(1, 0, 0)); + } + + #[test] + fn min_is_current_with_patch_zeroed() { + assert_eq!(IGGY_PROTOCOL_VERSION_MIN & PATCH_MASK, 0); + assert_eq!( + IGGY_PROTOCOL_VERSION >> COMPONENT_BITS, + IGGY_PROTOCOL_VERSION_MIN >> COMPONENT_BITS + ); + } + + #[test] + fn compatibility_range_boundaries() { + assert!(is_protocol_compatible(IGGY_PROTOCOL_VERSION_MIN)); + assert!(is_protocol_compatible(IGGY_PROTOCOL_VERSION)); + // Patch never changes the wire: any patch of the current minor passes. + assert!(is_protocol_compatible(IGGY_PROTOCOL_VERSION | PATCH_MASK)); + // Next minor is outside the window. + assert!(!is_protocol_compatible( + ((IGGY_PROTOCOL_VERSION >> COMPONENT_BITS) + 1) << COMPONENT_BITS + )); + if IGGY_PROTOCOL_VERSION_MIN > 0 { + assert!(!is_protocol_compatible(IGGY_PROTOCOL_VERSION_MIN - 1)); + } + } + + #[test] + fn protocol_version_display() { + assert_eq!( + ProtocolVersion(pack_protocol_version(0, 10, 1)).to_string(), + "0.10.1" + ); + } +} diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index 816d8fac57..d13f0bcfbf 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -512,6 +512,15 @@ pub enum IggyError { InvalidSession(u64) = 14001, #[error("Replicated command with unknown code {0}")] UnknownReplicatedCommand(u32) = 14002, + /// Packed protocol versions, see `iggy_binary_protocol::ProtocolVersion`. + /// Field order: `(client_version, server_min, server_max)`. + #[error( + "Incompatible binary protocol version: client {}, server accepts [{}, {}]", + iggy_binary_protocol::ProtocolVersion(*.0), + iggy_binary_protocol::ProtocolVersion(*.1), + iggy_binary_protocol::ProtocolVersion(*.2) + )] + IncompatibleProtocolVersion(u32, u32, u32) = 14003, } impl IggyError { diff --git a/core/common/src/traits/binary_impls/mod.rs b/core/common/src/traits/binary_impls/mod.rs index cbdd3edb9e..42245cff83 100644 --- a/core/common/src/traits/binary_impls/mod.rs +++ b/core/common/src/traits/binary_impls/mod.rs @@ -28,7 +28,37 @@ mod topics; mod users; use crate::IggyError; +use crate::http::users::defaults::{MAX_PASSWORD_LENGTH, MIN_PASSWORD_LENGTH}; use iggy_binary_protocol::WireDecode; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::{ClientVersionInfo, IGGY_PROTOCOL_VERSION, WireName}; + +/// SDK identifier sent in the login-register version prefix. Foreign SDKs +/// send their own (e.g. `go-sdk`) once they adopt VSR framing. +#[cfg(feature = "vsr")] +pub(crate) const RUST_SDK_NAME: &str = "rust-sdk"; + +/// Version prefix for both login-register request shapes. `sdk_version` +/// comes from [`crate::VsrSessionControl::sdk_version`] so it is the SDK +/// crate's version, not this crate's. +#[cfg(feature = "vsr")] +pub(crate) fn rust_sdk_version_info(sdk_version: &str) -> Result { + Ok(ClientVersionInfo { + protocol_version: IGGY_PROTOCOL_VERSION, + sdk_name: WireName::new(RUST_SDK_NAME).expect("RUST_SDK_NAME is 1-255 bytes"), + sdk_version: WireName::new(sdk_version).map_err(|_| IggyError::InvalidFormat)?, + }) +} + +/// Same bounds and error every server (HTTP and binary) enforces, applied +/// before encoding so an oversized password can never desync the u8 length +/// prefix on the wire. +pub(crate) fn validate_password(password: &str) -> Result<(), IggyError> { + if !(MIN_PASSWORD_LENGTH..=MAX_PASSWORD_LENGTH).contains(&password.len()) { + return Err(IggyError::InvalidPassword); + } + Ok(()) +} /// Decode a wire response, logging the error details before converting to `IggyError`. pub(crate) fn decode_response(response: &[u8]) -> Result { @@ -37,3 +67,16 @@ pub(crate) fn decode_response(response: &[u8]) -> Result PersonalAccessTokenClient for B { ) -> Result { #[cfg(feature = "vsr")] { + // Same bounds the non-vsr branch gets from `WireName::new(token)`; + // the request stores a `SecretString`, so enforce them here to keep + // the u8 length prefix consistent with the realized bytes. + if token.is_empty() || token.len() > MAX_WIRE_NAME_LENGTH { + return Err(IggyError::InvalidFormat); + } let response = match self .send_raw_with_response( LOGIN_REGISTER_WITH_PAT_CODE, LoginRegisterWithPatRequest { + version_info: super::rust_sdk_version_info(self.sdk_version())?, token: SecretString::from(token.to_string()), - version: Some(env!("CARGO_PKG_VERSION").to_string()), - client_context: Some(String::new()), + client_context: None, } .to_bytes(), ) @@ -132,6 +140,11 @@ impl PersonalAccessTokenClient for B { self.reset_vsr_session().await?; return Err(error); } + tracing::debug!( + server_version = %wire_resp.server_version, + server_protocol_version = wire_resp.server_protocol_version, + "authenticated against iggy server" + ); self.set_state(ClientState::Authenticated).await; self.publish_event(DiagnosticEvent::SignedIn).await; return Ok(IdentityInfo { diff --git a/core/common/src/traits/binary_impls/users.rs b/core/common/src/traits/binary_impls/users.rs index fa7cc64112..2b9d9fe390 100644 --- a/core/common/src/traits/binary_impls/users.rs +++ b/core/common/src/traits/binary_impls/users.rs @@ -85,6 +85,7 @@ impl UserClient for B { permissions: Option, ) -> Result { fail_if_not_authenticated(self).await?; + super::validate_password(password)?; let wire_name = WireName::new(username).map_err(|_| IggyError::InvalidFormat)?; let wire_perms = permissions.as_ref().map(permissions_to_wire); let response = self @@ -166,6 +167,8 @@ impl UserClient for B { new_password: &str, ) -> Result<(), IggyError> { fail_if_not_authenticated(self).await?; + super::validate_password(current_password)?; + super::validate_password(new_password)?; let wire_id = identifier_to_wire(user_id)?; self.send_raw_with_response( CHANGE_PASSWORD_CODE, @@ -181,6 +184,7 @@ impl UserClient for B { } async fn login_user(&self, username: &str, password: &str) -> Result { + super::validate_password(password)?; #[cfg(feature = "vsr")] { let wire_name = WireName::new(username).map_err(|_| IggyError::InvalidFormat)?; @@ -188,10 +192,10 @@ impl UserClient for B { .send_raw_with_response( LOGIN_REGISTER_CODE, LoginRegisterRequest { + version_info: super::rust_sdk_version_info(self.sdk_version())?, username: wire_name, password: SecretString::from(password.to_string()), - version: Some(env!("CARGO_PKG_VERSION").to_string()), - client_context: Some(String::new()), + client_context: None, } .to_bytes(), ) @@ -214,6 +218,11 @@ impl UserClient for B { self.reset_vsr_session().await?; return Err(error); } + tracing::debug!( + server_version = %wire_resp.server_version, + server_protocol_version = wire_resp.server_protocol_version, + "authenticated against iggy server" + ); self.set_state(ClientState::Authenticated).await; self.publish_event(DiagnosticEvent::SignedIn).await; return Ok(IdentityInfo { diff --git a/core/common/src/traits/binary_transport.rs b/core/common/src/traits/binary_transport.rs index 6eff73f4d4..2efc51dd0a 100644 --- a/core/common/src/traits/binary_transport.rs +++ b/core/common/src/traits/binary_transport.rs @@ -47,6 +47,10 @@ mod vsr_session_sealed { pub trait VsrSessionControl: vsr_session_sealed::Sealed + BinaryTransport { async fn bind_vsr_session(&self, session: u64) -> Result<(), IggyError>; async fn reset_vsr_session(&self) -> Result<(), IggyError>; + /// SDK crate version sent in the login-register version prefix. + /// Implemented by the transports so the value is the SDK crate's own + /// `CARGO_PKG_VERSION` (`iggy` for Rust), not `iggy_common`'s. + fn sdk_version(&self) -> &'static str; } #[cfg(feature = "vsr")] diff --git a/core/consensus/src/metadata_helpers.rs b/core/consensus/src/metadata_helpers.rs index 53061aa812..8471e47e77 100644 --- a/core/consensus/src/metadata_helpers.rs +++ b/core/consensus/src/metadata_helpers.rs @@ -21,7 +21,9 @@ use crate::client_table::{ClientTable, REGISTER_REQUEST_ID, RequestStatus}; use crate::{Consensus, Pipeline, PipelineEntry, VsrConsensus}; -use iggy_binary_protocol::{EvictionHeader, EvictionReason, HEADER_SIZE}; +use iggy_binary_protocol::{ + EvictionHeader, EvictionReason, HEADER_SIZE, IGGY_PROTOCOL_VERSION, IGGY_PROTOCOL_VERSION_MIN, +}; use message_bus::MessageBus; use server_common::iobuf::Frozen; use server_common::{MESSAGE_ALIGN, Message}; @@ -302,13 +304,48 @@ pub fn build_eviction_message( reason != EvictionReason::Reserved, "build_eviction_message: Reserved is sentinel; pick a real variant" ); + build_eviction_from_header(EvictionHeader::new( + ctx.cluster, + ctx.view, + ctx.replica, + client_id, + reason, + )) +} +/// `IncompatibleProtocol` eviction carrying the server's accepted protocol +/// window, see [`EvictionHeader::incompatible_protocol`]. +/// +/// # Panics +/// Unreachable: zeroed `HEADER_SIZE` buffer is always a valid `EvictionHeader`. +#[must_use] +pub fn build_incompatible_protocol_eviction_message( + ctx: EvictionContext, + client_id: u128, +) -> Message { + debug_assert!( + client_id != 0, + "build_incompatible_protocol_eviction_message: client_id != 0" + ); + build_eviction_from_header(EvictionHeader::incompatible_protocol( + ctx.cluster, + ctx.view, + ctx.replica, + client_id, + IGGY_PROTOCOL_VERSION, + IGGY_PROTOCOL_VERSION_MIN, + )) +} + +/// # Panics +/// Unreachable: zeroed `HEADER_SIZE` buffer is always a valid `EvictionHeader`. +fn build_eviction_from_header(header: EvictionHeader) -> Message { let mut msg = Message::::new(HEADER_SIZE); - let header = bytemuck::checked::try_from_bytes_mut::( + let slot = bytemuck::checked::try_from_bytes_mut::( &mut msg.as_mut_slice()[..HEADER_SIZE], ) .expect("zeroed bytes are valid"); - *header = EvictionHeader::new(ctx.cluster, ctx.view, ctx.replica, client_id, reason); + *slot = header; msg } diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index 81ba70ed95..cc68a09a04 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -38,6 +38,7 @@ assert_cmd = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } bon = { workspace = true } +bytemuck = { workspace = true } bytes = { workspace = true } compio = { workspace = true } configs = { workspace = true } diff --git a/core/integration/tests/sdk/mod.rs b/core/integration/tests/sdk/mod.rs index 25358cba49..2802a9f35a 100644 --- a/core/integration/tests/sdk/mod.rs +++ b/core/integration/tests/sdk/mod.rs @@ -20,4 +20,6 @@ mod hello_world; mod messages; #[cfg(not(feature = "vsr"))] mod producer; +#[cfg(feature = "vsr")] +mod protocol_version; mod raw; diff --git a/core/integration/tests/sdk/protocol_version.rs b/core/integration/tests/sdk/protocol_version.rs new file mode 100644 index 0000000000..5aa595d4dc --- /dev/null +++ b/core/integration/tests/sdk/protocol_version.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Negative tests for the login protocol-version gate. The Rust SDK always +//! sends a well-formed prefix, so the rejections are hand-crafted on a raw +//! TCP socket: `[256-byte RequestHeader][LoginRegisterRequest body]`. An +//! out-of-window version must be answered with a header-only `Eviction` +//! frame carrying `IncompatibleProtocol` plus the accepted window; a body +//! without a decodable prefix with `MalformedLogin` and a zero window. + +#![cfg(feature = "vsr")] + +use iggy::prelude::*; +use iggy_binary_protocol::codec::WireEncode; +use iggy_binary_protocol::consensus::{Command2, Operation, RequestHeader}; +use iggy_binary_protocol::namespace::METADATA_CONSENSUS_NAMESPACE; +use iggy_binary_protocol::requests::users::LoginRegisterRequest; +use iggy_binary_protocol::{ + ClientVersionInfo, HEADER_SIZE, IGGY_PROTOCOL_VERSION, IGGY_PROTOCOL_VERSION_MIN, WireName, +}; +use integration::harness::TestHarness; +use integration::iggy_harness; +use secrecy::SecretString; +use std::mem::offset_of; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; + +const EVICTION_REASON_INCOMPATIBLE_PROTOCOL: u8 = 10; +const EVICTION_REASON_MALFORMED_LOGIN: u8 = 11; + +#[iggy_harness] +async fn given_incompatible_protocol_version_when_logging_in_should_receive_eviction( + harness: &TestHarness, +) { + let body = LoginRegisterRequest { + version_info: ClientVersionInfo { + protocol_version: 0, + sdk_name: WireName::new("rust-sdk").unwrap(), + sdk_version: WireName::new("0.0.1").unwrap(), + }, + username: WireName::new(DEFAULT_ROOT_USERNAME).unwrap(), + password: SecretString::from(DEFAULT_ROOT_PASSWORD), + client_context: None, + } + .to_bytes(); + + assert_login_evicted( + harness, + &body, + EVICTION_REASON_INCOMPATIBLE_PROTOCOL, + (IGGY_PROTOCOL_VERSION, IGGY_PROTOCOL_VERSION_MIN), + ) + .await; +} + +#[iggy_harness] +async fn given_no_version_prefix_when_logging_in_should_receive_eviction(harness: &TestHarness) { + // Empty body cannot hold a ClientVersionInfo prefix; the gate rejects it + // as malformed, window bytes zero. + assert_login_evicted(harness, &[], EVICTION_REASON_MALFORMED_LOGIN, (0, 0)).await; +} + +async fn assert_login_evicted( + harness: &TestHarness, + body: &[u8], + expected_reason: u8, + expected_window: (u32, u32), +) { + let header = RequestHeader { + command: Command2::Request, + operation: Operation::Register, + size: u32::try_from(HEADER_SIZE + body.len()).unwrap(), + client: 0xC0FFEE, + session: 0, + request: 0, + namespace: METADATA_CONSENSUS_NAMESPACE, + ..Default::default() + }; + + let addr = harness + .server() + .tcp_addr() + .expect("server must expose a TCP address"); + let mut stream = TcpStream::connect(addr).await.unwrap(); + stream.write_all(bytemuck::bytes_of(&header)).await.unwrap(); + stream.write_all(body).await.unwrap(); + + // Eviction is header-only: exactly 256 bytes. + let mut reply = [0u8; HEADER_SIZE]; + stream.read_exact(&mut reply).await.unwrap(); + + let command_offset = offset_of!(RequestHeader, command); + assert_eq!( + reply[command_offset], + Command2::Eviction as u8, + "expected an Eviction frame" + ); + assert_eq!( + reply[HEADER_SIZE - 1], + expected_reason, + "unexpected eviction reason" + ); + // Window carved at fixed offsets 144 / 148; zero outside + // IncompatibleProtocol. + let server_version = u32::from_le_bytes(reply[144..148].try_into().unwrap()); + let server_version_min = u32::from_le_bytes(reply[148..152].try_into().unwrap()); + assert_eq!((server_version, server_version_min), expected_window); +} diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index c1ce700154..50deca0e51 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -30,3 +30,8 @@ pub mod tcp; #[cfg(feature = "vsr")] mod vsr; pub mod websocket; + +/// Rust SDK version sent in the login-register version prefix; must be this +/// crate's version, see `VsrSessionControl::sdk_version`. +#[cfg(feature = "vsr")] +pub(crate) const SDK_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 2aacab7185..96f3962612 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -202,6 +202,10 @@ impl iggy_common::VsrSessionControl for QuicClient { .expect("consensus session mutex poisoned") = ConsensusSession::new(); Ok(()) } + + fn sdk_version(&self) -> &'static str { + crate::SDK_VERSION + } } impl BinaryClient for QuicClient {} diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index f6e6f4980e..dc38863676 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -231,6 +231,10 @@ impl iggy_common::VsrSessionControl for TcpClient { .expect("consensus session mutex poisoned") = ConsensusSession::new(); Ok(()) } + + fn sdk_version(&self) -> &'static str { + crate::SDK_VERSION + } } impl BinaryClient for TcpClient {} diff --git a/core/sdk/src/vsr.rs b/core/sdk/src/vsr.rs index 0dbbf81828..b58d206b51 100644 --- a/core/sdk/src/vsr.rs +++ b/core/sdk/src/vsr.rs @@ -24,7 +24,7 @@ use iggy_binary_protocol::codes::{ STORE_CONSUMER_OFFSET_2_CODE, STORE_CONSUMER_OFFSET_CODE, }; use iggy_binary_protocol::consensus::{ - Command2, EvictionHeader, EvictionReason, HEADER_SIZE, Operation, ReplyHeader, RequestHeader, + Command2, EvictionHeader, EvictionReason, GenericHeader, HEADER_SIZE, Operation, RequestHeader, read_size_field, }; use iggy_binary_protocol::namespace::{ @@ -37,6 +37,7 @@ use iggy_binary_protocol::requests::consumer_offsets::{ }; use iggy_binary_protocol::requests::messages::SendMessagesHeader; use iggy_binary_protocol::requests::segments::DeleteSegmentsRequest; +use iggy_binary_protocol::version::IGGY_PROTOCOL_VERSION; use iggy_binary_protocol::{WireIdentifier, WirePartitioning}; use iggy_common::IggyError; @@ -168,31 +169,20 @@ pub(crate) fn decode_response(response: Bytes) -> Result { return Err(IggyError::EmptyResponse); } - // Session-terminal Eviction frames map to typed errors, mirroring - // `decode_response_split` (the TCP/WS path) so every transport - // surfaces e.g. `Unauthenticated` instead of `InvalidCommand`. let header_bytes: &[u8; HEADER_SIZE] = response[..HEADER_SIZE] .try_into() .map_err(|_| IggyError::InvalidCommand)?; - if peek_command(header_bytes) == Command2::Eviction { - if let Ok(header) = bytemuck::checked::try_from_bytes::(header_bytes) { - return Err(map_eviction_reason(header.reason)); + match peek_command(header_bytes) { + Command2::Eviction => Err(decode_eviction(header_bytes)), + Command2::Reply => { + let total_size = response_size(header_bytes)?; + if response.len() < total_size { + return Err(IggyError::InvalidCommand); + } + Ok(response.slice(HEADER_SIZE..total_size)) } - return Err(IggyError::Unauthenticated); - } - - let header = bytemuck::checked::try_from_bytes::(&response[..HEADER_SIZE]) - .map_err(|_| IggyError::InvalidCommand)?; - if header.command != Command2::Reply { - return Err(IggyError::InvalidCommand); - } - - let total_size = header.size as usize; - if total_size < HEADER_SIZE || response.len() < total_size { - return Err(IggyError::InvalidCommand); + _ => Err(IggyError::InvalidCommand), } - - Ok(response.slice(HEADER_SIZE..total_size)) } /// Decode a reply when the header and body have been read into separate @@ -208,36 +198,24 @@ pub(crate) fn decode_response_split( header_bytes: &[u8; HEADER_SIZE], body: Bytes, ) -> Result { - let command = peek_command(header_bytes); - if command == Command2::Eviction { - if let Ok(header) = bytemuck::checked::try_from_bytes::(header_bytes) { - return Err(map_eviction_reason(header.reason)); + match peek_command(header_bytes) { + Command2::Eviction => Err(decode_eviction(header_bytes)), + Command2::Reply => { + let expected_body = response_size(header_bytes)? - HEADER_SIZE; + if body.len() < expected_body { + return Err(IggyError::InvalidCommand); + } + Ok(body.slice(..expected_body)) } - return Err(IggyError::Unauthenticated); - } - - let header = bytemuck::checked::try_from_bytes::(header_bytes) - .map_err(|_| IggyError::InvalidCommand)?; - if header.command != Command2::Reply { - return Err(IggyError::InvalidCommand); - } - - let total_size = header.size as usize; - if total_size < HEADER_SIZE { - return Err(IggyError::InvalidCommand); + _ => Err(IggyError::InvalidCommand), } - let expected_body = total_size - HEADER_SIZE; - if body.len() < expected_body { - return Err(IggyError::InvalidCommand); - } - Ok(body.slice(..expected_body)) } /// `Command2` lives at a fixed offset shared by every consensus header /// (Reply, Eviction, Prepare, ...), so a byte read is enough to discriminate -/// the frame before paying for full `bytemuck` validation. +/// the frame. fn peek_command(header_bytes: &[u8; HEADER_SIZE]) -> Command2 { - const COMMAND_OFFSET: usize = 60; + const COMMAND_OFFSET: usize = std::mem::offset_of!(GenericHeader, command); match header_bytes[COMMAND_OFFSET] { x if x == Command2::Reply as u8 => Command2::Reply, x if x == Command2::Eviction as u8 => Command2::Eviction, @@ -245,19 +223,49 @@ fn peek_command(header_bytes: &[u8; HEADER_SIZE]) -> Command2 { } } -fn map_eviction_reason(reason: EvictionReason) -> IggyError { +/// Map a session-terminal Eviction frame to a typed error. Fields are read +/// by wire offset instead of an `EvictionHeader` struct cast: the response +/// buffers are not 16-aligned (the header holds `u128`s, so the cast would +/// fail on an unlucky buffer placement), and reading raw lets the SDK apply +/// the same window sanity check as `EvictionHeader::validate` rather than +/// trusting the remote frame. +fn decode_eviction(header_bytes: &[u8; HEADER_SIZE]) -> IggyError { + const REASON_OFFSET: usize = std::mem::offset_of!(EvictionHeader, reason); + const VERSION_OFFSET: usize = std::mem::offset_of!(EvictionHeader, server_protocol_version); + const VERSION_MIN_OFFSET: usize = + std::mem::offset_of!(EvictionHeader, server_protocol_version_min); + + let Ok(&reason) = bytemuck::checked::try_from_bytes::( + &header_bytes[REASON_OFFSET..=REASON_OFFSET], + ) else { + return IggyError::Unauthenticated; + }; match reason { EvictionReason::InvalidCredentials => IggyError::InvalidCredentials, EvictionReason::InvalidToken => IggyError::InvalidPersonalAccessToken, - EvictionReason::UserInactive => IggyError::Unauthenticated, - EvictionReason::SessionError + EvictionReason::UserInactive + | EvictionReason::SessionError | EvictionReason::NoSession - | EvictionReason::SessionTooLow - | EvictionReason::SessionReleaseMismatch => IggyError::Unauthenticated, + | EvictionReason::SessionTooLow => IggyError::Unauthenticated, + EvictionReason::IncompatibleProtocol => { + let server_max = read_window_field(header_bytes, VERSION_OFFSET); + let server_min = read_window_field(header_bytes, VERSION_MIN_OFFSET); + if server_min == 0 || server_max < server_min { + return IggyError::Unauthenticated; + } + IggyError::IncompatibleProtocolVersion(IGGY_PROTOCOL_VERSION, server_min, server_max) + } + EvictionReason::MalformedLogin => IggyError::InvalidFormat, _ => IggyError::InvalidCommand, } } +fn read_window_field(header_bytes: &[u8; HEADER_SIZE], offset: usize) -> u32 { + let mut value = [0u8; 4]; + value.copy_from_slice(&header_bytes[offset..offset + 4]); + u32::from_le_bytes(value) +} + fn namespace_for_request( code: u32, payload: &Bytes, @@ -385,7 +393,7 @@ mod tests { use iggy_binary_protocol::requests::messages::SendMessagesHeader; use iggy_binary_protocol::requests::streams::CreateStreamRequest; use iggy_binary_protocol::requests::users::LoginRegisterRequest; - use iggy_binary_protocol::{WireEncode, WireName}; + use iggy_binary_protocol::{ClientVersionInfo, WireEncode, WireName}; use secrecy::SecretString; fn decode_request_header(bytes: &Bytes) -> RequestHeader { @@ -396,9 +404,13 @@ mod tests { fn register_request_uses_zero_request_and_session() { let mut session = ConsensusSession::with_client_id(7); let request = LoginRegisterRequest { + version_info: ClientVersionInfo { + protocol_version: IGGY_PROTOCOL_VERSION, + sdk_name: WireName::new("rust-sdk").unwrap(), + sdk_version: WireName::new("1.0.0").unwrap(), + }, username: WireName::new("admin").unwrap(), password: SecretString::from("secret"), - version: None, client_context: None, }; @@ -416,6 +428,54 @@ mod tests { assert_eq!(header.namespace, METADATA_CONSENSUS_NAMESPACE); } + #[test] + fn eviction_incompatible_protocol_decodes_to_typed_error() { + use iggy_binary_protocol::version::IGGY_PROTOCOL_VERSION_MIN; + + // Header lands at a guaranteed-misaligned address (aligned start +1): + // eviction decode reads fields by offset and must not care. + #[repr(C, align(16))] + struct Misaligner([u8; HEADER_SIZE + 1]); + + let header = EvictionHeader::incompatible_protocol( + 0, + 0, + 0, + 0xCAFE, + IGGY_PROTOCOL_VERSION, + IGGY_PROTOCOL_VERSION_MIN, + ); + let mut raw = Misaligner([0; HEADER_SIZE + 1]); + raw.0[1..].copy_from_slice(bytemuck::bytes_of(&header)); + let shifted: &[u8; HEADER_SIZE] = raw.0[1..].try_into().unwrap(); + + let result = decode_response_split(shifted, Bytes::new()); + assert!(matches!( + result, + Err(IggyError::IncompatibleProtocolVersion(client, min, max)) + if client == IGGY_PROTOCOL_VERSION + && min == IGGY_PROTOCOL_VERSION_MIN + && max == IGGY_PROTOCOL_VERSION + )); + } + + #[test] + fn eviction_with_invalid_window_degrades_to_unauthenticated() { + for (server_max, server_min) in [(1, 0), (1, 2)] { + let mut header = EvictionHeader::incompatible_protocol(0, 0, 0, 0xCAFE, 1, 1); + header.server_protocol_version = server_max; + header.server_protocol_version_min = server_min; + let mut buf = [0u8; HEADER_SIZE]; + buf.copy_from_slice(bytemuck::bytes_of(&header)); + + let result = decode_response_split(&buf, Bytes::new()); + assert!( + matches!(result, Err(IggyError::Unauthenticated)), + "window [{server_min}, {server_max}] must not surface as typed error" + ); + } + } + #[test] fn replicated_request_increments_request_counter() { let mut session = ConsensusSession::with_client_id(42); diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index e1a9838398..72f9292f68 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -218,6 +218,10 @@ impl iggy_common::VsrSessionControl for WebSocketClient { .expect("consensus session mutex poisoned") = ConsensusSession::new(); Ok(()) } + + fn sdk_version(&self) -> &'static str { + crate::SDK_VERSION + } } impl BinaryClient for WebSocketClient {} diff --git a/core/server-ng/src/auth.rs b/core/server-ng/src/auth.rs index b4c5b66ebc..b8aa393f68 100644 --- a/core/server-ng/src/auth.rs +++ b/core/server-ng/src/auth.rs @@ -25,9 +25,12 @@ use crate::bootstrap::ServerNgShard; use crate::dispatch::submit_register_on_owner; use crate::login_register::LoginRegisterError; use crate::responses::{build_empty_reply, build_login_register_reply, current_metadata_commit}; -use crate::session_manager::SessionManager; +use crate::session_manager::{ClientSdkInfo, SessionManager}; use consensus::MetadataHandle; -use iggy_binary_protocol::RequestHeader; +use iggy_binary_protocol::{ClientVersionInfo, RequestHeader}; +use iggy_common::defaults::{ + MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH, +}; use iggy_common::{IggyTimestamp, PersonalAccessToken, UserStatus}; use message_bus::MessageBus; use metadata::impls::metadata::StreamsFrontend; @@ -41,6 +44,15 @@ pub(crate) fn verify_login_credentials( username: &str, password: &str, ) -> Result { + // Same bounds the legacy server enforces before any lookup or hashing; + // also keeps arbitrary-length input out of the password hash. Collapsed + // to InvalidCredentials on purpose (legacy: InvalidUsername / + // InvalidPassword): don't leak which field failed. + if !(MIN_USERNAME_LENGTH..=MAX_USERNAME_LENGTH).contains(&username.len()) + || !(MIN_PASSWORD_LENGTH..=MAX_PASSWORD_LENGTH).contains(&password.len()) + { + return Err(LoginRegisterError::InvalidCredentials); + } shard.plane.metadata().mux_stm.users().read(|users| { let Some(user_id) = users.index.get(username).copied() else { return Err(LoginRegisterError::InvalidCredentials); @@ -98,7 +110,13 @@ pub(crate) async fn complete_login_register( vsr_client_id: u128, request_header: &RequestHeader, user_id: u32, + client_version: &ClientVersionInfo, ) -> Result<(), LoginRegisterError> { + let sdk_info = ClientSdkInfo { + sdk_name: client_version.sdk_name.as_str().to_owned(), + sdk_version: client_version.sdk_version.as_str().to_owned(), + protocol_version: client_version.protocol_version, + }; let existing_session = { let sessions = sessions.borrow(); sessions @@ -106,6 +124,11 @@ pub(crate) async fn complete_login_register( .map(|(_, session)| session) }; if let Some(session) = existing_session { + // Re-login on a bound connection: refresh the recorded SDK info + // (a reconnecting client may have been upgraded) and replay. + sessions + .borrow_mut() + .record_sdk_info(transport_client_id, sdk_info); let commit = current_metadata_commit(shard); let reply = build_login_register_reply(request_header, vsr_client_id, session, commit, user_id); @@ -136,6 +159,7 @@ pub(crate) async fn complete_login_register( sessions .login(transport_client_id, user_id) .map_err(LoginRegisterError::Session)?; + sessions.record_sdk_info(transport_client_id, sdk_info); if let Err(error) = sessions.bind_session(transport_client_id, vsr_client_id, session) { // No local rollback: `submit_register_in_process` above has // already committed cluster-wide. A local-only diff --git a/core/server-ng/src/dispatch.rs b/core/server-ng/src/dispatch.rs index e8b3e1099e..f52ed42ac6 100644 --- a/core/server-ng/src/dispatch.rs +++ b/core/server-ng/src/dispatch.rs @@ -39,7 +39,10 @@ use crate::session_manager::SessionManager; use crate::users::maybe_rewrite_user_password_request; use crate::wire::request_body; use bytes::Bytes; -use consensus::{MetadataHandle, PartitionsHandle}; +use consensus::{ + EvictionContext, MetadataHandle, PartitionsHandle, build_eviction_message, + build_incompatible_protocol_eviction_message, +}; use iggy_binary_protocol::codes::{ GET_CLIENT_CODE, GET_CLIENTS_CODE, GET_CLUSTER_METADATA_CODE, GET_CONSUMER_OFFSET_CODE, GET_ME_CODE, PING_CODE, POLL_MESSAGES_CODE, @@ -52,7 +55,10 @@ use iggy_binary_protocol::requests::system::get_client::GetClientRequest; use iggy_binary_protocol::requests::users::{LoginRegisterRequest, LoginRegisterWithPatRequest}; use iggy_binary_protocol::responses::clients::get_client::ClientDetailsResponse; use iggy_binary_protocol::responses::clients::get_clients::GetClientsResponse; -use iggy_binary_protocol::{GenericHeader, Operation, RequestHeader, WireDecode, WireEncode}; +use iggy_binary_protocol::{ + ClientVersionInfo, EvictionReason, GenericHeader, Operation, ProtocolVersion, RequestHeader, + WireDecode, WireEncode, is_protocol_compatible, +}; use iggy_common::{IggyError, PollingStrategy}; use message_bus::client_listener::RequestHandler; use message_bus::replica::listener::MessageHandler; @@ -1230,7 +1236,7 @@ fn ensure_transport_connection( .ensure_connection(transport_client_id, meta.peer_addr, meta.transport); } -#[allow(clippy::future_not_send)] +#[allow(clippy::future_not_send, clippy::too_many_lines)] async fn handle_login_register_request( shard: &Rc, sessions: &Rc>, @@ -1240,7 +1246,47 @@ async fn handle_login_register_request( let body = request_body(&request); let vsr_client_id = request.header().client; - if let Ok(wire_request) = LoginRegisterRequest::decode_from(body) { + // Both login-register shapes share the ClientVersionInfo prefix, so the + // protocol gate decodes it once and runs before any credential work; the + // body shapes below parse from past the prefix. Clients from before the + // header relayout never reach this gate: their command byte sits at the + // old offset, so the typed-header decode drops the frame earlier. + let Ok((version_info, prefix_len)) = ClientVersionInfo::decode(body) else { + warn!( + transport_client_id, + "rejecting login: body has no decodable version prefix" + ); + send_login_eviction( + shard, + transport_client_id, + vsr_client_id, + EvictionReason::MalformedLogin, + ) + .await; + return; + }; + if !is_protocol_compatible(version_info.protocol_version) { + warn!( + transport_client_id, + client_protocol_version = %ProtocolVersion(version_info.protocol_version), + sdk_name = %version_info.sdk_name, + sdk_version = %version_info.sdk_version, + "rejecting login: incompatible protocol version" + ); + send_login_eviction( + shard, + transport_client_id, + vsr_client_id, + EvictionReason::IncompatibleProtocol, + ) + .await; + return; + } + + let body_tail = &body[prefix_len..]; + if let Ok((wire_request, _)) = + LoginRegisterRequest::decode_after_prefix(version_info.clone(), body_tail) + { match verify_login_credentials( shard, wire_request.username.as_str(), @@ -1254,6 +1300,7 @@ async fn handle_login_register_request( vsr_client_id, request.header(), user_id, + &wire_request.version_info, ) .await { @@ -1277,7 +1324,9 @@ async fn handle_login_register_request( } } - if let Ok(wire_request) = LoginRegisterWithPatRequest::decode_from(body) { + if let Ok((wire_request, _)) = + LoginRegisterWithPatRequest::decode_after_prefix(version_info, body_tail) + { match verify_pat_credentials(shard, wire_request.token.expose_secret()) { Ok(user_id) => { if let Err(error) = complete_login_register( @@ -1287,6 +1336,7 @@ async fn handle_login_register_request( vsr_client_id, request.header(), user_id, + &wire_request.version_info, ) .await { @@ -1319,6 +1369,46 @@ async fn handle_login_register_request( send_login_failure_reply(shard, transport_client_id, request.header()).await; } +/// Best-effort login-rejection eviction. Terminal one-way frame; a gone +/// connection has nothing to recover, so the send error is logged and +/// dropped. Consensus context (cluster/view/replica) is stamped on the +/// metadata shard and zeroed elsewhere -- the SDK only reads the reason, +/// plus the protocol window on `IncompatibleProtocol`. +#[allow(clippy::future_not_send)] +async fn send_login_eviction( + shard: &Rc, + transport_client_id: u128, + vsr_client_id: u128, + reason: EvictionReason, +) { + let ctx = shard.plane.metadata().consensus.as_ref().map_or( + EvictionContext { + cluster: 0, + view: 0, + replica: 0, + }, + EvictionContext::from_consensus, + ); + let eviction = match reason { + EvictionReason::IncompatibleProtocol => { + build_incompatible_protocol_eviction_message(ctx, vsr_client_id) + } + _ => build_eviction_message(ctx, vsr_client_id, reason), + }; + if let Err(error) = shard + .bus + .send_to_client(transport_client_id, eviction.into_generic().into_frozen()) + .await + { + warn!( + transport_client_id, + error = %error, + reason = ?reason, + "failed to send login eviction" + ); + } +} + pub(crate) fn upgrade_shard_handle( shard_handle: &ServerNgShardHandle, ) -> Option> { diff --git a/core/server-ng/src/responses.rs b/core/server-ng/src/responses.rs index e1102ad49a..3f184028ef 100644 --- a/core/server-ng/src/responses.rs +++ b/core/server-ng/src/responses.rs @@ -55,12 +55,13 @@ use iggy_binary_protocol::responses::system::get_cluster_metadata::{ use iggy_binary_protocol::responses::system::get_stats::StatsResponse; use iggy_binary_protocol::responses::topics::get_topic::{GetTopicResponse, PartitionResponse}; use iggy_binary_protocol::responses::topics::get_topics::GetTopicsResponse; +use iggy_binary_protocol::responses::users::LoginRegisterResponse; use iggy_binary_protocol::responses::users::get_user::UserDetailsResponse; use iggy_binary_protocol::responses::users::get_users::GetUsersResponse; use iggy_binary_protocol::responses::users::user_response::UserResponse; use iggy_binary_protocol::{ - Command2, GenericHeader, Operation, ReplyHeader, RequestHeader, WireDecode, WireEncode, - WireIdentifier, WireName, WirePartitioning, + Command2, GenericHeader, IGGY_PROTOCOL_VERSION, Operation, ReplyHeader, RequestHeader, + WireDecode, WireEncode, WireIdentifier, WireName, WirePartitioning, }; use iggy_common::IggyError; use metadata::impls::metadata::StreamsFrontend; @@ -616,6 +617,9 @@ pub(crate) fn build_empty_reply( build_reply_with_body(request_header, client_id, session, commit, 0, |_| {}) } +/// Server build version advertised in the login-register response. +const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION"); + pub(crate) fn build_login_register_reply( request_header: &RequestHeader, client_id: u128, @@ -623,10 +627,14 @@ pub(crate) fn build_login_register_reply( commit: u64, user_id: u32, ) -> Message { - build_reply_with_body(request_header, client_id, session, commit, 12, |body| { - body[..4].copy_from_slice(&user_id.to_le_bytes()); - body[4..12].copy_from_slice(&session.to_le_bytes()); - }) + let body = LoginRegisterResponse { + user_id, + session, + server_protocol_version: IGGY_PROTOCOL_VERSION, + server_version: WireName::new(SERVER_VERSION).expect("SERVER_VERSION is 1-255 bytes"), + } + .to_bytes(); + build_reply_from_bytes(request_header, client_id, session, commit, &body) } pub(crate) fn build_reply_from_bytes( diff --git a/core/server-ng/src/session_manager.rs b/core/server-ng/src/session_manager.rs index e7fceb96fa..db6221a48b 100644 --- a/core/server-ng/src/session_manager.rs +++ b/core/server-ng/src/session_manager.rs @@ -56,12 +56,23 @@ pub enum ConnectionState { }, } +/// SDK identity reported in the login-register version prefix. +#[derive(Debug, Clone)] +pub struct ClientSdkInfo { + pub sdk_name: String, + pub sdk_version: String, + /// Packed protocol version, see `iggy_binary_protocol::ProtocolVersion`. + pub protocol_version: u32, +} + /// Per-connection metadata tracked by the session manager. #[derive(Debug, Clone)] pub struct Connection { pub address: SocketAddr, pub transport: ClientTransportKind, pub state: ConnectionState, + /// Recorded at login; `None` until the connection authenticates. + pub sdk: Option, } /// Bridges transport connections to consensus sessions. @@ -103,9 +114,19 @@ impl SessionManager { address, transport, state: ConnectionState::Connected, + sdk: None, }); } + /// Record (or refresh on re-login) the SDK identity for a connection. + /// No state-machine constraint: the gate already validated the version + /// and a missing connection just drops the record. + pub fn record_sdk_info(&mut self, connection_id: u128, info: ClientSdkInfo) { + if let Some(conn) = self.connections.get_mut(&connection_id) { + conn.sdk = Some(info); + } + } + /// Remove a connection (disconnect). Cleans up the reverse index if bound. /// /// Returns the bound `(client_id, session)` when the removed connection had @@ -280,7 +301,7 @@ impl std::fmt::Display for SessionError { impl std::error::Error for SessionError {} /// Flatten a connection + its id into a [`ConnectedClientInfo`]. -const fn record_from(connection_id: u128, conn: &Connection) -> ConnectedClientInfo { +fn record_from(connection_id: u128, conn: &Connection) -> ConnectedClientInfo { let user_id = match conn.state { ConnectionState::Authenticated { user_id } | ConnectionState::Bound { user_id, .. } => { Some(user_id) @@ -292,6 +313,9 @@ const fn record_from(connection_id: u128, conn: &Connection) -> ConnectedClientI user_id, transport: conn.transport, address: conn.address, + sdk_name: conn.sdk.as_ref().map(|sdk| sdk.sdk_name.clone()), + sdk_version: conn.sdk.as_ref().map(|sdk| sdk.sdk_version.clone()), + protocol_version: conn.sdk.as_ref().map(|sdk| sdk.protocol_version), } } @@ -312,6 +336,52 @@ mod tests { SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port) } + #[test] + fn record_sdk_info_exposed_via_iter_clients() { + let mut mgr = SessionManager::new(); + mgr.ensure_connection(1, addr(5000), ClientTransportKind::Tcp); + + // Pre-login: no SDK identity. + let info = mgr.iter_clients().next().unwrap(); + assert!(info.sdk_name.is_none()); + + mgr.record_sdk_info( + 1, + ClientSdkInfo { + sdk_name: "rust-sdk".to_string(), + sdk_version: "1.0.0".to_string(), + protocol_version: 42, + }, + ); + let info = mgr.iter_clients().next().unwrap(); + assert_eq!(info.sdk_name.as_deref(), Some("rust-sdk")); + assert_eq!(info.sdk_version.as_deref(), Some("1.0.0")); + assert_eq!(info.protocol_version, Some(42)); + + // Re-login overwrites (client may reconnect after an upgrade). + mgr.record_sdk_info( + 1, + ClientSdkInfo { + sdk_name: "rust-sdk".to_string(), + sdk_version: "2.0.0".to_string(), + protocol_version: 43, + }, + ); + let info = mgr.iter_clients().next().unwrap(); + assert_eq!(info.sdk_version.as_deref(), Some("2.0.0")); + + // Unknown connection: record is dropped, no panic. + mgr.record_sdk_info( + 999, + ClientSdkInfo { + sdk_name: "go-sdk".to_string(), + sdk_version: "0.1.0".to_string(), + protocol_version: 1, + }, + ); + assert_eq!(mgr.iter_clients().count(), 1); + } + #[test] fn full_lifecycle() { let mut mgr = SessionManager::new(); diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs index 0ecd7b76be..02930ae042 100644 --- a/core/shard/src/lib.rs +++ b/core/shard/src/lib.rs @@ -183,6 +183,13 @@ pub struct ConnectedClientInfo { pub user_id: Option, pub transport: ClientTransportKind, pub address: std::net::SocketAddr, + /// SDK identity from the login version prefix; `None` pre-login. + /// In-memory only: the `get_clients` wire response is shared with the + /// legacy server, so exposing these on the wire is a follow-up. + pub sdk_name: Option, + pub sdk_version: Option, + /// Packed protocol version, see `iggy_binary_protocol::ProtocolVersion`. + pub protocol_version: Option, } /// Handler each shard runs for an inbound [`LifecycleFrame::ListClients`].