g-counter: async refactor
This commit is contained in:
parent
42fd8e997a
commit
e9ec4a0def
392
Cargo.lock
generated
392
Cargo.lock
generated
|
@ -2,19 +2,88 @@
|
|||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
|
||||
[[package]]
|
||||
name = "addr2line"
|
||||
version = "0.21.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
|
||||
dependencies = [
|
||||
"gimli",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "adler"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.71"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d"
|
||||
dependencies = [
|
||||
"addr2line",
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"miniz_oxide",
|
||||
"object",
|
||||
"rustc-demangle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.0.96"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "065a29261d53ba54260972629f9ca6bffa69bac13cd1fed61420f7fa68b9f8bd"
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "5.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"hashbrown",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "distributed-systems-flyio"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"dashmap",
|
||||
"rand",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -28,6 +97,24 @@ dependencies = [
|
|||
"wasi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.28.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.14.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.3.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.10"
|
||||
|
@ -40,6 +127,96 @@ version = "0.2.151"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4"
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7"
|
||||
dependencies = [
|
||||
"adler",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.8.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"wasi",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_cpus"
|
||||
version = "1.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
|
||||
dependencies = [
|
||||
"hermit-abi",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.32.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.9.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"smallvec",
|
||||
"windows-targets 0.52.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.17"
|
||||
|
@ -94,12 +271,33 @@ dependencies = [
|
|||
"getrandom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustc-demangle"
|
||||
version = "0.1.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.193"
|
||||
|
@ -131,6 +329,31 @@ dependencies = [
|
|||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "1.13.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.5.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.45"
|
||||
|
@ -142,6 +365,36 @@ dependencies = [
|
|||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.37.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"parking_lot",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-macros"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.12"
|
||||
|
@ -153,3 +406,142 @@ name = "wasi"
|
|||
version = "0.11.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
|
||||
dependencies = [
|
||||
"windows-targets 0.48.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
|
||||
dependencies = [
|
||||
"windows-targets 0.52.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-targets"
|
||||
version = "0.48.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
|
||||
dependencies = [
|
||||
"windows_aarch64_gnullvm 0.48.5",
|
||||
"windows_aarch64_msvc 0.48.5",
|
||||
"windows_i686_gnu 0.48.5",
|
||||
"windows_i686_msvc 0.48.5",
|
||||
"windows_x86_64_gnu 0.48.5",
|
||||
"windows_x86_64_gnullvm 0.48.5",
|
||||
"windows_x86_64_msvc 0.48.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-targets"
|
||||
version = "0.52.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb"
|
||||
dependencies = [
|
||||
"windows_aarch64_gnullvm 0.52.5",
|
||||
"windows_aarch64_msvc 0.52.5",
|
||||
"windows_i686_gnu 0.52.5",
|
||||
"windows_i686_gnullvm",
|
||||
"windows_i686_msvc 0.52.5",
|
||||
"windows_x86_64_gnu 0.52.5",
|
||||
"windows_x86_64_gnullvm 0.52.5",
|
||||
"windows_x86_64_msvc 0.52.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.48.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.52.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.48.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.52.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.48.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.52.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnullvm"
|
||||
version = "0.52.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.48.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.52.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.48.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.52.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
version = "0.48.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
version = "0.52.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.48.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.52.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"
|
||||
|
|
|
@ -6,6 +6,8 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
dashmap = "5.5.3"
|
||||
rand = { version = "0.8.5", features = ["small_rng"] }
|
||||
serde = { version = "1.0.193", features = ["serde_derive"] }
|
||||
serde_json = "1.0.109"
|
||||
tokio = { version = "1.37.0", features = ["full"] }
|
||||
|
|
227
src/lib.rs
227
src/lib.rs
|
@ -1,55 +1,55 @@
|
|||
#![feature(async_closure)]
|
||||
#![feature(hash_set_entry)]
|
||||
#![feature(trait_alias)]
|
||||
|
||||
pub mod seq_kv;
|
||||
pub mod types;
|
||||
|
||||
use crate::types::{BroadcastMessage, Message, TopologyMessage};
|
||||
use crate::{seq_kv::MonotonicCounter, types::Message};
|
||||
use rand::{rngs::SmallRng, Rng, SeedableRng};
|
||||
use serde_json::Value;
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
collections::HashMap,
|
||||
io::{stdin, stdout, BufRead, Error as IoError, Stdin, Stdout, Write},
|
||||
rc::Rc,
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
pub trait Handler = Fn(Message, Rc<Malestorm>) -> Result<(), String> + 'static;
|
||||
pub trait Handler = Fn(Message, Arc<RwLock<Malestorm>>, Arc<Mutex<MalestormIo>>) -> Result<(), String>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static;
|
||||
|
||||
pub struct Malestorm<'a> {
|
||||
node: RefCell<Node>,
|
||||
io: MalestormIo,
|
||||
pub handlers: HashMap<&'a str, Box<dyn Handler>>,
|
||||
pub struct Malestorm {
|
||||
pub node: Node,
|
||||
pub handlers: HashMap<String, Arc<dyn Handler>>,
|
||||
}
|
||||
|
||||
struct Node {
|
||||
pub struct Node {
|
||||
id: String,
|
||||
nodes: Vec<String>,
|
||||
broadcasts: Vec<i64>,
|
||||
topology: HashMap<String, Vec<String>>,
|
||||
counter: MonotonicCounter,
|
||||
rng: SmallRng,
|
||||
}
|
||||
|
||||
struct MalestormIo {
|
||||
pub struct MalestormIo {
|
||||
stdin: Stdin,
|
||||
stdout: Stdout,
|
||||
}
|
||||
|
||||
impl<'a> Default for Malestorm<'a> {
|
||||
impl Default for Malestorm {
|
||||
fn default() -> Self {
|
||||
let seed = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("time went backwards??");
|
||||
|
||||
Self {
|
||||
node: RefCell::new(Node {
|
||||
node: Node {
|
||||
id: String::new(),
|
||||
nodes: Vec::new(),
|
||||
broadcasts: Vec::new(),
|
||||
topology: HashMap::new(),
|
||||
counter: MonotonicCounter::new(),
|
||||
rng: SmallRng::seed_from_u64(seed.as_secs()),
|
||||
}),
|
||||
io: MalestormIo {
|
||||
stdin: stdin(),
|
||||
stdout: stdout(),
|
||||
},
|
||||
handlers: HashMap::new(),
|
||||
}
|
||||
|
@ -71,22 +71,122 @@ impl MalestormIo {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'a> Malestorm<'a> {
|
||||
pub fn run(self) {
|
||||
let program = Rc::new(self);
|
||||
impl Default for MalestormIo {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
stdin: stdin(),
|
||||
stdout: stdout(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut buf = Vec::new();
|
||||
impl Malestorm {
|
||||
pub fn run(self, runtime: Runtime, io: MalestormIo) {
|
||||
let program = Arc::new(RwLock::new(self));
|
||||
let io = Arc::new(Mutex::new(io));
|
||||
let loop_io = io.clone();
|
||||
|
||||
loop {
|
||||
let read = match program.io.read_line(&mut buf) {
|
||||
let mut buf = Vec::new();
|
||||
|
||||
{
|
||||
let io = loop_io.lock().unwrap();
|
||||
let read = match io.read_line(&mut buf) {
|
||||
Ok(v) => v,
|
||||
Err(e) => panic!("{:?}", e),
|
||||
};
|
||||
|
||||
if read == 0 {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let p = program.clone();
|
||||
let io = io.clone();
|
||||
runtime.spawn(async move { parse_request_and_handle(p, io, &buf).await });
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_node_id(&mut self, node_id: String) {
|
||||
self.node.id = node_id;
|
||||
}
|
||||
|
||||
pub fn set_nodes(&mut self, nodes: Vec<String>) {
|
||||
self.node.nodes = nodes;
|
||||
}
|
||||
|
||||
pub fn get_nodes(&self) -> Vec<String> {
|
||||
self.node.nodes.clone()
|
||||
}
|
||||
|
||||
pub fn read_counter(&self, src: &str) -> u64 {
|
||||
self.node.counter.read(src)
|
||||
}
|
||||
|
||||
pub fn write_counter(&mut self, src: &str, v: u64) {
|
||||
self.node.counter.write(src, v);
|
||||
}
|
||||
|
||||
pub fn sync(
|
||||
&self,
|
||||
dest: &str,
|
||||
prev_msg: Message,
|
||||
io: Arc<Mutex<MalestormIo>>,
|
||||
) -> Result<(), IoError> {
|
||||
let msg = Message {
|
||||
id: Some(prev_msg.id.unwrap_or(0) + 1),
|
||||
src: self.node.id.clone(),
|
||||
dest: dest.to_string(),
|
||||
body: types::MessageBody {
|
||||
msg_id: Some(prev_msg.body.msg_id.unwrap_or(0) + 1),
|
||||
in_reply_to: None,
|
||||
message_type: "counter_sync".to_string(),
|
||||
message_body: Value::String("".to_string()),
|
||||
},
|
||||
};
|
||||
|
||||
let out = serde_json::to_vec(&msg)?;
|
||||
eprintln!("wrote = {:?}", String::from_utf8_lossy(&out));
|
||||
let io = io.lock().unwrap();
|
||||
io.write(&out)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn generate_client_id(&mut self) -> String {
|
||||
let s2: u64 = self.node.rng.gen();
|
||||
let s1 = &self.node.id;
|
||||
format!("{}_{}", s1, s2)
|
||||
}
|
||||
|
||||
pub fn register(&mut self, name: &str, func: impl Handler) {
|
||||
self.handlers.insert(name.to_string(), Arc::new(func));
|
||||
}
|
||||
|
||||
pub fn send(&self, io: Arc<Mutex<MalestormIo>>, mut msg: Message) -> Result<(), IoError> {
|
||||
// Before replying, Swap src / dst in original message
|
||||
// Add the correct value for in_reply_to
|
||||
|
||||
std::mem::swap(&mut msg.src, &mut msg.dest);
|
||||
msg.src = self.node.id.clone();
|
||||
|
||||
msg.body.in_reply_to = msg.body.msg_id;
|
||||
|
||||
let out = serde_json::to_vec(&msg)?;
|
||||
let io = io.lock().unwrap();
|
||||
|
||||
io.write(&out).map(|_| Ok(()))?
|
||||
}
|
||||
|
||||
pub fn id(&self) -> String {
|
||||
self.node.id.clone()
|
||||
}
|
||||
}
|
||||
|
||||
async fn parse_request_and_handle(
|
||||
program: Arc<RwLock<Malestorm>>,
|
||||
io: Arc<Mutex<MalestormIo>>,
|
||||
buf: &[u8],
|
||||
) {
|
||||
let message: Message = match serde_json::from_slice(&buf) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
|
@ -95,81 +195,26 @@ impl<'a> Malestorm<'a> {
|
|||
e,
|
||||
String::from_utf8_lossy(&buf)
|
||||
);
|
||||
continue;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
buf.clear();
|
||||
|
||||
let mtype = message.body.message_type.clone();
|
||||
|
||||
let pc = program.clone();
|
||||
|
||||
let handler = match program.handlers.get(&mtype.as_str()) {
|
||||
Some(v) => v,
|
||||
let handler: Arc<dyn Handler> = {
|
||||
let program = program.read().unwrap();
|
||||
|
||||
match program.handlers.get(&mtype) {
|
||||
Some(v) => v.clone(),
|
||||
None => {
|
||||
eprintln!("no handler found for {}", message.body.message_type);
|
||||
continue;
|
||||
//eprintln!("no handler found for {}", message.body.message_type);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = handler(message, pc) {
|
||||
if let Err(e) = handler(message, pc, io) {
|
||||
eprintln!("error in serving request: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_node_id(&self, node_id: String) {
|
||||
let mut node = self.node.borrow_mut();
|
||||
node.id = node_id;
|
||||
}
|
||||
|
||||
pub fn set_nodes(&self, nodes: Vec<String>) {
|
||||
let mut node = self.node.borrow_mut();
|
||||
node.nodes = nodes;
|
||||
}
|
||||
|
||||
pub fn record_broadcast(&self, broadcast: BroadcastMessage) {
|
||||
let mut node = self.node.borrow_mut();
|
||||
node.broadcasts.push(broadcast.message);
|
||||
}
|
||||
|
||||
pub fn read_broadcasts(&self) -> Vec<i64> {
|
||||
let node = self.node.borrow();
|
||||
node.broadcasts.clone()
|
||||
}
|
||||
|
||||
// TODO: this'll probably be more complex later on if we have ops to update toplogy
|
||||
pub fn save_toplogy(&self, msg: TopologyMessage) {
|
||||
let mut node = self.node.borrow_mut();
|
||||
|
||||
node.topology = msg.topology;
|
||||
}
|
||||
|
||||
pub fn generate_client_id(&self) -> String {
|
||||
let mut node = self.node.borrow_mut();
|
||||
|
||||
let s2: u64 = node.rng.gen();
|
||||
let s1 = &node.id;
|
||||
|
||||
format!("{}_{}", s1, s2)
|
||||
}
|
||||
|
||||
pub fn register(&mut self, name: &'a str, func: impl Handler) {
|
||||
self.handlers.insert(name, Box::new(func));
|
||||
}
|
||||
|
||||
pub fn reply(&self, mut msg: Message) -> Result<(), IoError> {
|
||||
// Before replying, Swap src / dst in original message
|
||||
// Add the correct value for in_reply_to
|
||||
|
||||
std::mem::swap(&mut msg.src, &mut msg.dest);
|
||||
let node = self.node.borrow();
|
||||
msg.src = node.id.clone();
|
||||
|
||||
msg.body.in_reply_to = msg.body.msg_id;
|
||||
|
||||
let out = serde_json::to_vec(&msg)?;
|
||||
self.io.write(&out).map(|_| Ok(()))?
|
||||
}
|
||||
}
|
||||
|
|
128
src/main.rs
128
src/main.rs
|
@ -1,11 +1,11 @@
|
|||
#![feature(async_closure)]
|
||||
|
||||
use distributed_systems_flyio::{
|
||||
types::{
|
||||
BroadcastMessage, EchoMessage, GenerateMessage, InitMessage, Message, ReadMessage,
|
||||
TopologyMessage,
|
||||
},
|
||||
Malestorm,
|
||||
types::{GrowCounterReadMessage, GrowCounterUpdateMessage, InitMessage, Message},
|
||||
Malestorm, MalestormIo,
|
||||
};
|
||||
use std::rc::Rc;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
fn main() {
|
||||
let mut program = Malestorm::default();
|
||||
|
@ -13,96 +13,98 @@ fn main() {
|
|||
program.register(
|
||||
"init",
|
||||
// TODO: Replace error string with a type
|
||||
|mut msg: Message, program: Rc<Malestorm>| -> Result<(), String> {
|
||||
|mut msg: Message,
|
||||
program: Arc<RwLock<Malestorm>>,
|
||||
io: Arc<Mutex<MalestormIo>>|
|
||||
-> Result<(), String> {
|
||||
let message_body: InitMessage = serde_json::from_value(msg.body.message_body.take())
|
||||
.map_err(|e| format!("error in parsing response body: {}", e))?;
|
||||
|
||||
let mut program = program.write().unwrap();
|
||||
program.set_node_id(message_body.node_id);
|
||||
program.set_nodes(message_body.nodes);
|
||||
|
||||
msg.body.message_type = "init_ok".into();
|
||||
|
||||
program
|
||||
.reply(msg)
|
||||
.send(io, msg)
|
||||
.map_err(|e| format!("init: error in writing response: {}", e))
|
||||
},
|
||||
);
|
||||
|
||||
program.register(
|
||||
"echo",
|
||||
|mut msg: Message, program: Rc<Malestorm>| -> Result<(), String> {
|
||||
let message_body: EchoMessage = serde_json::from_value(msg.body.message_body.take())
|
||||
"add",
|
||||
|mut msg: Message,
|
||||
program: Arc<RwLock<Malestorm>>,
|
||||
io: Arc<Mutex<MalestormIo>>|
|
||||
-> Result<(), String> {
|
||||
let body: GrowCounterUpdateMessage =
|
||||
serde_json::from_value(msg.body.message_body.take())
|
||||
.map_err(|e| format!("error in parsing response body: {}", e))?;
|
||||
msg.body.message_type = "add_ok".into();
|
||||
|
||||
msg.body.message_type = "echo_ok".into();
|
||||
msg.body.message_body = serde_json::to_value(message_body).unwrap();
|
||||
let mut program = program.write().unwrap();
|
||||
let val = program.read_counter(&msg.src);
|
||||
program.write_counter(&msg.src, val + body.delta);
|
||||
|
||||
program
|
||||
.reply(msg)
|
||||
.map_err(|e| format!("echo: error in writing response: {}", e))
|
||||
},
|
||||
);
|
||||
|
||||
program.register(
|
||||
"generate",
|
||||
|mut msg: Message, program: Rc<Malestorm>| -> Result<(), String> {
|
||||
msg.body.message_type = "generate_ok".into();
|
||||
msg.body.message_body = serde_json::to_value(GenerateMessage {
|
||||
id: program.generate_client_id(),
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
program
|
||||
.reply(msg)
|
||||
.map_err(|e| format!("generate: error in writing response: {}", e))
|
||||
},
|
||||
);
|
||||
|
||||
program.register(
|
||||
"broadcast",
|
||||
|mut msg: Message, program: Rc<Malestorm>| -> Result<(), String> {
|
||||
let body: BroadcastMessage = serde_json::from_value(msg.body.message_body.take())
|
||||
.map_err(|e| format!("error in parsing response body: {}", e))?;
|
||||
msg.body.message_type = "broadcast_ok".into();
|
||||
|
||||
program.record_broadcast(body);
|
||||
|
||||
program
|
||||
.reply(msg)
|
||||
.map_err(|e| format!("broadcast: error in writing response: {}", e))
|
||||
.send(io, msg.clone())
|
||||
.map_err(|e| format!("add: error in writing response: {}", e))
|
||||
},
|
||||
);
|
||||
|
||||
program.register(
|
||||
"read",
|
||||
|mut msg: Message, program: Rc<Malestorm>| -> Result<(), String> {
|
||||
|mut msg: Message,
|
||||
program: Arc<RwLock<Malestorm>>,
|
||||
io: Arc<Mutex<MalestormIo>>|
|
||||
-> Result<(), String> {
|
||||
msg.body.message_type = "read_ok".into();
|
||||
|
||||
let body = serde_json::to_value(ReadMessage {
|
||||
messages: program.read_broadcasts(),
|
||||
})
|
||||
.map_err(|e| format!("error in marshalling response: {}", e))?;
|
||||
msg.body.message_body = body;
|
||||
let program = program.write().unwrap();
|
||||
let mut sum = 0;
|
||||
for node in program.get_nodes() {
|
||||
if node == msg.src {
|
||||
sum += program.read_counter(&node);
|
||||
continue;
|
||||
}
|
||||
// Sync first then add
|
||||
|
||||
program.sync(&node, msg.clone(), io.clone());
|
||||
|
||||
sum += program.read_counter(&msg.src);
|
||||
}
|
||||
|
||||
msg.body.message_body =
|
||||
serde_json::to_value(GrowCounterReadMessage { value: sum }).unwrap();
|
||||
|
||||
program
|
||||
.reply(msg)
|
||||
.map_err(|e| format!("broadcast: error in writing response: {}", e))
|
||||
.send(io, msg.clone())
|
||||
.map_err(|e| format!("read: error in writing response: {}", e))
|
||||
},
|
||||
);
|
||||
|
||||
program.register(
|
||||
"topology",
|
||||
|mut msg: Message, program: Rc<Malestorm>| -> Result<(), String> {
|
||||
let topology: TopologyMessage = serde_json::from_value(msg.body.message_body.take())
|
||||
.map_err(|e| format!("error in parsing response: {}", e))?;
|
||||
"counter_sync",
|
||||
|mut msg: Message,
|
||||
program: Arc<RwLock<Malestorm>>,
|
||||
io: Arc<Mutex<MalestormIo>>|
|
||||
-> Result<(), String> {
|
||||
msg.body.message_type = "counter_sync_ok".into();
|
||||
|
||||
msg.body.message_type = "topology_ok".into();
|
||||
program.save_toplogy(topology);
|
||||
let program = program.write().unwrap();
|
||||
msg.body.message_body = serde_json::to_value(GrowCounterReadMessage {
|
||||
value: program.read_counter(&msg.src),
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
program
|
||||
.reply(msg)
|
||||
.map_err(|e| format!("broadcast: error in writing response: {}", e))
|
||||
.send(io, msg.clone())
|
||||
.map_err(|e| format!("read: error in writing response: {}", e))
|
||||
},
|
||||
);
|
||||
program.run();
|
||||
|
||||
let io = MalestormIo::default();
|
||||
let runtime = Runtime::new().unwrap();
|
||||
|
||||
program.run(runtime, io);
|
||||
}
|
||||
|
|
22
src/seq_kv.rs
Normal file
22
src/seq_kv.rs
Normal file
|
@ -0,0 +1,22 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MonotonicCounter {
|
||||
counter: HashMap<String, u64>,
|
||||
}
|
||||
|
||||
impl MonotonicCounter {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
counter: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read(&self, node: &str) -> u64 {
|
||||
*self.counter.get(node).unwrap_or(&0)
|
||||
}
|
||||
|
||||
pub fn write(&mut self, node: &str, v: u64) {
|
||||
*self.counter.entry(node.to_string()).or_insert(0) = v;
|
||||
}
|
||||
}
|
32
src/types.rs
32
src/types.rs
|
@ -1,8 +1,6 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Serialize, Clone, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct Message {
|
||||
pub id: Option<i64>,
|
||||
|
@ -11,7 +9,7 @@ pub struct Message {
|
|||
pub body: MessageBody,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Serialize, Clone, Deserialize)]
|
||||
pub struct MessageBody {
|
||||
pub msg_id: Option<i64>,
|
||||
pub in_reply_to: Option<i64>,
|
||||
|
@ -23,12 +21,6 @@ pub struct MessageBody {
|
|||
pub message_body: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct EchoMessage {
|
||||
echo: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct InitMessage {
|
||||
|
@ -39,26 +31,14 @@ pub struct InitMessage {
|
|||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct GenerateMessage {
|
||||
pub id: String,
|
||||
pub struct GrowCounterUpdateMessage {
|
||||
pub delta: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct BroadcastMessage {
|
||||
pub message: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct ReadMessage {
|
||||
pub messages: Vec<i64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TopologyMessage {
|
||||
pub topology: HashMap<String, Vec<String>>,
|
||||
pub struct GrowCounterReadMessage {
|
||||
pub value: u64,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
Loading…
Reference in New Issue
Block a user