Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/binary_protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

Wire protocol types and codec for the [Apache Iggy](https://iggy.apache.org) binary protocol, shared between server and SDK. This crate is an internal building block. Most users should depend on the [`iggy`](https://crates.io/crates/iggy) SDK crate instead.

The language-neutral wire spec for the login protocol-version handshake (packed version layout, `ClientVersionInfo` prefix, gate semantics, rejection frame offsets) lives in the [`version` module docs](src/version.rs).

> Apache Iggy (Incubating) is an effort undergoing incubation at the Apache Software Foundation (ASF), sponsored by the Apache Incubator PMC.
>
> Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects.
Expand Down
92 changes: 87 additions & 5 deletions core/binary_protocol/src/consensus/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ pub enum EvictionReason {
InvalidToken = 10,
UserInactive = 11,
SessionError = 12,
/// Client protocol version outside the server's accepted range; the
/// header carries `server_protocol_version{,_min}` so the SDK can
/// report the exact window.
IncompatibleProtocol = 13,
/// Login body without a decodable `ClientVersionInfo` prefix.
MalformedLogin = 14,
}

// EvictionHeader - primary -> client (session-terminal, no body)
Expand All @@ -390,7 +396,10 @@ pub struct EvictionHeader {
pub reserved_frame: [u8; 66],

pub client: u128,
pub reserved: [u8; 111],
/// Accepted protocol window on `IncompatibleProtocol`; zero otherwise.
pub server_protocol_version: u32,
pub server_protocol_version_min: u32,
pub reserved: [u8; 103],
pub reason: EvictionReason,
}
const _: () = {
Expand All @@ -399,6 +408,8 @@ const _: () = {
offset_of!(EvictionHeader, client)
== offset_of!(EvictionHeader, reserved_frame) + size_of::<[u8; 66]>()
);
assert!(offset_of!(EvictionHeader, server_protocol_version) == 144);
assert!(offset_of!(EvictionHeader, server_protocol_version_min) == 148);
assert!(offset_of!(EvictionHeader, reason) + size_of::<EvictionReason>() == HEADER_SIZE);
};

Expand All @@ -411,7 +422,8 @@ impl EvictionHeader {
/// # Panics (debug)
/// On `ClientReleaseTooLow`/`ClientReleaseTooHigh`: `release` hardcoded
/// to 0, those reasons need real bounds. Add `release_min`/`release_max`
/// params before emitting them.
/// params before emitting them. On `IncompatibleProtocol`: needs the
/// accepted protocol window, use [`Self::incompatible_protocol`] instead.
///
/// # Safety
/// `client` must be non-zero , `validate` rejects zero so SDKs can
Expand All @@ -428,9 +440,11 @@ impl EvictionHeader {
debug_assert!(
!matches!(
reason,
EvictionReason::ClientReleaseTooLow | EvictionReason::ClientReleaseTooHigh,
EvictionReason::ClientReleaseTooLow
| EvictionReason::ClientReleaseTooHigh
| EvictionReason::IncompatibleProtocol,
),
"EvictionHeader::new: ClientRelease* needs release_min/release_max",
"EvictionHeader::new: ClientRelease*/IncompatibleProtocol need extra fields",
);
// Cap from consensus REPLICAS_MAX=32; literal here to avoid
// wire-proto crate depending on consensus crate.
Expand All @@ -449,10 +463,30 @@ impl EvictionHeader {
replica,
reserved_frame: [0; 66],
client,
reserved: [0; 111],
server_protocol_version: 0,
server_protocol_version_min: 0,
reserved: [0; 103],
reason,
}
}

/// Protocol-version rejection carrying the accepted window so the SDK
/// can report `client X, server accepts [min, max]`.
#[must_use]
pub const fn incompatible_protocol(
cluster: u128,
view: u32,
replica: u8,
client: u128,
server_protocol_version: u32,
server_protocol_version_min: u32,
) -> Self {
let mut header = Self::new(cluster, view, replica, client, EvictionReason::NoSession);
header.reason = EvictionReason::IncompatibleProtocol;
header.server_protocol_version = server_protocol_version;
header.server_protocol_version_min = server_protocol_version_min;
header
}
}

impl ConsensusHeader for EvictionHeader {
Expand Down Expand Up @@ -508,6 +542,22 @@ impl ConsensusHeader for EvictionHeader {
"eviction: reason must not be Reserved".to_string(),
));
}
// Protocol window only travels on IncompatibleProtocol; anywhere
// else a nonzero value is smuggling (same rule as reserved bytes).
if self.reason == EvictionReason::IncompatibleProtocol {
if self.server_protocol_version_min == 0
|| self.server_protocol_version < self.server_protocol_version_min
{
return Err(ConsensusError::InvalidField(
"eviction: incompatible-protocol window must satisfy 1 <= min <= max"
.to_string(),
));
}
} else if self.server_protocol_version != 0 || self.server_protocol_version_min != 0 {
return Err(ConsensusError::InvalidField(
"eviction: protocol window bytes must be zero".to_string(),
));
}
Ok(())
}
}
Expand Down Expand Up @@ -1075,6 +1125,38 @@ mod tests {
assert_eq!(EvictionReason::InvalidToken as u8, 10);
assert_eq!(EvictionReason::UserInactive as u8, 11);
assert_eq!(EvictionReason::SessionError as u8, 12);
assert_eq!(EvictionReason::IncompatibleProtocol as u8, 13);
assert_eq!(EvictionReason::MalformedLogin as u8, 14);
}

#[test]
fn eviction_incompatible_protocol_accepts_valid_window() {
let header = EvictionHeader::incompatible_protocol(0, 0, 0, 0xCAFE, 2, 1);
assert!(header.validate().is_ok());
assert_eq!(header.reason, EvictionReason::IncompatibleProtocol);
assert_eq!(header.server_protocol_version, 2);
assert_eq!(header.server_protocol_version_min, 1);
}

#[test]
fn eviction_incompatible_protocol_rejects_inverted_window() {
let header = EvictionHeader::incompatible_protocol(0, 0, 0, 0xCAFE, 1, 2);
assert!(header.validate().is_err());
}

#[test]
fn eviction_incompatible_protocol_rejects_zero_min() {
let header = EvictionHeader::incompatible_protocol(0, 0, 0, 0xCAFE, 1, 0);
assert!(header.validate().is_err());
}

// Protocol window is IncompatibleProtocol-only; nonzero elsewhere is
// smuggling, same rule as the reserved-byte guards.
#[test]
fn eviction_validate_rejects_window_on_other_reason() {
let mut header = EvictionHeader::new(0, 0, 0, 0xCAFE, EvictionReason::NoSession);
header.server_protocol_version = 1;
assert!(header.validate().is_err());
}

#[test]
Expand Down
5 changes: 5 additions & 0 deletions core/binary_protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub mod namespace;
pub mod primitives;
pub mod requests;
pub mod responses;
pub mod version;

pub use codec::{WireDecode, WireEncode};
pub use consensus::{
Expand Down Expand Up @@ -94,6 +95,10 @@ pub use primitives::user_headers::{
WireHeaderKind, WireUserHeaderEntry, WireUserHeaderIterator, WireUserHeaders,
encode_user_headers, user_headers_encoded_size, validate_user_headers,
};
pub use version::{
ClientVersionInfo, IGGY_PROTOCOL_VERSION, IGGY_PROTOCOL_VERSION_MIN, ProtocolVersion,
is_protocol_compatible,
};

/// Maximum number of partitions allowed in a single create/delete request.
pub const MAX_PARTITIONS_PER_REQUEST: u32 = 1000;
5 changes: 5 additions & 0 deletions core/binary_protocol/src/requests/users/change_password.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ impl WireEncode for ChangePasswordRequest {

fn encode(&self, buf: &mut BytesMut) {
self.user_id.encode(buf);
debug_assert!(
u8::try_from(self.current_password.len()).is_ok()
&& u8::try_from(self.new_password.len()).is_ok(),
"password exceeds u8 length prefix; callers must validate before encoding"
);
#[allow(clippy::cast_possible_truncation)]
buf.put_u8(self.current_password.len() as u8);
buf.put_slice(self.current_password.as_bytes());
Expand Down
4 changes: 4 additions & 0 deletions core/binary_protocol/src/requests/users/create_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ impl WireEncode for CreateUserRequest {

fn encode(&self, buf: &mut BytesMut) {
self.username.encode(buf);
debug_assert!(
u8::try_from(self.password.len()).is_ok(),
"password exceeds u8 length prefix; callers must validate before encoding"
);
#[allow(clippy::cast_possible_truncation)]
buf.put_u8(self.password.len() as u8);
buf.put_slice(self.password.as_bytes());
Expand Down
Loading
Loading