diff --git a/src/lib.rs b/src/lib.rs index 591128c..fef562a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,25 +1,83 @@ -use std::io::{stdin, stdout, Stdin, Stdout}; +pub mod types; +use crate::types::Message; +use std::{ + collections::HashMap, + io::{stdin, stdout, BufRead, BufReader, Error as IoError, Stdin, Stdout, Write}, + rc::Rc, + sync::{Arc, Mutex}, +}; -pub struct Malestorm { +pub struct Malestorm<'a> { stdin: Stdin, - stdout: Stdout, + pub stdout: Stdout, + + pub handlers: HashMap<&'a str, Box>, } -impl Default for Malestorm { +impl<'a> Default for Malestorm<'a> { fn default() -> Self { Self { stdin: stdin(), stdout: stdout(), + handlers: HashMap::new(), } } } -impl Malestorm { +impl<'a> Malestorm<'a> { pub fn run(self) { - loop {} + let mut reader = BufReader::new(self.stdin); + + let mut buf = String::new(); + + loop { + let read = match reader.read_line(&mut buf) { + Ok(v) => v, + Err(e) => panic!("{:?}", e), + }; + + if read == 0 { + continue; + } + + let message: Message = match serde_json::from_str(&buf) { + Ok(v) => v, + Err(e) => { + eprintln!("error in parsing input: {:?}", e); + continue; + } + }; + + let mtype = message.body.message_type.clone(); + + let handler = match self.handlers.get(&mtype.as_str()) { + Some(v) => v, + None => { + eprintln!("no handler found for {}", message.body.message_type); + continue; + } + }; + + handler(message); + } } - fn read() {} + pub fn register(&mut self, name: &'a str, func: impl Fn(Message) + 'a + 'static) { + self.handlers + .insert(name, Box::new(move |msg: Message| func(msg))); + } - fn write() {} + pub fn send(&self, msg: Message) -> Result<(), IoError> { + let out = serde_json::to_vec(&msg)?; + match self.write(&out) { + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } + + fn write(&self, buf: &[u8]) -> Result { + let mut writer = self.stdout.lock(); + + writer.write(buf) + } } diff --git a/src/main.rs b/src/main.rs index 5cd4cdd..07746a8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,44 @@ #![allow(special_module_name)] -mod lib; -mod types; +use std::{ + io::{stdout, Write}, + rc::Rc, + sync::{Arc, Mutex}, +}; -use crate::lib::Malestorm; +use distributed_systems_flyio::{ + types::{EchoMessage, Message, MessageBody}, + Malestorm, +}; fn main() { - let program = Malestorm::default(); + let mut program = Malestorm::default(); + + let stdout = Arc::new(stdout()); + + let handlers = &mut program.handlers; + + handlers.insert( + "echo", + Box::new(move |mut msg: Message| { + let message_body: EchoMessage = serde_json::from_value(msg.body.message_body) + .expect("error in parsing message body"); + + msg.body.message_type = "echo_ok".into(); + msg.body.message_body = serde_json::to_value(message_body).unwrap(); + + match serde_json::to_vec(&msg).map(|buf| { + let mut writer = stdout.lock(); + writer.write_all(&buf).expect("error in writing content"); + writer.flush() + }) { + Ok(_) => (), + Err(e) => { + eprintln!("error in writing response: {}", e); + } + }; + }), + ); + + program.run(); } diff --git a/src/types.rs b/src/types.rs index 2dd231e..b2ed2f7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -9,21 +9,15 @@ pub struct Message { } #[derive(Debug, Serialize, Deserialize)] -pub(crate) struct MessageBody { - msg_id: Option, - in_reply_to: Option, +pub struct MessageBody { + pub msg_id: Option, + pub in_reply_to: Option, #[serde(rename = "type")] - message_type: String, + pub message_type: String, #[serde(flatten)] - pub message_body: MessageEnum, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(untagged, deny_unknown_fields)] -pub enum MessageEnum { - Echo(EchoMessage), + pub message_body: serde_json::Value, } #[derive(Debug, Serialize, Deserialize)] @@ -37,7 +31,6 @@ mod test { use super::*; - #[test] pub fn extraneous_fields_fail() { let body = "{ \"src\": \"c1\",