1
0

refactored library

This commit is contained in:
Ishan Jain 2024-01-03 13:00:48 +05:30
parent 74107a9c68
commit 7ff21c5ada
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
2 changed files with 61 additions and 57 deletions

View File

@ -2,36 +2,53 @@ pub mod types;
use crate::types::Message;
use std::{
collections::HashMap,
io::{stdin, stdout, BufRead, BufReader, Error as IoError, Stdin, Stdout, Write},
io::{stdin, stdout, BufRead, Error as IoError, Stdin, Stdout, Write},
rc::Rc,
sync::{Arc, Mutex},
};
pub struct Malestorm<'a> {
stdin: Stdin,
pub stdout: Stdout,
io: MalestormIo,
pub handlers: HashMap<&'a str, Box<dyn Fn(Message, Rc<Malestorm>)>>,
}
pub handlers: HashMap<&'a str, Box<dyn Fn(Message)>>,
struct MalestormIo {
stdin: Stdin,
stdout: Stdout,
}
impl<'a> Default for Malestorm<'a> {
fn default() -> Self {
Self {
io: MalestormIo {
stdin: stdin(),
stdout: stdout(),
},
handlers: HashMap::new(),
}
}
}
impl MalestormIo {
fn write(&self, buf: &[u8]) -> Result<(), IoError> {
let mut writer = self.stdout.lock();
writer.write_all(buf)?;
writer.flush()
}
fn read_line(&self, buf: &mut Vec<u8>) -> Result<usize, IoError> {
let mut reader = self.stdin.lock();
reader.read_until(b'\n', buf)
}
}
impl<'a> Malestorm<'a> {
pub fn run(self) {
let mut reader = BufReader::new(self.stdin);
let mut buf = String::new();
let program = Rc::new(self);
let mut buf = Vec::new();
loop {
let read = match reader.read_line(&mut buf) {
let read = match program.io.read_line(&mut buf) {
Ok(v) => v,
Err(e) => panic!("{:?}", e),
};
@ -40,7 +57,7 @@ impl<'a> Malestorm<'a> {
continue;
}
let message: Message = match serde_json::from_str(&buf) {
let message: Message = match serde_json::from_slice(&buf) {
Ok(v) => v,
Err(e) => {
eprintln!("error in parsing input: {:?}", e);
@ -50,7 +67,9 @@ impl<'a> Malestorm<'a> {
let mtype = message.body.message_type.clone();
let handler = match self.handlers.get(&mtype.as_str()) {
let pc = program.clone();
let handler = match program.handlers.get(&mtype.as_str()) {
Some(v) => v,
None => {
eprintln!("no handler found for {}", message.body.message_type);
@ -58,26 +77,26 @@ impl<'a> Malestorm<'a> {
}
};
handler(message);
handler(message, pc);
}
}
pub fn register(&mut self, name: &'a str, func: impl Fn(Message) + 'a + 'static) {
self.handlers
.insert(name, Box::new(move |msg: Message| func(msg)));
pub fn register(
&mut self,
name: &'a str,
func: impl Fn(Message, Rc<Malestorm>) + 'a + 'static,
) {
self.handlers.insert(name, Box::new(func));
}
pub fn send(&self, msg: Message) -> Result<(), IoError> {
pub fn reply(&self, mut msg: Message) -> Result<(), IoError> {
// Before replying, Swap src / dst in original message
// Add the correct value for in_reply_to
std::mem::swap(&mut msg.src, &mut msg.dest);
msg.body.in_reply_to = msg.body.msg_id;
let out = serde_json::to_vec(&msg)?;
match self.write(&out) {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
fn write(&self, buf: &[u8]) -> Result<usize, IoError> {
let mut writer = self.stdout.lock();
writer.write(buf)
self.io.write(&out).map(|_| Ok(()))?
}
}

View File

@ -1,44 +1,29 @@
#![allow(special_module_name)]
use std::{
io::{stdout, Write},
rc::Rc,
sync::{Arc, Mutex},
};
use std::rc::Rc;
use distributed_systems_flyio::{
types::{EchoMessage, Message, MessageBody},
types::{EchoMessage, Message},
Malestorm,
};
fn main() {
let mut program = Malestorm::default();
let stdout = Arc::new(stdout());
let handlers = &mut program.handlers;
handlers.insert(
"echo",
Box::new(move |mut msg: Message| {
let message_body: EchoMessage = serde_json::from_value(msg.body.message_body)
.expect("error in parsing message body");
program.register("echo", |mut msg: Message, program: Rc<Malestorm>| {
let message_body: EchoMessage =
serde_json::from_value(msg.body.message_body).expect("error in parsing message body");
msg.body.message_type = "echo_ok".into();
msg.body.message_body = serde_json::to_value(message_body).unwrap();
match serde_json::to_vec(&msg).map(|buf| {
let mut writer = stdout.lock();
writer.write_all(&buf).expect("error in writing content");
writer.flush()
}) {
match program.reply(msg) {
Ok(_) => (),
Err(e) => {
eprintln!("error in writing response: {}", e);
}
};
}),
);
}
});
program.run();
}