From e64f16dd95ba92d13ad8f65a6249562eec385a26 Mon Sep 17 00:00:00 2001 From: Ishan Jain Date: Sat, 4 May 2024 21:26:06 +0530 Subject: [PATCH] wip/g-counter: async refactor --- .gitignore | 7 +++++ src/lib.rs | 86 +++++++++++++++++++++++---------------------------- src/main.rs | 7 ++--- src/seq_kv.rs | 6 ++++ 4 files changed, 55 insertions(+), 51 deletions(-) diff --git a/.gitignore b/.gitignore index ea8c4bf..54a131d 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,8 @@ /target +output +store +test.file +input +ip +maelstorm/* +*.bz2 diff --git a/src/lib.rs b/src/lib.rs index f44008d..2ba28aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,6 @@ use crate::{seq_kv::MonotonicCounter, types::Message}; use rand::{rngs::SmallRng, Rng, SeedableRng}; use serde_json::Value; use std::{ - cell::RefCell, collections::HashMap, io::{stdin, stdout, BufRead, Error as IoError, Stdin, Stdout, Write}, sync::{Arc, Mutex, RwLock}, @@ -34,7 +33,6 @@ pub struct Node { rng: SmallRng, } pub struct MalestormIo { - stdin: Stdin, stdout: Stdout, } @@ -63,20 +61,11 @@ impl MalestormIo { writer.write_all(&[b'\n'])?; writer.flush() } - - fn read_line(&self, buf: &mut Vec) -> Result { - let mut reader = self.stdin.lock(); - - reader.read_until(b'\n', buf) - } } impl Default for MalestormIo { fn default() -> Self { - Self { - stdin: stdin(), - stdout: stdout(), - } + Self { stdout: stdout() } } } @@ -84,14 +73,14 @@ impl Malestorm { pub fn run(self, runtime: Runtime, io: MalestormIo) { let program = Arc::new(RwLock::new(self)); let io = Arc::new(Mutex::new(io)); - let loop_io = io.clone(); + let stdin = stdin(); loop { let mut buf = Vec::new(); { - let io = loop_io.lock().unwrap(); - let read = match io.read_line(&mut buf) { + let mut reader = stdin.lock(); + let read = match reader.read_until(b'\n', &mut buf) { Ok(v) => v, Err(e) => panic!("{:?}", e), }; @@ -100,9 +89,39 @@ impl Malestorm { } } - let p = program.clone(); + let message: Message = match serde_json::from_slice(&buf) { + Ok(v) => v, + Err(e) => { + eprintln!( + "error in parsing input: {:?} msg = {:?}", + e, + String::from_utf8_lossy(&buf) + ); + return; + } + }; + + let mtype = message.body.message_type.clone(); + + let handler: Arc = { + let program = program.read().unwrap(); + + match program.handlers.get(&mtype) { + Some(v) => v.clone(), + None => { + //eprintln!("no handler found for {}", message.body.message_type); + return; + } + } + }; + + let pc = program.clone(); let io = io.clone(); - runtime.spawn(async move { parse_request_and_handle(p, io, &buf).await }); + runtime.spawn(async move { + if let Err(e) = handler(message, pc, io) { + eprintln!("error in serving request: {}", e); + } + }); } } @@ -125,6 +144,9 @@ impl Malestorm { pub fn write_counter(&mut self, src: &str, v: u64) { self.node.counter.write(src, v); } + pub fn add_counter(&mut self, src: &str, v: u64) { + self.node.counter.add(src, v); + } pub fn sync( &self, @@ -187,34 +209,4 @@ async fn parse_request_and_handle( io: Arc>, buf: &[u8], ) { - let message: Message = match serde_json::from_slice(&buf) { - Ok(v) => v, - Err(e) => { - eprintln!( - "error in parsing input: {:?} msg = {:?}", - e, - String::from_utf8_lossy(&buf) - ); - return; - } - }; - - let mtype = message.body.message_type.clone(); - - let pc = program.clone(); - - let handler: Arc = { - let program = program.read().unwrap(); - - match program.handlers.get(&mtype) { - Some(v) => v.clone(), - None => { - //eprintln!("no handler found for {}", message.body.message_type); - return; - } - } - }; - if let Err(e) = handler(message, pc, io) { - eprintln!("error in serving request: {}", e); - } } diff --git a/src/main.rs b/src/main.rs index 21d0afa..ed7ed00 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,8 +43,7 @@ fn main() { msg.body.message_type = "add_ok".into(); let mut program = program.write().unwrap(); - let val = program.read_counter(&msg.src); - program.write_counter(&msg.src, val + body.delta); + program.add_counter(&msg.src, body.delta); program .send(io, msg.clone()) @@ -60,7 +59,7 @@ fn main() { -> Result<(), String> { msg.body.message_type = "read_ok".into(); - let program = program.write().unwrap(); + let program = program.read().unwrap(); let mut sum = 0; for node in program.get_nodes() { if node == msg.src { @@ -91,7 +90,7 @@ fn main() { -> Result<(), String> { msg.body.message_type = "counter_sync_ok".into(); - let program = program.write().unwrap(); + let program = program.read().unwrap(); msg.body.message_body = serde_json::to_value(GrowCounterReadMessage { value: program.read_counter(&msg.src), }) diff --git a/src/seq_kv.rs b/src/seq_kv.rs index ae67853..17a7182 100644 --- a/src/seq_kv.rs +++ b/src/seq_kv.rs @@ -19,4 +19,10 @@ impl MonotonicCounter { pub fn write(&mut self, node: &str, v: u64) { *self.counter.entry(node.to_string()).or_insert(0) = v; } + + pub fn add(&mut self, node: &str, v: u64) { + eprintln!("{:?}", self.counter); + + *self.counter.entry(node.to_string()).or_insert(0) += v; + } }