Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
903 changes: 102 additions & 801 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ edition = "2021"
publish = false
default-run = "foxmq"

[dependencies.tashi-consensus-engine]
git = "ssh://git@github.com/tashigg/tashi-consensus-engine"
rev = "aaf0206f2a71c710bafe6c5c89f172e5fae2b277"
[dependencies.tashi-vertex]
git = "ssh://git@github.com/tashigg/tashi-vertex-rs.git"
branch = "main"

[dependencies]
fnv = "1.0.7"
Expand All @@ -36,8 +36,8 @@ serde = { version = "1.0.196", features = ["derive", "std"] }
thiserror = "1.0.56"
toml = { version = "0.8.10", features = ["parse"] }
tracing = { version = "0.1.40", features = ["attributes"] }
tokio-util = "0.7.10"
der = { version = "0.7.9", features = ["derive"] }
tokio-util = { version = "0.7.10", features = ["time"] }
der = { version = "0.7.9", features = ["derive", "alloc", "std"] }
futures = { version = "0.3.30", features = ["std", "alloc"] }
slotmap = "1.0.7"
arbitrary = { version = "1", optional = true, features = ["derive"] }
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# foxmq
Message queue MeshApp utilizing Tashi Consensus Engine

### Quick Start
See [example.md](example.md) for a step-by-step guide on running and testing the message queue in WSL.

### Checking Out

When checking out, be sure to initialize and update submodules:
Expand Down
39 changes: 39 additions & 0 deletions example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# FoxMQ Usage Example

This example demonstrates how to run the FoxMQ broker and test it using standard MQTT clients in a WSL environment.

## Prerequisites

Ensure you are in a **WSL** terminal, as the Tashi Vertex native libraries require Linux for linking and execution.

## Step-by-Step Demo

### 1. Terminal 1: Start the Broker
Start the FoxMQ broker with anonymous login enabled for testing:

```bash
cd /mnt/e/projects/tashi/message-queue
cargo run -- run foxmq.d --allow-anonymous-login
```

### 2. Terminal 2: Subscribe to a Topic
Open a second WSL terminal and subscribe to a test topic. This client will wait for messages ordered by the Tashi Consensus Engine.

```bash
mosquitto_sub -h 127.0.0.1 -p 1883 -v -t "tashi/mesh/demo"
```

### 3. Terminal 3: Publish a Message
Open a third WSL terminal and send a message. This message is wrapped in a Tashi transaction, ordered by the consensus layer, and then delivered to all subscribers.

```bash
mosquitto_pub -h 127.0.0.1 -p 1883 -t "tashi/mesh/demo" -m "Message ordered by Tashi Consensus!"
```

## What's Happening?

1. **MQTT Publish**: The `mosquitto_pub` command sends the message to FoxMQ.
2. **Consensus Layer**: FoxMQ submits the message to the `tashi-vertex`, which assigns it a global order and a consensus timestamp.
3. **MQTT Delivery**: FoxMQ receives the ordered event back and delivers it to `mosquitto_sub`.

This ensures that in a multi-node cluster, every node sees the exact same sequence of messages at the exact same time.
12 changes: 6 additions & 6 deletions src/cli/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{fs, io};
use color_eyre::eyre;
use color_eyre::eyre::{eyre, WrapErr};
use serde::Serialize;
use tashi_consensus_engine::SecretKey;
use tashi_vertex::KeySecret;

use crate::cli::LogFormat;
use crate::collections::HashSet;
Expand Down Expand Up @@ -150,15 +150,15 @@ fn generate_address_book(
for (i, address) in addresses.enumerate() {
eyre::ensure!(address_set.insert(address), "Duplicate address: {address}");

let key = SecretKey::generate();
let pubkey = key.public_key();
let key = KeySecret::generate();
let pubkey = key.public();

let pem = key.to_pem();
let key_str = key.to_string();

let pem_filename = format!("key_{i}.pem");
let pem_filename = format!("key_{i}.key");
let pem_path = output_dir.join(&pem_filename);

write_new_file(&pem_path, pem, force)?;
write_new_file(&pem_path, key_str, force)?;

// Mark each entry with a comment referencing its key file.
writeln!(address_book, "# {pem_filename}").expect("writing to a String cannot fail");
Expand Down
164 changes: 102 additions & 62 deletions src/cli/run.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
use crate::cli::LogFormat;
use crate::collections::HashMap;
use crate::config;
use crate::config::addresses::Addresses;
use crate::config::permissions::PermissionsConfig;
use crate::config::users::{AuthConfig, UsersConfig};
use crate::mqtt::broker::{self, MqttBroker};
use crate::mqtt::{KeepAlive, TceState};
use crate::transaction::AddNodeTransaction;
// use crate::transaction::AddNodeTransaction;
use color_eyre::eyre;
use color_eyre::eyre::Context;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use tashi_consensus_engine::quic::QuicSocket;
use tashi_consensus_engine::{
Certificate, Platform, RootCertificates, SecretKey, UnknownConnectionAction,
};
use tokio::sync::mpsc;
use tashi_vertex::{KeyPublic, KeySecret, Options};
use tokio_rustls::rustls;

#[derive(clap::Args, Clone, Debug)]
pub struct RunArgs {
Expand Down Expand Up @@ -155,27 +152,46 @@ pub struct ClusterConfig {
pub cluster_accept_peer_with_cert: bool,
}

trait KeySecretToRustls {
fn to_rustls(&self) -> crate::Result<rustls::pki_types::PrivateKeyDer<'static>>;
}

impl KeySecretToRustls for KeySecret {
fn to_rustls(&self) -> crate::Result<rustls::pki_types::PrivateKeyDer<'static>> {
let der = self.to_der_vec().wrap_err("failed to encode key to DER")?;
Ok(rustls::pki_types::PrivateKeyDer::Pkcs8(der.into()))
}
}

struct TceConfig {
config: tashi_consensus_engine::Config,
roots: Option<Arc<RootCertificates>>,
add_nodes: mpsc::UnboundedReceiver<AddNodeTransaction>,
joining_running_session: bool,
options: Options,
pub secret_key: KeySecret,
pub _joining_running_session: bool,
pub initial_peers: Vec<(KeyPublic, SocketAddr)>,
}

impl SecretKeyOpt {
/// NOTE: uses blocking I/O internally if the secret key was specified as a file.
pub fn read_key(&self) -> crate::Result<SecretKey> {
if let Some(der) = &self.secret_key {
let der_bytes = hex::decode(der).wrap_err("error decoding hex-encoded secret key")?;
return SecretKey::from_der(&der_bytes)
.wrap_err("error decoding P-256 secret key from DER");
pub fn read_key(&self) -> crate::Result<KeySecret> {
if let Some(key_input) = &self.secret_key {
// First, try to parse directly as a Base58 secret key
if let Ok(key) = KeySecret::from_str(key_input) {
return Ok(key);
}

// If that fails, treat it as a file path
let content = std::fs::read_to_string(key_input)
.wrap_err_with(|| format!("failed to read secret key file: {key_input}"))?;

return KeySecret::from_str(content.trim())
.wrap_err("failed to parse secret key from file");
}

if let Some(path) = &self.secret_key_file {
return read_secret_key(path);
}

Ok(SecretKey::generate())
Ok(KeySecret::generate())
}
}

Expand Down Expand Up @@ -243,13 +259,9 @@ pub fn main(args: RunArgs) -> crate::Result<()> {
)
})?
} else {
vec![Certificate::generate_self_signed(
&key,
tls_socket_addr,
&args.tls_config.server_name,
None,
)?
.into_rustls()]
// Certificate::generate_self_signed was from TCE.
// We need to implement using rcgen or similar if needed, or require cert file.
eyre::bail!("Self-signed certificate generation is temporarily disabled during migration. Please provide a certificate file.");
};

eyre::Ok(broker::TlsConfig {
Expand Down Expand Up @@ -277,23 +289,41 @@ async fn main_async(
) -> crate::Result<()> {
let tce = match tce_config {
Some(tce_config) => {
let (platform, messages) = Platform::start(
tce_config.config,
QuicSocket::bind_udp(args.cluster_addr).await?,
tce_config.joining_running_session,
)?;
let context =
Arc::new(tashi_vertex::Context::new().wrap_err("failed to create context")?);
let socket = tashi_vertex::Socket::bind(&context, &args.cluster_addr.to_string())
.await
.wrap_err("failed to bind cluster socket")?;
let mut peers = tashi_vertex::Peers::new().wrap_err("failed to create peers")?;

for (key, addr) in tce_config.initial_peers {
peers
.insert(
&addr.to_string(),
&key,
tashi_vertex::PeerCapabilities::default(),
)
.wrap_err("failed to add initial peer")?;
}

let engine = tashi_vertex::Engine::start(
&context,
socket,
tce_config.options,
&tce_config.secret_key,
peers,
)
.wrap_err("failed to start engine")?;

Some(TceState {
platform: Arc::new(platform),
messages,
roots: tce_config.roots,
add_nodes: tce_config.add_nodes,
engine: Arc::new(engine),
context,
})
}
None => None,
};

let tce_platform = tce.as_ref().map(|tce| tce.platform.clone());
let tce_platform = tce.as_ref().map(|tce| tce.engine.clone());

let mut broker = MqttBroker::bind(
args.mqtt_addr,
Expand All @@ -315,8 +345,8 @@ async fn main_async(
res = tokio::signal::ctrl_c() => {
res.wrap_err("error from ctrl_c() handler")?;

if let Some(platform) = tce_platform {
platform.shutdown().await;
if let Some(_platform) = tce_platform {
// platform.shutdown().await;
}
break;
}
Expand All @@ -332,42 +362,51 @@ async fn main_async(
}

fn create_tce_config(
secret_key: SecretKey,
secret_key: KeySecret,
addresses: &Addresses,
config: &ClusterConfig,
) -> crate::Result<TceConfig> {
let nodes: HashMap<_, _> = addresses
.addresses
.iter()
.map(|address| (address.key.clone(), address.addr))
.collect();
// let nodes: HashMap<_, _> = addresses
// .addresses
// .iter()
// .map(|address| (address.key.clone(), address.addr))
// .collect();

// The address book is only required to contain the existing nodes.
let joining_running_session = !nodes.contains_key(&secret_key.public_key());

let mut tce_config = tashi_consensus_engine::Config::new(secret_key);

tce_config
.initial_nodes(nodes.into_iter().collect())
.enable_hole_punching(false)
// TODO: we can dispatch messages before they come to consensus
// but we need to make sure we don't duplicate PUBLISHes.
// .report_events_before_consensus(true)
// Since a FoxMQ cluster is permissioned, don't kick failed nodes which may later recover.
.fallen_behind_kick_seconds(None);
// let joining_running_session = !nodes.contains_key(&secret_key.public());
// For now assume we are always joining or starting based on some other logic, or just let vertex handle it?
// tashi-vertex handle this via initial peers.
let _joining_running_session = false; // TODO: restore logic

let options = Options::new();

// Add initial peers
let mut initial_peers = Vec::new();
for address in &addresses.addresses {
let addr: SocketAddr = address
.addr
.to_string()
.parse()
.expect("invalid socket address");
initial_peers.push((address.key.clone(), addr));
}

if let Some(cert_path) = &config.cluster_cert {
tce_config.tls_cert_chain(Certificate::load_chain_from(cert_path)?);
if let Some(_cert_path) = &config.cluster_cert {
// TODO: handle certs in tashi-vertex
// options.tls_cert_chain(Certificate::load_chain_from(cert_path)?);
}

/*
let roots = if let Some(root_cert_path) = &config.cluster_root_cert {
let roots = Arc::new(RootCertificates::read_from(root_cert_path)?);
tce_config.tls_roots(roots.clone());
Some(roots)
} else {
None
};
*/

/*
let (add_nodes_tx, add_nodes_rx) = mpsc::unbounded_channel();

if config.cluster_accept_peer_with_cert {
Expand All @@ -385,20 +424,21 @@ fn create_tce_config(
Ok(UnknownConnectionAction::VoteToAddPeer)
});
}
*/

Ok(TceConfig {
config: tce_config,
roots,
add_nodes: add_nodes_rx,
joining_running_session,
options,
secret_key: secret_key.clone(),
_joining_running_session,
initial_peers,
})
}

/// NOTE: uses blocking I/O internally.
fn read_secret_key(path: &Path) -> crate::Result<SecretKey> {
fn read_secret_key(path: &Path) -> crate::Result<KeySecret> {
// There's no benefit to using `tokio::fs` because it just does the blocking work on a background thread.
let pem = std::fs::read(path).wrap_err_with(|| format!("error reading {}", path.display()))?;

SecretKey::from_pem(&pem)
KeySecret::from_der(&pem)
.wrap_err_with(|| format!("error reading P-256 secret key from {}", path.display()))
}
4 changes: 2 additions & 2 deletions src/config/addresses.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::net::SocketAddr;
use std::path::Path;

use tashi_consensus_engine::PublicKey;
use tashi_vertex::KeyPublic;

#[derive(serde::Deserialize, serde::Serialize)]
pub struct Addresses {
Expand All @@ -10,7 +10,7 @@ pub struct Addresses {

#[derive(serde::Deserialize, serde::Serialize)]
pub struct Address {
pub key: PublicKey,
pub key: KeyPublic,
pub addr: SocketAddr,
}

Expand Down
Loading
Loading