From e9ec4a0deff0681eba7a8d94d56ca83df5b9f423 Mon Sep 17 00:00:00 2001 From: Ishan Jain Date: Sat, 4 May 2024 19:51:27 +0530 Subject: [PATCH] g-counter: async refactor --- Cargo.lock | 392 ++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + src/lib.rs | 215 ++++++++++++++++----------- src/main.rs | 130 ++++++++--------- src/seq_kv.rs | 22 +++ src/types.rs | 32 +---- 6 files changed, 618 insertions(+), 175 deletions(-) create mode 100644 src/seq_kv.rs diff --git a/Cargo.lock b/Cargo.lock index 679c1fa..3641f73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,19 +2,88 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "autocfg" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" + +[[package]] +name = "backtrace" +version = "0.3.71" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bitflags" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" + +[[package]] +name = "bytes" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" + +[[package]] +name = "cc" +version = "1.0.96" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065a29261d53ba54260972629f9ca6bffa69bac13cd1fed61420f7fa68b9f8bd" + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "distributed-systems-flyio" version = "0.1.0" dependencies = [ + "dashmap", "rand", "serde", "serde_json", + "tokio", ] [[package]] @@ -28,6 +97,24 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "itoa" version = "1.0.10" @@ -40,6 +127,96 @@ version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "memchr" +version = "2.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" + +[[package]] +name = "miniz_oxide" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" +dependencies = [ + "adler", +] + +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.48.0", +] + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "parking_lot" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.5", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -94,12 +271,33 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" +dependencies = [ + "bitflags", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "ryu" version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.193" @@ -131,6 +329,31 @@ dependencies = [ "serde", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "syn" version = "2.0.45" @@ -142,6 +365,36 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tokio" +version = "1.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.12" @@ -153,3 +406,142 @@ name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +dependencies = [ + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" diff --git a/Cargo.toml b/Cargo.toml index bec9802..a6d8872 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +dashmap = "5.5.3" rand = { version = "0.8.5", features = ["small_rng"] } serde = { version = "1.0.193", features = ["serde_derive"] } serde_json = "1.0.109" +tokio = { version = "1.37.0", features = ["full"] } diff --git a/src/lib.rs b/src/lib.rs index adaf95a..f44008d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,55 +1,55 @@ +#![feature(async_closure)] +#![feature(hash_set_entry)] #![feature(trait_alias)] +pub mod seq_kv; pub mod types; -use crate::types::{BroadcastMessage, Message, TopologyMessage}; +use crate::{seq_kv::MonotonicCounter, types::Message}; use rand::{rngs::SmallRng, Rng, SeedableRng}; +use serde_json::Value; use std::{ cell::RefCell, collections::HashMap, io::{stdin, stdout, BufRead, Error as IoError, Stdin, Stdout, Write}, - rc::Rc, + sync::{Arc, Mutex, RwLock}, time::{SystemTime, UNIX_EPOCH}, }; +use tokio::runtime::Runtime; -pub trait Handler = Fn(Message, Rc) -> Result<(), String> + 'static; +pub trait Handler = Fn(Message, Arc>, Arc>) -> Result<(), String> + + Send + + Sync + + 'static; -pub struct Malestorm<'a> { - node: RefCell, - io: MalestormIo, - pub handlers: HashMap<&'a str, Box>, +pub struct Malestorm { + pub node: Node, + pub handlers: HashMap>, } -struct Node { +pub struct Node { id: String, nodes: Vec, - broadcasts: Vec, - topology: HashMap>, + counter: MonotonicCounter, rng: SmallRng, } - -struct MalestormIo { +pub struct MalestormIo { stdin: Stdin, stdout: Stdout, } -impl<'a> Default for Malestorm<'a> { +impl Default for Malestorm { fn default() -> Self { let seed = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("time went backwards??"); Self { - node: RefCell::new(Node { + node: Node { id: String::new(), nodes: Vec::new(), - broadcasts: Vec::new(), - topology: HashMap::new(), + counter: MonotonicCounter::new(), rng: SmallRng::seed_from_u64(seed.as_secs()), - }), - io: MalestormIo { - stdin: stdin(), - stdout: stdout(), }, handlers: HashMap::new(), } @@ -71,105 +71,150 @@ impl MalestormIo { } } -impl<'a> Malestorm<'a> { - pub fn run(self) { - let program = Rc::new(self); +impl Default for MalestormIo { + fn default() -> Self { + Self { + stdin: stdin(), + stdout: stdout(), + } + } +} - let mut buf = Vec::new(); +impl Malestorm { + pub fn run(self, runtime: Runtime, io: MalestormIo) { + let program = Arc::new(RwLock::new(self)); + let io = Arc::new(Mutex::new(io)); + let loop_io = io.clone(); loop { - let read = match program.io.read_line(&mut buf) { - Ok(v) => v, - Err(e) => panic!("{:?}", e), - }; + let mut buf = Vec::new(); - if read == 0 { - continue; - } - - let message: Message = match serde_json::from_slice(&buf) { - Ok(v) => v, - Err(e) => { - eprintln!( - "error in parsing input: {:?} msg = {:?}", - e, - String::from_utf8_lossy(&buf) - ); + { + let io = loop_io.lock().unwrap(); + let read = match io.read_line(&mut buf) { + Ok(v) => v, + Err(e) => panic!("{:?}", e), + }; + if read == 0 { continue; } - }; - - buf.clear(); - - let mtype = message.body.message_type.clone(); - - let pc = program.clone(); - - let handler = match program.handlers.get(&mtype.as_str()) { - Some(v) => v, - None => { - eprintln!("no handler found for {}", message.body.message_type); - continue; - } - }; - - if let Err(e) = handler(message, pc) { - eprintln!("error in serving request: {}", e); } + + let p = program.clone(); + let io = io.clone(); + runtime.spawn(async move { parse_request_and_handle(p, io, &buf).await }); } } - pub fn set_node_id(&self, node_id: String) { - let mut node = self.node.borrow_mut(); - node.id = node_id; + pub fn set_node_id(&mut self, node_id: String) { + self.node.id = node_id; } - pub fn set_nodes(&self, nodes: Vec) { - let mut node = self.node.borrow_mut(); - node.nodes = nodes; + pub fn set_nodes(&mut self, nodes: Vec) { + self.node.nodes = nodes; } - pub fn record_broadcast(&self, broadcast: BroadcastMessage) { - let mut node = self.node.borrow_mut(); - node.broadcasts.push(broadcast.message); + pub fn get_nodes(&self) -> Vec { + self.node.nodes.clone() } - pub fn read_broadcasts(&self) -> Vec { - let node = self.node.borrow(); - node.broadcasts.clone() + pub fn read_counter(&self, src: &str) -> u64 { + self.node.counter.read(src) } - // TODO: this'll probably be more complex later on if we have ops to update toplogy - pub fn save_toplogy(&self, msg: TopologyMessage) { - let mut node = self.node.borrow_mut(); - - node.topology = msg.topology; + pub fn write_counter(&mut self, src: &str, v: u64) { + self.node.counter.write(src, v); } - pub fn generate_client_id(&self) -> String { - let mut node = self.node.borrow_mut(); + pub fn sync( + &self, + dest: &str, + prev_msg: Message, + io: Arc>, + ) -> Result<(), IoError> { + let msg = Message { + id: Some(prev_msg.id.unwrap_or(0) + 1), + src: self.node.id.clone(), + dest: dest.to_string(), + body: types::MessageBody { + msg_id: Some(prev_msg.body.msg_id.unwrap_or(0) + 1), + in_reply_to: None, + message_type: "counter_sync".to_string(), + message_body: Value::String("".to_string()), + }, + }; - let s2: u64 = node.rng.gen(); - let s1 = &node.id; + let out = serde_json::to_vec(&msg)?; + eprintln!("wrote = {:?}", String::from_utf8_lossy(&out)); + let io = io.lock().unwrap(); + io.write(&out)?; + Ok(()) + } + + pub fn generate_client_id(&mut self) -> String { + let s2: u64 = self.node.rng.gen(); + let s1 = &self.node.id; format!("{}_{}", s1, s2) } - pub fn register(&mut self, name: &'a str, func: impl Handler) { - self.handlers.insert(name, Box::new(func)); + pub fn register(&mut self, name: &str, func: impl Handler) { + self.handlers.insert(name.to_string(), Arc::new(func)); } - pub fn reply(&self, mut msg: Message) -> Result<(), IoError> { + pub fn send(&self, io: Arc>, mut msg: Message) -> Result<(), IoError> { // Before replying, Swap src / dst in original message // Add the correct value for in_reply_to std::mem::swap(&mut msg.src, &mut msg.dest); - let node = self.node.borrow(); - msg.src = node.id.clone(); + msg.src = self.node.id.clone(); msg.body.in_reply_to = msg.body.msg_id; let out = serde_json::to_vec(&msg)?; - self.io.write(&out).map(|_| Ok(()))? + let io = io.lock().unwrap(); + + io.write(&out).map(|_| Ok(()))? + } + + pub fn id(&self) -> String { + self.node.id.clone() + } +} + +async fn parse_request_and_handle( + program: Arc>, + io: Arc>, + buf: &[u8], +) { + let message: Message = match serde_json::from_slice(&buf) { + Ok(v) => v, + Err(e) => { + eprintln!( + "error in parsing input: {:?} msg = {:?}", + e, + String::from_utf8_lossy(&buf) + ); + return; + } + }; + + let mtype = message.body.message_type.clone(); + + let pc = program.clone(); + + let handler: Arc = { + let program = program.read().unwrap(); + + match program.handlers.get(&mtype) { + Some(v) => v.clone(), + None => { + //eprintln!("no handler found for {}", message.body.message_type); + return; + } + } + }; + if let Err(e) = handler(message, pc, io) { + eprintln!("error in serving request: {}", e); } } diff --git a/src/main.rs b/src/main.rs index bb39d1f..21d0afa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,11 @@ +#![feature(async_closure)] + use distributed_systems_flyio::{ - types::{ - BroadcastMessage, EchoMessage, GenerateMessage, InitMessage, Message, ReadMessage, - TopologyMessage, - }, - Malestorm, + types::{GrowCounterReadMessage, GrowCounterUpdateMessage, InitMessage, Message}, + Malestorm, MalestormIo, }; -use std::rc::Rc; +use std::sync::{Arc, Mutex, RwLock}; +use tokio::runtime::Runtime; fn main() { let mut program = Malestorm::default(); @@ -13,96 +13,98 @@ fn main() { program.register( "init", // TODO: Replace error string with a type - |mut msg: Message, program: Rc| -> Result<(), String> { + |mut msg: Message, + program: Arc>, + io: Arc>| + -> Result<(), String> { let message_body: InitMessage = serde_json::from_value(msg.body.message_body.take()) .map_err(|e| format!("error in parsing response body: {}", e))?; + let mut program = program.write().unwrap(); program.set_node_id(message_body.node_id); program.set_nodes(message_body.nodes); - msg.body.message_type = "init_ok".into(); program - .reply(msg) + .send(io, msg) .map_err(|e| format!("init: error in writing response: {}", e)) }, ); program.register( - "echo", - |mut msg: Message, program: Rc| -> Result<(), String> { - let message_body: EchoMessage = serde_json::from_value(msg.body.message_body.take()) - .map_err(|e| format!("error in parsing response body: {}", e))?; + "add", + |mut msg: Message, + program: Arc>, + io: Arc>| + -> Result<(), String> { + let body: GrowCounterUpdateMessage = + serde_json::from_value(msg.body.message_body.take()) + .map_err(|e| format!("error in parsing response body: {}", e))?; + msg.body.message_type = "add_ok".into(); - msg.body.message_type = "echo_ok".into(); - msg.body.message_body = serde_json::to_value(message_body).unwrap(); + let mut program = program.write().unwrap(); + let val = program.read_counter(&msg.src); + program.write_counter(&msg.src, val + body.delta); program - .reply(msg) - .map_err(|e| format!("echo: error in writing response: {}", e)) - }, - ); - - program.register( - "generate", - |mut msg: Message, program: Rc| -> Result<(), String> { - msg.body.message_type = "generate_ok".into(); - msg.body.message_body = serde_json::to_value(GenerateMessage { - id: program.generate_client_id(), - }) - .unwrap(); - - program - .reply(msg) - .map_err(|e| format!("generate: error in writing response: {}", e)) - }, - ); - - program.register( - "broadcast", - |mut msg: Message, program: Rc| -> Result<(), String> { - let body: BroadcastMessage = serde_json::from_value(msg.body.message_body.take()) - .map_err(|e| format!("error in parsing response body: {}", e))?; - msg.body.message_type = "broadcast_ok".into(); - - program.record_broadcast(body); - - program - .reply(msg) - .map_err(|e| format!("broadcast: error in writing response: {}", e)) + .send(io, msg.clone()) + .map_err(|e| format!("add: error in writing response: {}", e)) }, ); program.register( "read", - |mut msg: Message, program: Rc| -> Result<(), String> { + |mut msg: Message, + program: Arc>, + io: Arc>| + -> Result<(), String> { msg.body.message_type = "read_ok".into(); - let body = serde_json::to_value(ReadMessage { - messages: program.read_broadcasts(), - }) - .map_err(|e| format!("error in marshalling response: {}", e))?; - msg.body.message_body = body; + let program = program.write().unwrap(); + let mut sum = 0; + for node in program.get_nodes() { + if node == msg.src { + sum += program.read_counter(&node); + continue; + } + // Sync first then add + + program.sync(&node, msg.clone(), io.clone()); + + sum += program.read_counter(&msg.src); + } + + msg.body.message_body = + serde_json::to_value(GrowCounterReadMessage { value: sum }).unwrap(); program - .reply(msg) - .map_err(|e| format!("broadcast: error in writing response: {}", e)) + .send(io, msg.clone()) + .map_err(|e| format!("read: error in writing response: {}", e)) }, ); program.register( - "topology", - |mut msg: Message, program: Rc| -> Result<(), String> { - let topology: TopologyMessage = serde_json::from_value(msg.body.message_body.take()) - .map_err(|e| format!("error in parsing response: {}", e))?; + "counter_sync", + |mut msg: Message, + program: Arc>, + io: Arc>| + -> Result<(), String> { + msg.body.message_type = "counter_sync_ok".into(); - msg.body.message_type = "topology_ok".into(); - program.save_toplogy(topology); + let program = program.write().unwrap(); + msg.body.message_body = serde_json::to_value(GrowCounterReadMessage { + value: program.read_counter(&msg.src), + }) + .unwrap(); program - .reply(msg) - .map_err(|e| format!("broadcast: error in writing response: {}", e)) + .send(io, msg.clone()) + .map_err(|e| format!("read: error in writing response: {}", e)) }, ); - program.run(); + + let io = MalestormIo::default(); + let runtime = Runtime::new().unwrap(); + + program.run(runtime, io); } diff --git a/src/seq_kv.rs b/src/seq_kv.rs new file mode 100644 index 0000000..ae67853 --- /dev/null +++ b/src/seq_kv.rs @@ -0,0 +1,22 @@ +use std::collections::HashMap; + +#[derive(Debug)] +pub struct MonotonicCounter { + counter: HashMap, +} + +impl MonotonicCounter { + pub fn new() -> Self { + Self { + counter: HashMap::new(), + } + } + + pub fn read(&self, node: &str) -> u64 { + *self.counter.get(node).unwrap_or(&0) + } + + pub fn write(&mut self, node: &str, v: u64) { + *self.counter.entry(node.to_string()).or_insert(0) = v; + } +} diff --git a/src/types.rs b/src/types.rs index e130897..f4d4d45 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,8 +1,6 @@ -use std::collections::HashMap; - use serde::{Deserialize, Serialize}; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct Message { pub id: Option, @@ -11,7 +9,7 @@ pub struct Message { pub body: MessageBody, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Clone, Deserialize)] pub struct MessageBody { pub msg_id: Option, pub in_reply_to: Option, @@ -23,12 +21,6 @@ pub struct MessageBody { pub message_body: serde_json::Value, } -#[derive(Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] -pub struct EchoMessage { - echo: String, -} - #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct InitMessage { @@ -39,26 +31,14 @@ pub struct InitMessage { #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct GenerateMessage { - pub id: String, +pub struct GrowCounterUpdateMessage { + pub delta: u64, } #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct BroadcastMessage { - pub message: i64, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] -pub struct ReadMessage { - pub messages: Vec, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] -pub struct TopologyMessage { - pub topology: HashMap>, +pub struct GrowCounterReadMessage { + pub value: u64, } #[cfg(test)]