diff --git a/src/lib.rs b/src/lib.rs index c587b50..adaf95a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ pub mod types; -use crate::types::Message; +use crate::types::{BroadcastMessage, Message, TopologyMessage}; use rand::{rngs::SmallRng, Rng, SeedableRng}; use std::{ cell::RefCell, @@ -23,6 +23,8 @@ pub struct Malestorm<'a> { struct Node { id: String, nodes: Vec, + broadcasts: Vec, + topology: HashMap>, rng: SmallRng, } @@ -41,6 +43,8 @@ impl<'a> Default for Malestorm<'a> { node: RefCell::new(Node { id: String::new(), nodes: Vec::new(), + broadcasts: Vec::new(), + topology: HashMap::new(), rng: SmallRng::seed_from_u64(seed.as_secs()), }), io: MalestormIo { @@ -125,6 +129,23 @@ impl<'a> Malestorm<'a> { node.nodes = nodes; } + pub fn record_broadcast(&self, broadcast: BroadcastMessage) { + let mut node = self.node.borrow_mut(); + node.broadcasts.push(broadcast.message); + } + + pub fn read_broadcasts(&self) -> Vec { + let node = self.node.borrow(); + node.broadcasts.clone() + } + + // TODO: this'll probably be more complex later on if we have ops to update toplogy + pub fn save_toplogy(&self, msg: TopologyMessage) { + let mut node = self.node.borrow_mut(); + + node.topology = msg.topology; + } + pub fn generate_client_id(&self) -> String { let mut node = self.node.borrow_mut(); diff --git a/src/main.rs b/src/main.rs index 618b687..bb39d1f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,8 @@ use distributed_systems_flyio::{ - types::{EchoMessage, GenerateMessage, InitMessage, Message}, + types::{ + BroadcastMessage, EchoMessage, GenerateMessage, InitMessage, Message, ReadMessage, + TopologyMessage, + }, Malestorm, }; use std::rc::Rc; @@ -54,5 +57,52 @@ fn main() { .map_err(|e| format!("generate: error in writing response: {}", e)) }, ); + + program.register( + "broadcast", + |mut msg: Message, program: Rc| -> Result<(), String> { + let body: BroadcastMessage = serde_json::from_value(msg.body.message_body.take()) + .map_err(|e| format!("error in parsing response body: {}", e))?; + msg.body.message_type = "broadcast_ok".into(); + + program.record_broadcast(body); + + program + .reply(msg) + .map_err(|e| format!("broadcast: error in writing response: {}", e)) + }, + ); + + program.register( + "read", + |mut msg: Message, program: Rc| -> Result<(), String> { + msg.body.message_type = "read_ok".into(); + + let body = serde_json::to_value(ReadMessage { + messages: program.read_broadcasts(), + }) + .map_err(|e| format!("error in marshalling response: {}", e))?; + msg.body.message_body = body; + + program + .reply(msg) + .map_err(|e| format!("broadcast: error in writing response: {}", e)) + }, + ); + + program.register( + "topology", + |mut msg: Message, program: Rc| -> Result<(), String> { + let topology: TopologyMessage = serde_json::from_value(msg.body.message_body.take()) + .map_err(|e| format!("error in parsing response: {}", e))?; + + msg.body.message_type = "topology_ok".into(); + program.save_toplogy(topology); + + program + .reply(msg) + .map_err(|e| format!("broadcast: error in writing response: {}", e)) + }, + ); program.run(); } diff --git a/src/types.rs b/src/types.rs index a2a01ed..e130897 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] @@ -41,6 +43,24 @@ pub struct GenerateMessage { pub id: String, } +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct BroadcastMessage { + pub message: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ReadMessage { + pub messages: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct TopologyMessage { + pub topology: HashMap>, +} + #[cfg(test)] mod test {