1
0

Added code to initialize node

This commit is contained in:
Ishan Jain 2024-01-04 00:28:06 +05:30
parent 7ff21c5ada
commit e911ee72d4
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
3 changed files with 87 additions and 37 deletions

View File

@ -1,16 +1,23 @@
pub mod types; pub mod types;
use crate::types::Message; use crate::types::Message;
use std::{ use std::{
cell::RefCell,
collections::HashMap, collections::HashMap,
io::{stdin, stdout, BufRead, Error as IoError, Stdin, Stdout, Write}, io::{stdin, stdout, BufRead, Error as IoError, Stdin, Stdout, Write},
rc::Rc, rc::Rc,
}; };
pub struct Malestorm<'a> { pub struct Malestorm<'a> {
node: RefCell<Node>,
io: MalestormIo, io: MalestormIo,
pub handlers: HashMap<&'a str, Box<dyn Fn(Message, Rc<Malestorm>)>>, pub handlers: HashMap<&'a str, Box<dyn Fn(Message, Rc<Malestorm>)>>,
} }
struct Node {
id: String,
nodes: Vec<String>,
}
struct MalestormIo { struct MalestormIo {
stdin: Stdin, stdin: Stdin,
stdout: Stdout, stdout: Stdout,
@ -19,6 +26,10 @@ struct MalestormIo {
impl<'a> Default for Malestorm<'a> { impl<'a> Default for Malestorm<'a> {
fn default() -> Self { fn default() -> Self {
Self { Self {
node: RefCell::new(Node {
id: String::new(),
nodes: Vec::new(),
}),
io: MalestormIo { io: MalestormIo {
stdin: stdin(), stdin: stdin(),
stdout: stdout(), stdout: stdout(),
@ -47,21 +58,25 @@ impl<'a> Malestorm<'a> {
let program = Rc::new(self); let program = Rc::new(self);
let mut buf = Vec::new(); let mut buf = Vec::new();
loop {
let read = match program.io.read_line(&mut buf) { let read = match program.io.read_line(&mut buf) {
Ok(v) => v, Ok(v) => v,
Err(e) => panic!("{:?}", e), Err(e) => panic!("{:?}", e),
}; };
if read == 0 { if read == 0 {
continue; return;
} }
let message: Message = match serde_json::from_slice(&buf) { let message: Message = match serde_json::from_slice(&buf) {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
eprintln!("error in parsing input: {:?}", e); eprintln!(
continue; "error in parsing input: {:?} msg = {}",
e,
String::from_utf8_lossy(&buf)
);
return;
} }
}; };
@ -73,12 +88,21 @@ impl<'a> Malestorm<'a> {
Some(v) => v, Some(v) => v,
None => { None => {
eprintln!("no handler found for {}", message.body.message_type); eprintln!("no handler found for {}", message.body.message_type);
continue; return;
} }
}; };
handler(message, pc); handler(message, pc);
} }
pub fn set_node_id(&self, node_id: String) {
let mut node = self.node.borrow_mut();
node.id = node_id;
}
pub fn set_nodes(&self, nodes: Vec<String>) {
let mut node = self.node.borrow_mut();
node.nodes = nodes;
} }
pub fn register( pub fn register(
@ -94,6 +118,9 @@ impl<'a> Malestorm<'a> {
// Add the correct value for in_reply_to // Add the correct value for in_reply_to
std::mem::swap(&mut msg.src, &mut msg.dest); std::mem::swap(&mut msg.src, &mut msg.dest);
let node = self.node.borrow();
msg.src = node.id.clone();
msg.body.in_reply_to = msg.body.msg_id; msg.body.in_reply_to = msg.body.msg_id;
let out = serde_json::to_vec(&msg)?; let out = serde_json::to_vec(&msg)?;

View File

@ -1,18 +1,32 @@
#![allow(special_module_name)]
use std::rc::Rc;
use distributed_systems_flyio::{ use distributed_systems_flyio::{
types::{EchoMessage, Message}, types::{EchoMessage, InitMessage, Message},
Malestorm, Malestorm,
}; };
use std::rc::Rc;
fn main() { fn main() {
let mut program = Malestorm::default(); let mut program = Malestorm::default();
program.register("init", |mut msg: Message, program: Rc<Malestorm>| {
let message_body: InitMessage = serde_json::from_value(msg.body.message_body.take())
.expect("error in parsing message body");
program.set_node_id(message_body.node_id);
program.set_nodes(message_body.nodes);
msg.body.message_type = "init_ok".into();
match program.reply(msg) {
Ok(_) => (),
Err(e) => {
eprintln!("init: error in writing response: {}", e);
}
}
});
program.register("echo", |mut msg: Message, program: Rc<Malestorm>| { program.register("echo", |mut msg: Message, program: Rc<Malestorm>| {
let message_body: EchoMessage = let message_body: EchoMessage = serde_json::from_value(msg.body.message_body.take())
serde_json::from_value(msg.body.message_body).expect("error in parsing message body"); .expect("error in parsing message body");
msg.body.message_type = "echo_ok".into(); msg.body.message_type = "echo_ok".into();
msg.body.message_body = serde_json::to_value(message_body).unwrap(); msg.body.message_body = serde_json::to_value(message_body).unwrap();
@ -20,7 +34,7 @@ fn main() {
match program.reply(msg) { match program.reply(msg) {
Ok(_) => (), Ok(_) => (),
Err(e) => { Err(e) => {
eprintln!("error in writing response: {}", e); eprintln!("echo: error in writing response: {}", e);
} }
} }
}); });

View File

@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)] #[serde(deny_unknown_fields)]
pub struct Message { pub struct Message {
pub id: Option<i64>,
pub src: String, pub src: String,
pub dest: String, pub dest: String,
pub body: MessageBody, pub body: MessageBody,
@ -26,6 +27,14 @@ pub struct EchoMessage {
echo: String, echo: String,
} }
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct InitMessage {
pub node_id: String,
#[serde(rename = "node_ids")]
pub nodes: Vec<String>,
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {