From e911ee72d4918446c5105da36da161f1e1304864 Mon Sep 17 00:00:00 2001 From: Ishan Jain Date: Thu, 4 Jan 2024 00:28:06 +0530 Subject: [PATCH] Added code to initialize node --- src/lib.rs | 85 ++++++++++++++++++++++++++++++++++------------------ src/main.rs | 30 ++++++++++++++----- src/types.rs | 9 ++++++ 3 files changed, 87 insertions(+), 37 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 465e609..de5c1ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,23 @@ pub mod types; use crate::types::Message; use std::{ + cell::RefCell, collections::HashMap, io::{stdin, stdout, BufRead, Error as IoError, Stdin, Stdout, Write}, rc::Rc, }; pub struct Malestorm<'a> { + node: RefCell, io: MalestormIo, pub handlers: HashMap<&'a str, Box)>>, } +struct Node { + id: String, + nodes: Vec, +} + struct MalestormIo { stdin: Stdin, stdout: Stdout, @@ -19,6 +26,10 @@ struct MalestormIo { impl<'a> Default for Malestorm<'a> { fn default() -> Self { Self { + node: RefCell::new(Node { + id: String::new(), + nodes: Vec::new(), + }), io: MalestormIo { stdin: stdin(), stdout: stdout(), @@ -47,38 +58,51 @@ impl<'a> Malestorm<'a> { let program = Rc::new(self); let mut buf = Vec::new(); - loop { - let read = match program.io.read_line(&mut buf) { - Ok(v) => v, - Err(e) => panic!("{:?}", e), - }; - if read == 0 { - continue; - } + let read = match program.io.read_line(&mut buf) { + Ok(v) => v, + Err(e) => panic!("{:?}", e), + }; - let message: Message = match serde_json::from_slice(&buf) { - Ok(v) => v, - Err(e) => { - eprintln!("error in parsing input: {:?}", e); - continue; - } - }; - - let mtype = message.body.message_type.clone(); - - let pc = program.clone(); - - let handler = match program.handlers.get(&mtype.as_str()) { - Some(v) => v, - None => { - eprintln!("no handler found for {}", message.body.message_type); - continue; - } - }; - - handler(message, pc); + if read == 0 { + return; } + + let message: Message = match serde_json::from_slice(&buf) { + Ok(v) => v, + Err(e) => { + eprintln!( + "error in parsing input: {:?} msg = {}", + e, + String::from_utf8_lossy(&buf) + ); + return; + } + }; + + let mtype = message.body.message_type.clone(); + + let pc = program.clone(); + + let handler = match program.handlers.get(&mtype.as_str()) { + Some(v) => v, + None => { + eprintln!("no handler found for {}", message.body.message_type); + return; + } + }; + + handler(message, pc); + } + + pub fn set_node_id(&self, node_id: String) { + let mut node = self.node.borrow_mut(); + node.id = node_id; + } + + pub fn set_nodes(&self, nodes: Vec) { + let mut node = self.node.borrow_mut(); + node.nodes = nodes; } pub fn register( @@ -94,6 +118,9 @@ impl<'a> Malestorm<'a> { // Add the correct value for in_reply_to std::mem::swap(&mut msg.src, &mut msg.dest); + let node = self.node.borrow(); + msg.src = node.id.clone(); + msg.body.in_reply_to = msg.body.msg_id; let out = serde_json::to_vec(&msg)?; diff --git a/src/main.rs b/src/main.rs index 928a715..8da0f75 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,32 @@ -#![allow(special_module_name)] - -use std::rc::Rc; - use distributed_systems_flyio::{ - types::{EchoMessage, Message}, + types::{EchoMessage, InitMessage, Message}, Malestorm, }; +use std::rc::Rc; fn main() { let mut program = Malestorm::default(); + program.register("init", |mut msg: Message, program: Rc| { + let message_body: InitMessage = serde_json::from_value(msg.body.message_body.take()) + .expect("error in parsing message body"); + + program.set_node_id(message_body.node_id); + program.set_nodes(message_body.nodes); + + msg.body.message_type = "init_ok".into(); + + match program.reply(msg) { + Ok(_) => (), + Err(e) => { + eprintln!("init: error in writing response: {}", e); + } + } + }); + program.register("echo", |mut msg: Message, program: Rc| { - let message_body: EchoMessage = - serde_json::from_value(msg.body.message_body).expect("error in parsing message body"); + let message_body: EchoMessage = serde_json::from_value(msg.body.message_body.take()) + .expect("error in parsing message body"); msg.body.message_type = "echo_ok".into(); msg.body.message_body = serde_json::to_value(message_body).unwrap(); @@ -20,7 +34,7 @@ fn main() { match program.reply(msg) { Ok(_) => (), Err(e) => { - eprintln!("error in writing response: {}", e); + eprintln!("echo: error in writing response: {}", e); } } }); diff --git a/src/types.rs b/src/types.rs index b2ed2f7..6c6763a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct Message { + pub id: Option, pub src: String, pub dest: String, pub body: MessageBody, @@ -26,6 +27,14 @@ pub struct EchoMessage { echo: String, } +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct InitMessage { + pub node_id: String, + #[serde(rename = "node_ids")] + pub nodes: Vec, +} + #[cfg(test)] mod test {