1
0

Completed broadcasts

This commit is contained in:
Ishan Jain 2024-01-04 02:28:26 +05:30
parent d78ea520ca
commit 42fd8e997a
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
3 changed files with 93 additions and 2 deletions

View File

@ -2,7 +2,7 @@
pub mod types; pub mod types;
use crate::types::Message; use crate::types::{BroadcastMessage, Message, TopologyMessage};
use rand::{rngs::SmallRng, Rng, SeedableRng}; use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::{ use std::{
cell::RefCell, cell::RefCell,
@ -23,6 +23,8 @@ pub struct Malestorm<'a> {
struct Node { struct Node {
id: String, id: String,
nodes: Vec<String>, nodes: Vec<String>,
broadcasts: Vec<i64>,
topology: HashMap<String, Vec<String>>,
rng: SmallRng, rng: SmallRng,
} }
@ -41,6 +43,8 @@ impl<'a> Default for Malestorm<'a> {
node: RefCell::new(Node { node: RefCell::new(Node {
id: String::new(), id: String::new(),
nodes: Vec::new(), nodes: Vec::new(),
broadcasts: Vec::new(),
topology: HashMap::new(),
rng: SmallRng::seed_from_u64(seed.as_secs()), rng: SmallRng::seed_from_u64(seed.as_secs()),
}), }),
io: MalestormIo { io: MalestormIo {
@ -125,6 +129,23 @@ impl<'a> Malestorm<'a> {
node.nodes = nodes; node.nodes = nodes;
} }
pub fn record_broadcast(&self, broadcast: BroadcastMessage) {
let mut node = self.node.borrow_mut();
node.broadcasts.push(broadcast.message);
}
pub fn read_broadcasts(&self) -> Vec<i64> {
let node = self.node.borrow();
node.broadcasts.clone()
}
// TODO: this'll probably be more complex later on if we have ops to update toplogy
pub fn save_toplogy(&self, msg: TopologyMessage) {
let mut node = self.node.borrow_mut();
node.topology = msg.topology;
}
pub fn generate_client_id(&self) -> String { pub fn generate_client_id(&self) -> String {
let mut node = self.node.borrow_mut(); let mut node = self.node.borrow_mut();

View File

@ -1,5 +1,8 @@
use distributed_systems_flyio::{ use distributed_systems_flyio::{
types::{EchoMessage, GenerateMessage, InitMessage, Message}, types::{
BroadcastMessage, EchoMessage, GenerateMessage, InitMessage, Message, ReadMessage,
TopologyMessage,
},
Malestorm, Malestorm,
}; };
use std::rc::Rc; use std::rc::Rc;
@ -54,5 +57,52 @@ fn main() {
.map_err(|e| format!("generate: error in writing response: {}", e)) .map_err(|e| format!("generate: error in writing response: {}", e))
}, },
); );
program.register(
"broadcast",
|mut msg: Message, program: Rc<Malestorm>| -> Result<(), String> {
let body: BroadcastMessage = serde_json::from_value(msg.body.message_body.take())
.map_err(|e| format!("error in parsing response body: {}", e))?;
msg.body.message_type = "broadcast_ok".into();
program.record_broadcast(body);
program
.reply(msg)
.map_err(|e| format!("broadcast: error in writing response: {}", e))
},
);
program.register(
"read",
|mut msg: Message, program: Rc<Malestorm>| -> Result<(), String> {
msg.body.message_type = "read_ok".into();
let body = serde_json::to_value(ReadMessage {
messages: program.read_broadcasts(),
})
.map_err(|e| format!("error in marshalling response: {}", e))?;
msg.body.message_body = body;
program
.reply(msg)
.map_err(|e| format!("broadcast: error in writing response: {}", e))
},
);
program.register(
"topology",
|mut msg: Message, program: Rc<Malestorm>| -> Result<(), String> {
let topology: TopologyMessage = serde_json::from_value(msg.body.message_body.take())
.map_err(|e| format!("error in parsing response: {}", e))?;
msg.body.message_type = "topology_ok".into();
program.save_toplogy(topology);
program
.reply(msg)
.map_err(|e| format!("broadcast: error in writing response: {}", e))
},
);
program.run(); program.run();
} }

View File

@ -1,3 +1,5 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -41,6 +43,24 @@ pub struct GenerateMessage {
pub id: String, pub id: String,
} }
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct BroadcastMessage {
pub message: i64,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ReadMessage {
pub messages: Vec<i64>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct TopologyMessage {
pub topology: HashMap<String, Vec<String>>,
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {