From 5936b16b0e91c6877c7a8e01d2955e29b6e41932 Mon Sep 17 00:00:00 2001 From: Ishan Jain Date: Mon, 5 Jun 2023 04:38:36 +0530 Subject: [PATCH] Added a basic version of processor. It can listen on multiple ports and aggregate all of it in 1 place --- Cargo.lock | 203 +++++++++++++++++++++++++++++++++++++++--- Cargo.toml | 4 +- README.md | 20 +++++ config.toml | 1 + src/communications.rs | 131 +++++++++++++++++++++++++++ src/config.rs | 11 +++ src/lib.rs | 66 -------------- src/listener.rs | 2 +- src/main.rs | 29 +++++- src/processor.rs | 24 +++++ 10 files changed, 410 insertions(+), 81 deletions(-) create mode 100644 src/communications.rs delete mode 100644 src/lib.rs create mode 100644 src/processor.rs diff --git a/Cargo.lock b/Cargo.lock index 837bca6..f770819 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aho-corasick" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +dependencies = [ + "memchr", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -14,18 +23,86 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "cc" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "dns-parser" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4d33be9473d06f75f58220f71f7a9317aca647dc061dbd3c361b0bef505fbea" +dependencies = [ + "byteorder", + "quick-error", +] + +[[package]] +name = "env_logger" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "errno" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" +dependencies = [ + "errno-dragonfly", + "libc", + "windows-sys", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hermit-abi" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "indexmap" version = "1.9.3" @@ -36,12 +113,47 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys", +] + +[[package]] +name = "is-terminal" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" +dependencies = [ + "hermit-abi", + "io-lifetimes", + "rustix", + "windows-sys", +] + [[package]] name = "libc" version = "0.2.144" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + +[[package]] +name = "log" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de" + [[package]] name = "memchr" version = "2.5.0" @@ -61,10 +173,12 @@ dependencies = [ name = "multicaster" version = "0.1.0" dependencies = [ + "dns-parser", + "env_logger", + "log", "nix", "once_cell", "serde", - "socket2", "toml", ] @@ -103,6 +217,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quote" version = "1.0.28" @@ -112,6 +232,37 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81ca098a9821bd52d6b24fd8b10bd081f47d39c22778cafaa75a2857a62c6390" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" + +[[package]] +name = "rustix" +version = "0.37.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acf8729d8542766f1b2cf77eb034d52f40d375bb8b615d0b147089946e16613d" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys", +] + [[package]] name = "serde" version = "1.0.163" @@ -141,16 +292,6 @@ dependencies = [ "serde", ] -[[package]] -name = "socket2" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" -dependencies = [ - "libc", - "windows-sys", -] - [[package]] name = "static_assertions" version = "1.1.0" @@ -168,6 +309,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "termcolor" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" +dependencies = [ + "winapi-util", +] + [[package]] name = "toml" version = "0.7.4" @@ -208,6 +358,37 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 866d42b..b462204 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] +dns-parser = "0.8.0" +env_logger = "0.10.0" +log = "0.4.18" nix = "0.26.2" once_cell = "1.17.2" serde = { version = "1.0.163", features = ["derive"] } -socket2 = "0.5.3" toml = "0.7.4" diff --git a/README.md b/README.md index ac2dc64..bd20ae9 100644 --- a/README.md +++ b/README.md @@ -12,3 +12,23 @@ This causes problems if there are other softwares that are also listening withou For now, Disable those softwares when running this. A list of such softwares, a. avahi-daemon + + +### How should this be designed?? + + +For now, I am restricting it to only consider 1 config. +It won't listen on multiple ports for multicast traffic. This will be changed once I have the basic structure ready. + + +For now, Only work on IPv4. IPv6 will be added once IPv4 is ready + + + + + + + + + + diff --git a/config.toml b/config.toml index 1c7355f..cc5aae7 100644 --- a/config.toml +++ b/config.toml @@ -2,3 +2,4 @@ port = 5353 multicast_groups = ["224.0.0.251"] destinations = ["wlp6s0"] +traffic_type = "MDNS" diff --git a/src/communications.rs b/src/communications.rs new file mode 100644 index 0000000..3db24ec --- /dev/null +++ b/src/communications.rs @@ -0,0 +1,131 @@ +use std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, + sync::mpsc::{self, Receiver, Sender}, + thread::{self, JoinHandle}, +}; + +use crate::{ + listener::{join_multicast, new_socket}, + Config, +}; +use log::{debug, info, warn}; +use once_cell::sync::Lazy; +pub static IPV4: Lazy = Lazy::new(|| Ipv4Addr::new(0, 0, 0, 0).into()); +pub static IPV6: Lazy = Lazy::new(|| { + Ipv6Addr::new(0x2a0a, 0x6040, 0x4004, 0x10, 0xf1c6, 0xf2b0, 0x9f45, 0xb425).into() +}); + +pub struct Communications { + // TODO(ishan): Accommodate IPv6 sockets as well here + sockets: HashMap, + handles: Vec>, + + tx_chan: Sender, + rx_chan: Receiver, +} + +pub struct Event { + pub remote_addr: SocketAddr, + pub payload: Vec, +} + +impl Communications { + pub fn new(input: Config) -> Result { + let mut sockets = HashMap::new(); + + // This channel is used to send packets from this module to the processor + // TODO(ishan): Eventually, swap out std::mpsc for some thing faster + // or switch to a different model completely that doesn't use channels like this + let (rx, tx) = mpsc::channel(); + + for (_, v) in input.config.iter() { + let socket = + new_socket(&SocketAddr::new(*IPV4, v.port)).expect("error in bind on ipv4 address"); + + info!("started listening on {}:{}", *IPV4, v.port); + + for group in v.multicast_groups.iter() { + join_multicast(&socket, group) + .map_err(|e| format!("error in joining multicast group {}: {}", group, e))?; + + info!("joined multicast group {}", group); + } + + sockets.insert(v.port, socket); + } + + Ok(Communications { + sockets, + handles: vec![], + rx_chan: tx, + tx_chan: rx, + }) + } + + pub fn start_listeners(&mut self) -> Result<(), String> { + let sockets = self + .clone_sockets() + .map_err(|e| format!("error in cloning sockets: {}", e))?; + + for (port, v) in sockets { + let socket = v + .try_clone() + .map_err(|e| format!("error in cloning socket for read ops: {}", e))?; + let tx_chan = self.tx_chan.clone(); + + let handle = thread::spawn(move || { + // TODO(ishan): What should be the size here ? + + loop { + let mut buf = [0u8; 2000]; + + match socket.recv_from(&mut buf) { + Ok((len, remote_addr)) => { + let buf = &buf[..len]; + + tx_chan + .send(Event { + remote_addr, + payload: buf.to_vec(), + }) + .expect("error in sending to mpsc channel"); + + debug!("read {}bytes from {}", len, remote_addr); + } + Err(e) => { + warn!("error in reading from socket {}: {}", port, e); + continue; + } + }; + } + }); + + self.handles.push(handle); + } + + Ok(()) + } + + pub fn wait(&mut self) { + while let Some(handle) = self.handles.pop() { + handle.join().unwrap(); + } + } + + fn clone_sockets(&self) -> Result, std::io::Error> { + let mut sockets = HashMap::new(); + + for (k, v) in self.sockets.iter() { + sockets.insert(*k, v.try_clone()?); + } + + Ok(sockets) + } + + pub fn get_reader(&self) -> &Receiver { + &self.rx_chan + } + + // TODO(ishan): Add a function to send messages from a port +} diff --git a/src/config.rs b/src/config.rs index 0374530..0ef755f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,3 +1,4 @@ +use log::debug; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, fs::File, io::Read, net::IpAddr}; @@ -11,6 +12,12 @@ pub struct Record { pub port: u16, pub multicast_groups: Vec, pub destinations: Vec, + pub traffic_type: TrafficType, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum TrafficType { + MDNS, } impl Config { @@ -18,6 +25,10 @@ impl Config { pub fn parse(mut filename: &str) -> Result { if filename.is_empty() { + debug!( + "filename is empty. using default name: {}", + Config::FILENAME + ); filename = Config::FILENAME; } diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index c352266..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,66 +0,0 @@ -// TODO(ishan): Eventually we'll have a listener and transmitter module for every thing we want to -// support. So, 1 for MDNS, another for WSDD? -// Or a common listener/transmitter and then different modules to parse and transmit each type of -// traffic -pub mod config; -pub use config::*; -pub mod listener; - -use crate::listener::{join_multicast, new_socket}; -use once_cell::sync::Lazy; -use std::{ - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, - thread, -}; - -pub static IPV4: Lazy = Lazy::new(|| Ipv4Addr::new(0, 0, 0, 0).into()); -pub static IPV6: Lazy = Lazy::new(|| { - Ipv6Addr::new(0x2a0a, 0x6040, 0x4004, 0x10, 0xf1c6, 0xf2b0, 0x9f45, 0xb425).into() -}); - -pub fn start(config: crate::Config) { - // TODO(ishan): Start listeners and transmitters on v4 and v6 here - - let mut handles = vec![]; - - for (_, v) in config.config { - let handle = thread::spawn(move || { - let mut ipv4_socket = new_socket(&SocketAddr::new(*IPV4, v.port)) - .expect("error in bind op on ipv4 address"); - - let mut ipv6_socket = new_socket(&SocketAddr::new(*IPV6, v.port)) - .expect("error in bind op on ipv6 address"); - - for group in v.multicast_groups { - join_multicast(&mut ipv4_socket, &group) - .expect("error in joining ipv4 multicast group"); - join_multicast(&mut ipv6_socket, &group) - .expect("error in joining ipv6 multicast group"); - println!( - "Listening for multicast packets on ipv4/ipv6 in group {}:{}", - group, v.port - ); - } - let mut buf = [0u8; 64]; - match ipv4_socket.recv_from(&mut buf) { - Ok((len, remote_addr)) => { - let buf = &buf[..len]; - println!("received {}bytes response from {}", len, remote_addr); - - let v = String::from_utf8_lossy(buf); - - println!("output = {:?}", v); - } - Err(e) => { - println!("got an error: {}", e); - } - } - }); - - handles.push(handle); - } - - for handle in handles { - handle.join().unwrap() - } -} diff --git a/src/listener.rs b/src/listener.rs index aebd239..f0c920c 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -17,7 +17,7 @@ pub fn new_socket(addr: &SocketAddr) -> Result { Ok(socket) } -pub fn join_multicast(socket: &mut UdpSocket, group: &IpAddr) -> IoResult<()> { +pub fn join_multicast(socket: &UdpSocket, group: &IpAddr) -> IoResult<()> { // TODO(ishan): Eventually, this should only listen on the // interfaces specified in config.toml match group { diff --git a/src/main.rs b/src/main.rs index ccc0cb3..b61e486 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,33 @@ -use multicaster::Config; +// TODO(ishan): Eventually we'll have a listener and transmitter module for every thing we want to +// support. So, 1 for MDNS, another for WSDD? +// Or a common listener/transmitter and then different modules to parse and transmit each type of +// traffic + +pub mod communications; +pub use communications::*; +pub mod config; +pub use config::*; +pub mod listener; +pub mod processor; +pub use processor::*; + fn main() { + env_logger::init(); + let config = Config::parse("config.toml").expect("error in parsing config"); println!("{:?}", config); - multicaster::start(config); + // TODO(ishan): Start listeners and transmitters on v4 and v6 here + let mut comms = Communications::new(config).expect("error in starting comms"); + + comms + .start_listeners() + .expect("error in starting listeners"); + + let processor = Processor::new(comms.get_reader()); + + processor.start_read_loop(); + + comms.wait(); } diff --git a/src/processor.rs b/src/processor.rs new file mode 100644 index 0000000..4453086 --- /dev/null +++ b/src/processor.rs @@ -0,0 +1,24 @@ +use dns_parser::Packet; +use log::info; +use std::sync::mpsc::Receiver; + +use crate::Event; + +pub struct Processor<'a> { + reader: &'a Receiver, +} + +impl<'a> Processor<'a> { + pub fn new(reader: &'a Receiver) -> Self { + Self { reader } + } + + pub fn start_read_loop(&self) { + for evt in self.reader { + let packet = + Packet::parse(&evt.payload).expect("failed to parse packet as a dns packet"); + + info!("{:?}", packet); + } + } +}