diff --git a/.gitignore b/.gitignore index 7ab7037..8226ebf 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ Cargo.lock # Temporary VI files *.swp + +# MacOS Specific Files +**/.DS_Store diff --git a/Cargo.toml b/Cargo.toml index 41313f2..9f51141 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [package] -name = "rusty_junctions" +name = "rusty-junctions" version = "0.1.0" -authors = ["Sebastian Müksch "] -edition = "2018" +authors = ["Sebastian Müksch ", "Kyle Cotton "] +edition = "2021" description = "Join Pattern implementation in Rust." readme = "README.md" repository = "https://github.com/smueksch/rusty_junctions" @@ -11,6 +11,12 @@ keywords = ["join-pattern", "join-calculus", "concurrency"] categories = ["concurrency"] [dependencies] +bag = { path = "bag" } +inverted-index = { path = "inverted-index" } +counter = { path = "counter" } +rusty-junctions-macro = "0.1.0" +log = "0.4.14" [dev-dependencies] rand = "0.7.3" +pretty_env_logger = "0.4.0" diff --git a/bag/Cargo.toml b/bag/Cargo.toml new file mode 100644 index 0000000..dea5c48 --- /dev/null +++ b/bag/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "bag" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/src/bag.rs b/bag/src/lib.rs similarity index 95% rename from src/bag.rs rename to bag/src/lib.rs index 6b01da8..939d2c5 100644 --- a/src/bag.rs +++ b/bag/src/lib.rs @@ -11,7 +11,7 @@ use std::hash::Hash; /// Stores a `HashMap` of keys to `VecDeque` of values, which allows it to /// store and arbitrary amount of values for any given key and retrieve /// them in FIFO order. -pub(crate) struct Bag { +pub struct Bag { items: HashMap>, } @@ -19,7 +19,7 @@ impl Bag where K: Hash + Eq, { - pub(crate) fn new() -> Bag { + pub fn new() -> Bag { Bag { items: HashMap::new(), } @@ -30,7 +30,7 @@ where /// If the given key already exists, the given value is added last in /// a `VecDeque` for the given key. Otherwise, the new key and value /// are inserted. - pub(crate) fn add(&mut self, key: K, item: V) { + pub fn add(&mut self, key: K, item: V) { match self.items.get_mut(&key) { Some(queue) => { queue.push_back(item); @@ -48,17 +48,17 @@ where /// /// Retrieve `Some` of the least recently added value for the given key /// if there is at least one available, otherwise return `None`. - pub(crate) fn retrieve(&mut self, key: &K) -> Option { + pub fn retrieve(&mut self, key: &K) -> Option { self.items.get_mut(key)?.pop_front() } /// Return true if there are values for the given key. - pub(crate) fn contains_items(&self, key: &K) -> bool { + pub fn contains_items(&self, key: &K) -> bool { self.items.get(key).map_or(false, |q| !q.is_empty()) } /// Return the number of values stored for the given key. - pub(crate) fn count_items(&self, key: &K) -> usize { + pub fn count_items(&self, key: &K) -> usize { self.items.get(key).map_or(0, |q| q.len()) } } diff --git a/counter/Cargo.toml b/counter/Cargo.toml new file mode 100644 index 0000000..21e5c80 --- /dev/null +++ b/counter/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "counter" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/src/counter.rs b/counter/src/lib.rs similarity index 99% rename from src/counter.rs rename to counter/src/lib.rs index 21bba56..3b6ad97 100644 --- a/src/counter.rs +++ b/counter/src/lib.rs @@ -40,7 +40,7 @@ const CARRY: Uint = 1; /// the internal representation using `Vec` small. Pushing a value at the back of /// a `Vec` may require resizing, but never moving existing elements. #[derive(Clone, Eq, Debug)] -pub(crate) struct Counter { +pub struct Counter { digits: Vec, } @@ -49,7 +49,7 @@ impl Counter { /// /// Increments the `Counter` and dynamically grows it if all of its digits /// have reached their maximum values. - pub(crate) fn increment(&mut self) { + pub fn increment(&mut self) { // Increment the lowest digit by a normal carry. let mut carry = CARRY; diff --git a/examples/buffer.rs b/examples/buffer.rs index 6cfad4e..5abf43c 100644 --- a/examples/buffer.rs +++ b/examples/buffer.rs @@ -4,13 +4,11 @@ /// most importantly demonstrates error handling related to channels not being /// able to send messages after their controller has shut down. Failing to /// correctly handle cases like that will cause threads to panic. - -use std::any::Any; -use std::thread; - -use rusty_junctions::channels::{RecvChannel, SendChannel}; -use rusty_junctions::types::ControllerHandle; -use rusty_junctions::Junction; +use rusty_junctions::{ + channels::{RecvChannel, SendChannel}, + ControllerHandle, Junction, +}; +use std::{any::Any, thread}; // Create a new private buffer Junction and return all required channels. // diff --git a/examples/collatz.rs b/examples/collatz.rs index caa58c6..e170ea7 100644 --- a/examples/collatz.rs +++ b/examples/collatz.rs @@ -13,10 +13,8 @@ /// Refer to the following Wikipedia article for the mathematical background: /// /// https://en.wikipedia.org/wiki/Collatz_conjecture - -use std::env; - use rusty_junctions::Junction; +use std::env; // Define a result type to be sent over channels. // Note that the type *must* implement the clone trait. diff --git a/examples/junction-macro.rs b/examples/junction-macro.rs new file mode 100644 index 0000000..b34b894 --- /dev/null +++ b/examples/junction-macro.rs @@ -0,0 +1,36 @@ +use rusty_junctions_macro::client::junction; + +fn main() { + pretty_env_logger::init(); + + junction! { + message as Send::String, + id as Send::i32, + | message, id | { + println!("Single Junction Procedural Macro API: {message} {id}"); + }, + }; + message + .send(String::from("Secret Concurrent Message")) + .unwrap(); + id.send(1960).unwrap(); + + junction! { + main_junction as Junction, + name as Send::String, + value as Send::i32, + | name, value | { + std::thread::sleep(std::time::Duration::from_secs(5)); + println!("Single Junction Procedural Macro API: {name} {value}"); + }, + }; + + value.send(2).unwrap(); + name.send(String::from("Hello, World!")).unwrap(); + + let channel = main_junction.send_channel::(); + main_junction.when(&channel).then_do(|c| { + println!("Newly Installed Pattern: {c}"); + }); + channel.send("Lots of opportunities".to_string()).unwrap(); +} diff --git a/examples/macro-api.rs b/examples/macro-api.rs new file mode 100644 index 0000000..834b5dc --- /dev/null +++ b/examples/macro-api.rs @@ -0,0 +1,66 @@ +use rusty_junctions_macro::client::{channel, junction, junction_dec, when}; + +fn main() { + pretty_env_logger::init(); + + // Standard API + let junction = rusty_junctions::Junction::new(); + let name = junction.send_channel::(); + let value = junction.send_channel::(); + junction.when(&name).and(&value).then_do(|name, value| { + println!("Standard API: {name} {value}"); + }); + value.send(0).unwrap(); + name.send(String::from("Hello, World!")).unwrap(); + + // Single Junction Declarative Macro API + let (name, value, mut handle) = junction_dec! { + name as Send::String, + value as Send::i32, + |name, value| { + println!("Single Junction Declarative Macro API: {name} {value}"); + }, + }; + value.send(1).unwrap(); + name.send(String::from("Hello, World!")).unwrap(); + // Needs to have the Controller explicitly stopped, if we allowed it to + // be dropped from the inner scope there would be no guarantee it would + // have time for the pattern to fire. + handle.stop(); + + // Single Junction Procedural Macro API + // junction as ControllerHandle, // Bring the cotnroller handle into scope with this name + junction! { + // some_junction as Junction, + get as Recv::i32, + set as Send::i32, + value as Send::i32, + | get, value | { + println!("Getting value: {value}"); + value_super.send(value).unwrap(); + value + }, + | set, value | { + println!("Setting value: {value} --> {set}"); + value_super.send(set).unwrap(); + }, + }; + // let _handle = some_junction.controller_handle(); + + value.send(1809124).unwrap(); + let _v = get.recv().unwrap(); + set.send(2022).unwrap(); + + // let value = value.recv(); + // println!("Got value {value:?}"); + + // When! Macro API + let junction = rusty_junctions::Junction::new(); + let name = junction.send_channel::(); + let value = junction.send_channel::(); + when!(junction; name, value).then_do(|name, value| { + println!("when! Macro API: {name} {value}"); + }); + value.send(3).unwrap(); + name.send(String::from("Hello, World!")).unwrap(); +} diff --git a/examples/mutex.rs b/examples/mutex.rs index 7c199ab..b7af3f8 100644 --- a/examples/mutex.rs +++ b/examples/mutex.rs @@ -2,19 +2,15 @@ /// /// This code provides evidence that concurrent code using locks can equally /// be implemented using junctions and join patterns. - +use rusty_junctions::{ + channels::{RecvChannel, SendChannel}, + ControllerHandle, Junction, +}; use std::thread; -use rusty_junctions::Junction; -use rusty_junctions::types::ControllerHandle; -use rusty_junctions::channels::{SendChannel, RecvChannel}; - // Create a new mutex using a private junction and return channels to acquire // and release it. -fn new_mutex() -> ( - ControllerHandle, RecvChannel<()>, SendChannel<()> -) -{ +fn new_mutex() -> (ControllerHandle, RecvChannel<()>, SendChannel<()>) { // Private junction to set up the mutex. let mut mutex = Junction::new(); @@ -25,7 +21,7 @@ fn new_mutex() -> ( // Asynchronous state channel to represent a lock that can be consumed // and released. let lock = mutex.send_channel::<()>(); - + // When there is a lock available and a thread wants to acquire it, // unblock that thread which is equivalent to acquiring the lock. // It is mutually exclusive as no new lock message is sent out. diff --git a/examples/rendezvous.rs b/examples/rendezvous.rs index 717d304..dbea7fc 100644 --- a/examples/rendezvous.rs +++ b/examples/rendezvous.rs @@ -6,13 +6,12 @@ /// /// Link to the paper: /// https://www.researchgate.net/profile/Nick_Benton2/publication/2569067_Jingle_Bells_Solving_the_Santa_Claus_Problem_in/links/0c9605264f92520a08000000/Jingle-Bells-Solving-the-Santa-Claus-Problem-in.pdf - +use rusty_junctions::{ + channels::{BidirChannel, RecvChannel}, + ControllerHandle, Junction, +}; use std::thread; -use rusty_junctions::channels::{BidirChannel, RecvChannel}; -use rusty_junctions::types::ControllerHandle; -use rusty_junctions::Junction; - // Set up a private Junction for a rendezvous and return the public channels. pub fn rendezvous() -> (ControllerHandle, BidirChannel, RecvChannel<()>) { let mut j = Junction::new(); diff --git a/examples/santa-claus.rs b/examples/santa-claus.rs index cfa7eb5..5dc3261 100644 --- a/examples/santa-claus.rs +++ b/examples/santa-claus.rs @@ -4,15 +4,13 @@ /// Problem in Polyphonic C#" by Nick Benton /// /// The paper: https://www.researchgate.net/profile/Nick_Benton2/publication/2569067_Jingle_Bells_Solving_the_Santa_Claus_Problem_in/links/0c9605264f92520a08000000/Jingle-Bells-Solving-the-Santa-Claus-Problem-in.pdf - +use rusty_junctions::{ + channels::{BidirChannel, RecvChannel}, + ControllerHandle, Junction, +}; use rand::Rng; - use std::{thread, time::Duration}; -use rusty_junctions::channels::{BidirChannel, RecvChannel}; -use rusty_junctions::types::ControllerHandle; -use rusty_junctions::Junction; - fn main() { /***************************** * Elves Junction & Channels * diff --git a/examples/storage-cell.rs b/examples/storage-cell.rs index e3c6ec7..d21fc88 100644 --- a/examples/storage-cell.rs +++ b/examples/storage-cell.rs @@ -1,6 +1,5 @@ /// Simple storage cell implementation to demonstrate every available /// channel as well as join patterns with repeated channels. - use rusty_junctions::Junction; fn main() { @@ -18,7 +17,7 @@ fn main() { // New channel to swap the value of the storage cell for a new one and // retrieve the value that was just replaced. - let swap = cell.bidir_channel::(); + let swap = cell.bidir_channel::(); // New channel that will actually carry the value so that at no point // any given thread will have possession over it so concurrency issues @@ -67,14 +66,14 @@ fn main() { // a multithreaded environment with many users accessing the storage // cell, the value retrieved is exactly the value that has been // updated. - cell.when(&val).and_bidir(&swap).then_do(move |old, new| { + cell.when(&val).and_bidir(&swap).then_do(move |key, value| { println!( ">> val-swap pattern fired with old={} and new={}!", - old, new + key, value ); - swap_val.send(new).unwrap(); + swap_val.send(key).unwrap(); - old + "Something".to_string() }); // Declare a new Join Pattern that mentions the same channel multiple @@ -108,7 +107,8 @@ fn main() { // Request a swap of the current value of the storage cell with a new // one and print the old value that is retrieved as a result. - println!("swap.send_recv()={}", swap.send_recv(16).unwrap()); + let thing: String = swap.send_recv(16).expect("faield to get value"); + println!("swap.send_recv()={thing}"); // Request the current value of the storage cell again and print it. println!("get.recv()={}", get.recv().unwrap()); diff --git a/examples/taxi-cab.rs b/examples/taxi-cab.rs index 284f928..ab2e525 100644 --- a/examples/taxi-cab.rs +++ b/examples/taxi-cab.rs @@ -1,5 +1,4 @@ /// Simple toy example to demonstrate the basic API of the library. - // The only struct that needs to be brought into score is the Junction itself. use rusty_junctions::Junction; @@ -10,11 +9,17 @@ fn main() { // Create new channels on the Junction j. let name = j.send_channel::(); let value = j.send_channel::(); + let get = j.recv_channel::(); // Declare a new Join Pattern on the Junction using the channels above. - j.when(&name).and(&value).then_do(|n, v| { - println!("{} {}", n, v); - }); + // j.when(&name).and(&value).then_do(|n, v| { + // println!("{} {}", n, v); + // }); + + j.when_recv(&get).then_do(|| 10); + + let val = get.recv(); + println!("{val:?}"); // Send all the required messages for the Join Pattern above to fire. value.send(1729).unwrap(); diff --git a/inverted-index/Cargo.toml b/inverted-index/Cargo.toml new file mode 100644 index 0000000..7203a74 --- /dev/null +++ b/inverted-index/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "inverted-index" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/src/inverted_index.rs b/inverted-index/src/lib.rs similarity index 93% rename from src/inverted_index.rs rename to inverted-index/src/lib.rs index b12dfc3..0cd2f60 100644 --- a/src/inverted_index.rs +++ b/inverted-index/src/lib.rs @@ -9,7 +9,7 @@ use std::hash::Hash; /// For each key inserted, an arbitrary number of values is stored as a /// `LinkedList`, making this collection appropritate for iterating over /// all values associated to a single key. -pub(crate) struct InvertedIndex { +pub struct InvertedIndex { look_up_table: HashMap>, } @@ -17,7 +17,7 @@ impl InvertedIndex where K: Hash + Eq, { - pub(crate) fn new() -> InvertedIndex { + pub fn new() -> InvertedIndex { InvertedIndex { look_up_table: HashMap::new(), } @@ -28,7 +28,7 @@ where /// If the key does not yet exist in the collection, it is added with /// the value. Otherwise, the given value is added as the last value /// in the list of values associated with the given key. - pub(crate) fn insert_single(&mut self, key: K, value: V) { + pub fn insert_single(&mut self, key: K, value: V) { match self.look_up_table.get_mut(&key) { Some(values) => values.push_back(value), None => { @@ -46,7 +46,7 @@ where /// the values, in order. Otherwise, the given values are added in /// order at the end of the list of values already associated with the /// given key. - pub(crate) fn insert_multiple(&mut self, key: K, values: impl IntoIterator) { + pub fn insert_multiple(&mut self, key: K, values: impl IntoIterator) { match self.look_up_table.get_mut(&key) { Some(stored_values) => { values.into_iter().for_each(|v| stored_values.push_back(v)); @@ -62,7 +62,7 @@ where /// Retrieve an immutable reference to the first value added for the /// given key, if the key is available in the collection. Otherwise, /// return `None`. - pub(crate) fn peek_first(&self, key: &K) -> Option<&V> { + pub fn peek_first(&self, key: &K) -> Option<&V> { self.look_up_table.get(key)?.front() } @@ -71,7 +71,7 @@ where /// Retrieve an immutable reference to all values, in order of /// insertion, for the given key, if the key is available in the /// collection. Otherwise, return `None`. - pub(crate) fn peek_all(&self, key: &K) -> Option<&LinkedList> { + pub fn peek_all(&self, key: &K) -> Option<&LinkedList> { self.look_up_table.get(key) } } diff --git a/src/channels.rs b/src/channels.rs index 3ae1052..260e044 100644 --- a/src/channels.rs +++ b/src/channels.rs @@ -5,11 +5,13 @@ //! a `RecvChannel` is used to get the value generated by a Join Pattern firing //! asynchronously. -use std::marker::PhantomData; -use std::sync::mpsc::{channel, RecvError, SendError, Sender}; -use std::{any::Any, marker::Send}; - -use super::types::{ids, Message, Packet}; +use crate::types::{ids, Message, Packet}; +use std::{ + any::Any, + marker::PhantomData, + marker::Send, + sync::mpsc::{channel, RecvError, SendError, Sender}, +}; /*************************** * Sending Channel Structs * @@ -31,12 +33,7 @@ pub struct SendChannel { send_type: PhantomData, } -impl SendChannel { - /// Return the channel's ID. - pub(crate) fn id(&self) -> ids::ChannelId { - self.id - } - +impl SendChannel { /// Return the ID of the `Junction` this channel is associated to. pub(crate) fn junction_id(&self) -> ids::JunctionId { self.junction_id @@ -46,12 +43,7 @@ impl SendChannel { pub(crate) fn strip(&self) -> StrippedSendChannel { StrippedSendChannel::new(self.id) } -} -impl SendChannel -where - T: Any + Send, -{ pub(crate) fn new( id: ids::ChannelId, junction_id: ids::JunctionId, @@ -122,12 +114,7 @@ pub struct RecvChannel { recv_type: PhantomData, } -impl RecvChannel { - /// Return the channel's ID. - pub(crate) fn id(&self) -> ids::ChannelId { - self.id - } - +impl RecvChannel { /// Return the ID of the `Junction` this channel is associated to. pub(crate) fn junction_id(&self) -> ids::JunctionId { self.junction_id @@ -137,12 +124,7 @@ impl RecvChannel { pub(crate) fn strip(&self) -> StrippedRecvChannel { StrippedRecvChannel::new(self.id) } -} -impl RecvChannel -where - R: Any + Send, -{ pub(crate) fn new( id: ids::ChannelId, junction_id: ids::JunctionId, @@ -169,6 +151,7 @@ where channel_id: self.id, msg: Message::new(tx), }) + .map_err(|e| log::error!("Failed to send Recv Message: {e:?}")) .unwrap(); rx.recv() @@ -233,12 +216,7 @@ pub struct BidirChannel { recv_type: PhantomData, } -impl BidirChannel { - /// Return the channel's ID. - pub(crate) fn id(&self) -> ids::ChannelId { - self.id - } - +impl BidirChannel { /// Return the ID of the `Junction` this channel is associated to. pub(crate) fn junction_id(&self) -> ids::JunctionId { self.junction_id @@ -248,13 +226,7 @@ impl BidirChannel { pub(crate) fn strip(&self) -> StrippedBidirChannel { StrippedBidirChannel::new(self.id) } -} -impl BidirChannel -where - T: Any + Send, - R: Any + Send, -{ pub(crate) fn new( id: ids::ChannelId, junction_id: ids::JunctionId, @@ -283,6 +255,7 @@ where channel_id: self.id, msg: Message::new((msg, tx)), }) + .map_err(|e| log::error!("Failed to send Bidir Message: {e:?}")) .unwrap(); rx.recv() diff --git a/src/controller.rs b/src/controller.rs deleted file mode 100644 index efb9b68..0000000 --- a/src/controller.rs +++ /dev/null @@ -1,514 +0,0 @@ -//! Control structure started by any new `Junction`, running in a background thread -//! to handle the coordination of Join Pattern creation and execution. - -use std::sync::mpsc::{Receiver, Sender}; -use std::thread; -use std::{cmp::Ordering, collections::HashMap, collections::LinkedList, vec::Vec}; - -use super::bag::Bag; -use super::counter::Counter; -use super::inverted_index::InvertedIndex; -use super::types::ids::{ChannelId, JoinPatternId}; -use super::types::{ControllerHandle, JoinPattern, Message, Packet}; - -/// Struct to handle `Packet`s sent from the user in the background. -/// -/// This struct holds all the information required to store and fire -/// `JoinPattern`s once all requirements have been met. It is created by a -/// `Junction` in a separate control thread, where it continuously listens -/// for `Packet`s sent by user code and reacts accordingly. -pub(crate) struct Controller { - latest_channel_id: ChannelId, - latest_join_pattern_id: JoinPatternId, - /// Counter for how many messages have arrived since creation. - message_counter: Counter, - /// Collection of all currently available messages. - messages: Bag, - /// Collection of all available Join Patterns for the `Junction` associated with - /// this `Controller`. - join_patterns: HashMap, - /// Map of `JoinPatternId`s to the message count at which they were last - /// fired, `None` if the Join Pattern has never been fired. Used to - /// determine precedence of Join Patterns that have not been fired in a - /// while when needing to choose which of the alive Join Patterns to fire. - join_pattern_last_fired: HashMap>, - /// `InvertedIndex` matching `ChannelId`s to all Join Patterns they appear in. - /// Used to easily determine which Join Patterns are relevant any time a new - /// message comes in. - join_pattern_index: InvertedIndex, -} - -impl Controller { - pub(crate) fn new() -> Controller { - Controller { - latest_channel_id: ChannelId::default(), - latest_join_pattern_id: JoinPatternId::default(), - message_counter: Counter::default(), - messages: Bag::new(), - join_patterns: HashMap::new(), - join_pattern_last_fired: HashMap::new(), - join_pattern_index: InvertedIndex::new(), - } - } - - /// Start thread to handle incoming `Packet`s from `Junction` user. - /// - /// Start new thread in the background to handle incoming `Packet`s sent from - /// the user of the `Junction` that created this `Controller`. Return a - /// `ControlThreadHandle` so that this control thread can be joint at any future - /// point. - pub(crate) fn start( - mut self, - sender: Sender, - receiver: Receiver, - ) -> ControllerHandle { - ControllerHandle::new(sender, thread::spawn(move || self.handle_packets(receiver))) - } - - /// Handle incoming `Packet` from associated `Junction`. - /// - /// This function will continuously receive `Packet`s sent from structs - /// associated with the `Junction` that created and started this `Controller` - /// until a `Packet::ShutDownRequest` has been sent. - fn handle_packets(&mut self, receiver: Receiver) { - use Packet::*; - - while let Ok(packet) = receiver.recv() { - match packet { - Message { channel_id, msg } => self.handle_message(channel_id, msg), - NewChannelIdRequest { return_sender } => { - self.handle_new_channel_id_request(return_sender) - } - AddJoinPatternRequest { join_pattern } => { - self.handle_add_join_pattern_request(join_pattern) - } - ShutDownRequest => break, - } - } - } - - /// Handle a received `Message` from a given channel. - /// - /// The first action taken in handling a `Message` is storing the received - /// message in the `Message` bag of the `Controller`. - /// - /// The second action is to start determining if any of the Join Patterns stored - /// with the `Controller` are alive and if so, which of these to fire. - fn handle_message(&mut self, channel_id: ChannelId, msg: Message) { - self.messages.add(channel_id, msg); - self.message_counter.increment(); - - self.handle_join_pattern_firing(channel_id); - } - - /// Handle the firing of a `JoinPattern`, if possible. - /// - /// Determine which `JoinPattern`s contain the channel with the given - /// `ChannelId`. For these, check which ones have at least one `Message` - /// available for each of their channels, i.e. are alive, then select - /// one `JoinPattern` to be fired. If at any point during this process - /// no more `JoinPattern`s remain, nothing will be done. - fn handle_join_pattern_firing(&mut self, channel_id: ChannelId) { - let mut alive_join_patterns: Vec = Vec::new(); - - if let Some(jp_ids) = self.relevant_join_patterns(channel_id) { - alive_join_patterns = self.alive_join_patterns(jp_ids); - } - - if let Some(jp_id_to_fire) = self.select_to_fire(&mut alive_join_patterns) { - self.fire_join_pattern(*jp_id_to_fire); - self.reset_last_fired(*jp_id_to_fire); - } - } - - /// Return the `JoinPatternId`s of relevant Join Patterns for given `ChannelId`. - /// - /// A Join Pattern is considered relevant for a given `ChannelId` if at least - /// one of its channels has the `ChannelId`. - fn relevant_join_patterns(&self, channel_id: ChannelId) -> Option<&LinkedList> { - self.join_pattern_index.peek_all(&channel_id) - } - - /// Return the `JoinPatternId`s of all alive `JoinPattern`s. - /// - /// A `JoinPattern` is considered alive if for each of the channels - /// involved in it, there is at least one `Message` available. - fn alive_join_patterns( - &self, - join_pattern_ids: &LinkedList, - ) -> Vec { - // We need clone the `JoinPatternId`s at this point to avoid - // because need to avoid the issue of `peek_all` borrowing mutably, - // but then needing to mutably borrow again later to update the - // latest fired `JoinPatternId`. - join_pattern_ids - .iter() - .filter(|&jp_id| self.is_alive(*jp_id)) - .cloned() - .collect() - } - - /// Select which `JoinPattern` should be fired. - /// - /// In order to avoid certain scenarious in which one `JoinPattern` would - /// block the execution of another, because it for instance has a subset of - /// the other's channels, we need to ensure that from the `JoinPattern`s - /// that are alive simultaneously, we select the one to be fired that has - /// been waiting for the longest time. - /// - /// Specifically, we record the `Counter` of the `Message` that each - /// `JoinPattern` has last been fired at. We use this as a pseudo-time and - /// simply order the `JoinPattern`s by their `Counter` values, then take - /// the one with the smallest. - /// - /// Note that this procedure should ensure a certain form of *fairness*, - /// by which if a `JoinPattern` has been alive an infinite amount of times, - /// it will fire at least once. In practice, this should amount to each - /// `JoinPattern` being incapable of getting deadlocked by others. - fn select_to_fire<'a>( - &self, - alive_jp_ids: &'a mut Vec, - ) -> Option<&'a JoinPatternId> { - alive_jp_ids - .sort_unstable_by(|&jp_id_1, &jp_id_2| self.compare_last_fired(jp_id_1, jp_id_2)); - - alive_jp_ids.first() - } - - /// Return `true` if Join Pattern with given `JoinPatternId` is alive. - /// - /// A Join Pattern is considered alive if there is at least one `Message` for - /// each of the channels involved in it. - fn is_alive(&self, join_pattern_id: JoinPatternId) -> bool { - use JoinPattern::*; - - if let Some(join_pattern) = self.join_patterns.get(&join_pattern_id) { - match join_pattern { - UnarySend(jp) => self.is_unary_alive(jp.channel_id()), - UnaryRecv(jp) => self.is_unary_alive(jp.channel_id()), - UnaryBidir(jp) => self.is_unary_alive(jp.channel_id()), - BinarySend(jp) => { - self.is_binary_alive(jp.first_send_channel_id(), jp.second_send_channel_id()) - } - BinaryRecv(jp) => self.is_binary_alive(jp.send_channel_id(), jp.recv_channel_id()), - BinaryBidir(jp) => { - self.is_binary_alive(jp.send_channel_id(), jp.bidir_channel_id()) - } - TernarySend(jp) => self.is_ternary_alive( - jp.first_send_channel_id(), - jp.second_send_channel_id(), - jp.third_send_channel_id(), - ), - TernaryRecv(jp) => self.is_ternary_alive( - jp.first_send_channel_id(), - jp.second_send_channel_id(), - jp.recv_channel_id(), - ), - TernaryBidir(jp) => self.is_ternary_alive( - jp.first_send_channel_id(), - jp.second_send_channel_id(), - jp.bidir_channel_id(), - ), - } - } else { - false - } - } - - /// Return `true` if *unary* Join Pattern with given `JoinPatternId` is alive. - /// - /// A Join Pattern is considered alive if there is at least one `Message` for - /// each of the channels involved in it. - fn is_unary_alive(&self, channel_id: ChannelId) -> bool { - self.messages.count_items(&channel_id) >= 1 - } - - /// Return `true` if *binary* Join Pattern with given `JoinPatternId` is alive. - /// - /// A Join Pattern is considered alive if there is at least one `Message` for - /// each of the channels involved in it. - /// - /// For binary Join Patterns, we need to ensure that should both channels - /// involved have the same `ChannelId`, we actually have at least two - /// `Message`s available. - fn is_binary_alive(&self, first_ch_id: ChannelId, second_ch_id: ChannelId) -> bool { - if first_ch_id == second_ch_id { - self.messages.count_items(&first_ch_id) >= 2 - } else { - self.is_unary_alive(first_ch_id) && self.is_unary_alive(second_ch_id) - } - } - - /// Return `true` if *ternary* Join Pattern with given `JoinPatternId` is alive. - /// - /// A Join Pattern is considered alive if there is at least one `Message` for - /// each of the channels involved in it. - /// - /// For ternary Join Patterns, we need to ensure that should more than one - /// channel involved have the same `ChannelId`, we actually have enough - /// `Message`s available. - fn is_ternary_alive( - &self, - first_ch_id: ChannelId, - second_ch_id: ChannelId, - third_ch_id: ChannelId, - ) -> bool { - if first_ch_id == second_ch_id && second_ch_id == third_ch_id { - self.messages.count_items(&first_ch_id) >= 3 - } else { - self.is_binary_alive(first_ch_id, second_ch_id) - && self.is_binary_alive(first_ch_id, third_ch_id) - && self.is_binary_alive(second_ch_id, third_ch_id) - } - } - - /// Compare when the Join Patterns with given `JoinPatternId`s were last alive at. - /// - /// Rules for Order: - /// 1. If neither `JoinPatternId` has a last alive `Counter`, then neither - /// has been fired yet, so they can be viewed as equal in this ordering. - /// 2. If only one `JoinPatternId` has no last alive `Counter`, then that - /// one has to be ordered as less than the other since having been fired - /// at least once will always be a later point of firing than not having - /// been fired yet. - /// 3. If both `JoinPatternId`s have last alive `Counter`s, use the ordering - /// of these. - /// - /// # Panics - /// - /// For simplicity, this function panics if the either of the given - /// `JoinPatternId`s is not registered in the internal map of `JoinPatternId`s - /// to `Instant`s that describe the last `Instant` at which a particular Join - /// Pattern was alive. That is to say, this function should only be called on - /// `JoinPatternId`s which are definitely stored in the calling `Controller`. - fn compare_last_fired(&self, jp_id_1: JoinPatternId, jp_id_2: JoinPatternId) -> Ordering { - // TODO: Can we sensibly use `Option::flatten` here? - let last_fired_1 = self.join_pattern_last_fired.get(&jp_id_1).unwrap(); - let last_fired_2 = self.join_pattern_last_fired.get(&jp_id_2).unwrap(); - - if last_fired_1.is_none() && last_fired_2.is_none() { - Ordering::Equal - } else if last_fired_1.is_none() && last_fired_2.is_some() { - Ordering::Less - } else if last_fired_1.is_some() && last_fired_2.is_none() { - Ordering::Greater - } else { - last_fired_1.cmp(last_fired_2) - } - } - - /// Fire the `JoinPattern` corresponding to the given `JoinPatternId`. - /// - /// The processs of firing a `JoinPattern` consists of first retrieving - /// a `Message` for each of the channels involved in the `JoinPattern`, - /// then passing these `Messages`s to the `JoinPattern` to handle the - /// firing. - /// - /// # Panics - /// - /// Panics when there is no `JoinPattern` stored for the given - /// `JoinPatternId`. - fn fire_join_pattern(&mut self, join_pattern_id: JoinPatternId) { - use JoinPattern::*; - - let join_pattern = self.join_patterns.get(&join_pattern_id).unwrap(); - - match join_pattern { - UnarySend(jp) => { - let arg = self.messages.retrieve(&jp.channel_id()).unwrap(); - - jp.fire(arg); - } - UnaryRecv(jp) => { - let return_sender = self.messages.retrieve(&jp.channel_id()).unwrap(); - - jp.fire(return_sender); - } - UnaryBidir(jp) => { - let arg_and_sender = self.messages.retrieve(&jp.channel_id()).unwrap(); - - jp.fire(arg_and_sender); - } - BinarySend(jp) => { - let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap(); - let arg_2 = self - .messages - .retrieve(&jp.second_send_channel_id()) - .unwrap(); - - jp.fire(arg_1, arg_2); - } - BinaryRecv(jp) => { - let arg = self.messages.retrieve(&jp.send_channel_id()).unwrap(); - let return_sender = self.messages.retrieve(&jp.recv_channel_id()).unwrap(); - - jp.fire(arg, return_sender); - } - BinaryBidir(jp) => { - let arg_1 = self.messages.retrieve(&jp.send_channel_id()).unwrap(); - let arg_2_and_sender = self.messages.retrieve(&jp.bidir_channel_id()).unwrap(); - - jp.fire(arg_1, arg_2_and_sender); - } - TernarySend(jp) => { - let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap(); - let arg_2 = self - .messages - .retrieve(&jp.second_send_channel_id()) - .unwrap(); - let arg_3 = self.messages.retrieve(&jp.third_send_channel_id()).unwrap(); - - jp.fire(arg_1, arg_2, arg_3); - } - TernaryRecv(jp) => { - let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap(); - let arg_2 = self - .messages - .retrieve(&jp.second_send_channel_id()) - .unwrap(); - let return_sender = self.messages.retrieve(&jp.recv_channel_id()).unwrap(); - - jp.fire(arg_1, arg_2, return_sender); - } - TernaryBidir(jp) => { - let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap(); - let arg_2 = self - .messages - .retrieve(&jp.second_send_channel_id()) - .unwrap(); - let arg_3_and_sender = self.messages.retrieve(&jp.bidir_channel_id()).unwrap(); - - jp.fire(arg_1, arg_2, arg_3_and_sender); - } - } - } - - /// Reset the `Counter` at which the given Join Pattern has last been fired. - fn reset_last_fired(&mut self, join_pattern_id: JoinPatternId) { - self.join_pattern_last_fired - .insert(join_pattern_id, Some(self.message_counter.clone())); - } - - /// Send new, *unique* `ChannelId` back to the requesting `Junction`. - /// - /// # Panics - /// - /// Panics if the new `ChannelId` could not be sent to the requesting `Junction`. - fn handle_new_channel_id_request(&mut self, return_sender: Sender) { - return_sender.send(self.new_channel_id()).unwrap(); - } - - /// Add new Join Pattern to `Controller` storage. - fn handle_add_join_pattern_request(&mut self, join_pattern: JoinPattern) { - let jp_id = self.new_join_pattern_id(); - - self.initialize_last_fired(jp_id); - - self.insert_join_pattern(jp_id, join_pattern); - } - - /// Initialize the `Instant` at which Join Pattern was last alive. - fn initialize_last_fired(&mut self, join_pattern_id: JoinPatternId) { - self.join_pattern_last_fired.insert(join_pattern_id, None); - } - - /// Insert Join Pattern into relevant internal storage. - /// - /// The given Join Pattern needs to be registered within the internal - /// `InvertedIndex` for future look-up operations and then stored in the - /// Join Pattern collection. - fn insert_join_pattern(&mut self, join_pattern_id: JoinPatternId, join_pattern: JoinPattern) { - use JoinPattern::*; - - match join_pattern { - UnarySend(jp) => { - self.join_pattern_index - .insert_single(jp.channel_id(), join_pattern_id); - - self.join_patterns.insert(join_pattern_id, UnarySend(jp)); - } - UnaryRecv(jp) => { - self.join_pattern_index - .insert_single(jp.channel_id(), join_pattern_id); - - self.join_patterns.insert(join_pattern_id, UnaryRecv(jp)); - } - UnaryBidir(jp) => { - self.join_pattern_index - .insert_single(jp.channel_id(), join_pattern_id); - - self.join_patterns.insert(join_pattern_id, UnaryBidir(jp)); - } - BinarySend(jp) => { - self.join_pattern_index - .insert_single(jp.first_send_channel_id(), join_pattern_id); - self.join_pattern_index - .insert_single(jp.second_send_channel_id(), join_pattern_id); - - self.join_patterns.insert(join_pattern_id, BinarySend(jp)); - } - BinaryRecv(jp) => { - self.join_pattern_index - .insert_single(jp.send_channel_id(), join_pattern_id); - self.join_pattern_index - .insert_single(jp.recv_channel_id(), join_pattern_id); - - self.join_patterns.insert(join_pattern_id, BinaryRecv(jp)); - } - BinaryBidir(jp) => { - self.join_pattern_index - .insert_single(jp.send_channel_id(), join_pattern_id); - self.join_pattern_index - .insert_single(jp.bidir_channel_id(), join_pattern_id); - - self.join_patterns.insert(join_pattern_id, BinaryBidir(jp)); - } - TernarySend(jp) => { - self.join_pattern_index - .insert_single(jp.first_send_channel_id(), join_pattern_id); - self.join_pattern_index - .insert_single(jp.second_send_channel_id(), join_pattern_id); - self.join_pattern_index - .insert_single(jp.third_send_channel_id(), join_pattern_id); - - self.join_patterns.insert(join_pattern_id, TernarySend(jp)); - } - TernaryRecv(jp) => { - self.join_pattern_index - .insert_single(jp.first_send_channel_id(), join_pattern_id); - self.join_pattern_index - .insert_single(jp.second_send_channel_id(), join_pattern_id); - self.join_pattern_index - .insert_single(jp.recv_channel_id(), join_pattern_id); - - self.join_patterns.insert(join_pattern_id, TernaryRecv(jp)); - } - TernaryBidir(jp) => { - self.join_pattern_index - .insert_single(jp.first_send_channel_id(), join_pattern_id); - self.join_pattern_index - .insert_single(jp.second_send_channel_id(), join_pattern_id); - self.join_pattern_index - .insert_single(jp.bidir_channel_id(), join_pattern_id); - - self.join_patterns.insert(join_pattern_id, TernaryBidir(jp)); - } - } - } - - /// Generate new, *unique* `ChannelId`. - fn new_channel_id(&mut self) -> ChannelId { - let ch_id = self.latest_channel_id; - self.latest_channel_id.increment(); - - ch_id - } - - /// Generate new, *unique* `JoinPatternId`. - fn new_join_pattern_id(&mut self) -> JoinPatternId { - let jp_id = self.latest_join_pattern_id; - self.latest_join_pattern_id.increment(); - - jp_id - } -} diff --git a/src/controller/alive.rs b/src/controller/alive.rs new file mode 100644 index 0000000..c9e5457 --- /dev/null +++ b/src/controller/alive.rs @@ -0,0 +1,47 @@ +use std::collections::LinkedList; + +use crate::{ + controller::Controller, + // join_pattern::JoinPattern, + types::ids::JoinPatternId, +}; + +impl Controller { + /// Return the `JoinPatternId`s of all alive `JoinPattern`s. + /// + /// A `JoinPattern` is considered alive if for each of the channels + /// involved in it, there is at least one `Message` available. + pub(in crate::controller) fn alive_join_patterns( + &self, + join_pattern_ids: &LinkedList, + ) -> Vec { + log::debug!("Checking for alive JoinPatterns"); + // We need clone the `JoinPatternId`s at this point to avoid + // because need to avoid the issue of `peek_all` borrowing mutably, + // but then needing to mutably borrow again later to update the + // latest fired `JoinPatternId`. + let alive_join_patterns = join_pattern_ids + .iter() + .filter(|&jp_id| self.is_alive(*jp_id)) + .cloned() + .collect(); + + log::debug!("Retriving all of the alive JoinPatterns: {alive_join_patterns:?}"); + + alive_join_patterns + } + + /// Return `true` if Join Pattern with given `JoinPatternId` is alive. + /// + /// A Join Pattern is considered alive if there is at least one `Message` for + /// each of the channels involved in it. + fn is_alive(&self, join_pattern_id: JoinPatternId) -> bool { + let is_alive = self + .join_patterns + .get(&join_pattern_id) + .map_or(false, |jp| jp.is_alive(&self.messages)); + log::debug!("Checking if JoinPattern: {join_pattern_id:?} is alive: {is_alive}"); + + is_alive + } +} diff --git a/src/controller/fire.rs b/src/controller/fire.rs new file mode 100644 index 0000000..3d10ad2 --- /dev/null +++ b/src/controller/fire.rs @@ -0,0 +1,126 @@ +use std::cmp::Ordering; + +use crate::{ + controller::Controller, + types::{ids::JoinPatternId, Message}, +}; + +impl Controller { + /// Select which `JoinPattern` should be fired. + /// + /// In order to avoid certain scenarious in which one `JoinPattern` would + /// block the execution of another, because it for instance has a subset of + /// the other's channels, we need to ensure that from the `JoinPattern`s + /// that are alive simultaneously, we select the one to be fired that has + /// been waiting for the longest time. + /// + /// Specifically, we record the `Counter` of the `Message` that each + /// `JoinPattern` has last been fired at. We use this as a pseudo-time and + /// simply order the `JoinPattern`s by their `Counter` values, then take + /// the one with the smallest. + /// + /// Note that this procedure should ensure a certain form of *fairness*, + /// by which if a `JoinPattern` has been alive an infinite amount of times, + /// it will fire at least once. In practice, this should amount to each + /// `JoinPattern` being incapable of getting deadlocked by others. + pub(in crate::controller) fn select_to_fire<'a>( + &self, + alive_jp_ids: &'a mut Vec, + ) -> Option<&'a JoinPatternId> { + alive_jp_ids + .sort_unstable_by(|&jp_id_1, &jp_id_2| self.compare_last_fired(jp_id_1, jp_id_2)); + + alive_jp_ids.first() + } + + /// Compare when the Join Patterns with given `JoinPatternId`s were last alive at. + /// + /// Rules for Order: + /// 1. If neither `JoinPatternId` has a last alive `Counter`, then neither + /// has been fired yet, so they can be viewed as equal in this ordering. + /// 2. If only one `JoinPatternId` has no last alive `Counter`, then that + /// one has to be ordered as less than the other since having been fired + /// at least once will always be a later point of firing than not having + /// been fired yet. + /// 3. If both `JoinPatternId`s have last alive `Counter`s, use the ordering + /// of these. + /// + /// # Panics + /// + /// For simplicity, this function panics if the either of the given + /// `JoinPatternId`s is not registered in the internal map of `JoinPatternId`s + /// to `Instant`s that describe the last `Instant` at which a particular Join + /// Pattern was alive. That is to say, this function should only be called on + /// `JoinPatternId`s which are definitely stored in the calling `Controller`. + pub(in crate::controller) fn compare_last_fired( + &self, + jp_id_1: JoinPatternId, + jp_id_2: JoinPatternId, + ) -> Ordering { + // TODO: Can we sensibly use `Option::flatten` here? + let last_fired_1 = self.join_pattern_last_fired.get(&jp_id_1).unwrap(); + let last_fired_2 = self.join_pattern_last_fired.get(&jp_id_2).unwrap(); + + if last_fired_1.is_none() && last_fired_2.is_none() { + Ordering::Equal + } else if last_fired_1.is_none() && last_fired_2.is_some() { + Ordering::Less + } else if last_fired_1.is_some() && last_fired_2.is_none() { + Ordering::Greater + } else { + last_fired_1.cmp(last_fired_2) + } + } + + /// Fire the `JoinPattern` corresponding to the given `JoinPatternId`. + /// + /// The processs of firing a `JoinPattern` consists of first retrieving + /// a `Message` for each of the channels involved in the `JoinPattern`, + /// then passing these `Messages`s to the `JoinPattern` to handle the + /// firing. + /// + /// # Panics + /// + /// Panics when there is no `JoinPattern` stored for the given + /// `JoinPatternId`. + pub(in crate::controller) fn fire_join_pattern(&mut self, join_pattern_id: JoinPatternId) { + let join_pattern = self.join_patterns.get(&join_pattern_id).unwrap(); + + let mut messages_for_channels: Vec = Vec::new(); + for chan in join_pattern.channels() { + let message = self.messages.retrieve(&chan).unwrap(); + messages_for_channels.push(message); + } + + // Get a handle to the firing Join Pattern + log::debug!("Firing JoinPattern: {join_pattern_id:?}"); + let thread_handle = join_pattern.fire(messages_for_channels); + + // Add the pattern to set of patterns that are firing + self.firing_join_patterns.push(thread_handle); + + // Prune the threads that have completed firing + // i.e. Keep all of the JoinHandle that are still running + log::debug!( + "Current Firing Join Patterns: {:?}", + self.firing_join_patterns + ); + self.firing_join_patterns + .retain(|handle| handle.is_running()); + log::debug!( + "Purged Firing Join Patterns: {:?}", + self.firing_join_patterns + ); + } + + /// Reset the `Counter` at which the given Join Pattern has last been fired. + pub(in crate::controller) fn reset_last_fired(&mut self, join_pattern_id: JoinPatternId) { + self.join_pattern_last_fired + .insert(join_pattern_id, Some(self.message_counter.clone())); + } + + /// Initialize the `Instant` at which Join Pattern was last alive. + pub(in crate::controller) fn initialize_last_fired(&mut self, join_pattern_id: JoinPatternId) { + self.join_pattern_last_fired.insert(join_pattern_id, None); + } +} diff --git a/src/controller/handle.rs b/src/controller/handle.rs new file mode 100644 index 0000000..1444cc0 --- /dev/null +++ b/src/controller/handle.rs @@ -0,0 +1,60 @@ +use std::{ + sync::mpsc::Sender, + thread::{JoinHandle, Thread}, +}; + +use crate::types::Packet; + +/// Handle to a `Junction`'s underlying `Controller`. +/// +/// This struct carries a `JoinHandle` to the thread that the `Controller` of +/// a `Junction` is running in. It allows for the `Controller` and its thread +/// to be stopped gracefully at any point. +pub struct ControllerHandle { + sender: Sender, + control_thread_handle: Option>, +} + +impl ControllerHandle { + pub(crate) fn new(sender: Sender, handle: JoinHandle<()>) -> ControllerHandle { + ControllerHandle { + sender, + control_thread_handle: Some(handle), + } + } + + /// Extracts a handle to the underlying thread. + pub fn thread(&self) -> Option<&Thread> { + match &self.control_thread_handle { + Some(h) => Some(h.thread()), + None => None, + } + } + + /// Request the `Controller` to stop gracefully, then join its thread. + /// + /// # Panics + /// + /// Panics if it was unable to send shut-down request to the control thread. + pub fn stop(&mut self) { + log::debug!("Controller asked to shutdown"); + self.sender + .send(Packet::ShutDownRequest) + .map_err(|e| log::error!("Failed to send ShutDownRequest: {e:?}")) + .unwrap(); + + let controller_handle = self.control_thread_handle.take(); + + if controller_handle.is_none() { + log::error!("Failed to take the ControllerHandle"); + } + + controller_handle + .unwrap() + .join() + .map_err(|_| log::error!("Failed to join the Controller thread to the main thread")) + .unwrap(); + + log::debug!("Controller shutdown"); + } +} diff --git a/src/controller/handlers.rs b/src/controller/handlers.rs new file mode 100644 index 0000000..b094404 --- /dev/null +++ b/src/controller/handlers.rs @@ -0,0 +1,148 @@ +use std::{ + collections::LinkedList, + sync::mpsc::{Receiver, Sender}, +}; + +use crate::{ + controller::Controller, + join_pattern::JoinPattern, + types::{ + ids::{ChannelId, JoinPatternId}, + Message, Packet, + }, +}; + +impl Controller { + /// Handle incoming `Packet` from associated `Junction`. + /// + /// This function will continuously receive `Packet`s sent from structs + /// associated with the `Junction` that created and started this `Controller` + /// until a `Packet::ShutDownRequest` has been sent. + pub(in crate::controller) fn handle_packets(mut self, receiver: Receiver) { + use Packet::*; + + while let Ok(packet) = receiver.recv() { + match packet { + Message { channel_id, msg } => { + log::debug!("Handling a Packet::Message to: {channel_id:?}"); + self.handle_message(channel_id, msg); + } + NewChannelIdRequest { return_sender } => { + log::debug!("Handling a Packet::NewChannelIdRequest"); + self.handle_new_channel_id_request(return_sender) + } + AddJoinPatternRequest { join_pattern } => { + log::debug!("Handling a Packet::AddJoinPatternRequest"); + self.handle_add_join_pattern_request(join_pattern) + } + ShutDownRequest => { + log::debug!("Handling a Packet::ShutDownRequest"); + break; + } + } + } + + // Join all of the `JoinHandle`s of the firing `JoinPattern` + log::debug!("Starting to join all of the firing threads"); + self.firing_join_patterns.into_iter().for_each(|handle| { + handle.join().ok(); + }); + log::debug!("Finished joining all of the firing threads"); + } + + /// Handle a received `Message` from a given channel. + /// + /// The first action taken in handling a `Message` is storing the received + /// message in the `Message` bag of the `Controller`. + /// + /// The second action is to start determining if any of the Join Patterns stored + /// with the `Controller` are alive and if so, which of these to fire. + fn handle_message(&mut self, channel_id: ChannelId, msg: Message) { + self.messages.add(channel_id, msg); + self.message_counter.increment(); + + self.handle_join_pattern_firing(channel_id); + } + + /// Handle the firing of a `JoinPattern`, if possible. + /// + /// Determine which `JoinPattern`s contain the channel with the given + /// `ChannelId`. For these, check which ones have at least one `Message` + /// available for each of their channels, i.e. are alive, then select + /// one `JoinPattern` to be fired. If at any point during this process + /// no more `JoinPattern`s remain, nothing will be done. + fn handle_join_pattern_firing(&mut self, channel_id: ChannelId) { + let mut alive_join_patterns: Vec = Vec::new(); + + if let Some(jp_ids) = self.relevant_join_patterns(channel_id) { + alive_join_patterns = self.alive_join_patterns(jp_ids); + } + + if let Some(jp_id_to_fire) = self.select_to_fire(&mut alive_join_patterns) { + self.fire_join_pattern(*jp_id_to_fire); + self.reset_last_fired(*jp_id_to_fire); + } + } + + /// Send new, *unique* `ChannelId` back to the requesting `Junction`. + /// + /// # Panics + /// + /// Panics if the new `ChannelId` could not be sent to the requesting `Junction`. + fn handle_new_channel_id_request(&mut self, return_sender: Sender) { + return_sender + .send(self.new_channel_id()) + .map_err(|e| log::error!("Failed to send NewChannelIdRequest: {e:?}")) + .unwrap(); + } + + /// Add new Join Pattern to `Controller` storage. + fn handle_add_join_pattern_request(&mut self, join_pattern: Box) { + let jp_id = self.new_join_pattern_id(); + + self.initialize_last_fired(jp_id); + + self.insert_join_pattern(jp_id, join_pattern); + } + + /// Return the `JoinPatternId`s of relevant Join Patterns for given `ChannelId`. + /// + /// A Join Pattern is considered relevant for a given `ChannelId` if at least + /// one of its channels has the `ChannelId`. + fn relevant_join_patterns(&self, channel_id: ChannelId) -> Option<&LinkedList> { + self.join_pattern_index.peek_all(&channel_id) + } + + /// Insert Join Pattern into relevant internal storage. + /// + /// The given Join Pattern needs to be registered within the internal + /// `InvertedIndex` for future look-up operations and then stored in the + /// Join Pattern collection. + fn insert_join_pattern( + &mut self, + join_pattern_id: JoinPatternId, + join_pattern: Box, + ) { + join_pattern.channels().iter().for_each(|chan| { + self.join_pattern_index + .insert_single(*chan, join_pattern_id) + }); + self.join_patterns.insert(join_pattern_id, join_pattern); + } + + /// Generate new, *unique* `ChannelId`. + fn new_channel_id(&mut self) -> ChannelId { + let ch_id = self.latest_channel_id; + self.latest_channel_id.increment(); + + ch_id + } + + /// Generate new, *unique* `JoinPatternId`. + fn new_join_pattern_id(&mut self) -> JoinPatternId { + let jp_id = self.latest_join_pattern_id; + self.latest_join_pattern_id.increment(); + + jp_id + } +} diff --git a/src/controller/mod.rs b/src/controller/mod.rs new file mode 100644 index 0000000..d1ca386 --- /dev/null +++ b/src/controller/mod.rs @@ -0,0 +1,87 @@ +//! Control structure started by any new `Junction`, running in a background thread +//! to handle the coordination of Join Pattern creation and execution. +use std::{ + collections::HashMap, + sync::mpsc::{Receiver, Sender}, + thread::{self, JoinHandle}, +}; + +use crate::{ + join_pattern::JoinPattern, + types::{ + ids::{ChannelId, JoinPatternId}, + Message, Packet, + }, +}; + +use bag::Bag; +use counter::Counter; +use inverted_index::InvertedIndex; + +mod alive; +mod fire; +mod handle; +mod handlers; + +pub use handle::ControllerHandle; + +/// Struct to handle `Packet`s sent from the user in the background. +/// +/// This struct holds all the information required to store and fire +/// `JoinPattern`s once all requirements have been met. It is created by a +/// `Junction` in a separate control thread, where it continuously listens +/// for `Packet`s sent by user code and reacts accordingly. +pub(crate) struct Controller { + latest_channel_id: ChannelId, + latest_join_pattern_id: JoinPatternId, + /// Counter for how many messages have arrived since creation. + message_counter: Counter, + /// Collection of all currently available messages. + messages: Bag, + /// Collection of all available Join Patterns for the `Junction` associated with + /// this `Controller`. + join_patterns: HashMap>, + /// Map of `JoinPatternId`s to the message count at which they were last + /// fired, `None` if the Join Pattern has never been fired. Used to + /// determine precedence of Join Patterns that have not been fired in a + /// while when needing to choose which of the alive Join Patterns to fire. + join_pattern_last_fired: HashMap>, + /// `InvertedIndex` matching `ChannelId`s to all Join Patterns they appear in. + /// Used to easily determine which Join Patterns are relevant any time a new + /// message comes in. + join_pattern_index: InvertedIndex, + /// A store of all of the `JoinPattern` that are currently firing. When + /// the `stop` directive is given to the controller, we can join all of + /// the `JoinHandle`s to ensure the computation being performed by each + /// thread is given time to complete. + firing_join_patterns: Vec>, +} + +impl Controller { + pub(crate) fn new() -> Controller { + Controller { + latest_channel_id: ChannelId::default(), + latest_join_pattern_id: JoinPatternId::default(), + message_counter: Counter::default(), + messages: Bag::new(), + join_patterns: HashMap::new(), + join_pattern_last_fired: HashMap::new(), + join_pattern_index: InvertedIndex::new(), + firing_join_patterns: Vec::new(), + } + } + + /// Start thread to handle incoming `Packet`s from `Junction` user. + /// + /// Start new thread in the background to handle incoming `Packet`s sent from + /// the user of the `Junction` that created this `Controller`. Return a + /// `ControlThreadHandle` so that this control thread can be joint at any future + /// point. + pub(crate) fn start( + self, + sender: Sender, + receiver: Receiver, + ) -> ControllerHandle { + ControllerHandle::new(sender, thread::spawn(move || self.handle_packets(receiver))) + } +} diff --git a/src/function_transforms.rs b/src/function_transforms.rs deleted file mode 100644 index 9fb180d..0000000 --- a/src/function_transforms.rs +++ /dev/null @@ -1,163 +0,0 @@ -//! Function transformers used to hide actual type signatures of functions stored -//! with a Join Pattern and instead expose a generic interface that is easily stored. - -use std::any::Any; -use std::sync::mpsc::Sender; - -use crate::types::{functions, Message}; - -/// Function transformers for functions stored with unary Join Patterns. -pub(crate) mod unary { - use super::*; - - /// Transform function of `SendJoinPattern` to use `Message` arguments. - pub(crate) fn transform_send(f: F) -> Box - where - F: Fn(T) -> () + Send + Clone + 'static, - T: Any + Send + 'static, - { - Box::new(move |arg: Message| { - f(*arg.downcast::().unwrap()); - }) - } - - /// Transform function of `RecvJoinPattern` to use `Message` arguments. - pub(crate) fn transform_recv(f: F) -> Box - where - F: Fn() -> R + Send + Clone + 'static, - R: Any + Send + 'static, - { - Box::new(move |return_sender: Message| { - let return_sender = *return_sender.downcast::>().unwrap(); - - return_sender.send(f()).unwrap(); - }) - } - - /// Transform function of `BidirJoinPattern` to use `Message` arguments. - pub(crate) fn transform_bidir(f: F) -> Box - where - F: Fn(T) -> R + Send + Clone + 'static, - T: Any + Send + 'static, - R: Any + Send + 'static, - { - Box::new(move |arg_and_sender: Message| { - let (arg, return_sender) = *arg_and_sender.downcast::<(T, Sender)>().unwrap(); - - return_sender.send(f(arg)).unwrap(); - }) - } -} - -/// Function transformers for functions stored with binary Join Patterns. -pub(crate) mod binary { - use super::*; - - /// Transform function of `SendJoinPattern` to use `Message` arguments. - pub(crate) fn transform_send(f: F) -> Box - where - F: Fn(T, U) -> () + Send + Clone + 'static, - T: Any + Send + 'static, - U: Any + Send + 'static, - { - Box::new(move |arg_1: Message, arg_2: Message| { - f( - *arg_1.downcast::().unwrap(), - *arg_2.downcast::().unwrap(), - ); - }) - } - - /// Transform function of `RecvJoinPattern` to use `Message` arguments. - pub(crate) fn transform_recv(f: F) -> Box - where - F: Fn(T) -> R + Send + Clone + 'static, - T: Any + Send + 'static, - R: Any + Send + 'static, - { - Box::new(move |arg: Message, return_sender: Message| { - let return_sender = *return_sender.downcast::>().unwrap(); - let arg = *arg.downcast::().unwrap(); - - return_sender.send(f(arg)).unwrap(); - }) - } - - /// Transform function of `BidirJoinPattern` to use `Message` arguments. - pub(crate) fn transform_bidir(f: F) -> Box - where - F: Fn(T, U) -> R + Send + Clone + 'static, - T: Any + Send + 'static, - U: Any + Send + 'static, - R: Any + Send + 'static, - { - Box::new(move |arg_1: Message, arg_2_and_sender: Message| { - let arg_1 = *arg_1.downcast::().unwrap(); - let (arg_2, return_sender) = *arg_2_and_sender.downcast::<(U, Sender)>().unwrap(); - - return_sender.send(f(arg_1, arg_2)).unwrap(); - }) - } -} - -/// Function transformers for functions stored with ternary `JoinPattern`s. -pub(crate) mod ternary { - use super::*; - - /// Transform function of `SendJoinPattern` to use `Message` arguments. - pub(crate) fn transform_send(f: F) -> Box - where - F: Fn(T, U, V) -> () + Send + Clone + 'static, - T: Any + Send + 'static, - U: Any + Send + 'static, - V: Any + Send + 'static, - { - Box::new(move |arg_1: Message, arg_2: Message, arg_3: Message| { - f( - *arg_1.downcast::().unwrap(), - *arg_2.downcast::().unwrap(), - *arg_3.downcast::().unwrap(), - ); - }) - } - - /// Transform function of `RecvJoinPattern` to use `Message` arguments. - pub(crate) fn transform_recv(f: F) -> Box - where - F: Fn(T, U) -> R + Send + Clone + 'static, - T: Any + Send + 'static, - U: Any + Send + 'static, - R: Any + Send + 'static, - { - Box::new( - move |arg_1: Message, arg_2: Message, return_sender: Message| { - let return_sender = *return_sender.downcast::>().unwrap(); - let arg_1 = *arg_1.downcast::().unwrap(); - let arg_2 = *arg_2.downcast::().unwrap(); - - return_sender.send(f(arg_1, arg_2)).unwrap(); - }, - ) - } - - /// Transform function of `BidirJoinPattern` to use `Message` arguments. - pub(crate) fn transform_bidir(f: F) -> Box - where - F: Fn(T, U, V) -> R + Send + Clone + 'static, - T: Any + Send + 'static, - U: Any + Send + 'static, - V: Any + Send + 'static, - R: Any + Send + 'static, - { - Box::new( - move |arg_1: Message, arg_2: Message, arg_3_and_sender: Message| { - let arg_1 = *arg_1.downcast::().unwrap(); - let arg_2 = *arg_2.downcast::().unwrap(); - let (arg_3, return_sender) = - *arg_3_and_sender.downcast::<(V, Sender)>().unwrap(); - - return_sender.send(f(arg_1, arg_2, arg_3)).unwrap(); - }, - ) - } -} diff --git a/src/join_pattern.rs b/src/join_pattern.rs new file mode 100644 index 0000000..2980584 --- /dev/null +++ b/src/join_pattern.rs @@ -0,0 +1,52 @@ +use crate::types::{ids::ChannelId, Message, Packet}; +use bag::Bag; +use std::{ + marker::{Send, Sized}, + sync::mpsc::Sender, +}; + +pub trait JoinPattern: Send { + /// Return `true` if the Join Pattern with given `JoinPatternId` is alive. + /// + /// A Join Pattern is considered alive if there is at least one `Message` for + /// each of the channels involved in it. + // TODO: Ensure this is a valid implementation of `is_valid` for any number of channels + // TODO: The hashmap might be able to be precomputed in the macro + fn is_alive(&self, messages: &Bag) -> bool { + // Create a hashmap associating each `ChannelId` with its number of + // occurrences + let mut threshold_channels = std::collections::HashMap::new(); + self.channels().into_iter().for_each(|chan| { + let counter = threshold_channels.entry(chan).or_insert(0); + *counter += 1; + }); + + // Check if there is a sufficient number of messages for each channel + for (channel, num) in threshold_channels.into_iter() { + let messages_for_channel = messages.count_items(&channel); + if messages_for_channel < num { + return false; + } + } + + true + } + + fn add(self, sender: Sender) + where + Self: Sized + Send + 'static, + { + sender + .send(Packet::AddJoinPatternRequest { + join_pattern: Box::new(self), + }) + .map_err(|e| log::error!("Failed to send AddJoinPatternRequest: {e:?}")) + .unwrap(); + } + + /// Return a `Vec Vec; + + /// Given the `Message` for each of the channels in the pattern - fire. + fn fire(&self, messages: Vec) -> std::thread::JoinHandle<()>; +} diff --git a/src/junction.rs b/src/junction.rs index aac162d..c865059 100644 --- a/src/junction.rs +++ b/src/junction.rs @@ -2,14 +2,19 @@ //! together. Main structure for the public interface, used to create new //! channels and construct `JoinPattern`s based on them. -use std::any::Any; -use std::ops::Drop; -use std::sync::mpsc::{channel, RecvError, Sender}; - -use super::channels::{BidirChannel, RecvChannel, SendChannel}; -use super::controller::Controller; -use super::patterns::unary::{BidirPartialPattern, RecvPartialPattern, SendPartialPattern}; -use super::types::{ids, ControllerHandle, Packet}; +use std::{ + any::Any, + ops::Drop, + sync::mpsc::{channel, RecvError, Sender}, +}; + +use crate::{ + channels::{BidirChannel, RecvChannel, SendChannel}, + controller::{Controller, ControllerHandle}, + // join_pattern::JoinPattern, + patterns::unary::{BidirPartialPattern, RecvPartialPattern, SendPartialPattern}, + types::{ids, Packet}, +}; /// Struct managing the creation of new channels and Join Patterns. /// @@ -124,11 +129,14 @@ impl Junction { .send(Packet::NewChannelIdRequest { return_sender: id_sender, }) + .map_err(|e| log::error!("Failed to send NewChannelIdRequest: {e:?}")) .unwrap(); id_receiver.recv() } +} +impl Junction { /// Create new partial Join Pattern starting with a `SendChannel`. /// /// # Panics @@ -204,8 +212,12 @@ impl Drop for Junction { /// associated `Controller` and join the control thread. Otherwise, no /// action is needed. fn drop(&mut self) { + log::debug!("Dropping Junction - Attempting to shutdown Controller"); if self.controller_handle.is_some() { + log::debug!("Controller has a ControllerHandle"); self.controller_handle.take().unwrap().stop(); + } else { + log::debug!("Controller didn't have a ControllerHandle"); } } } diff --git a/src/lib.rs b/src/lib.rs index 8f14c2b..27e339f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(thread_is_running)] //! Crate implementing Join Patterns from the [Join Calculus](https://www.microsoft.com/en-us/research/wp-content/uploads/2017/01/join-tutorial.pdf) developed by //! Cédric Fournet and Georges Gonthier. //! @@ -132,14 +133,15 @@ //! For more examples, visit the [`examples`](https://github.com/smueksch/rusty_junctions/tree/master/examples) folder in the [Rusty Junctions GitHub //! repository](https://github.com/smueksch/rusty_junctions). -mod bag; pub mod channels; mod controller; -mod counter; -mod function_transforms; -mod inverted_index; +mod join_pattern; mod junction; -pub mod patterns; -pub mod types; +mod types; +pub use controller::ControllerHandle; pub use junction::Junction; +pub use rusty_junctions_macro::client::junction; + +// Generate the library, upto an order of 32. +rusty_junctions_macro::library::generate!(32); diff --git a/src/patterns.rs b/src/patterns.rs deleted file mode 100644 index da72d03..0000000 --- a/src/patterns.rs +++ /dev/null @@ -1,1093 +0,0 @@ -//! Structs to implement different types of `JoinPattern`s. - -use std::any::Any; -use std::sync::mpsc::Sender; -use std::thread; - -use super::channels::{ - BidirChannel, RecvChannel, SendChannel, StrippedBidirChannel, StrippedRecvChannel, - StrippedSendChannel, -}; -use super::function_transforms; -use super::types::{functions, ids, JoinPattern, Message, Packet}; - -/// Structs for Join Patterns with one channel. -pub mod unary { - use super::*; - - /********************************** - * Send Join Pattern Construction * - **********************************/ - - /// `SendChannel` partial Join Pattern. - pub struct SendPartialPattern { - junction_id: ids::JunctionId, - send_channel: StrippedSendChannel, - sender: Sender, - } - - impl SendPartialPattern - where - T: Any + Send, - { - pub(crate) fn new( - junction_id: ids::JunctionId, - send_channel: StrippedSendChannel, - sender: Sender, - ) -> SendPartialPattern { - SendPartialPattern { - junction_id, - send_channel, - sender, - } - } - - /// Create a binary partial Join Pattern with two send channels. - /// - /// Create a new binary partial Join Pattern that starts with the current - /// pattern and includes a new `SendChannel` after that. - /// - /// # Panics - /// - /// Panics if the supplied `SendChannel` does not carry the same - /// `JunctionID` as this `SendPartialPattern`, i.e. has not been created by - /// and is associated with the same `Junction`. - pub fn and(self, send_channel: &SendChannel) -> binary::SendPartialPattern - where - U: Any + Send, - { - if send_channel.junction_id() == self.junction_id { - binary::SendPartialPattern::new( - self.junction_id, - self.send_channel, - send_channel.strip(), - self.sender, - ) - } else { - panic!( - "SendChannel and SendPartialPattern not associated \ - with same Junction! Please use a SendChannel created \ - using the same Junction as this partially complete Join \ - Pattern" - ); - } - } - - /// Create a binary partial Join Pattern with a send and receive channel. - /// - /// Create a new binary partial Join Pattern that starts with the current - /// pattern and includes a new `RecvChannel` after that. - /// - /// # Panics - /// - /// Panics if the supplied `RecvChannel` does not carry the same - /// `JunctionID` as this `SendPartialPattern`, i.e. has not been created by - /// and is associated with the same `Junction`. - pub fn and_recv(self, recv_channel: &RecvChannel) -> binary::RecvPartialPattern - where - R: Any + Send, - { - if recv_channel.junction_id() == self.junction_id { - binary::RecvPartialPattern::new( - self.send_channel, - recv_channel.strip(), - self.sender, - ) - } else { - panic!( - "RecvChannel and SendPartialPattern not associated \ - with same Junction! Please use a RecvChannel created \ - using the same Junction as this partially complete Join \ - Pattern" - ); - } - } - - /// Create a binary partial Join Pattern with a send and bidirectional channel. - /// - /// Create a new binary partial Join Pattern that starts with the current - /// pattern and includes a new `BidirChannel` after that. - /// - /// # Panics - /// - /// Panics if the supplied `BidirChannel` does not carry the same - /// `JunctionID` as this `SendPartialPattern`, i.e. has not been created by - /// and is associated with the same `Junction`. - pub fn and_bidir( - self, - bidir_channel: &BidirChannel, - ) -> binary::BidirPartialPattern - where - U: Any + Send, - R: Any + Send, - { - if bidir_channel.junction_id() == self.junction_id { - binary::BidirPartialPattern::new( - self.send_channel, - bidir_channel.strip(), - self.sender, - ) - } else { - panic!( - "BidirChannel and SendPartialPattern not associated \ - with same Junction! Please use a BidirChannel created \ - using the same Junction as this partially complete Join \ - Pattern" - ); - } - } - - /// Create full Join Pattern and send request to add it to `Junction`. - /// - /// Create a full Join Pattern by taking the channels that are part of - /// the partial pattern and adding a function to be executed when there - /// is at least one message sent on each channel. Attempt to add the - /// Join Pattern to the `Junction` after creation. - /// - /// # Panics - /// - /// Panics if it was not possible to send the request to add the newly - /// create Join Pattern to the `Junction`. - pub fn then_do(self, f: F) - where - F: Fn(T) -> () + Send + Clone + 'static, - { - let join_pattern = JoinPattern::UnarySend(SendJoinPattern::new( - self.send_channel.id(), - function_transforms::unary::transform_send(f), - )); - - self.sender - .send(Packet::AddJoinPatternRequest { join_pattern }) - .unwrap(); - } - } - - /// `SendChannel` full Join Pattern. - pub struct SendJoinPattern { - channel_id: ids::ChannelId, - f: functions::unary::FnBox, - } - - impl SendJoinPattern { - pub(crate) fn new( - channel_id: ids::ChannelId, - f: functions::unary::FnBox, - ) -> SendJoinPattern { - SendJoinPattern { channel_id, f } - } - - /// Return the ID of the channel in this Join Pattern. - pub(crate) fn channel_id(&self) -> ids::ChannelId { - self.channel_id - } - - /// Fire Join Pattern by running associated function in separate thread. - pub(crate) fn fire(&self, arg: Message) { - let f_clone = self.f.clone(); - - thread::spawn(move || { - (*f_clone)(arg); - }); - } - } - - /************************************* - * Receive Join Pattern Construction * - *************************************/ - - /// `RecvChannel` partial Join Pattern. - pub struct RecvPartialPattern { - recv_channel: StrippedRecvChannel, - sender: Sender, - } - - impl RecvPartialPattern - where - R: Any + Send, - { - pub(crate) fn new( - recv_channel: StrippedRecvChannel, - sender: Sender, - ) -> RecvPartialPattern { - RecvPartialPattern { - recv_channel, - sender, - } - } - - /// Create full Join Pattern and send request to add it to `Junction`. - /// - /// Create a full Join Pattern by taking the channels that are part of - /// the partial pattern and adding a function to be executed when there - /// is at least one message sent on each channel. Attempt to add the - /// Join Pattern to the `Junction` after creation. - /// - /// # Panics - /// - /// Panics if it was not possible to send the request to add the newly - /// create Join Pattern to the `Junction`. - pub fn then_do(self, f: F) - where - F: Fn() -> R + Send + Clone + 'static, - { - let join_pattern = JoinPattern::UnaryRecv(RecvJoinPattern::new( - self.recv_channel.id(), - function_transforms::unary::transform_recv(f), - )); - - self.sender - .send(Packet::AddJoinPatternRequest { join_pattern }) - .unwrap(); - } - } - - /// `RecvChannel` full Join Pattern. - /// - /// N.B.: While this struct appears to be a duplicate of `SendJoinPattern` - /// in terms of code, it is used to distinguish the capability of the - /// Join Pattern within the `Junction` through its type. - pub struct RecvJoinPattern { - channel_id: ids::ChannelId, - f: functions::unary::FnBox, - } - - impl RecvJoinPattern { - pub(crate) fn new( - channel_id: ids::ChannelId, - f: functions::unary::FnBox, - ) -> RecvJoinPattern { - RecvJoinPattern { channel_id, f } - } - - /// Return the ID of the channel in this Join Pattern. - pub(crate) fn channel_id(&self) -> ids::ChannelId { - self.channel_id - } - - /// Fire Join Pattern by running associated function in separate thread. - pub(crate) fn fire(&self, return_sender: Message) { - let f_clone = self.f.clone(); - - thread::spawn(move || { - (*f_clone)(return_sender); - }); - } - } - - /******************************************* - * Bidirectional Join Pattern Construction * - *******************************************/ - - /// Bidirectional channel partial Join Pattern. - pub struct BidirPartialPattern { - bidir_channel: StrippedBidirChannel, - sender: Sender, - } - - impl BidirPartialPattern - where - T: Any + Send, - R: Any + Send, - { - pub(crate) fn new( - bidir_channel: StrippedBidirChannel, - sender: Sender, - ) -> BidirPartialPattern { - BidirPartialPattern { - bidir_channel, - sender, - } - } - - /// Create full Join Pattern and send request to add it to `Junction`. - /// - /// Create a full Join Pattern by taking the channels that are part of - /// the partial pattern and adding a function to be executed when there - /// is at least one message sent on each channel. Attempt to add the - /// Join Pattern to the `Junction` after creation. - /// - /// # Panics - /// - /// Panics if it was not possible to send the request to add the newly - /// create Join Pattern to the `Junction`. - pub fn then_do(self, f: F) - where - F: Fn(T) -> R + Send + Clone + 'static, - { - let join_pattern = JoinPattern::UnaryBidir(BidirJoinPattern::new( - self.bidir_channel.id(), - function_transforms::unary::transform_bidir(f), - )); - - self.sender - .send(Packet::AddJoinPatternRequest { join_pattern }) - .unwrap(); - } - } - - /// `BidirChannel` full Join Pattern. - pub struct BidirJoinPattern { - channel_id: ids::ChannelId, - f: functions::unary::FnBox, - } - - impl BidirJoinPattern { - pub(crate) fn new( - channel_id: ids::ChannelId, - f: functions::unary::FnBox, - ) -> BidirJoinPattern { - BidirJoinPattern { channel_id, f } - } - - /// Return the ID of the channel in this Join Pattern. - pub(crate) fn channel_id(&self) -> ids::ChannelId { - self.channel_id - } - - /// Fire Join Pattern by running associated function in separate thread. - pub(crate) fn fire(&self, arg_and_sender: Message) { - let f_clone = self.f.clone(); - - thread::spawn(move || { - (*f_clone)(arg_and_sender); - }); - } - } -} - -/// Structs for Join Patterns with two channels. -pub mod binary { - use super::*; - - /***************************************** - * Send & Send Join Pattern Construction * - *****************************************/ - - /// Two `SendChannel` partial Join Pattern. - pub struct SendPartialPattern { - junction_id: ids::JunctionId, - first_send_channel: StrippedSendChannel, - second_send_channel: StrippedSendChannel, - sender: Sender, - } - - impl SendPartialPattern - where - T: Any + Send, - U: Any + Send, - { - pub(crate) fn new( - junction_id: ids::JunctionId, - first_send_channel: StrippedSendChannel, - second_send_channel: StrippedSendChannel, - sender: Sender, - ) -> SendPartialPattern { - SendPartialPattern { - junction_id, - first_send_channel, - second_send_channel, - sender, - } - } - - /// Create a ternary partial `JoinPattern` with three send channels. - /// - /// Create a new ternary partial `JoinPattern` that starts with the current - /// pattern and includes a new `SendChannel` after that. - /// - /// # Panics - /// - /// Panics if the supplied `SendChannel` does not carry the same - /// `JunctionID` as this `SendPartialPattern`, i.e. has not been created by - /// and is associated with the same `Junction`. - pub fn and(self, send_channel: &SendChannel) -> ternary::SendPartialPattern - where - V: Any + Send, - { - if send_channel.junction_id() == self.junction_id { - ternary::SendPartialPattern::new( - self.junction_id, - self.first_send_channel, - self.second_send_channel, - send_channel.strip(), - self.sender, - ) - } else { - panic!( - "SendChannel and SendPartialPattern not associated \ - with same Junction! Please use a SendChannel created \ - using the same Junction as this partially complete Join \ - Pattern" - ); - } - } - - /// Create a ternary partial `JoinPattern` with two send and receive channel. - /// - /// Create a new ternary partial `JoinPattern` that starts with the current - /// pattern and includes a new `RecvChannel` after that. - /// - /// # Panics - /// - /// Panics if the supplied `RecvChannel` does not carry the same - /// `JunctionID` as this `SendPartialPattern`, i.e. has not been created by - /// and is associated with the same `Junction`. - pub fn and_recv( - self, - recv_channel: &RecvChannel, - ) -> ternary::RecvPartialPattern - where - R: Any + Send, - { - if recv_channel.junction_id() == self.junction_id { - ternary::RecvPartialPattern::new( - self.first_send_channel, - self.second_send_channel, - recv_channel.strip(), - self.sender, - ) - } else { - panic!( - "RecvChannel and SendPartialPattern not associated \ - with same Junction! Please use a RecvChannel created \ - using the same Junction as this partially complete Join \ - Pattern" - ); - } - } - - /// Create a ternary partial `JoinPattern` with two send and bidirectional channel. - /// - /// Create a new ternary partial Join Pattern that starts with the current - /// pattern and includes a new `BidirChannel` after that. - /// - /// # Panics - /// - /// Panics if the supplied `BidirChannel` does not carry the same - /// `JunctionID` as this `SendPartialPattern`, i.e. has not been created by - /// and is associated with the same `Junction`. - pub fn and_bidir( - self, - bidir_channel: &BidirChannel, - ) -> ternary::BidirPartialPattern - where - V: Any + Send, - R: Any + Send, - { - if bidir_channel.junction_id() == self.junction_id { - ternary::BidirPartialPattern::new( - self.first_send_channel, - self.second_send_channel, - bidir_channel.strip(), - self.sender, - ) - } else { - panic!( - "BidirChannel and SendPartialPattern not associated \ - with same Junction! Please use a BidirChannel created \ - using the same Junction as this partially complete Join \ - Pattern" - ); - } - } - - /// Create full Join Pattern and send request to add it to `Junction`. - /// - /// Create a full Join Pattern by taking the channels that are part of - /// the partial pattern and adding a function to be executed when there - /// is at least one message sent on each channel. Attempt to add the - /// Join Pattern to the `Junction` after creation. - /// - /// # Panics - /// - /// Panics if it was not possible to send the request to add the newly - /// create Join Pattern to the `Junction`. - pub fn then_do(self, f: F) - where - F: Fn(T, U) -> () + Send + Clone + 'static, - { - let join_pattern = JoinPattern::BinarySend(SendJoinPattern::new( - self.first_send_channel.id(), - self.second_send_channel.id(), - function_transforms::binary::transform_send(f), - )); - - self.sender - .send(Packet::AddJoinPatternRequest { join_pattern }) - .unwrap(); - } - } - - /// `SendChannel` & `SendChannel` full Join Pattern. - pub struct SendJoinPattern { - first_send_channel_id: ids::ChannelId, - second_send_channel_id: ids::ChannelId, - f: functions::binary::FnBox, - } - - impl SendJoinPattern { - pub(crate) fn new( - first_send_channel_id: ids::ChannelId, - second_send_channel_id: ids::ChannelId, - f: functions::binary::FnBox, - ) -> SendJoinPattern { - SendJoinPattern { - first_send_channel_id, - second_send_channel_id, - f, - } - } - - /// Return the ID of the first `SendChannel` in this Join Pattern. - pub(crate) fn first_send_channel_id(&self) -> ids::ChannelId { - self.first_send_channel_id - } - - /// Return the ID of the second `SendChannel` in this Join Pattern. - pub(crate) fn second_send_channel_id(&self) -> ids::ChannelId { - self.second_send_channel_id - } - - /// Fire Join Pattern by running associated function in separate thread. - pub(crate) fn fire(&self, arg_1: Message, arg_2: Message) { - let f_clone = self.f.clone(); - - thread::spawn(move || { - (*f_clone)(arg_1, arg_2); - }); - } - } - - /******************************************** - * Send & Receive Join Pattern Construction * - ********************************************/ - - /// `SendChannel` & `RecvChannel` partial Join Pattern. - pub struct RecvPartialPattern { - send_channel: StrippedSendChannel, - recv_channel: StrippedRecvChannel, - sender: Sender, - } - - impl RecvPartialPattern - where - T: Any + Send, - R: Any + Send, - { - pub(crate) fn new( - send_channel: StrippedSendChannel, - recv_channel: StrippedRecvChannel, - sender: Sender, - ) -> RecvPartialPattern { - RecvPartialPattern { - send_channel, - recv_channel, - sender, - } - } - - /// Create full Join Pattern and send request to add it to `Junction`. - /// - /// Create a full Join Pattern by taking the channels that are part of - /// the partial pattern and adding a function to be executed when there - /// is at least one message sent on each channel. Attempt to add the - /// Join Pattern to the `Junction` after creation. - /// - /// # Panics - /// - /// Panics if it was not possible to send the request to add the newly - /// create Join Pattern to the `Junction`. - pub fn then_do(self, f: F) - where - F: Fn(T) -> R + Send + Clone + 'static, - { - let join_pattern = JoinPattern::BinaryRecv(RecvJoinPattern::new( - self.send_channel.id(), - self.recv_channel.id(), - function_transforms::binary::transform_recv(f), - )); - - self.sender - .send(Packet::AddJoinPatternRequest { join_pattern }) - .unwrap(); - } - } - - /// `SendChannel` & `RecvChannel` full Join Pattern. - /// - /// N.B.: While this struct appears to be a duplicate of `SendJoinPattern` - /// in terms of code, it is used to distinguish the capability of the - /// Join Pattern within the `Junction` through its type. - pub struct RecvJoinPattern { - send_channel_id: ids::ChannelId, - recv_channel_id: ids::ChannelId, - f: functions::binary::FnBox, - } - - impl RecvJoinPattern { - pub(crate) fn new( - send_channel_id: ids::ChannelId, - recv_channel_id: ids::ChannelId, - f: functions::binary::FnBox, - ) -> RecvJoinPattern { - RecvJoinPattern { - send_channel_id, - recv_channel_id, - f, - } - } - - /// Return the ID of the `SendChannel` in this Join Pattern. - pub(crate) fn send_channel_id(&self) -> ids::ChannelId { - self.send_channel_id - } - - /// Return the ID of the `RecvChannel` in this Join Pattern. - pub(crate) fn recv_channel_id(&self) -> ids::ChannelId { - self.recv_channel_id - } - - /// Fire Join Pattern by running associated function in separate thread. - pub(crate) fn fire(&self, msg: Message, return_sender: Message) { - let f_clone = self.f.clone(); - - thread::spawn(move || { - (*f_clone)(msg, return_sender); - }); - } - } - - /************************************************** - * Send & Bidirectional Join Pattern Construction * - **************************************************/ - - /// `SendChannel` & `BidirChannel` partial Join Pattern. - pub struct BidirPartialPattern { - send_channel: StrippedSendChannel, - bidir_channel: StrippedBidirChannel, - sender: Sender, - } - - impl BidirPartialPattern - where - T: Any + Send, - U: Any + Send, - R: Any + Send, - { - pub(crate) fn new( - send_channel: StrippedSendChannel, - bidir_channel: StrippedBidirChannel, - sender: Sender, - ) -> BidirPartialPattern { - BidirPartialPattern { - send_channel, - bidir_channel, - sender, - } - } - - /// Create full Join Pattern and send request to add it to `Junction`. - /// - /// Create a full Join Pattern by taking the channels that are part of - /// the partial pattern and adding a function to be executed when there - /// is at least one message sent on each channel. Attempt to add the - /// Join Pattern to the `Junction` after creation. - /// - /// # Panics - /// - /// Panics if it was not possible to send the request to add the newly - /// create Join Pattern to the `Junction`. - pub fn then_do(self, f: F) - where - F: Fn(T, U) -> R + Send + Clone + 'static, - { - let join_pattern = JoinPattern::BinaryBidir(BidirJoinPattern::new( - self.send_channel.id(), - self.bidir_channel.id(), - function_transforms::binary::transform_bidir(f), - )); - - self.sender - .send(Packet::AddJoinPatternRequest { join_pattern }) - .unwrap(); - } - } - - /// `SendChannel` & `BidirChannel` full Join Pattern. - pub struct BidirJoinPattern { - send_channel_id: ids::ChannelId, - bidir_channel_id: ids::ChannelId, - f: functions::binary::FnBox, - } - - impl BidirJoinPattern { - pub(crate) fn new( - send_channel_id: ids::ChannelId, - bidir_channel_id: ids::ChannelId, - f: functions::binary::FnBox, - ) -> BidirJoinPattern { - BidirJoinPattern { - send_channel_id, - bidir_channel_id, - f, - } - } - - /// Return the ID of the `SendChannel` in this Join Pattern. - pub(crate) fn send_channel_id(&self) -> ids::ChannelId { - self.send_channel_id - } - - /// Return the ID of the `BidirChannel` in this Join Pattern. - pub(crate) fn bidir_channel_id(&self) -> ids::ChannelId { - self.bidir_channel_id - } - - /// Fire Join Pattern by running associated function in separate thread. - pub(crate) fn fire(&self, arg_1: Message, arg_2_and_sender: Message) { - let f_clone = self.f.clone(); - - thread::spawn(move || { - (*f_clone)(arg_1, arg_2_and_sender); - }); - } - } -} - -/// Structs for Join Patterns with two channels. -pub mod ternary { - use super::*; - - /**************************************** - * Three Send Join Pattern Construction * - ****************************************/ - - /// Three `SendChannel` partial Join Pattern. - pub struct SendPartialPattern { - junction_id: ids::JunctionId, - first_send_channel: StrippedSendChannel, - second_send_channel: StrippedSendChannel, - third_send_channel: StrippedSendChannel, - sender: Sender, - } - - impl SendPartialPattern - where - T: Any + Send, - U: Any + Send, - V: Any + Send, - { - pub(crate) fn new( - junction_id: ids::JunctionId, - first_send_channel: StrippedSendChannel, - second_send_channel: StrippedSendChannel, - third_send_channel: StrippedSendChannel, - sender: Sender, - ) -> SendPartialPattern { - SendPartialPattern { - junction_id, - first_send_channel, - second_send_channel, - third_send_channel, - sender, - } - } - - /// Create full Join Pattern and send request to add it to `Junction`. - /// - /// Create a full Join Pattern by taking the channels that are part of - /// the partial pattern and adding a function to be executed when there - /// is at least one message sent on each channel. Attempt to add the - /// Join Pattern to the `Junction` after creation. - /// - /// # Panics - /// - /// Panics if it was not possible to send the request to add the newly - /// create Join Pattern to the `Junction`. - pub fn then_do(self, f: F) - where - F: Fn(T, U, V) -> () + Send + Clone + 'static, - { - let join_pattern = JoinPattern::TernarySend(SendJoinPattern::new( - self.first_send_channel.id(), - self.second_send_channel.id(), - self.third_send_channel.id(), - function_transforms::ternary::transform_send(f), - )); - - self.sender - .send(Packet::AddJoinPatternRequest { join_pattern }) - .unwrap(); - } - } - - /// Three `SendChannel` full `JoinPattern`. - pub struct SendJoinPattern { - first_send_channel_id: ids::ChannelId, - second_send_channel_id: ids::ChannelId, - third_send_channel_id: ids::ChannelId, - f: functions::ternary::FnBox, - } - - impl SendJoinPattern { - pub(crate) fn new( - first_send_channel_id: ids::ChannelId, - second_send_channel_id: ids::ChannelId, - third_send_channel_id: ids::ChannelId, - f: functions::ternary::FnBox, - ) -> SendJoinPattern { - SendJoinPattern { - first_send_channel_id, - second_send_channel_id, - third_send_channel_id, - f, - } - } - - /// Return the ID of the first `SendChannel` in this Join Pattern. - pub(crate) fn first_send_channel_id(&self) -> ids::ChannelId { - self.first_send_channel_id - } - - /// Return the ID of the second `SendChannel` in this Join Pattern. - pub(crate) fn second_send_channel_id(&self) -> ids::ChannelId { - self.second_send_channel_id - } - - /// Return the ID of the third `SendChannel` in this Join Pattern. - pub(crate) fn third_send_channel_id(&self) -> ids::ChannelId { - self.third_send_channel_id - } - - /// Fire Join Pattern by running associated function in separate thread. - pub(crate) fn fire(&self, arg_1: Message, arg_2: Message, arg_3: Message) { - let f_clone = self.f.clone(); - - thread::spawn(move || { - (*f_clone)(arg_1, arg_2, arg_3); - }); - } - } - - /******************************************** - * Send & Receive Join Pattern Construction * - ********************************************/ - - /// Two `SendChannel` & `RecvChannel` partial Join Pattern. - pub struct RecvPartialPattern { - first_send_channel: StrippedSendChannel, - second_send_channel: StrippedSendChannel, - recv_channel: StrippedRecvChannel, - sender: Sender, - } - - impl RecvPartialPattern - where - T: Any + Send, - U: Any + Send, - R: Any + Send, - { - pub(crate) fn new( - first_send_channel: StrippedSendChannel, - second_send_channel: StrippedSendChannel, - recv_channel: StrippedRecvChannel, - sender: Sender, - ) -> RecvPartialPattern { - RecvPartialPattern { - first_send_channel, - second_send_channel, - recv_channel, - sender, - } - } - - /// Create full Join Pattern and send request to add it to `Junction`. - /// - /// Create a full Join Pattern by taking the channels that are part of - /// the partial pattern and adding a function to be executed when there - /// is at least one message sent on each channel. Attempt to add the - /// Join Pattern to the `Junction` after creation. - /// - /// # Panics - /// - /// Panics if it was not possible to send the request to add the newly - /// create Join Pattern to the `Junction`. - pub fn then_do(self, f: F) - where - F: Fn(T, U) -> R + Send + Clone + 'static, - { - let join_pattern = JoinPattern::TernaryRecv(RecvJoinPattern::new( - self.first_send_channel.id(), - self.second_send_channel.id(), - self.recv_channel.id(), - function_transforms::ternary::transform_recv(f), - )); - - self.sender - .send(Packet::AddJoinPatternRequest { join_pattern }) - .unwrap(); - } - } - - /// Two `SendChannel` & `RecvChannel` full `JoinPattern`. - /// - /// N.B.: While this struct appears to be a duplicate of `SendJoinPattern` - /// in terms of code, it is used to distinguish the capability of the - /// Join Pattern within the `Junction` through its type. - pub struct RecvJoinPattern { - first_send_channel_id: ids::ChannelId, - second_send_channel_id: ids::ChannelId, - recv_channel_id: ids::ChannelId, - f: functions::ternary::FnBox, - } - - impl RecvJoinPattern { - pub(crate) fn new( - first_send_channel_id: ids::ChannelId, - second_send_channel_id: ids::ChannelId, - recv_channel_id: ids::ChannelId, - f: functions::ternary::FnBox, - ) -> RecvJoinPattern { - RecvJoinPattern { - first_send_channel_id, - second_send_channel_id, - recv_channel_id, - f, - } - } - - /// Return the ID of first `SendChannel` in this `JoinPattern`. - pub(crate) fn first_send_channel_id(&self) -> ids::ChannelId { - self.first_send_channel_id - } - - /// Return the ID of second `SendChannel` in this `JoinPattern`. - pub(crate) fn second_send_channel_id(&self) -> ids::ChannelId { - self.second_send_channel_id - } - - /// Return the ID of the `RecvChannel` in this `JoinPattern`. - pub(crate) fn recv_channel_id(&self) -> ids::ChannelId { - self.recv_channel_id - } - - /// Fire `JoinPattern` by running associated function in separate thread. - pub(crate) fn fire(&self, arg_1: Message, arg_2: Message, return_sender: Message) { - let f_clone = self.f.clone(); - - thread::spawn(move || { - (*f_clone)(arg_1, arg_2, return_sender); - }); - } - } - - /************************************************** - * Send & Bidirectional Join Pattern Construction * - **************************************************/ - - /// `SendChannel` & `BidirChannel` partial Join Pattern. - pub struct BidirPartialPattern { - first_send_channel: StrippedSendChannel, - second_send_channel: StrippedSendChannel, - bidir_channel: StrippedBidirChannel, - sender: Sender, - } - - impl BidirPartialPattern - where - T: Any + Send, - U: Any + Send, - V: Any + Send, - R: Any + Send, - { - pub(crate) fn new( - first_send_channel: StrippedSendChannel, - second_send_channel: StrippedSendChannel, - bidir_channel: StrippedBidirChannel, - sender: Sender, - ) -> BidirPartialPattern { - BidirPartialPattern { - first_send_channel, - second_send_channel, - bidir_channel, - sender, - } - } - - /// Create full `JoinPattern` and send request to add it to `Junction`. - /// - /// Create a full Join Pattern by taking the channels that are part of - /// the partial pattern and adding a function to be executed when there - /// is at least one message sent on each channel. Attempt to add the - /// Join Pattern to the `Junction` after creation. - /// - /// # Panics - /// - /// Panics if it was not possible to send the request to add the newly - /// create Join Pattern to the `Junction`. - pub fn then_do(self, f: F) - where - F: Fn(T, U, V) -> R + Send + Clone + 'static, - { - let join_pattern = JoinPattern::TernaryBidir(BidirJoinPattern::new( - self.first_send_channel.id(), - self.second_send_channel.id(), - self.bidir_channel.id(), - function_transforms::ternary::transform_bidir(f), - )); - - self.sender - .send(Packet::AddJoinPatternRequest { join_pattern }) - .unwrap(); - } - } - - /// Two `SendChannel` & `BidirChannel` full `JoinPattern`. - pub struct BidirJoinPattern { - first_send_channel_id: ids::ChannelId, - second_send_channel_id: ids::ChannelId, - bidir_channel_id: ids::ChannelId, - f: functions::ternary::FnBox, - } - - impl BidirJoinPattern { - pub(crate) fn new( - first_send_channel_id: ids::ChannelId, - second_send_channel_id: ids::ChannelId, - bidir_channel_id: ids::ChannelId, - f: functions::ternary::FnBox, - ) -> BidirJoinPattern { - BidirJoinPattern { - first_send_channel_id, - second_send_channel_id, - bidir_channel_id, - f, - } - } - - /// Return the ID of first `SendChannel` in this Join Pattern. - pub(crate) fn first_send_channel_id(&self) -> ids::ChannelId { - self.first_send_channel_id - } - - /// Return the ID of second `SendChannel` in this Join Pattern. - pub(crate) fn second_send_channel_id(&self) -> ids::ChannelId { - self.second_send_channel_id - } - - /// Return the ID of the `BidirChannel` in this Join Pattern. - pub(crate) fn bidir_channel_id(&self) -> ids::ChannelId { - self.bidir_channel_id - } - - /// Fire Join Pattern by running associated function in separate thread. - pub(crate) fn fire(&self, arg_1: Message, arg_2: Message, arg_3_and_sender: Message) { - let f_clone = self.f.clone(); - - thread::spawn(move || { - (*f_clone)(arg_1, arg_2, arg_3_and_sender); - }); - } - } -} diff --git a/src/types.rs b/src/types.rs index bc5df72..455fd1b 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,11 +1,8 @@ //! Collection of types to increase readability and maintainability of the //! crate. -use std::any::Any; -use std::sync::mpsc::Sender; -use std::thread::{JoinHandle, Thread}; - -use crate::patterns; +use crate::join_pattern::JoinPattern; +use std::{any::Any, marker::Send, sync::mpsc::Sender}; /// Shallow wrapper for a trait object using `Box` that can pass through thread /// boundaries. @@ -42,186 +39,26 @@ pub enum Packet { return_sender: Sender, }, /// Request adding a new Join Pattern to the Junction. - AddJoinPatternRequest { join_pattern: JoinPattern }, + // TODO: Currently dynamic dispatch is being used + AddJoinPatternRequest { + join_pattern: Box, + }, /// Request the internal control thread managing the `Message`s to shut down. ShutDownRequest, } -/// Enum defining all Join Patterns that can be added to a Junction using the -/// `AddJoinPatternRequest` in a `Packet`. -pub enum JoinPattern { - /// Single channel Join Pattern. - UnarySend(patterns::unary::SendJoinPattern), - /// Single `RecvChannel` Join Pattern. - UnaryRecv(patterns::unary::RecvJoinPattern), - /// Single `BidirChannel` Join Pattern. - UnaryBidir(patterns::unary::BidirJoinPattern), - /// Two `SendChannel` Join Pattern. - BinarySend(patterns::binary::SendJoinPattern), - /// `SendChannel` and `RecvChannel` Join Pattern. - BinaryRecv(patterns::binary::RecvJoinPattern), - /// `SendChannel` and `BidirChannel` Join Pattern. - BinaryBidir(patterns::binary::BidirJoinPattern), - /// Three `SendChannel` Join Pattern. - TernarySend(patterns::ternary::SendJoinPattern), - /// Two `SendChannel` and `RecvChannel` Join Pattern. - TernaryRecv(patterns::ternary::RecvJoinPattern), - /// Two `SendChannel` and `BidirChannel` Join Pattern. - TernaryBidir(patterns::ternary::BidirJoinPattern), -} - -/// Handle to a `Junction`'s underlying `Controller`. -/// -/// This struct carries a `JoinHandle` to the thread that the `Controller` of -/// a `Junction` is running in. It allows for the `Controller` and its thread -/// to be stopped gracefully at any point. -pub struct ControllerHandle { - sender: Sender, - control_thread_handle: Option>, -} - -impl ControllerHandle { - pub(crate) fn new(sender: Sender, handle: JoinHandle<()>) -> ControllerHandle { - ControllerHandle { - sender, - control_thread_handle: Some(handle), - } - } - - /// Extracts a handle to the underlying thread. - pub fn thread(&self) -> Option<&Thread> { - match &self.control_thread_handle { - Some(h) => Some(h.thread()), - None => None, - } - } - - /// Request the `Controller` to stop gracefully, then join its thread. - /// - /// # Panics - /// - /// Panics if it was unable to send shut-down request to the control thread. - pub fn stop(&mut self) { - self.sender.send(Packet::ShutDownRequest).unwrap(); - - self.control_thread_handle.take().unwrap().join().unwrap(); - } -} - -/// Function types related to various kind of functions that can be stored and -/// executed with Join Patterns. -pub mod functions { - use super::*; - - /// Types and Traits for functions which take one argument. - pub mod unary { - use super::*; - - /// Trait to allow boxed up functions that take one `Message` and return - /// nothing to be cloned. - pub trait FnBoxClone: Fn(Message) -> () + Send { - fn clone_box(&self) -> Box; - } - - impl FnBoxClone for F - where - F: Fn(Message) -> () + Send + Clone + 'static, - { - /// Proxy function to be able to implement the `Clone` trait on - /// boxed up functions that take one `Message` and return nothing. - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } - } - - impl Clone for Box { - fn clone(&self) -> Box { - (**self).clone_box() - } - } - - /// Type alias for boxed up cloneable functions that take one `Message` and - /// return nothing. Mainly meant to increase readability of code. - pub type FnBox = Box; - } - - /// Types and Traits for functions which take two arguments. - pub mod binary { - use super::*; - - /// Trait to allow boxed up functions that take two `Message`s and return - /// nothing to be cloned. - pub trait FnBoxClone: Fn(Message, Message) -> () + Send { - fn clone_box(&self) -> Box; - } - - impl FnBoxClone for F - where - F: Fn(Message, Message) -> () + Send + Clone + 'static, - { - /// Proxy function to be able to implement the `Clone` trait on - /// boxed up functions that take two `Message`s and return nothing. - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } - } - - impl Clone for Box { - fn clone(&self) -> Box { - (**self).clone_box() - } - } - - /// Type alias for boxed up cloneable functions that take two `Message`s and - /// return nothing. Mainly meant to increase readability of code. - pub type FnBox = Box; - } - - /// Types and Traits for functions which take three arguments. - pub mod ternary { - use super::*; - - /// Trait to allow boxed up functions that take three `Message`s and return - /// nothing to be cloned. - pub trait FnBoxClone: Fn(Message, Message, Message) -> () + Send { - fn clone_box(&self) -> Box; - } - - impl FnBoxClone for F - where - F: Fn(Message, Message, Message) -> () + Send + Clone + 'static, - { - /// Proxy function to be able to implement the `Clone` trait on - /// boxed up functions that take three `Message`s and return nothing. - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } - } - - impl Clone for Box { - fn clone(&self) -> Box { - (**self).clone_box() - } - } - - /// Type alias for boxed up cloneable functions that take three `Message`s and - /// return nothing. Mainly meant to increase readability of code. - pub type FnBox = Box; - } -} - /// Adds specific ID types for the various IDs that are used in the crate. pub mod ids { use std::sync::atomic::{AtomicUsize, Ordering}; /// ID to identify a channel within a Join Pattern. - #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)] + #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Ord, PartialOrd)] pub struct ChannelId(usize); impl ChannelId { - pub(crate) fn new(value: usize) -> ChannelId { - ChannelId(value) - } + // pub(crate) fn new(value: usize) -> ChannelId { + // ChannelId(value) + // } /// Increment the internal value of the channel ID. pub(crate) fn increment(&mut self) {