From 29155935a09571333a0e7b9d958f83138ab83dcb Mon Sep 17 00:00:00 2001 From: Ishan Jain Date: Fri, 28 Jul 2023 15:17:52 +0530 Subject: [PATCH] switched to tokio, cleaned up some interfaces --- Cargo.lock | 166 ++++++++++++++++++++++++++---------------- Cargo.toml | 5 +- src/communications.rs | 105 ++++++++------------------ src/main.rs | 33 ++++++--- src/processor.rs | 29 ++++---- src/socket_manager.rs | 0 6 files changed, 169 insertions(+), 169 deletions(-) delete mode 100644 src/socket_manager.rs diff --git a/Cargo.lock b/Cargo.lock index fb9c83e..bdf6547 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aho-corasick" version = "1.0.2" @@ -17,6 +32,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -47,12 +77,6 @@ version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" -[[package]] -name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" - [[package]] name = "cfg-if" version = "1.0.0" @@ -137,6 +161,12 @@ dependencies = [ "libc", ] +[[package]] +name = "gimli" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" + [[package]] name = "hashbrown" version = "0.14.0" @@ -201,12 +231,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] -name = "memoffset" +name = "miniz_oxide" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" dependencies = [ - "autocfg", + "adler", +] + +[[package]] +name = "mio" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +dependencies = [ + "libc", + "wasi", + "windows-sys", ] [[package]] @@ -217,8 +258,8 @@ checksum = "53eacba0998466687d051439bf144286203d7286623275a8007505d22fd21cfb" dependencies = [ "get_if_addrs", "libc", - "nix 0.19.1", - "socket2", + "nix", + "socket2 0.3.19", "winapi 0.3.9", ] @@ -232,37 +273,11 @@ dependencies = [ "libc", "log", "multicast-socket", - "net2", - "network-interface", - "nix 0.26.2", - "once_cell", "serde", + "tokio", "toml", ] -[[package]] -name = "net2" -version = "0.2.39" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b13b648036a2339d06de780866fbdfda0dde886de7b3af2ddeba8b14f4ee34ac" -dependencies = [ - "cfg-if 0.1.10", - "libc", - "winapi 0.3.9", -] - -[[package]] -name = "network-interface" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "296ae2183dab440e352740b1fcf0589d8058afa40bc29fd163227ddb01fbf539" -dependencies = [ - "cc", - "libc", - "thiserror", - "winapi 0.3.9", -] - [[package]] name = "nix" version = "0.19.1" @@ -271,35 +286,34 @@ checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2" dependencies = [ "bitflags 1.3.2", "cc", - "cfg-if 1.0.0", + "cfg-if", "libc", ] [[package]] -name = "nix" -version = "0.26.2" +name = "num_cpus" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "bitflags 1.3.2", - "cfg-if 1.0.0", + "hermit-abi", "libc", - "memoffset", - "pin-utils", - "static_assertions", ] [[package]] -name = "once_cell" -version = "1.18.0" +name = "object" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1" +dependencies = [ + "memchr", +] [[package]] -name = "pin-utils" -version = "0.1.0" +name = "pin-project-lite" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" [[package]] name = "proc-macro2" @@ -354,6 +368,12 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2" +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "rustix" version = "0.38.4" @@ -402,16 +422,20 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "winapi 0.3.9", ] [[package]] -name = "static_assertions" -version = "1.1.0" +name = "socket2" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi 0.3.9", +] [[package]] name = "syn" @@ -434,19 +458,27 @@ dependencies = [ ] [[package]] -name = "thiserror" -version = "1.0.44" +name = "tokio" +version = "1.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90" +checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" dependencies = [ - "thiserror-impl", + "autocfg", + "backtrace", + "libc", + "mio", + "num_cpus", + "pin-project-lite", + "socket2 0.4.9", + "tokio-macros", + "windows-sys", ] [[package]] -name = "thiserror-impl" -version = "1.0.44" +name = "tokio-macros" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", @@ -493,6 +525,12 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "winapi" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index a1b3741..7e102c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,6 @@ get_if_addrs = "0.5.3" libc = "0.2.147" log = "0.4.18" multicast-socket = "0.2.2" -net2 = "0.2.39" -network-interface = "1.0.1" -nix = "0.26.2" -once_cell = "1.17.2" serde = { version = "1.0.163", features = ["derive"] } +tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread", "net"] } toml = "0.7.4" diff --git a/src/communications.rs b/src/communications.rs index fee5d43..2881cce 100644 --- a/src/communications.rs +++ b/src/communications.rs @@ -1,92 +1,47 @@ -use crate::Config; -use log::{info, trace}; +use log::{info, trace, warn}; use multicast_socket::{Message, MulticastOptions, MulticastSocket}; -use std::{ - net::SocketAddrV4, - sync::mpsc::{self, Receiver, Sender}, - thread::{self, JoinHandle}, -}; +use std::{net::SocketAddrV4, sync::mpsc::Sender}; pub struct Communications { - // TODO(ishan): Accommodate IPv6 sockets as well here - handles: Vec>, - tx_chan: Sender, - rx_chan: Receiver, -} - -#[derive(Debug)] -pub struct Event { - pub msg: Message, + tx_chan: Sender, } impl Communications { - pub fn new(config: Config) -> Result { - // This channel is used to send packets from this module to the processor - // TODO(ishan): Eventually, swap out std::mpsc for some thing faster - // or switch to a different model completely that doesn't use channels like this - let (rx, tx) = mpsc::channel(); - - Ok(Communications { - handles: vec![], - rx_chan: tx, - tx_chan: rx, - }) + pub fn new(tx_chan: Sender) -> Result { + Ok(Communications { tx_chan }) } - pub fn start_listeners(&mut self) -> Result<(), String> { + pub async fn start_listeners(&mut self) -> Result<(), String> { info!("listener started"); let tx_chan = self.tx_chan.clone(); - let handle = thread::spawn(move || { - // TODO(ishan): What should be the size here ? + let interfaces = get_if_addrs::get_if_addrs().unwrap(); + trace!("Interfaces list: {:?}", interfaces); - let interfaces = get_if_addrs::get_if_addrs().unwrap(); - trace!("Interfaces list: {:?}", interfaces); + // mdns + let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353); + let multicast_socket = MulticastSocket::with_options( + mdns_address, + multicast_socket::all_ipv4_interfaces().expect("could not fetch all interfaces"), + MulticastOptions { + loopback: false, + buffer_size: 4096, + ..Default::default() + }, + ) + .expect("error in creating multicast socket"); - // mdns - let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353); - let multicast_socket = MulticastSocket::with_options( - mdns_address, - multicast_socket::all_ipv4_interfaces().expect("could not fetch all interfaces"), - MulticastOptions { - loopback: false, - buffer_size: 4096, - ..Default::default() - }, - ) - .expect("error in creating multicast socket"); - - loop { - match multicast_socket.receive() { - Ok(msg) => { - tx_chan - .send(Event { msg }) - .expect("error in sending to mpsc channel"); - } - Err(_) => { - // TODO: Log all buy EAGAIN - // warn!("error in reading from socket {}: {}", port, e); - continue; - } - }; - } - }); - - self.handles.push(handle); - - Ok(()) - } - - pub fn wait(&mut self) { - while let Some(handle) = self.handles.pop() { - handle.join().unwrap(); + loop { + match multicast_socket.receive() { + Ok(msg) => { + tx_chan.send(msg).expect("error in sending to mpsc channel"); + } + Err(e) if e.to_string().contains("EAGAIN") => continue, + Err(e) => { + warn!("error in reading from socket {:?} ", e); + } + }; } } - - pub fn get_reader(&self) -> &Receiver { - &self.rx_chan - } - - // TODO(ishan): Add a function to send messages from a port } diff --git a/src/main.rs b/src/main.rs index 406b2b2..6490447 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,20 @@ +#![feature(async_closure)] // TODO(ishan): Eventually we'll have a listener and transmitter module for every thing we want to // support. So, 1 for MDNS, another for WSDD? // Or a common listener/transmitter and then different modules to parse and transmit each type of // traffic -pub mod socket_manager; -pub use socket_manager::*; pub mod communications; +use std::sync::mpsc::{self, Receiver, Sender}; + pub use communications::*; pub mod config; pub use config::*; pub mod processor; use log::info; +use multicast_socket::Message; pub use processor::*; +use tokio::runtime::Runtime; fn main() { env_logger::init(); @@ -22,16 +25,24 @@ fn main() { println!("{:?}", config); - // TODO(ishan): Start listeners and transmitters on v4 and v6 here - let mut comms = Communications::new(config.clone()).expect("error in starting comms"); - comms - .start_listeners() - .expect("error in starting listeners"); + let rt = Runtime::new().expect("error in creating runtime"); - let processor = - Processor::new(comms.get_reader(), config).expect("error in starting processor"); + // This channel is used to send packets from this module to the processor + // TODO(ishan): Eventually, swap out std::mpsc for some thing faster + // or switch to a different model completely that doesn't use channels like this + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + + // TODO(ishan): Start listeners and transmitters on v4 and v6 here + let mut comms = Communications::new(tx).expect("error in starting comms"); + + rt.spawn(async move { + comms + .start_listeners() + .await + .expect("error in comms listener"); + }); + + let processor = Processor::new(rx, config).expect("error in starting processor"); processor.start_read_loop(); - - comms.wait(); } diff --git a/src/processor.rs b/src/processor.rs index b3b0b29..0192cc1 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,20 +1,20 @@ -use crate::{Config, Event}; +use crate::Config; use dns_parser::Packet; -use log::{info, warn}; -use multicast_socket::{Interface, MulticastOptions, MulticastSocket}; +use log::info; +use multicast_socket::{Interface, Message, MulticastOptions, MulticastSocket}; use std::{ffi::CString, net::SocketAddrV4, sync::mpsc::Receiver}; -pub struct Processor<'a> { - reader: &'a Receiver, +pub struct Processor { + reader: Receiver, config: Config, } -impl<'a> Processor<'a> { - pub fn new(reader: &'a Receiver, config: Config) -> Result { +impl Processor { + pub fn new(reader: Receiver, config: Config) -> Result { Ok(Self { reader, config }) } - pub fn start_read_loop(&self) { + pub fn start_read_loop(self) { // mdns let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353); let socket = MulticastSocket::with_options( @@ -30,12 +30,11 @@ impl<'a> Processor<'a> { for evt in self.reader { // TODO: Generalize this to parse any type of supported packet - let packet = - Packet::parse(&evt.msg.data).expect("failed to parse packet as a dns packet"); + let packet = Packet::parse(&evt.data).expect("failed to parse packet as a dns packet"); let interfaces = get_if_addrs::get_if_addrs().unwrap(); - let src_ifname = if let Interface::Index(idx) = evt.msg.interface { + let src_ifname = if let Interface::Index(idx) = evt.interface { ifidx_to_ifname(idx as u32) } else { "lo".to_string() @@ -44,8 +43,8 @@ impl<'a> Processor<'a> { info!( "EVENT src-if = {} if-index {:?} address = {}, packet: {:?} answers = {:?}", src_ifname, - evt.msg.interface, - evt.msg.origin_address, + evt.interface, + evt.origin_address, packet.questions.iter().map(|q| q.qname).collect::>(), packet.answers.iter().map(|q| q.name).collect::>() ); @@ -72,7 +71,7 @@ impl<'a> Processor<'a> { // TODO(ishan): Take a note of transaction id // and avoid feedback loops socket - .send(&evt.msg.data, &Interface::Index(dst_ifid as i32)) + .send(&evt.data, &Interface::Index(dst_ifid as i32)) .expect("error in sending mdns packet"); } } @@ -103,7 +102,7 @@ impl<'a> Processor<'a> { ); socket - .send(&evt.msg.data, &Interface::Index(dst_ifid as i32)) + .send(&evt.data, &Interface::Index(dst_ifid as i32)) .expect("error in sending mdns packet"); } } diff --git a/src/socket_manager.rs b/src/socket_manager.rs deleted file mode 100644 index e69de29..0000000