diff --git a/src/lib.rs b/src/lib.rs index c1e15f0..c896bd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ use std::{ future::Future, io::{stdin, stdout, BufRead, Error as IoError, Stdout, Write}, sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use tokio::sync::{ @@ -20,7 +20,7 @@ use tokio::sync::{ Mutex, RwLock, }; -trait Callback = FnOnce(Message) -> Result<(), String> + Send + Sync + 'static; +type Callback = Box BoxFuture<'static, Result<(), String>> + Send + Sync>; type Handler = Arc< dyn Fn( @@ -35,11 +35,13 @@ type Handler = Arc< pub struct Maelstorm { pub node: Node, pub handlers: HashMap, - callbacks: HashMap>, + callbacks: HashMap, } pub struct Node { pub id: String, + pub counter: u64, + pub other_counter: u64, pub nodes: Vec, rng: SmallRng, } @@ -55,8 +57,10 @@ impl Default for Maelstorm { Self { node: Node { id: String::new(), + counter: 0, + other_counter: 0, nodes: Vec::new(), - rng: SmallRng::seed_from_u64(seed.as_secs()), + rng: SmallRng::seed_from_u64(seed.as_nanos() as u64), }, handlers: HashMap::new(), callbacks: HashMap::new(), @@ -111,7 +115,10 @@ impl Maelstorm { } }; + eprintln!("INCOMING MESSAGE = {:?}", message); + if let Some(reply_msg_id) = message.body.in_reply_to { + eprintln!("REPLY ID FOR CALLBACK FOUND = {:?}", message); let mut program = program.write().await; let callback = match program.callbacks.remove(&reply_msg_id) { @@ -122,11 +129,12 @@ impl Maelstorm { } }; - if let Err(e) = callback(message) - /* add await*/ - { + if let Err(e) = callback(message.clone()).await { eprintln!("error in callback: {e}"); + } else { + eprintln!("SUCCESSFULLY CALLED CALLBACK FOR = {:?}", message); } + continue; } @@ -177,44 +185,66 @@ impl Maelstorm { msg: Message, ) -> Result { let (tx, rx): (Sender, Receiver) = oneshot::channel(); + let m = msg.clone(); - self.rpc( - io, - msg, - Box::new(|msg| match tx.send(msg) { - Ok(v) => Ok(()), - Err(e) => { - format!("error in sending to tx chan: {:?}", e); - Ok(()) - } - }), - ) + self.rpc(io, msg, async |msg: Message| match tx.send(msg.clone()) { + Ok(v) => { + eprintln!("SENT RESPONSE INTO CHANNEL = {:?}", msg); + + Ok(()) + } + Err(e) => { + format!("error in sending to tx chan: {:?}", e); + Ok(()) + } + }) .await .map_err(|e| e.to_string())?; + let t = Instant::now(); match tokio::time::timeout(Duration::from_secs(1), rx).await { - Ok(result) => match result { - Ok(v) => Ok(v), - Err(e) => { - eprintln!("sync callback error: {}", e); - Err(e.to_string()) + Ok(result) => { + eprintln!( + "GOT RESULT BEFORE TIMEOUT = {:?} elapsed = {:?}", + result, + t.elapsed() + ); + + match result { + Ok(v) => Ok(v), + Err(e) => { + eprintln!("sync callback error: {}", e); + Err(e.to_string()) + } } - }, + } Err(e) => { - eprintln!("sync callback timeout: {}", e); + eprintln!( + "sync callback timeout: {} msg = {:?} callback queue {:?}", + e, + m, + self.callbacks.keys() + ); Err(e.to_string()) } } } - async fn rpc( + async fn rpc( &mut self, io: Arc>, mut msg: Message, - handler: impl Callback, - ) -> Result<(), IoError> { - let next_msg_id = msg.body.msg_id.unwrap() + 1; - self.callbacks.insert(next_msg_id, Box::new(handler)); + handler: F, + ) -> Result<(), IoError> + where + F: FnOnce(Message) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + let next_msg_id = msg.body.msg_id.unwrap(); + + eprintln!("INSERTING {} {:?}", next_msg_id, msg); + self.callbacks + .insert(next_msg_id, Box::new(move |a| Box::pin(handler(a)))); msg.body.msg_id = Some(next_msg_id); diff --git a/src/main.rs b/src/main.rs index 5f52eb9..6dab5a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,11 +45,22 @@ async fn main() { msg.body.message_type = "add_ok".into(); let mut program = program.write().await; - let current = program.read_counter(io.clone(), msg.dest.clone()).await?; + program.node.counter += body.delta; + let current = program.node.counter; + + program.write_counter(io.clone(), current).await?; + + let mut other_counter = 0; + + for node in program.node.nodes.clone() { + if node == msg.dest { + continue; + } + let resp = program.read_counter(io.clone(), node).await?; + other_counter += resp; + } + program.node.other_counter = other_counter; - program - .write_counter(io.clone(), current + body.delta) - .await?; program .reply(io, msg) .await @@ -66,52 +77,30 @@ async fn main() { msg.body.message_type = "read_ok".into(); let mut program = program.write().await; - let mut sum = 0; + let mut other_counter = 0; for node in program.node.nodes.clone() { if node == msg.dest { - sum += program.read_counter(io.clone(), node.clone()).await?; continue; } - - let mut msg = msg.clone(); - msg.body.message_type = "local_read".to_string(); - msg.body.in_reply_to = None; - - let mut resp = program.sync_rpc(io.clone(), msg).await?; - let body: GrowCounterReadMessage = - serde_json::from_value(resp.body.message_body.take()) - .map_err(|e| format!("error in parsing response body: {}", e))?; - - sum += body.value; + let resp = program.read_counter(io.clone(), node).await?; + other_counter += resp; } + program.node.other_counter = other_counter; - program - .reply(io, msg.clone()) - .await - .map_err(|e| format!("read: error in writing response: {}", e)) - }, - ); - - program.register( - "local_read", - async |mut msg: Message, - program: Arc>, - io: Arc>| - -> Result<(), String> { - msg.body.message_type = "local_read_ok".into(); - - let mut program = program.write().await; - - let current = program.read_counter(io.clone(), msg.dest.clone()).await?; - - msg.body.message_body = - serde_json::to_value(GrowCounterReadMessage { value: current }).unwrap(); + eprintln!( + "READ OP {} {}", + program.node.counter, program.node.other_counter + ); + let resp = GrowCounterReadMessage { + value: program.node.counter + program.node.other_counter, + }; + msg.body.message_body = serde_json::to_value(resp).unwrap(); program .reply(io, msg) .await - .map_err(|e| format!("add: error in writing response: {}", e)) + .map_err(|e| format!("read: error in writing response: {}", e)) }, );