From 6ab9f669a48cf7b101f7fff4b8235095800fc471 Mon Sep 17 00:00:00 2001 From: Divyanshu110011 Date: Mon, 2 Feb 2026 16:14:36 +0530 Subject: [PATCH 1/4] fix(ci) : resolve clippy and rustfmt errors --- src/cli/run.rs | 6 +++--- src/cli/user.rs | 6 +++--- src/config/permissions.rs | 6 +++--- src/mqtt/broker/connection.rs | 5 ++--- src/mqtt/broker/websocket.rs | 3 +-- src/mqtt/client_id.rs | 2 -- src/mqtt/trie/node.rs | 4 ++-- 7 files changed, 14 insertions(+), 18 deletions(-) diff --git a/src/cli/run.rs b/src/cli/run.rs index 68c8ba7..9191a23 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -1,6 +1,3 @@ -use std::net::SocketAddr; -use std::path::{Path, PathBuf}; -use std::sync::Arc; use crate::cli::LogFormat; use crate::collections::HashMap; use crate::config; @@ -12,6 +9,9 @@ use crate::mqtt::{KeepAlive, TceState}; use crate::transaction::AddNodeTransaction; use color_eyre::eyre; use color_eyre::eyre::Context; +use std::net::SocketAddr; +use std::path::{Path, PathBuf}; +use std::sync::Arc; use tashi_consensus_engine::quic::QuicSocket; use tashi_consensus_engine::{ Certificate, Platform, RootCertificates, SecretKey, UnknownConnectionAction, diff --git a/src/cli/user.rs b/src/cli/user.rs index 83120bb..1a90505 100644 --- a/src/cli/user.rs +++ b/src/cli/user.rs @@ -3,12 +3,12 @@ use std::fs::{File, OpenOptions}; use std::io::{BufRead, Write}; use std::path::{Path, PathBuf}; -use color_eyre::eyre::WrapErr; -use rand::distributions::Uniform; -use rand::Rng; use crate::cli::LogFormat; use crate::collections::HashMap; use crate::config::users::{AuthConfig, User, UsersConfig}; +use color_eyre::eyre::WrapErr; +use rand::distributions::Uniform; +use rand::Rng; const DEFAULT_PASSWORD_LEN: usize = 12; diff --git a/src/config/permissions.rs b/src/config/permissions.rs index 15b9775..04cae1d 100644 --- a/src/config/permissions.rs +++ b/src/config/permissions.rs @@ -55,13 +55,13 @@ impl PermissionsConfig { transaction_type: TransactionType, ) -> bool { // Allows everything if no topics config was found. - topics_config.map_or(true, |perms| { + topics_config.is_none_or(|perms| { perms .topic .iter() .find(|k| k.filter.matches_topic(topic_name)) - .map_or(true, |k| { - k.allowed.iter().any(|k| *k == transaction_type) + .is_none_or(|k| { + k.allowed.contains(&transaction_type) || !k.denied.iter().all(|k| *k == transaction_type) }) }) diff --git a/src/mqtt/broker/connection.rs b/src/mqtt/broker/connection.rs index bb4cf70..8e4d265 100644 --- a/src/mqtt/broker/connection.rs +++ b/src/mqtt/broker/connection.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use std::{cmp, iter}; +use std::cmp; use bytes::BytesMut; use color_eyre::eyre; @@ -888,8 +888,7 @@ impl Connection { self.send(Packet::UnsubAck( UnsubAck { pkid: unsub.pkid, - reasons: iter::repeat(UnsubAckReason::PacketIdentifierInUse) - .take(unsub.filters.len()) + reasons: std::iter::repeat_n(UnsubAckReason::PacketIdentifierInUse, unsub.filters.len()) .collect(), }, None, diff --git a/src/mqtt/broker/websocket.rs b/src/mqtt/broker/websocket.rs index 2b6c734..fb7e161 100644 --- a/src/mqtt/broker/websocket.rs +++ b/src/mqtt/broker/websocket.rs @@ -250,6 +250,5 @@ fn find_client_ip(connected_addr: IpAddr, x_forwarded_for: &HeaderValue) -> Opti }) .ok() }) - .filter(|ip| ip != &connected_addr) - .next_back() + .rfind(|ip| ip != &connected_addr) } diff --git a/src/mqtt/client_id.rs b/src/mqtt/client_id.rs index 62c607a..bfbc9d8 100644 --- a/src/mqtt/client_id.rs +++ b/src/mqtt/client_id.rs @@ -192,8 +192,6 @@ impl PartialEq for str { } } - - impl From for String { fn from(value: ClientId) -> Self { value.as_str().into() diff --git a/src/mqtt/trie/node.rs b/src/mqtt/trie/node.rs index a6dc7d0..0c8c60c 100644 --- a/src/mqtt/trie/node.rs +++ b/src/mqtt/trie/node.rs @@ -19,8 +19,8 @@ pub(super) struct NodeLeaf { impl NodeLeaf { pub(super) fn all(&self, f: &mut impl FnMut(&T) -> bool) -> bool { - self.exact_val.as_ref().map_or(true, &mut *f) - && self.descendant_val.as_ref().map_or(true, f) + self.exact_val.as_ref().is_none_or(&mut *f) + && self.descendant_val.as_ref().is_none_or(f) } fn is_empty(&self) -> bool { From e4449d51a58a409144d9e49b99b93e00b927516a Mon Sep 17 00:00:00 2001 From: Divyanshu110011 Date: Mon, 2 Feb 2026 12:45:59 +0000 Subject: [PATCH 2/4] updated message-queue ref with fox-mq --- README.md | 2 +- tests/Dockerfile | 4 ++-- tests/docker-compose.yaml | 12 ++++++------ tests/foxmq-root-ca.d/docker-compose.yaml | 24 +++++++++++------------ 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 47faa99..8452d5e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# message-queue +# foxmq Message queue MeshApp utilizing Tashi Consensus Engine ### Checking Out diff --git a/tests/Dockerfile b/tests/Dockerfile index 40a93e3..3a07fd7 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -1,6 +1,6 @@ FROM rust:1-bookworm -WORKDIR /home/message-queue +WORKDIR /home/foxmq # The minimum number of files required to allow `cargo fetch` to cache all dependencies. # We do this separately from the `COPY ./ .` so most code changes don't invalidate the `cargo fetch` layer. @@ -35,6 +35,6 @@ RUN cargo build --release -p tokio -p tracing -p tashi-consensus-engine COPY ./ . -WORKDIR /home/message-queue +WORKDIR /home/foxmq RUN cargo build --release diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index c4bef21..de579ee 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -9,9 +9,9 @@ services: dockerfile: tests/Dockerfile ssh: - default - image: message-queue-test + image: foxmq-test volumes: - - ./foxmq.d:/home/message-queue/foxmq.d + - ./foxmq.d:/home/foxmq/foxmq.d ports: - "1883:1883" - "8080:8080" @@ -30,15 +30,15 @@ services: --secret-key-file foxmq.d/key_0.pem --tls-cert-file=foxmq.d/key_0.crt --server-name broker1.example.com - /home/message-queue/foxmq.d + /home/foxmq/foxmq.d broker2: container_name: broker2 depends_on: - broker1 - image: message-queue-test + image: foxmq-test volumes: - - ./foxmq.d:/home/message-queue/foxmq.d + - ./foxmq.d:/home/foxmq/foxmq.d ports: - "1884:1883" - "8081:8080" @@ -56,7 +56,7 @@ services: --secret-key-file foxmq.d/key_1.pem --tls-cert-file=foxmq.d/key_1.crt --server-name broker2.example.com - /home/message-queue/foxmq.d + /home/foxmq/foxmq.d networks: mesh-app-bridge: diff --git a/tests/foxmq-root-ca.d/docker-compose.yaml b/tests/foxmq-root-ca.d/docker-compose.yaml index 859af5d..40c3237 100644 --- a/tests/foxmq-root-ca.d/docker-compose.yaml +++ b/tests/foxmq-root-ca.d/docker-compose.yaml @@ -9,9 +9,9 @@ services: dockerfile: tests/Dockerfile ssh: - default - image: message-queue-test + image: foxmq-test volumes: - - .:/home/message-queue/foxmq.d + - .:/home/foxmq/foxmq.d ports: - "1883:1883" networks: @@ -28,16 +28,16 @@ services: --cluster-root-cert=foxmq.d/ca.crt --cluster-accept-peer-with-cert --server-name broker1.example.com - /home/message-queue/foxmq.d + /home/foxmq/foxmq.d broker2: container_name: broker2 depends_on: # Ensure we start after `broker1` for proper IP address assignment - broker1 - image: message-queue-test + image: foxmq-test volumes: - - .:/home/message-queue/foxmq.d + - .:/home/foxmq/foxmq.d ports: - "1884:1883" networks: @@ -53,16 +53,16 @@ services: --cluster-root-cert=foxmq.d/ca.crt --cluster-accept-peer-with-cert --server-name broker2.example.com - /home/message-queue/foxmq.d + /home/foxmq/foxmq.d broker3: container_name: broker3 depends_on: # Ensure we start after `broker2` for proper IP address assignment - broker2 - image: message-queue-test + image: foxmq-test volumes: - - .:/home/message-queue/foxmq.d + - .:/home/foxmq/foxmq.d ports: - "1885:1883" networks: @@ -78,16 +78,16 @@ services: --cluster-root-cert=foxmq.d/ca.crt --cluster-accept-peer-with-cert --server-name broker2.example.com - /home/message-queue/foxmq.d + /home/foxmq/foxmq.d broker4: container_name: broker4 depends_on: # Ensure we start after `broker3` for proper IP address assignment\ - broker3 - image: message-queue-test + image: foxmq-test volumes: - - .:/home/message-queue/foxmq.d + - .:/home/foxmq/foxmq.d ports: - "1886:1883" networks: @@ -104,7 +104,7 @@ services: --cluster-root-cert=foxmq.d/ca.crt --cluster-accept-peer-with-cert --server-name broker4.example.com - /home/message-queue/foxmq.d + /home/foxmq/foxmq.d networks: mesh-app-bridge: From 8ee4ff11a02810dcbdea45b4a04f8998d8cc69f7 Mon Sep 17 00:00:00 2001 From: Divyanshu110011 Date: Wed, 4 Feb 2026 17:30:05 +0530 Subject: [PATCH 3/4] feat: add consensus timestamps and verification tests --- src/mqtt/publish.rs | 2 +- tests/.gitignore | 1 + tests/mqtt/consensus-timestamps.test.js | 78 +++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 tests/mqtt/consensus-timestamps.test.js diff --git a/src/mqtt/publish.rs b/src/mqtt/publish.rs index 30f1f55..25e6de3 100644 --- a/src/mqtt/publish.rs +++ b/src/mqtt/publish.rs @@ -363,7 +363,7 @@ pub fn txn_to_packet( Bytes::copy_from_slice(txn.topic.as_bytes()), txn.payload.0.clone(), ), - (!sub_ids.is_empty() || txn.properties.is_some()).then(|| { + (!sub_ids.is_empty() || txn.properties.is_some() || include_broker_timestamps).then(|| { macro_rules! clone_prop { ($prop:ident) => { txn.properties diff --git a/tests/.gitignore b/tests/.gitignore index 07e6e47..988514e 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -1 +1,2 @@ /node_modules +/foxmq-single.d/ diff --git a/tests/mqtt/consensus-timestamps.test.js b/tests/mqtt/consensus-timestamps.test.js new file mode 100644 index 0000000..4863d26 --- /dev/null +++ b/tests/mqtt/consensus-timestamps.test.js @@ -0,0 +1,78 @@ +const mqtt = require("mqtt"); +const events = require("node:events"); + +describe("Consensus Timestamps", () => { + test("receive timestamp_received user property when requested", async () => { + // Client 1: Subscriber + console.log("Connecting Subscriber..."); + const subClient = await mqtt.connectAsync("mqtt://127.0.0.1:1883", { protocolVersion: 5 }); + + // Client 2: Publisher + console.log("Connecting Publisher..."); + const pubClient = await mqtt.connectAsync("mqtt://127.0.0.1:1883", { protocolVersion: 5 }); + + // Setup message collector on Subscriber BEFORE verifying subscription or publishing + const messagesReceived = []; + const completionPromise = new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error("Timeout waiting for messages")); + }, 10000); + + subClient.on('message', (topic, message, packet) => { + console.log(`Received: ${topic} ${message.toString()}`); + messagesReceived.push({ + msg: message.toString(), + props: packet.properties + }); + + if (messagesReceived.length >= 3) { + clearTimeout(timeout); + resolve(); + } + }); + }); + + // Request timestamps via subscription properties + console.log("Subscribing..."); + await subClient.subscribeAsync("timestamp/test", { + qos: 1, + properties: { + userProperties: { + include_broker_timestamps: "true" + } + } + }); + console.log("Subscribed!"); + + // Publish messages from Client 2 + console.log("Publishing..."); + await pubClient.publishAsync("timestamp/test", "message 1", { qos: 1 }); + await pubClient.publishAsync("timestamp/test", "message 2", { qos: 1 }); + await pubClient.publishAsync("timestamp/test", "message 3", { qos: 1 }); + console.log("Published!"); + + // Wait for subscriber to get them + await completionPromise; + + // Verify content and timestamps + expect(messagesReceived.length).toBe(3); + + let lastTimestamp = ""; + for (const m of messagesReceived) { + expect(m.props).toBeDefined(); + expect(m.props.userProperties).toBeDefined(); + expect(m.props.userProperties.timestamp_received).toBeDefined(); + + const currentTs = m.props.userProperties.timestamp_received; + if (lastTimestamp !== "") { + expect(currentTs >= lastTimestamp).toBe(true); + } + lastTimestamp = currentTs; + } + + console.log("Timestamps verified:", messagesReceived.map(m => m.props.userProperties.timestamp_received)); + + await subClient.endAsync(); + await pubClient.endAsync(); + }, 30000); +}); From 0a7b8f4041e5e64c401f221f622a214bbd89d4fb Mon Sep 17 00:00:00 2001 From: Divyanshu110011 Date: Wed, 4 Feb 2026 19:11:09 +0530 Subject: [PATCH 4/4] fixed fmt issues --- src/mqtt/broker/connection.rs | 9 ++++++--- src/mqtt/trie/node.rs | 3 +-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/mqtt/broker/connection.rs b/src/mqtt/broker/connection.rs index 8e4d265..239218b 100644 --- a/src/mqtt/broker/connection.rs +++ b/src/mqtt/broker/connection.rs @@ -1,9 +1,9 @@ +use std::cmp; use std::fmt::{Debug, Display}; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use std::cmp; use bytes::BytesMut; use color_eyre::eyre; @@ -888,8 +888,11 @@ impl Connection { self.send(Packet::UnsubAck( UnsubAck { pkid: unsub.pkid, - reasons: std::iter::repeat_n(UnsubAckReason::PacketIdentifierInUse, unsub.filters.len()) - .collect(), + reasons: std::iter::repeat_n( + UnsubAckReason::PacketIdentifierInUse, + unsub.filters.len(), + ) + .collect(), }, None, )) diff --git a/src/mqtt/trie/node.rs b/src/mqtt/trie/node.rs index 0c8c60c..bebff36 100644 --- a/src/mqtt/trie/node.rs +++ b/src/mqtt/trie/node.rs @@ -19,8 +19,7 @@ pub(super) struct NodeLeaf { impl NodeLeaf { pub(super) fn all(&self, f: &mut impl FnMut(&T) -> bool) -> bool { - self.exact_val.as_ref().is_none_or(&mut *f) - && self.descendant_val.as_ref().is_none_or(f) + self.exact_val.as_ref().is_none_or(&mut *f) && self.descendant_val.as_ref().is_none_or(f) } fn is_empty(&self) -> bool {