1
0

wip/g-counter: async refactor

This commit is contained in:
Ishan Jain 2024-05-04 21:26:06 +05:30
parent e9ec4a0def
commit e64f16dd95
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
4 changed files with 55 additions and 51 deletions

7
.gitignore vendored
View File

@ -1 +1,8 @@
/target /target
output
store
test.file
input
ip
maelstorm/*
*.bz2

View File

@ -9,7 +9,6 @@ use crate::{seq_kv::MonotonicCounter, types::Message};
use rand::{rngs::SmallRng, Rng, SeedableRng}; use rand::{rngs::SmallRng, Rng, SeedableRng};
use serde_json::Value; use serde_json::Value;
use std::{ use std::{
cell::RefCell,
collections::HashMap, collections::HashMap,
io::{stdin, stdout, BufRead, Error as IoError, Stdin, Stdout, Write}, io::{stdin, stdout, BufRead, Error as IoError, Stdin, Stdout, Write},
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
@ -34,7 +33,6 @@ pub struct Node {
rng: SmallRng, rng: SmallRng,
} }
pub struct MalestormIo { pub struct MalestormIo {
stdin: Stdin,
stdout: Stdout, stdout: Stdout,
} }
@ -63,20 +61,11 @@ impl MalestormIo {
writer.write_all(&[b'\n'])?; writer.write_all(&[b'\n'])?;
writer.flush() writer.flush()
} }
fn read_line(&self, buf: &mut Vec<u8>) -> Result<usize, IoError> {
let mut reader = self.stdin.lock();
reader.read_until(b'\n', buf)
}
} }
impl Default for MalestormIo { impl Default for MalestormIo {
fn default() -> Self { fn default() -> Self {
Self { Self { stdout: stdout() }
stdin: stdin(),
stdout: stdout(),
}
} }
} }
@ -84,14 +73,14 @@ impl Malestorm {
pub fn run(self, runtime: Runtime, io: MalestormIo) { pub fn run(self, runtime: Runtime, io: MalestormIo) {
let program = Arc::new(RwLock::new(self)); let program = Arc::new(RwLock::new(self));
let io = Arc::new(Mutex::new(io)); let io = Arc::new(Mutex::new(io));
let loop_io = io.clone(); let stdin = stdin();
loop { loop {
let mut buf = Vec::new(); let mut buf = Vec::new();
{ {
let io = loop_io.lock().unwrap(); let mut reader = stdin.lock();
let read = match io.read_line(&mut buf) { let read = match reader.read_until(b'\n', &mut buf) {
Ok(v) => v, Ok(v) => v,
Err(e) => panic!("{:?}", e), Err(e) => panic!("{:?}", e),
}; };
@ -100,9 +89,39 @@ impl Malestorm {
} }
} }
let p = program.clone(); let message: Message = match serde_json::from_slice(&buf) {
Ok(v) => v,
Err(e) => {
eprintln!(
"error in parsing input: {:?} msg = {:?}",
e,
String::from_utf8_lossy(&buf)
);
return;
}
};
let mtype = message.body.message_type.clone();
let handler: Arc<dyn Handler> = {
let program = program.read().unwrap();
match program.handlers.get(&mtype) {
Some(v) => v.clone(),
None => {
//eprintln!("no handler found for {}", message.body.message_type);
return;
}
}
};
let pc = program.clone();
let io = io.clone(); let io = io.clone();
runtime.spawn(async move { parse_request_and_handle(p, io, &buf).await }); runtime.spawn(async move {
if let Err(e) = handler(message, pc, io) {
eprintln!("error in serving request: {}", e);
}
});
} }
} }
@ -125,6 +144,9 @@ impl Malestorm {
pub fn write_counter(&mut self, src: &str, v: u64) { pub fn write_counter(&mut self, src: &str, v: u64) {
self.node.counter.write(src, v); self.node.counter.write(src, v);
} }
pub fn add_counter(&mut self, src: &str, v: u64) {
self.node.counter.add(src, v);
}
pub fn sync( pub fn sync(
&self, &self,
@ -187,34 +209,4 @@ async fn parse_request_and_handle(
io: Arc<Mutex<MalestormIo>>, io: Arc<Mutex<MalestormIo>>,
buf: &[u8], buf: &[u8],
) { ) {
let message: Message = match serde_json::from_slice(&buf) {
Ok(v) => v,
Err(e) => {
eprintln!(
"error in parsing input: {:?} msg = {:?}",
e,
String::from_utf8_lossy(&buf)
);
return;
}
};
let mtype = message.body.message_type.clone();
let pc = program.clone();
let handler: Arc<dyn Handler> = {
let program = program.read().unwrap();
match program.handlers.get(&mtype) {
Some(v) => v.clone(),
None => {
//eprintln!("no handler found for {}", message.body.message_type);
return;
}
}
};
if let Err(e) = handler(message, pc, io) {
eprintln!("error in serving request: {}", e);
}
} }

View File

@ -43,8 +43,7 @@ fn main() {
msg.body.message_type = "add_ok".into(); msg.body.message_type = "add_ok".into();
let mut program = program.write().unwrap(); let mut program = program.write().unwrap();
let val = program.read_counter(&msg.src); program.add_counter(&msg.src, body.delta);
program.write_counter(&msg.src, val + body.delta);
program program
.send(io, msg.clone()) .send(io, msg.clone())
@ -60,7 +59,7 @@ fn main() {
-> Result<(), String> { -> Result<(), String> {
msg.body.message_type = "read_ok".into(); msg.body.message_type = "read_ok".into();
let program = program.write().unwrap(); let program = program.read().unwrap();
let mut sum = 0; let mut sum = 0;
for node in program.get_nodes() { for node in program.get_nodes() {
if node == msg.src { if node == msg.src {
@ -91,7 +90,7 @@ fn main() {
-> Result<(), String> { -> Result<(), String> {
msg.body.message_type = "counter_sync_ok".into(); msg.body.message_type = "counter_sync_ok".into();
let program = program.write().unwrap(); let program = program.read().unwrap();
msg.body.message_body = serde_json::to_value(GrowCounterReadMessage { msg.body.message_body = serde_json::to_value(GrowCounterReadMessage {
value: program.read_counter(&msg.src), value: program.read_counter(&msg.src),
}) })

View File

@ -19,4 +19,10 @@ impl MonotonicCounter {
pub fn write(&mut self, node: &str, v: u64) { pub fn write(&mut self, node: &str, v: u64) {
*self.counter.entry(node.to_string()).or_insert(0) = v; *self.counter.entry(node.to_string()).or_insert(0) = v;
} }
pub fn add(&mut self, node: &str, v: u64) {
eprintln!("{:?}", self.counter);
*self.counter.entry(node.to_string()).or_insert(0) += v;
}
} }