1
0

wip/g-counter: not working yet

This commit is contained in:
Ishan Jain 2024-05-06 18:05:26 +05:30
parent cec2eeda62
commit 864324139f
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
3 changed files with 166 additions and 166 deletions

View File

@ -6,19 +6,18 @@ pub mod types;
use crate::types::{Message, MessageBody, SeqKvInput}; use crate::types::{Message, MessageBody, SeqKvInput};
use futures::future::BoxFuture; use futures::future::BoxFuture;
use rand::{rngs::SmallRng, SeedableRng}; use rand::{rngs::SmallRng, Rng, SeedableRng};
use serde_json::Value;
use std::{ use std::{
collections::HashMap, collections::HashMap,
future::Future, future::Future,
io::{stdin, stdout, BufRead, Error as IoError, Stdout, Write}, io::{stdin, stdout, BufRead, Error as IoError, Stdout, Write},
pin::Pin, sync::Arc,
sync::{Arc, Mutex, RwLock},
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
}; };
use tokio::{
runtime::Handle, use tokio::sync::{
sync::oneshot::{self, Receiver, Sender}, oneshot::{self, Receiver, Sender},
Mutex, RwLock,
}; };
trait Callback = FnOnce(Message) -> Result<(), String> + Send + Sync + 'static; trait Callback = FnOnce(Message) -> Result<(), String> + Send + Sync + 'static;
@ -34,14 +33,14 @@ type Handler = Arc<
>; >;
pub struct Maelstorm { pub struct Maelstorm {
mutex: Mutex<()>,
pub node: Node, pub node: Node,
pub handlers: HashMap<String, Handler>, pub handlers: HashMap<String, Handler>,
callbacks: HashMap<i64, Box<dyn Callback>>, callbacks: HashMap<u64, Box<dyn Callback>>,
} }
pub struct Node { pub struct Node {
pub id: String, pub id: String,
pub value: HashMap<String, u64>,
pub nodes: Vec<String>, pub nodes: Vec<String>,
rng: SmallRng, rng: SmallRng,
} }
@ -54,13 +53,12 @@ impl Default for Maelstorm {
let seed = SystemTime::now() let seed = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.expect("time went backwards??"); .expect("time went backwards??");
Self { Self {
mutex: Mutex::new(()),
node: Node { node: Node {
id: String::new(), id: String::new(),
nodes: Vec::new(), nodes: Vec::new(),
rng: SmallRng::seed_from_u64(seed.as_secs()), rng: SmallRng::seed_from_u64(seed.as_secs()),
value: HashMap::new(),
}, },
handlers: HashMap::new(), handlers: HashMap::new(),
callbacks: HashMap::new(), callbacks: HashMap::new(),
@ -116,7 +114,7 @@ impl Maelstorm {
}; };
if let Some(reply_msg_id) = message.body.in_reply_to { if let Some(reply_msg_id) = message.body.in_reply_to {
let mut program = program.write().unwrap(); let mut program = program.write().await;
let callback = match program.callbacks.remove(&reply_msg_id) { let callback = match program.callbacks.remove(&reply_msg_id) {
Some(v) => v, Some(v) => v,
@ -135,7 +133,7 @@ impl Maelstorm {
} }
let handler = { let handler = {
let program = program.read().unwrap(); let program = program.read().await;
match program.handlers.get(&message.body.message_type) { match program.handlers.get(&message.body.message_type) {
Some(v) => v.clone(), Some(v) => v.clone(),
@ -161,31 +159,6 @@ impl Maelstorm {
self.node.id = node_id; self.node.id = node_id;
} }
pub fn sync(
&self,
dest: &str,
prev_msg: Message,
io: Arc<Mutex<MaelstormIo>>,
) -> Result<(), IoError> {
let msg = Message {
id: Some(prev_msg.id.unwrap_or(0) + 1),
src: self.node.id.clone(),
dest: dest.to_string(),
body: types::MessageBody {
msg_id: Some(prev_msg.body.msg_id.unwrap_or(0) + 1),
in_reply_to: None,
message_type: "counter_sync".to_string(),
message_body: Value::String("".to_string()),
},
};
let out = serde_json::to_vec(&msg)?;
let io = io.lock().unwrap();
io.write(&out)?;
Ok(())
}
pub fn register<F, Fut>(&mut self, name: &str, func: F) pub fn register<F, Fut>(&mut self, name: &str, func: F)
where where
F: Fn(Message, Arc<RwLock<Maelstorm>>, Arc<Mutex<MaelstormIo>>) -> Fut F: Fn(Message, Arc<RwLock<Maelstorm>>, Arc<Mutex<MaelstormIo>>) -> Fut
@ -210,15 +183,18 @@ impl Maelstorm {
self.rpc( self.rpc(
io, io,
msg, msg,
Box::new(|msg| { Box::new(|msg| match tx.send(msg) {
tx.send(msg) Ok(v) => Ok(()),
.map_err(|e| format!("error in sending to tx chan: {:?}", e))?; Err(e) => {
format!("error in sending to tx chan: {:?}", e);
Ok(()) Ok(())
}
}), }),
) )
.await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
match tokio::time::timeout(Duration::from_secs(2), rx).await { match tokio::time::timeout(Duration::from_secs(1), rx).await {
Ok(result) => match result { Ok(result) => match result {
Ok(v) => Ok(v), Ok(v) => Ok(v),
Err(e) => { Err(e) => {
@ -233,7 +209,7 @@ impl Maelstorm {
} }
} }
fn rpc( async fn rpc(
&mut self, &mut self,
io: Arc<Mutex<MaelstormIo>>, io: Arc<Mutex<MaelstormIo>>,
mut msg: Message, mut msg: Message,
@ -244,16 +220,20 @@ impl Maelstorm {
msg.body.msg_id = Some(next_msg_id); msg.body.msg_id = Some(next_msg_id);
self.send(io, msg.clone(), &msg.src) self.send(io, msg.clone(), &msg.dest).await
} }
pub fn reply(&mut self, io: Arc<Mutex<MaelstormIo>>, mut msg: Message) -> Result<(), IoError> { pub async fn reply(
&mut self,
io: Arc<Mutex<MaelstormIo>>,
mut msg: Message,
) -> Result<(), IoError> {
msg.body.in_reply_to = msg.body.msg_id; msg.body.in_reply_to = msg.body.msg_id;
msg.body.msg_id = None; msg.body.msg_id = None;
self.send(io, msg.clone(), &msg.src) self.send(io, msg.clone(), &msg.src).await
} }
fn send( async fn send(
&self, &self,
io: Arc<Mutex<MaelstormIo>>, io: Arc<Mutex<MaelstormIo>>,
mut msg: Message, mut msg: Message,
@ -266,7 +246,7 @@ impl Maelstorm {
msg.src = self.node.id.clone(); msg.src = self.node.id.clone();
let out = serde_json::to_vec(&msg)?; let out = serde_json::to_vec(&msg)?;
let io = io.lock().unwrap(); let io = io.lock().await;
io.write(&out).map(|_| Ok(()))? io.write(&out).map(|_| Ok(()))?
} }
@ -276,62 +256,65 @@ impl Maelstorm {
} }
} }
//impl Maelstorm { impl Maelstorm {
// pub fn read_counter( pub async fn read_counter(
// &mut self, &mut self,
// io: Arc<Mutex<MaelstormIo>>, io: Arc<Mutex<MaelstormIo>>,
// store: &str, node_to_read: String,
// node_to_read: String, ) -> Result<u64, String> {
// ) -> Result<u64, String> { let msg_id: u64 = self.node.rng.gen();
// let msg = Message {
// id: None, let msg = Message {
// src: self.node.id.clone(), id: None,
// dest: store.to_string(), src: self.node.id.clone(),
// body: MessageBody { dest: "seq-kv".to_string(),
// msg_id: Some(0), body: MessageBody {
// in_reply_to: None, msg_id: Some(msg_id),
// message_type: "read".to_string(), in_reply_to: None,
// message_body: serde_json::to_value(SeqKvInput { message_type: "read".to_string(),
// key: node_to_read, message_body: serde_json::to_value(SeqKvInput {
// value: None, key: node_to_read,
// }) value: None,
// .unwrap(), })
// }, .unwrap(),
// }; },
// let mut result = self.sync_rpc(io, msg)?; };
// let mut result = self.sync_rpc(io, msg).await?;
// let body: SeqKvInput = serde_json::from_value(result.body.message_body.take())
// .map_err(|e| format!("error in parsing response body: {}", e))?; let body: SeqKvInput = serde_json::from_value(result.body.message_body.take())
// .map_err(|e| format!("error in parsing response body: {}", e))?;
// Ok(body.value.unwrap())
// } Ok(body.value.unwrap())
// }
// pub fn write_counter(
// &mut self, pub async fn write_counter(
// io: Arc<Mutex<MaelstormIo>>, &mut self,
// store: &str, io: Arc<Mutex<MaelstormIo>>,
// val: u64, val: u64,
// ) -> Result<(), String> { ) -> Result<(), String> {
// let msg = Message { let msg_id: u64 = self.node.rng.gen();
// id: None,
// src: self.node.id.clone(), let msg = Message {
// dest: store.to_string(), id: None,
// body: MessageBody { src: self.node.id.clone(),
// msg_id: Some(0), dest: "seq-kv".to_string(),
// in_reply_to: None, body: MessageBody {
// message_type: "write".to_string(), msg_id: Some(msg_id),
// message_body: serde_json::to_value(SeqKvInput { in_reply_to: None,
// key: self.node.id.clone(), message_type: "write".to_string(),
// value: Some(val), message_body: serde_json::to_value(SeqKvInput {
// }) key: self.node.id.clone(),
// .unwrap(), value: Some(val),
// }, })
// }; .unwrap(),
// let mut result = self.sync_rpc(io, msg)?; },
// };
// let body: SeqKvInput = serde_json::from_value(result.body.message_body.take()) let mut result = self.sync_rpc(io, msg).await?;
// .map_err(|e| format!("error in parsing response body: {}", e))?;
// // TODO: could return the parsed confirmation from here
// Ok(()) serde_json::from_value(result.body.message_body.take())
// } .map_err(|e| format!("error in parsing response body: {}", e))?;
//}
Ok(())
}
}

View File

@ -4,15 +4,8 @@ use distributed_systems_flyio::{
types::{GrowCounterReadMessage, GrowCounterUpdateMessage, InitMessage, Message}, types::{GrowCounterReadMessage, GrowCounterUpdateMessage, InitMessage, Message},
Maelstorm, MaelstormIo, Maelstorm, MaelstormIo,
}; };
use std::sync::{Arc, Mutex, RwLock}; use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
async fn test(
msg: Message,
program: Arc<RwLock<Maelstorm>>,
io: Arc<Mutex<MaelstormIo>>,
) -> Result<(), String> {
Ok(())
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -28,72 +21,96 @@ async fn main() {
let message_body: InitMessage = serde_json::from_value(msg.body.message_body.take()) let message_body: InitMessage = serde_json::from_value(msg.body.message_body.take())
.map_err(|e| format!("error in parsing response body: {}", e))?; .map_err(|e| format!("error in parsing response body: {}", e))?;
let mut program = program.write().unwrap(); let mut program = program.write().await;
program.init(message_body.node_id, message_body.nodes); program.init(message_body.node_id, message_body.nodes);
msg.body.message_type = "init_ok".into(); msg.body.message_type = "init_ok".into();
program program
.reply(io, msg) .reply(io, msg)
.await
.map_err(|e| format!("init: error in writing response: {}", e)) .map_err(|e| format!("init: error in writing response: {}", e))
}, },
); );
// program.register( program.register(
// "add", "add",
// async |mut msg: Message, async |mut msg: Message,
// program: Arc<RwLock<Maelstorm>>, program: Arc<RwLock<Maelstorm>>,
// io: Arc<Mutex<MaelstormIo>>| io: Arc<Mutex<MaelstormIo>>|
// -> Result<(), String> { -> Result<(), String> {
// let body: GrowCounterUpdateMessage = let body: GrowCounterUpdateMessage =
// serde_json::from_value(msg.body.message_body.take()) serde_json::from_value(msg.body.message_body.take())
// .map_err(|e| format!("error in parsing response body: {}", e))?; .map_err(|e| format!("error in parsing response body: {}", e))?;
// msg.body.message_type = "add_ok".into(); msg.body.message_type = "add_ok".into();
// let mut program = program.write().unwrap(); let mut program = program.write().await;
// let id = program.node.id.clone(); let id = program.node.id.clone();
let current = *program.node.value.entry(id).or_insert(0);
// // let current = program match program
// // .read_counter(io.clone(), "seq-kv", id) .write_counter(io.clone(), current + body.delta)
// // .expect("error in reading value"); .await
{
Ok(v) => {
let id = program.node.id.clone();
*program.node.value.entry(id).or_insert(0) = current + body.delta;
// // program.write_counter(io.clone(), "seq-kv", current + body.delta)?; v
}
Err(e) if e.to_string() == "deadline has elapsed" => {
let id = program.node.id.clone();
*program.node.value.entry(id).or_insert(0) = current + body.delta;
}
Err(e) => return Err(e),
};
program
.reply(io, msg)
.await
.map_err(|e| format!("add: error in writing response: {}", e))
},
);
// program program.register(
// .reply(io, msg) "read",
// .map_err(|e| format!("add: error in writing response: {}", e)) async |mut msg: Message,
// }, program: Arc<RwLock<Maelstorm>>,
// ); io: Arc<Mutex<MaelstormIo>>|
-> Result<(), String> {
msg.body.message_type = "read_ok".into();
// program.register( let mut program = program.write().await;
// "read", let mut sum = 0;
// async |mut msg: Message, for node in program.node.nodes.clone() {
// program: Arc<RwLock<Maelstorm>>, eprintln!("reading {} from {}", node, msg.src);
// io: Arc<Mutex<MaelstormIo>>| if *node == msg.dest {
// -> Result<(), String> { sum += *program.node.value.entry(node).or_insert(0);
// msg.body.message_type = "read_ok".into(); continue;
}
// Sync first then add
// let mut program = program.write().unwrap(); sum += match program.read_counter(io.clone(), node).await {
// let mut sum = 0; Ok(v) => v,
// for node in program.node.nodes.clone() { Err(e) if e.to_string() == "deadline has elapsed" => {
// if *node == msg.src { let id = program.node.id.clone();
// sum += program.read_counter(io.clone(), "seq-kv", node)?; *program.node.value.entry(id).or_insert(0)
// continue; }
// } Err(e) => return Err(e),
// // Sync first then add };
}
// sum += program.read_counter(io.clone(), "seq-kv", node)?; msg.body.message_body =
// } serde_json::to_value(GrowCounterReadMessage { value: sum }).unwrap();
// msg.body.message_body = eprintln!("node = {} response = {:?}", msg.dest, msg);
// serde_json::to_value(GrowCounterReadMessage { value: sum }).unwrap();
// program program
// .reply(io, msg.clone()) .reply(io, msg.clone())
// .map_err(|e| format!("read: error in writing response: {}", e)) .await
// }, .map_err(|e| format!("read: error in writing response: {}", e))
// ); },
);
let io = MaelstormIo::default(); let io = MaelstormIo::default();

View File

@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
#[serde(deny_unknown_fields)] #[serde(deny_unknown_fields)]
pub struct Message { pub struct Message {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<i64>, pub id: Option<u64>,
pub src: String, pub src: String,
pub dest: String, pub dest: String,
pub body: MessageBody, pub body: MessageBody,
@ -13,9 +13,9 @@ pub struct Message {
#[derive(Debug, Serialize, Clone, Deserialize)] #[derive(Debug, Serialize, Clone, Deserialize)]
pub struct MessageBody { pub struct MessageBody {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub msg_id: Option<i64>, pub msg_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub in_reply_to: Option<i64>, pub in_reply_to: Option<u64>,
#[serde(rename = "type")] #[serde(rename = "type")]
pub message_type: String, pub message_type: String,