1
0

g-counter: wip

This commit is contained in:
Ishan Jain 2024-05-07 21:28:10 +05:30
parent 16df5532e6
commit 90fe99f0c6
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
2 changed files with 88 additions and 69 deletions

View File

@ -12,7 +12,7 @@ use std::{
future::Future, future::Future,
io::{stdin, stdout, BufRead, Error as IoError, Stdout, Write}, io::{stdin, stdout, BufRead, Error as IoError, Stdout, Write},
sync::Arc, sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, Instant, SystemTime, UNIX_EPOCH},
}; };
use tokio::sync::{ use tokio::sync::{
@ -20,7 +20,7 @@ use tokio::sync::{
Mutex, RwLock, Mutex, RwLock,
}; };
trait Callback = FnOnce(Message) -> Result<(), String> + Send + Sync + 'static; type Callback = Box<dyn FnOnce(Message) -> BoxFuture<'static, Result<(), String>> + Send + Sync>;
type Handler = Arc< type Handler = Arc<
dyn Fn( dyn Fn(
@ -35,11 +35,13 @@ type Handler = Arc<
pub struct Maelstorm { pub struct Maelstorm {
pub node: Node, pub node: Node,
pub handlers: HashMap<String, Handler>, pub handlers: HashMap<String, Handler>,
callbacks: HashMap<u64, Box<dyn Callback>>, callbacks: HashMap<u64, Callback>,
} }
pub struct Node { pub struct Node {
pub id: String, pub id: String,
pub counter: u64,
pub other_counter: u64,
pub nodes: Vec<String>, pub nodes: Vec<String>,
rng: SmallRng, rng: SmallRng,
} }
@ -55,8 +57,10 @@ impl Default for Maelstorm {
Self { Self {
node: Node { node: Node {
id: String::new(), id: String::new(),
counter: 0,
other_counter: 0,
nodes: Vec::new(), nodes: Vec::new(),
rng: SmallRng::seed_from_u64(seed.as_secs()), rng: SmallRng::seed_from_u64(seed.as_nanos() as u64),
}, },
handlers: HashMap::new(), handlers: HashMap::new(),
callbacks: HashMap::new(), callbacks: HashMap::new(),
@ -111,7 +115,10 @@ impl Maelstorm {
} }
}; };
eprintln!("INCOMING MESSAGE = {:?}", message);
if let Some(reply_msg_id) = message.body.in_reply_to { if let Some(reply_msg_id) = message.body.in_reply_to {
eprintln!("REPLY ID FOR CALLBACK FOUND = {:?}", message);
let mut program = program.write().await; let mut program = program.write().await;
let callback = match program.callbacks.remove(&reply_msg_id) { let callback = match program.callbacks.remove(&reply_msg_id) {
@ -122,11 +129,12 @@ impl Maelstorm {
} }
}; };
if let Err(e) = callback(message) if let Err(e) = callback(message.clone()).await {
/* add await*/
{
eprintln!("error in callback: {e}"); eprintln!("error in callback: {e}");
} else {
eprintln!("SUCCESSFULLY CALLED CALLBACK FOR = {:?}", message);
} }
continue; continue;
} }
@ -177,44 +185,66 @@ impl Maelstorm {
msg: Message, msg: Message,
) -> Result<Message, String> { ) -> Result<Message, String> {
let (tx, rx): (Sender<Message>, Receiver<Message>) = oneshot::channel(); let (tx, rx): (Sender<Message>, Receiver<Message>) = oneshot::channel();
let m = msg.clone();
self.rpc( self.rpc(io, msg, async |msg: Message| match tx.send(msg.clone()) {
io, Ok(v) => {
msg, eprintln!("SENT RESPONSE INTO CHANNEL = {:?}", msg);
Box::new(|msg| match tx.send(msg) {
Ok(v) => Ok(()), Ok(())
Err(e) => { }
format!("error in sending to tx chan: {:?}", e); Err(e) => {
Ok(()) format!("error in sending to tx chan: {:?}", e);
} Ok(())
}), }
) })
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let t = Instant::now();
match tokio::time::timeout(Duration::from_secs(1), rx).await { match tokio::time::timeout(Duration::from_secs(1), rx).await {
Ok(result) => match result { Ok(result) => {
Ok(v) => Ok(v), eprintln!(
Err(e) => { "GOT RESULT BEFORE TIMEOUT = {:?} elapsed = {:?}",
eprintln!("sync callback error: {}", e); result,
Err(e.to_string()) t.elapsed()
);
match result {
Ok(v) => Ok(v),
Err(e) => {
eprintln!("sync callback error: {}", e);
Err(e.to_string())
}
} }
}, }
Err(e) => { Err(e) => {
eprintln!("sync callback timeout: {}", e); eprintln!(
"sync callback timeout: {} msg = {:?} callback queue {:?}",
e,
m,
self.callbacks.keys()
);
Err(e.to_string()) Err(e.to_string())
} }
} }
} }
async fn rpc( async fn rpc<F, Fut>(
&mut self, &mut self,
io: Arc<Mutex<MaelstormIo>>, io: Arc<Mutex<MaelstormIo>>,
mut msg: Message, mut msg: Message,
handler: impl Callback, handler: F,
) -> Result<(), IoError> { ) -> Result<(), IoError>
let next_msg_id = msg.body.msg_id.unwrap() + 1; where
self.callbacks.insert(next_msg_id, Box::new(handler)); F: FnOnce(Message) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), String>> + Send + 'static,
{
let next_msg_id = msg.body.msg_id.unwrap();
eprintln!("INSERTING {} {:?}", next_msg_id, msg);
self.callbacks
.insert(next_msg_id, Box::new(move |a| Box::pin(handler(a))));
msg.body.msg_id = Some(next_msg_id); msg.body.msg_id = Some(next_msg_id);

View File

@ -45,11 +45,22 @@ async fn main() {
msg.body.message_type = "add_ok".into(); msg.body.message_type = "add_ok".into();
let mut program = program.write().await; let mut program = program.write().await;
let current = program.read_counter(io.clone(), msg.dest.clone()).await?; program.node.counter += body.delta;
let current = program.node.counter;
program.write_counter(io.clone(), current).await?;
let mut other_counter = 0;
for node in program.node.nodes.clone() {
if node == msg.dest {
continue;
}
let resp = program.read_counter(io.clone(), node).await?;
other_counter += resp;
}
program.node.other_counter = other_counter;
program
.write_counter(io.clone(), current + body.delta)
.await?;
program program
.reply(io, msg) .reply(io, msg)
.await .await
@ -66,52 +77,30 @@ async fn main() {
msg.body.message_type = "read_ok".into(); msg.body.message_type = "read_ok".into();
let mut program = program.write().await; let mut program = program.write().await;
let mut sum = 0; let mut other_counter = 0;
for node in program.node.nodes.clone() { for node in program.node.nodes.clone() {
if node == msg.dest { if node == msg.dest {
sum += program.read_counter(io.clone(), node.clone()).await?;
continue; continue;
} }
let resp = program.read_counter(io.clone(), node).await?;
let mut msg = msg.clone(); other_counter += resp;
msg.body.message_type = "local_read".to_string();
msg.body.in_reply_to = None;
let mut resp = program.sync_rpc(io.clone(), msg).await?;
let body: GrowCounterReadMessage =
serde_json::from_value(resp.body.message_body.take())
.map_err(|e| format!("error in parsing response body: {}", e))?;
sum += body.value;
} }
program.node.other_counter = other_counter;
program eprintln!(
.reply(io, msg.clone()) "READ OP {} {}",
.await program.node.counter, program.node.other_counter
.map_err(|e| format!("read: error in writing response: {}", e)) );
}, let resp = GrowCounterReadMessage {
); value: program.node.counter + program.node.other_counter,
};
program.register( msg.body.message_body = serde_json::to_value(resp).unwrap();
"local_read",
async |mut msg: Message,
program: Arc<RwLock<Maelstorm>>,
io: Arc<Mutex<MaelstormIo>>|
-> Result<(), String> {
msg.body.message_type = "local_read_ok".into();
let mut program = program.write().await;
let current = program.read_counter(io.clone(), msg.dest.clone()).await?;
msg.body.message_body =
serde_json::to_value(GrowCounterReadMessage { value: current }).unwrap();
program program
.reply(io, msg) .reply(io, msg)
.await .await
.map_err(|e| format!("add: error in writing response: {}", e)) .map_err(|e| format!("read: error in writing response: {}", e))
}, },
); );