diff --git a/src/lib.rs b/src/lib.rs index fef562a..465e609 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,36 +2,53 @@ pub mod types; use crate::types::Message; use std::{ collections::HashMap, - io::{stdin, stdout, BufRead, BufReader, Error as IoError, Stdin, Stdout, Write}, + io::{stdin, stdout, BufRead, Error as IoError, Stdin, Stdout, Write}, rc::Rc, - sync::{Arc, Mutex}, }; pub struct Malestorm<'a> { - stdin: Stdin, - pub stdout: Stdout, + io: MalestormIo, + pub handlers: HashMap<&'a str, Box)>>, +} - pub handlers: HashMap<&'a str, Box>, +struct MalestormIo { + stdin: Stdin, + stdout: Stdout, } impl<'a> Default for Malestorm<'a> { fn default() -> Self { Self { - stdin: stdin(), - stdout: stdout(), + io: MalestormIo { + stdin: stdin(), + stdout: stdout(), + }, handlers: HashMap::new(), } } } +impl MalestormIo { + fn write(&self, buf: &[u8]) -> Result<(), IoError> { + let mut writer = self.stdout.lock(); + writer.write_all(buf)?; + writer.flush() + } + + fn read_line(&self, buf: &mut Vec) -> Result { + let mut reader = self.stdin.lock(); + + reader.read_until(b'\n', buf) + } +} + impl<'a> Malestorm<'a> { pub fn run(self) { - let mut reader = BufReader::new(self.stdin); - - let mut buf = String::new(); + let program = Rc::new(self); + let mut buf = Vec::new(); loop { - let read = match reader.read_line(&mut buf) { + let read = match program.io.read_line(&mut buf) { Ok(v) => v, Err(e) => panic!("{:?}", e), }; @@ -40,7 +57,7 @@ impl<'a> Malestorm<'a> { continue; } - let message: Message = match serde_json::from_str(&buf) { + let message: Message = match serde_json::from_slice(&buf) { Ok(v) => v, Err(e) => { eprintln!("error in parsing input: {:?}", e); @@ -50,7 +67,9 @@ impl<'a> Malestorm<'a> { let mtype = message.body.message_type.clone(); - let handler = match self.handlers.get(&mtype.as_str()) { + let pc = program.clone(); + + let handler = match program.handlers.get(&mtype.as_str()) { Some(v) => v, None => { eprintln!("no handler found for {}", message.body.message_type); @@ -58,26 +77,26 @@ impl<'a> Malestorm<'a> { } }; - handler(message); + handler(message, pc); } } - pub fn register(&mut self, name: &'a str, func: impl Fn(Message) + 'a + 'static) { - self.handlers - .insert(name, Box::new(move |msg: Message| func(msg))); + pub fn register( + &mut self, + name: &'a str, + func: impl Fn(Message, Rc) + 'a + 'static, + ) { + self.handlers.insert(name, Box::new(func)); } - pub fn send(&self, msg: Message) -> Result<(), IoError> { + pub fn reply(&self, mut msg: Message) -> Result<(), IoError> { + // Before replying, Swap src / dst in original message + // Add the correct value for in_reply_to + + std::mem::swap(&mut msg.src, &mut msg.dest); + msg.body.in_reply_to = msg.body.msg_id; + let out = serde_json::to_vec(&msg)?; - match self.write(&out) { - Ok(_) => Ok(()), - Err(e) => Err(e), - } - } - - fn write(&self, buf: &[u8]) -> Result { - let mut writer = self.stdout.lock(); - - writer.write(buf) + self.io.write(&out).map(|_| Ok(()))? } } diff --git a/src/main.rs b/src/main.rs index 07746a8..928a715 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,44 +1,29 @@ #![allow(special_module_name)] -use std::{ - io::{stdout, Write}, - rc::Rc, - sync::{Arc, Mutex}, -}; +use std::rc::Rc; use distributed_systems_flyio::{ - types::{EchoMessage, Message, MessageBody}, + types::{EchoMessage, Message}, Malestorm, }; fn main() { let mut program = Malestorm::default(); - let stdout = Arc::new(stdout()); + program.register("echo", |mut msg: Message, program: Rc| { + let message_body: EchoMessage = + serde_json::from_value(msg.body.message_body).expect("error in parsing message body"); - let handlers = &mut program.handlers; + msg.body.message_type = "echo_ok".into(); + msg.body.message_body = serde_json::to_value(message_body).unwrap(); - handlers.insert( - "echo", - Box::new(move |mut msg: Message| { - let message_body: EchoMessage = serde_json::from_value(msg.body.message_body) - .expect("error in parsing message body"); - - msg.body.message_type = "echo_ok".into(); - msg.body.message_body = serde_json::to_value(message_body).unwrap(); - - match serde_json::to_vec(&msg).map(|buf| { - let mut writer = stdout.lock(); - writer.write_all(&buf).expect("error in writing content"); - writer.flush() - }) { - Ok(_) => (), - Err(e) => { - eprintln!("error in writing response: {}", e); - } - }; - }), - ); + match program.reply(msg) { + Ok(_) => (), + Err(e) => { + eprintln!("error in writing response: {}", e); + } + } + }); program.run(); }