diff --git a/Cargo.toml b/Cargo.toml index 0cd9642..5d2d6f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,8 +3,6 @@ name = "onebrc" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] fxhash = "0.2.1" rayon = "1.8.0" diff --git a/README.md b/README.md index 0e71426..01ee63d 100644 --- a/README.md +++ b/README.md @@ -2,5 +2,4 @@ * Fastest single threaded runtime: 29.7s (4.5s to read the file into memory, 24s to process rows and ~150micros to generate the output) - -* Fastest multi threaded runtime on a machine with 12 cores: - +* Fastest multi threaded runtime on a machine with 12 cores: read = 4.682276171s processed = 2.057576429s output_gen = 188.113µs diff --git a/src/main.rs b/src/main.rs index 8454c85..676f712 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,8 @@ use fxhash::FxHashMap; -use std::fs::File; -use std::io::Read; -use std::time::Instant; +use rayon::prelude::*; +use std::{fs::File, io::Read, time::Instant}; fn main() { - let mut map = FxHashMap::default(); let mut buf = Vec::new(); let t1 = Instant::now(); @@ -18,36 +16,58 @@ fn main() { let t2 = Instant::now(); - for line in buf.split(|&x| x == b'\n') { - let l = line.len(); - if l == 0 { - continue; - } + let map: FxHashMap<&[u8], (i16, i32, i16, usize)> = buf + .par_split(|&x| x == b'\n') + .filter_map(|line| { + let l = line.len(); + if l == 0 { + return None; + } - let loc = if line[l - 6] == b';' { - l - 6 - } else if line[l - 5] == b';' { - l - 5 - } else if line[l - 4] == b';' { - l - 4 - } else { - unreachable!(); - }; + let loc = if line[l - 6] == b';' { + l - 6 + } else if line[l - 5] == b';' { + l - 5 + } else if line[l - 4] == b';' { + l - 4 + } else { + unreachable!(); + }; - let (city, val) = line.split_at(loc); - let val = &val[1..]; + let (city, val) = line.split_at(loc); + let val = &val[1..]; - let val = parse_float(val); + let val = parse_float(val); - let (min, sum, max, counter) = map - .entry(city) - .or_insert_with(|| (std::i16::MAX, 0, std::i16::MIN, 0)); + Some((city, val)) + }) + .fold(FxHashMap::default, |mut a, (city, val)| { + let (min, sum, max, counter) = a + .entry(city) + .or_insert_with(|| (std::i16::MAX, 0, std::i16::MIN, 0usize)); - *min = val.min(*min); - *max = val.max(*max); - *counter += 1; - *sum += val as i32; - } + *min = val.min(*min); + *max = val.max(*max); + *counter += 1; + *sum += val as i32; + + a + }) + .reduce_with(|mut m1, m2| { + for (k, (min1, sum1, max1, counter1)) in m2 { + let (min, sum, max, counter) = m1 + .entry(k) + .or_insert_with(|| (std::i16::MAX, 0, std::i16::MIN, 0usize)); + + *min = min1.min(*min); + *max = max1.max(*max); + *counter += counter1; + *sum += sum1; + } + + m1 + }) + .unwrap(); let t2_elapsed = t2.elapsed();