Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# message-queue
# foxmq
Message queue MeshApp utilizing Tashi Consensus Engine

### Checking Out
Expand Down
6 changes: 3 additions & 3 deletions src/cli/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::cli::LogFormat;
use crate::collections::HashMap;
use crate::config;
Expand All @@ -12,6 +9,9 @@ use crate::mqtt::{KeepAlive, TceState};
use crate::transaction::AddNodeTransaction;
use color_eyre::eyre;
use color_eyre::eyre::Context;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tashi_consensus_engine::quic::QuicSocket;
use tashi_consensus_engine::{
Certificate, Platform, RootCertificates, SecretKey, UnknownConnectionAction,
Expand Down
6 changes: 3 additions & 3 deletions src/cli/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::fs::{File, OpenOptions};
use std::io::{BufRead, Write};
use std::path::{Path, PathBuf};

use color_eyre::eyre::WrapErr;
use rand::distributions::Uniform;
use rand::Rng;
use crate::cli::LogFormat;
use crate::collections::HashMap;
use crate::config::users::{AuthConfig, User, UsersConfig};
use color_eyre::eyre::WrapErr;
use rand::distributions::Uniform;
use rand::Rng;

const DEFAULT_PASSWORD_LEN: usize = 12;

Expand Down
6 changes: 3 additions & 3 deletions src/config/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ impl PermissionsConfig {
transaction_type: TransactionType,
) -> bool {
// Allows everything if no topics config was found.
topics_config.map_or(true, |perms| {
topics_config.is_none_or(|perms| {
perms
.topic
.iter()
.find(|k| k.filter.matches_topic(topic_name))
.map_or(true, |k| {
k.allowed.iter().any(|k| *k == transaction_type)
.is_none_or(|k| {
k.allowed.contains(&transaction_type)
|| !k.denied.iter().all(|k| *k == transaction_type)
})
})
Expand Down
10 changes: 6 additions & 4 deletions src/mqtt/broker/connection.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::cmp;
use std::fmt::{Debug, Display};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp, iter};

use bytes::BytesMut;
use color_eyre::eyre;
Expand Down Expand Up @@ -888,9 +888,11 @@ impl<S: MqttSocket> Connection<S> {
self.send(Packet::UnsubAck(
UnsubAck {
pkid: unsub.pkid,
reasons: iter::repeat(UnsubAckReason::PacketIdentifierInUse)
.take(unsub.filters.len())
.collect(),
reasons: std::iter::repeat_n(
UnsubAckReason::PacketIdentifierInUse,
unsub.filters.len(),
)
.collect(),
},
None,
))
Expand Down
3 changes: 1 addition & 2 deletions src/mqtt/broker/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,5 @@ fn find_client_ip(connected_addr: IpAddr, x_forwarded_for: &HeaderValue) -> Opti
})
.ok()
})
.filter(|ip| ip != &connected_addr)
.next_back()
.rfind(|ip| ip != &connected_addr)
}
2 changes: 0 additions & 2 deletions src/mqtt/client_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ impl PartialEq<ClientId> for str {
}
}



impl From<ClientId> for String {
fn from(value: ClientId) -> Self {
value.as_str().into()
Expand Down
2 changes: 1 addition & 1 deletion src/mqtt/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ pub fn txn_to_packet(
Bytes::copy_from_slice(txn.topic.as_bytes()),
txn.payload.0.clone(),
),
(!sub_ids.is_empty() || txn.properties.is_some()).then(|| {
(!sub_ids.is_empty() || txn.properties.is_some() || include_broker_timestamps).then(|| {
macro_rules! clone_prop {
($prop:ident) => {
txn.properties
Expand Down
3 changes: 1 addition & 2 deletions src/mqtt/trie/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ pub(super) struct NodeLeaf<T> {

impl<T> NodeLeaf<T> {
pub(super) fn all(&self, f: &mut impl FnMut(&T) -> bool) -> bool {
self.exact_val.as_ref().map_or(true, &mut *f)
&& self.descendant_val.as_ref().map_or(true, f)
self.exact_val.as_ref().is_none_or(&mut *f) && self.descendant_val.as_ref().is_none_or(f)
}

fn is_empty(&self) -> bool {
Expand Down
1 change: 1 addition & 0 deletions tests/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/node_modules
/foxmq-single.d/
4 changes: 2 additions & 2 deletions tests/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM rust:1-bookworm

WORKDIR /home/message-queue
WORKDIR /home/foxmq

# The minimum number of files required to allow `cargo fetch` to cache all dependencies.
# We do this separately from the `COPY ./ .` so most code changes don't invalidate the `cargo fetch` layer.
Expand Down Expand Up @@ -35,6 +35,6 @@ RUN cargo build --release -p tokio -p tracing -p tashi-consensus-engine

COPY ./ .

WORKDIR /home/message-queue
WORKDIR /home/foxmq

RUN cargo build --release
12 changes: 6 additions & 6 deletions tests/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ services:
dockerfile: tests/Dockerfile
ssh:
- default
image: message-queue-test
image: foxmq-test
volumes:
- ./foxmq.d:/home/message-queue/foxmq.d
- ./foxmq.d:/home/foxmq/foxmq.d
ports:
- "1883:1883"
- "8080:8080"
Expand All @@ -30,15 +30,15 @@ services:
--secret-key-file foxmq.d/key_0.pem
--tls-cert-file=foxmq.d/key_0.crt
--server-name broker1.example.com
/home/message-queue/foxmq.d
/home/foxmq/foxmq.d

broker2:
container_name: broker2
depends_on:
- broker1
image: message-queue-test
image: foxmq-test
volumes:
- ./foxmq.d:/home/message-queue/foxmq.d
- ./foxmq.d:/home/foxmq/foxmq.d
ports:
- "1884:1883"
- "8081:8080"
Expand All @@ -56,7 +56,7 @@ services:
--secret-key-file foxmq.d/key_1.pem
--tls-cert-file=foxmq.d/key_1.crt
--server-name broker2.example.com
/home/message-queue/foxmq.d
/home/foxmq/foxmq.d

networks:
mesh-app-bridge:
Expand Down
24 changes: 12 additions & 12 deletions tests/foxmq-root-ca.d/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ services:
dockerfile: tests/Dockerfile
ssh:
- default
image: message-queue-test
image: foxmq-test
volumes:
- .:/home/message-queue/foxmq.d
- .:/home/foxmq/foxmq.d
ports:
- "1883:1883"
networks:
Expand All @@ -28,16 +28,16 @@ services:
--cluster-root-cert=foxmq.d/ca.crt
--cluster-accept-peer-with-cert
--server-name broker1.example.com
/home/message-queue/foxmq.d
/home/foxmq/foxmq.d

broker2:
container_name: broker2
depends_on:
# Ensure we start after `broker1` for proper IP address assignment
- broker1
image: message-queue-test
image: foxmq-test
volumes:
- .:/home/message-queue/foxmq.d
- .:/home/foxmq/foxmq.d
ports:
- "1884:1883"
networks:
Expand All @@ -53,16 +53,16 @@ services:
--cluster-root-cert=foxmq.d/ca.crt
--cluster-accept-peer-with-cert
--server-name broker2.example.com
/home/message-queue/foxmq.d
/home/foxmq/foxmq.d

broker3:
container_name: broker3
depends_on:
# Ensure we start after `broker2` for proper IP address assignment
- broker2
image: message-queue-test
image: foxmq-test
volumes:
- .:/home/message-queue/foxmq.d
- .:/home/foxmq/foxmq.d
ports:
- "1885:1883"
networks:
Expand All @@ -78,16 +78,16 @@ services:
--cluster-root-cert=foxmq.d/ca.crt
--cluster-accept-peer-with-cert
--server-name broker2.example.com
/home/message-queue/foxmq.d
/home/foxmq/foxmq.d

broker4:
container_name: broker4
depends_on:
# Ensure we start after `broker3` for proper IP address assignment\
- broker3
image: message-queue-test
image: foxmq-test
volumes:
- .:/home/message-queue/foxmq.d
- .:/home/foxmq/foxmq.d
ports:
- "1886:1883"
networks:
Expand All @@ -104,7 +104,7 @@ services:
--cluster-root-cert=foxmq.d/ca.crt
--cluster-accept-peer-with-cert
--server-name broker4.example.com
/home/message-queue/foxmq.d
/home/foxmq/foxmq.d

networks:
mesh-app-bridge:
Expand Down
78 changes: 78 additions & 0 deletions tests/mqtt/consensus-timestamps.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
const mqtt = require("mqtt");
const events = require("node:events");

describe("Consensus Timestamps", () => {
test("receive timestamp_received user property when requested", async () => {
// Client 1: Subscriber
console.log("Connecting Subscriber...");
const subClient = await mqtt.connectAsync("mqtt://127.0.0.1:1883", { protocolVersion: 5 });

// Client 2: Publisher
console.log("Connecting Publisher...");
const pubClient = await mqtt.connectAsync("mqtt://127.0.0.1:1883", { protocolVersion: 5 });

// Setup message collector on Subscriber BEFORE verifying subscription or publishing
const messagesReceived = [];
const completionPromise = new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("Timeout waiting for messages"));
}, 10000);

subClient.on('message', (topic, message, packet) => {
console.log(`Received: ${topic} ${message.toString()}`);
messagesReceived.push({
msg: message.toString(),
props: packet.properties
});

if (messagesReceived.length >= 3) {
clearTimeout(timeout);
resolve();
}
});
});

// Request timestamps via subscription properties
console.log("Subscribing...");
await subClient.subscribeAsync("timestamp/test", {
qos: 1,
properties: {
userProperties: {
include_broker_timestamps: "true"
}
}
});
console.log("Subscribed!");

// Publish messages from Client 2
console.log("Publishing...");
await pubClient.publishAsync("timestamp/test", "message 1", { qos: 1 });
await pubClient.publishAsync("timestamp/test", "message 2", { qos: 1 });
await pubClient.publishAsync("timestamp/test", "message 3", { qos: 1 });
console.log("Published!");

// Wait for subscriber to get them
await completionPromise;

// Verify content and timestamps
expect(messagesReceived.length).toBe(3);

let lastTimestamp = "";
for (const m of messagesReceived) {
expect(m.props).toBeDefined();
expect(m.props.userProperties).toBeDefined();
expect(m.props.userProperties.timestamp_received).toBeDefined();

const currentTs = m.props.userProperties.timestamp_received;
if (lastTimestamp !== "") {
expect(currentTs >= lastTimestamp).toBe(true);
}
lastTimestamp = currentTs;
}

console.log("Timestamps verified:", messagesReceived.map(m => m.props.userProperties.timestamp_received));

await subClient.endAsync();
await pubClient.endAsync();
}, 30000);
});
Loading