From 864324139f25e2f6c1a83c7885a5c08f4df82b90 Mon Sep 17 00:00:00 2001 From: Ishan Jain Date: Mon, 6 May 2024 18:05:26 +0530 Subject: [PATCH] wip/g-counter: not working yet --- src/lib.rs | 199 +++++++++++++++++++++++---------------------------- src/main.rs | 127 ++++++++++++++++++-------------- src/types.rs | 6 +- 3 files changed, 166 insertions(+), 166 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1e3bbc5..2b99fc0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,19 +6,18 @@ pub mod types; use crate::types::{Message, MessageBody, SeqKvInput}; use futures::future::BoxFuture; -use rand::{rngs::SmallRng, SeedableRng}; -use serde_json::Value; +use rand::{rngs::SmallRng, Rng, SeedableRng}; use std::{ collections::HashMap, future::Future, io::{stdin, stdout, BufRead, Error as IoError, Stdout, Write}, - pin::Pin, - sync::{Arc, Mutex, RwLock}, + sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; -use tokio::{ - runtime::Handle, - sync::oneshot::{self, Receiver, Sender}, + +use tokio::sync::{ + oneshot::{self, Receiver, Sender}, + Mutex, RwLock, }; trait Callback = FnOnce(Message) -> Result<(), String> + Send + Sync + 'static; @@ -34,14 +33,14 @@ type Handler = Arc< >; pub struct Maelstorm { - mutex: Mutex<()>, pub node: Node, pub handlers: HashMap, - callbacks: HashMap>, + callbacks: HashMap>, } pub struct Node { pub id: String, + pub value: HashMap, pub nodes: Vec, rng: SmallRng, } @@ -54,13 +53,12 @@ impl Default for Maelstorm { let seed = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("time went backwards??"); - Self { - mutex: Mutex::new(()), node: Node { id: String::new(), nodes: Vec::new(), rng: SmallRng::seed_from_u64(seed.as_secs()), + value: HashMap::new(), }, handlers: HashMap::new(), callbacks: HashMap::new(), @@ -116,7 +114,7 @@ impl Maelstorm { }; if let Some(reply_msg_id) = message.body.in_reply_to { - let mut program = program.write().unwrap(); + let mut program = program.write().await; let callback = match program.callbacks.remove(&reply_msg_id) { Some(v) => v, @@ -135,7 +133,7 @@ impl Maelstorm { } let handler = { - let program = program.read().unwrap(); + let program = program.read().await; match program.handlers.get(&message.body.message_type) { Some(v) => v.clone(), @@ -161,31 +159,6 @@ impl Maelstorm { self.node.id = node_id; } - pub fn sync( - &self, - dest: &str, - prev_msg: Message, - io: Arc>, - ) -> Result<(), IoError> { - let msg = Message { - id: Some(prev_msg.id.unwrap_or(0) + 1), - src: self.node.id.clone(), - dest: dest.to_string(), - body: types::MessageBody { - msg_id: Some(prev_msg.body.msg_id.unwrap_or(0) + 1), - in_reply_to: None, - message_type: "counter_sync".to_string(), - message_body: Value::String("".to_string()), - }, - }; - - let out = serde_json::to_vec(&msg)?; - let io = io.lock().unwrap(); - io.write(&out)?; - - Ok(()) - } - pub fn register(&mut self, name: &str, func: F) where F: Fn(Message, Arc>, Arc>) -> Fut @@ -210,15 +183,18 @@ impl Maelstorm { self.rpc( io, msg, - Box::new(|msg| { - tx.send(msg) - .map_err(|e| format!("error in sending to tx chan: {:?}", e))?; - Ok(()) + Box::new(|msg| match tx.send(msg) { + Ok(v) => Ok(()), + Err(e) => { + format!("error in sending to tx chan: {:?}", e); + Ok(()) + } }), ) + .await .map_err(|e| e.to_string())?; - match tokio::time::timeout(Duration::from_secs(2), rx).await { + match tokio::time::timeout(Duration::from_secs(1), rx).await { Ok(result) => match result { Ok(v) => Ok(v), Err(e) => { @@ -233,7 +209,7 @@ impl Maelstorm { } } - fn rpc( + async fn rpc( &mut self, io: Arc>, mut msg: Message, @@ -244,16 +220,20 @@ impl Maelstorm { msg.body.msg_id = Some(next_msg_id); - self.send(io, msg.clone(), &msg.src) + self.send(io, msg.clone(), &msg.dest).await } - pub fn reply(&mut self, io: Arc>, mut msg: Message) -> Result<(), IoError> { + pub async fn reply( + &mut self, + io: Arc>, + mut msg: Message, + ) -> Result<(), IoError> { msg.body.in_reply_to = msg.body.msg_id; msg.body.msg_id = None; - self.send(io, msg.clone(), &msg.src) + self.send(io, msg.clone(), &msg.src).await } - fn send( + async fn send( &self, io: Arc>, mut msg: Message, @@ -266,7 +246,7 @@ impl Maelstorm { msg.src = self.node.id.clone(); let out = serde_json::to_vec(&msg)?; - let io = io.lock().unwrap(); + let io = io.lock().await; io.write(&out).map(|_| Ok(()))? } @@ -276,62 +256,65 @@ impl Maelstorm { } } -//impl Maelstorm { -// pub fn read_counter( -// &mut self, -// io: Arc>, -// store: &str, -// node_to_read: String, -// ) -> Result { -// let msg = Message { -// id: None, -// src: self.node.id.clone(), -// dest: store.to_string(), -// body: MessageBody { -// msg_id: Some(0), -// in_reply_to: None, -// message_type: "read".to_string(), -// message_body: serde_json::to_value(SeqKvInput { -// key: node_to_read, -// value: None, -// }) -// .unwrap(), -// }, -// }; -// let mut result = self.sync_rpc(io, msg)?; -// -// let body: SeqKvInput = serde_json::from_value(result.body.message_body.take()) -// .map_err(|e| format!("error in parsing response body: {}", e))?; -// -// Ok(body.value.unwrap()) -// } -// -// pub fn write_counter( -// &mut self, -// io: Arc>, -// store: &str, -// val: u64, -// ) -> Result<(), String> { -// let msg = Message { -// id: None, -// src: self.node.id.clone(), -// dest: store.to_string(), -// body: MessageBody { -// msg_id: Some(0), -// in_reply_to: None, -// message_type: "write".to_string(), -// message_body: serde_json::to_value(SeqKvInput { -// key: self.node.id.clone(), -// value: Some(val), -// }) -// .unwrap(), -// }, -// }; -// let mut result = self.sync_rpc(io, msg)?; -// -// let body: SeqKvInput = serde_json::from_value(result.body.message_body.take()) -// .map_err(|e| format!("error in parsing response body: {}", e))?; -// -// Ok(()) -// } -//} +impl Maelstorm { + pub async fn read_counter( + &mut self, + io: Arc>, + node_to_read: String, + ) -> Result { + let msg_id: u64 = self.node.rng.gen(); + + let msg = Message { + id: None, + src: self.node.id.clone(), + dest: "seq-kv".to_string(), + body: MessageBody { + msg_id: Some(msg_id), + in_reply_to: None, + message_type: "read".to_string(), + message_body: serde_json::to_value(SeqKvInput { + key: node_to_read, + value: None, + }) + .unwrap(), + }, + }; + let mut result = self.sync_rpc(io, msg).await?; + + let body: SeqKvInput = serde_json::from_value(result.body.message_body.take()) + .map_err(|e| format!("error in parsing response body: {}", e))?; + + Ok(body.value.unwrap()) + } + + pub async fn write_counter( + &mut self, + io: Arc>, + val: u64, + ) -> Result<(), String> { + let msg_id: u64 = self.node.rng.gen(); + + let msg = Message { + id: None, + src: self.node.id.clone(), + dest: "seq-kv".to_string(), + body: MessageBody { + msg_id: Some(msg_id), + in_reply_to: None, + message_type: "write".to_string(), + message_body: serde_json::to_value(SeqKvInput { + key: self.node.id.clone(), + value: Some(val), + }) + .unwrap(), + }, + }; + let mut result = self.sync_rpc(io, msg).await?; + + // TODO: could return the parsed confirmation from here + serde_json::from_value(result.body.message_body.take()) + .map_err(|e| format!("error in parsing response body: {}", e))?; + + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index e26d49a..e547a4b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,15 +4,8 @@ use distributed_systems_flyio::{ types::{GrowCounterReadMessage, GrowCounterUpdateMessage, InitMessage, Message}, Maelstorm, MaelstormIo, }; -use std::sync::{Arc, Mutex, RwLock}; - -async fn test( - msg: Message, - program: Arc>, - io: Arc>, -) -> Result<(), String> { - Ok(()) -} +use std::sync::Arc; +use tokio::sync::{Mutex, RwLock}; #[tokio::main] async fn main() { @@ -28,72 +21,96 @@ async fn main() { let message_body: InitMessage = serde_json::from_value(msg.body.message_body.take()) .map_err(|e| format!("error in parsing response body: {}", e))?; - let mut program = program.write().unwrap(); + let mut program = program.write().await; program.init(message_body.node_id, message_body.nodes); msg.body.message_type = "init_ok".into(); program .reply(io, msg) + .await .map_err(|e| format!("init: error in writing response: {}", e)) }, ); - // program.register( - // "add", - // async |mut msg: Message, - // program: Arc>, - // io: Arc>| - // -> Result<(), String> { - // let body: GrowCounterUpdateMessage = - // serde_json::from_value(msg.body.message_body.take()) - // .map_err(|e| format!("error in parsing response body: {}", e))?; + program.register( + "add", + async |mut msg: Message, + program: Arc>, + io: Arc>| + -> Result<(), String> { + let body: GrowCounterUpdateMessage = + serde_json::from_value(msg.body.message_body.take()) + .map_err(|e| format!("error in parsing response body: {}", e))?; - // msg.body.message_type = "add_ok".into(); + msg.body.message_type = "add_ok".into(); - // let mut program = program.write().unwrap(); + let mut program = program.write().await; - // let id = program.node.id.clone(); + let id = program.node.id.clone(); + let current = *program.node.value.entry(id).or_insert(0); - // // let current = program - // // .read_counter(io.clone(), "seq-kv", id) - // // .expect("error in reading value"); + match program + .write_counter(io.clone(), current + body.delta) + .await + { + Ok(v) => { + let id = program.node.id.clone(); + *program.node.value.entry(id).or_insert(0) = current + body.delta; - // // program.write_counter(io.clone(), "seq-kv", current + body.delta)?; + v + } + Err(e) if e.to_string() == "deadline has elapsed" => { + let id = program.node.id.clone(); + *program.node.value.entry(id).or_insert(0) = current + body.delta; + } + Err(e) => return Err(e), + }; + program + .reply(io, msg) + .await + .map_err(|e| format!("add: error in writing response: {}", e)) + }, + ); - // program - // .reply(io, msg) - // .map_err(|e| format!("add: error in writing response: {}", e)) - // }, - // ); + program.register( + "read", + async |mut msg: Message, + program: Arc>, + io: Arc>| + -> Result<(), String> { + msg.body.message_type = "read_ok".into(); - // program.register( - // "read", - // async |mut msg: Message, - // program: Arc>, - // io: Arc>| - // -> Result<(), String> { - // msg.body.message_type = "read_ok".into(); + let mut program = program.write().await; + let mut sum = 0; + for node in program.node.nodes.clone() { + eprintln!("reading {} from {}", node, msg.src); + if *node == msg.dest { + sum += *program.node.value.entry(node).or_insert(0); + continue; + } + // Sync first then add - // let mut program = program.write().unwrap(); - // let mut sum = 0; - // for node in program.node.nodes.clone() { - // if *node == msg.src { - // sum += program.read_counter(io.clone(), "seq-kv", node)?; - // continue; - // } - // // Sync first then add + sum += match program.read_counter(io.clone(), node).await { + Ok(v) => v, + Err(e) if e.to_string() == "deadline has elapsed" => { + let id = program.node.id.clone(); + *program.node.value.entry(id).or_insert(0) + } + Err(e) => return Err(e), + }; + } - // sum += program.read_counter(io.clone(), "seq-kv", node)?; - // } + msg.body.message_body = + serde_json::to_value(GrowCounterReadMessage { value: sum }).unwrap(); - // msg.body.message_body = - // serde_json::to_value(GrowCounterReadMessage { value: sum }).unwrap(); + eprintln!("node = {} response = {:?}", msg.dest, msg); - // program - // .reply(io, msg.clone()) - // .map_err(|e| format!("read: error in writing response: {}", e)) - // }, - // ); + program + .reply(io, msg.clone()) + .await + .map_err(|e| format!("read: error in writing response: {}", e)) + }, + ); let io = MaelstormIo::default(); diff --git a/src/types.rs b/src/types.rs index 415f51b..d0b002c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] pub struct Message { #[serde(skip_serializing_if = "Option::is_none")] - pub id: Option, + pub id: Option, pub src: String, pub dest: String, pub body: MessageBody, @@ -13,9 +13,9 @@ pub struct Message { #[derive(Debug, Serialize, Clone, Deserialize)] pub struct MessageBody { #[serde(skip_serializing_if = "Option::is_none")] - pub msg_id: Option, + pub msg_id: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub in_reply_to: Option, + pub in_reply_to: Option, #[serde(rename = "type")] pub message_type: String,