1
0

wip/g-counter: not working yet

This commit is contained in:
Ishan Jain 2024-05-06 22:13:19 +05:30
parent 864324139f
commit 16df5532e6
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
2 changed files with 48 additions and 39 deletions

View File

@ -40,7 +40,6 @@ pub struct Maelstorm {
pub struct Node { pub struct Node {
pub id: String, pub id: String,
pub value: HashMap<String, u64>,
pub nodes: Vec<String>, pub nodes: Vec<String>,
rng: SmallRng, rng: SmallRng,
} }
@ -58,7 +57,6 @@ impl Default for Maelstorm {
id: String::new(), id: String::new(),
nodes: Vec::new(), nodes: Vec::new(),
rng: SmallRng::seed_from_u64(seed.as_secs()), rng: SmallRng::seed_from_u64(seed.as_secs()),
value: HashMap::new(),
}, },
handlers: HashMap::new(), handlers: HashMap::new(),
callbacks: HashMap::new(), callbacks: HashMap::new(),
@ -279,7 +277,11 @@ impl Maelstorm {
.unwrap(), .unwrap(),
}, },
}; };
let mut result = self.sync_rpc(io, msg).await?; let mut result = match self.sync_rpc(io, msg).await {
Ok(v) => v,
Err(e) if e.to_string() == "deadline has elapsed" => return Ok(0),
Err(e) => return Err(e),
};
let body: SeqKvInput = serde_json::from_value(result.body.message_body.take()) let body: SeqKvInput = serde_json::from_value(result.body.message_body.take())
.map_err(|e| format!("error in parsing response body: {}", e))?; .map_err(|e| format!("error in parsing response body: {}", e))?;
@ -309,7 +311,11 @@ impl Maelstorm {
.unwrap(), .unwrap(),
}, },
}; };
let mut result = self.sync_rpc(io, msg).await?; let mut result = match self.sync_rpc(io, msg).await {
Ok(v) => v,
Err(e) if e.to_string() == "deadline has elapsed" => return Ok(()),
Err(e) => return Err(e),
};
// TODO: could return the parsed confirmation from here // TODO: could return the parsed confirmation from here
serde_json::from_value(result.body.message_body.take()) serde_json::from_value(result.body.message_body.take())

View File

@ -45,26 +45,11 @@ async fn main() {
msg.body.message_type = "add_ok".into(); msg.body.message_type = "add_ok".into();
let mut program = program.write().await; let mut program = program.write().await;
let current = program.read_counter(io.clone(), msg.dest.clone()).await?;
let id = program.node.id.clone(); program
let current = *program.node.value.entry(id).or_insert(0);
match program
.write_counter(io.clone(), current + body.delta) .write_counter(io.clone(), current + body.delta)
.await .await?;
{
Ok(v) => {
let id = program.node.id.clone();
*program.node.value.entry(id).or_insert(0) = current + body.delta;
v
}
Err(e) if e.to_string() == "deadline has elapsed" => {
let id = program.node.id.clone();
*program.node.value.entry(id).or_insert(0) = current + body.delta;
}
Err(e) => return Err(e),
};
program program
.reply(io, msg) .reply(io, msg)
.await .await
@ -82,29 +67,25 @@ async fn main() {
let mut program = program.write().await; let mut program = program.write().await;
let mut sum = 0; let mut sum = 0;
for node in program.node.nodes.clone() { for node in program.node.nodes.clone() {
eprintln!("reading {} from {}", node, msg.src); if node == msg.dest {
if *node == msg.dest { sum += program.read_counter(io.clone(), node.clone()).await?;
sum += *program.node.value.entry(node).or_insert(0);
continue; continue;
} }
// Sync first then add
sum += match program.read_counter(io.clone(), node).await { let mut msg = msg.clone();
Ok(v) => v, msg.body.message_type = "local_read".to_string();
Err(e) if e.to_string() == "deadline has elapsed" => { msg.body.in_reply_to = None;
let id = program.node.id.clone();
*program.node.value.entry(id).or_insert(0) let mut resp = program.sync_rpc(io.clone(), msg).await?;
} let body: GrowCounterReadMessage =
Err(e) => return Err(e), serde_json::from_value(resp.body.message_body.take())
}; .map_err(|e| format!("error in parsing response body: {}", e))?;
sum += body.value;
} }
msg.body.message_body =
serde_json::to_value(GrowCounterReadMessage { value: sum }).unwrap();
eprintln!("node = {} response = {:?}", msg.dest, msg);
program program
.reply(io, msg.clone()) .reply(io, msg.clone())
.await .await
@ -112,6 +93,28 @@ async fn main() {
}, },
); );
program.register(
"local_read",
async |mut msg: Message,
program: Arc<RwLock<Maelstorm>>,
io: Arc<Mutex<MaelstormIo>>|
-> Result<(), String> {
msg.body.message_type = "local_read_ok".into();
let mut program = program.write().await;
let current = program.read_counter(io.clone(), msg.dest.clone()).await?;
msg.body.message_body =
serde_json::to_value(GrowCounterReadMessage { value: current }).unwrap();
program
.reply(io, msg)
.await
.map_err(|e| format!("add: error in writing response: {}", e))
},
);
let io = MaelstormIo::default(); let io = MaelstormIo::default();
program.run(io).await; program.run(io).await;