diff --git a/src/lib.rs b/src/lib.rs index 2b99fc0..c1e15f0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,7 +40,6 @@ pub struct Maelstorm { pub struct Node { pub id: String, - pub value: HashMap, pub nodes: Vec, rng: SmallRng, } @@ -58,7 +57,6 @@ impl Default for Maelstorm { id: String::new(), nodes: Vec::new(), rng: SmallRng::seed_from_u64(seed.as_secs()), - value: HashMap::new(), }, handlers: HashMap::new(), callbacks: HashMap::new(), @@ -279,7 +277,11 @@ impl Maelstorm { .unwrap(), }, }; - let mut result = self.sync_rpc(io, msg).await?; + let mut result = match self.sync_rpc(io, msg).await { + Ok(v) => v, + Err(e) if e.to_string() == "deadline has elapsed" => return Ok(0), + Err(e) => return Err(e), + }; let body: SeqKvInput = serde_json::from_value(result.body.message_body.take()) .map_err(|e| format!("error in parsing response body: {}", e))?; @@ -309,7 +311,11 @@ impl Maelstorm { .unwrap(), }, }; - let mut result = self.sync_rpc(io, msg).await?; + let mut result = match self.sync_rpc(io, msg).await { + Ok(v) => v, + Err(e) if e.to_string() == "deadline has elapsed" => return Ok(()), + Err(e) => return Err(e), + }; // TODO: could return the parsed confirmation from here serde_json::from_value(result.body.message_body.take()) diff --git a/src/main.rs b/src/main.rs index e547a4b..5f52eb9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,26 +45,11 @@ async fn main() { msg.body.message_type = "add_ok".into(); let mut program = program.write().await; + let current = program.read_counter(io.clone(), msg.dest.clone()).await?; - let id = program.node.id.clone(); - let current = *program.node.value.entry(id).or_insert(0); - - match program + program .write_counter(io.clone(), current + body.delta) - .await - { - Ok(v) => { - let id = program.node.id.clone(); - *program.node.value.entry(id).or_insert(0) = current + body.delta; - - v - } - Err(e) if e.to_string() == "deadline has elapsed" => { - let id = program.node.id.clone(); - *program.node.value.entry(id).or_insert(0) = current + body.delta; - } - Err(e) => return Err(e), - }; + .await?; program .reply(io, msg) .await @@ -82,29 +67,25 @@ async fn main() { let mut program = program.write().await; let mut sum = 0; + for node in program.node.nodes.clone() { - eprintln!("reading {} from {}", node, msg.src); - if *node == msg.dest { - sum += *program.node.value.entry(node).or_insert(0); + if node == msg.dest { + sum += program.read_counter(io.clone(), node.clone()).await?; continue; } - // Sync first then add - sum += match program.read_counter(io.clone(), node).await { - Ok(v) => v, - Err(e) if e.to_string() == "deadline has elapsed" => { - let id = program.node.id.clone(); - *program.node.value.entry(id).or_insert(0) - } - Err(e) => return Err(e), - }; + let mut msg = msg.clone(); + msg.body.message_type = "local_read".to_string(); + msg.body.in_reply_to = None; + + let mut resp = program.sync_rpc(io.clone(), msg).await?; + let body: GrowCounterReadMessage = + serde_json::from_value(resp.body.message_body.take()) + .map_err(|e| format!("error in parsing response body: {}", e))?; + + sum += body.value; } - msg.body.message_body = - serde_json::to_value(GrowCounterReadMessage { value: sum }).unwrap(); - - eprintln!("node = {} response = {:?}", msg.dest, msg); - program .reply(io, msg.clone()) .await @@ -112,6 +93,28 @@ async fn main() { }, ); + program.register( + "local_read", + async |mut msg: Message, + program: Arc>, + io: Arc>| + -> Result<(), String> { + msg.body.message_type = "local_read_ok".into(); + + let mut program = program.write().await; + + let current = program.read_counter(io.clone(), msg.dest.clone()).await?; + + msg.body.message_body = + serde_json::to_value(GrowCounterReadMessage { value: current }).unwrap(); + + program + .reply(io, msg) + .await + .map_err(|e| format!("add: error in writing response: {}", e)) + }, + ); + let io = MaelstormIo::default(); program.run(io).await;