From b2a1e152e259969463024541ed713da78ec71d8f Mon Sep 17 00:00:00 2001 From: Ishan Jain Date: Thu, 27 Jul 2023 23:43:22 +0530 Subject: [PATCH] Updated communicator & processor Trying to figure out the way to know the right ingress interface --- Cargo.lock | 235 ++++++++++++++++++++++++++++++------------ Cargo.toml | 3 + README.md | 10 +- config.toml | 5 +- src/communications.rs | 129 +++++++++++++++++------ src/config.rs | 23 +++-- src/listener.rs | 34 ------ src/main.rs | 11 +- src/processor.rs | 111 ++++++++++++++++++-- src/socket_manager.rs | 0 10 files changed, 407 insertions(+), 154 deletions(-) delete mode 100644 src/listener.rs create mode 100644 src/socket_manager.rs diff --git a/Cargo.lock b/Cargo.lock index f770819..d40dda3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,18 +23,36 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" + [[package]] name = "byteorder" version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "c_linked_list" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4964518bd3b4a8190e832886cdc0da9794f12e8e6c1613a9e90ff331c4c8724b" + [[package]] name = "cc" version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -64,6 +82,12 @@ dependencies = [ "termcolor", ] +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" version = "0.3.1" @@ -86,16 +110,44 @@ dependencies = [ ] [[package]] -name = "hashbrown" -version = "0.12.3" +name = "gcc" +version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" + +[[package]] +name = "get_if_addrs" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abddb55a898d32925f3148bd281174a68eeb68bbfd9a5938a57b18f506ee4ef7" +dependencies = [ + "c_linked_list", + "get_if_addrs-sys", + "libc", + "winapi 0.2.8", +] + +[[package]] +name = "get_if_addrs-sys" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d04f9fb746cf36b191c00f3ede8bde9c8e64f9f4b05ae2694a9ccf5e3f5ab48" +dependencies = [ + "gcc", + "libc", +] + +[[package]] +name = "hashbrown" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" [[package]] name = "hermit-abi" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" [[package]] name = "humantime" @@ -105,54 +157,42 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "indexmap" -version = "1.9.3" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" dependencies = [ - "autocfg", + "equivalent", "hashbrown", ] -[[package]] -name = "io-lifetimes" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" -dependencies = [ - "hermit-abi", - "libc", - "windows-sys", -] - [[package]] name = "is-terminal" -version = "0.4.7" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" +checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", - "io-lifetimes", "rustix", "windows-sys", ] [[package]] name = "libc" -version = "0.2.144" +version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" [[package]] name = "linux-raw-sys" -version = "0.3.8" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" +checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" [[package]] name = "log" -version = "0.4.18" +version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de" +checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" [[package]] name = "memchr" @@ -175,21 +215,47 @@ version = "0.1.0" dependencies = [ "dns-parser", "env_logger", + "get_if_addrs", "log", + "net2", + "network-interface", "nix", "once_cell", "serde", "toml", ] +[[package]] +name = "net2" +version = "0.2.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b13b648036a2339d06de780866fbdfda0dde886de7b3af2ddeba8b14f4ee34ac" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "network-interface" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "296ae2183dab440e352740b1fcf0589d8058afa40bc29fd163227ddb01fbf539" +dependencies = [ + "cc", + "libc", + "thiserror", + "winapi 0.3.9", +] + [[package]] name = "nix" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" dependencies = [ - "bitflags", - "cfg-if", + "bitflags 1.3.2", + "cfg-if 1.0.0", "libc", "memoffset", "pin-utils", @@ -198,9 +264,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.17.2" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9670a07f94779e00908f3e686eab508878ebb390ba6e604d3a284c00e8d0487b" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" [[package]] name = "pin-utils" @@ -210,9 +276,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "proc-macro2" -version = "1.0.59" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aeca18b86b413c660b781aa319e4e2648a3e6f9eadc9b47e9038e6fe9f3451b" +checksum = "92de25114670a878b1261c79c9f8f729fb97e95bac93f6312f583c60dd6a1dfe" dependencies = [ "unicode-ident", ] @@ -225,18 +291,30 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quote" -version = "1.0.28" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +checksum = "5907a1b7c277254a8b15170f6e7c97cfa60ee7872a3217663bb81151e48184bb" dependencies = [ "proc-macro2", ] [[package]] name = "regex" -version = "1.8.3" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81ca098a9821bd52d6b24fd8b10bd081f47d39c22778cafaa75a2857a62c6390" +checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39354c10dd07468c2e73926b23bb9c2caca74c5501e38a35da70406f1d923310" dependencies = [ "aho-corasick", "memchr", @@ -245,19 +323,18 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.7.2" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" +checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2" [[package]] name = "rustix" -version = "0.37.19" +version = "0.38.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acf8729d8542766f1b2cf77eb034d52f40d375bb8b615d0b147089946e16613d" +checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" dependencies = [ - "bitflags", + "bitflags 2.3.3", "errno", - "io-lifetimes", "libc", "linux-raw-sys", "windows-sys", @@ -265,18 +342,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.163" +version = "1.0.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2" +checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.163" +version = "1.0.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" +checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" dependencies = [ "proc-macro2", "quote", @@ -285,9 +362,9 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93107647184f6027e3b7dcb2e11034cf95ffa1e3a682c67951963ac69c1c007d" +checksum = "96426c9936fd7a0124915f9185ea1d20aa9445cc9821142f0a73bc9207a2e186" dependencies = [ "serde", ] @@ -300,9 +377,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "syn" -version = "2.0.18" +version = "2.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" +checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970" dependencies = [ "proc-macro2", "quote", @@ -319,10 +396,30 @@ dependencies = [ ] [[package]] -name = "toml" -version = "0.7.4" +name = "thiserror" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec" +checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "toml" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17e963a819c331dcacd7ab957d80bc2b9a9c1e71c804826d2f283dd65306542" dependencies = [ "serde", "serde_spanned", @@ -332,18 +429,18 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f" +checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" dependencies = [ "serde", ] [[package]] name = "toml_edit" -version = "0.19.10" +version = "0.19.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739" +checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a" dependencies = [ "indexmap", "serde", @@ -354,9 +451,15 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.9" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" +checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" + +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" [[package]] name = "winapi" @@ -380,7 +483,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -400,9 +503,9 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.48.0" +version = "0.48.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" dependencies = [ "windows_aarch64_gnullvm", "windows_aarch64_msvc", @@ -457,9 +560,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "winnow" -version = "0.4.6" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699" +checksum = "81fac9742fd1ad1bd9643b991319f72dd031016d44b77039a26977eb667141e7" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index b462204..3667e70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,10 @@ edition = "2021" [dependencies] dns-parser = "0.8.0" env_logger = "0.10.0" +get_if_addrs = "0.5.3" log = "0.4.18" +net2 = "0.2.39" +network-interface = "1.0.1" nix = "0.26.2" once_cell = "1.17.2" serde = { version = "1.0.163", features = ["derive"] } diff --git a/README.md b/README.md index bd20ae9..b9fa094 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,15 @@ For now, Only work on IPv4. IPv6 will be added once IPv4 is ready - +#### MDNS + +1. If a DNS Query comes on the source interface, We don't forward it to the destination. We want the destination to be able to resolve mdns hosts in source. A Query from source should not be forwarded to the destination. + +2. A DNS answer from source should be forwarded to the destination _if_ the domain name is in the allow list for that config. + +3. A DNS query from destination should not be forwarded to source if it is not in the allow list for the config + +4. A DNS answer should not be forwarded from destination to source in any circumstances diff --git a/config.toml b/config.toml index cc5aae7..446ddbb 100644 --- a/config.toml +++ b/config.toml @@ -1,5 +1,6 @@ -[config.mdns] +[[mdns]] port = 5353 multicast_groups = ["224.0.0.251"] +sources = ["enp7s0", "lo"] destinations = ["wlp6s0"] -traffic_type = "MDNS" +filters = [ "emerald.local" ] diff --git a/src/communications.rs b/src/communications.rs index 3db24ec..881f747 100644 --- a/src/communications.rs +++ b/src/communications.rs @@ -1,37 +1,77 @@ +use crate::Config; +use get_if_addrs::IfAddr; +use log::{debug, info, trace, warn}; +use net2::UdpBuilder; use std::{ collections::HashMap, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, + io::{Error as IoError, Result as IoResult}, + net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::mpsc::{self, Receiver, Sender}, thread::{self, JoinHandle}, + time::Duration, }; -use crate::{ - listener::{join_multicast, new_socket}, - Config, -}; -use log::{debug, info, warn}; -use once_cell::sync::Lazy; -pub static IPV4: Lazy = Lazy::new(|| Ipv4Addr::new(0, 0, 0, 0).into()); -pub static IPV6: Lazy = Lazy::new(|| { - Ipv6Addr::new(0x2a0a, 0x6040, 0x4004, 0x10, 0xf1c6, 0xf2b0, 0x9f45, 0xb425).into() -}); - pub struct Communications { // TODO(ishan): Accommodate IPv6 sockets as well here - sockets: HashMap, + sockets: HashMap<(String, IpAddr, u16), CommSocket>, handles: Vec>, tx_chan: Sender, rx_chan: Receiver, } +#[derive(Debug)] pub struct Event { + pub src_if: String, + pub local_addr: IpAddr, pub remote_addr: SocketAddr, pub payload: Vec, } +#[derive(Debug)] +pub struct CommSocket(UdpSocket); + +impl CommSocket { + pub fn new(addr: IpAddr, port: u16) -> Result { + // TODO(ishan): Remove expect + let addr = SocketAddr::new(addr, port); + + let socket = match addr { + SocketAddr::V4(_) => UdpBuilder::new_v4()?.reuse_address(true)?.bind(addr)?, + SocketAddr::V6(_) => UdpBuilder::new_v6()?.reuse_address(true)?.bind(addr)?, + }; + + socket.set_read_timeout(Some(Duration::from_millis(100)))?; + socket.set_nonblocking(false)?; + + Ok(CommSocket(socket)) + } + + pub fn join_multicast_group(&self, ip_addr: &IpAddr, group: &IpAddr) -> IoResult<()> { + match (group, ip_addr) { + (IpAddr::V4(ref group), IpAddr::V4(ref v4)) => { + self.0.set_multicast_loop_v4(false)?; + // TODO(ishan): This should be an input from config.toml + self.0.join_multicast_v4(group, v4) + } + (IpAddr::V6(ref group), IpAddr::V6(_)) => { + self.0.set_multicast_loop_v6(false)?; + self.0.join_multicast_v6(group, 0) + } + + (_, _) => unreachable!(), + } + } + + // TODO(ishan): Add a method to send messages + pub fn send(&self, buf: &[u8], addr: SocketAddr) -> Result { + self.0.set_ttl(1)?; + self.0.send_to(buf, addr) + } +} + impl Communications { - pub fn new(input: Config) -> Result { + pub fn new(config: Config) -> Result { let mut sockets = HashMap::new(); // This channel is used to send packets from this module to the processor @@ -39,22 +79,45 @@ impl Communications { // or switch to a different model completely that doesn't use channels like this let (rx, tx) = mpsc::channel(); - for (_, v) in input.config.iter() { - let socket = - new_socket(&SocketAddr::new(*IPV4, v.port)).expect("error in bind on ipv4 address"); + let interfaces = get_if_addrs::get_if_addrs().unwrap(); + trace!("Interfaces list: {:?}", interfaces); - info!("started listening on {}:{}", *IPV4, v.port); + for v in &config.mdns { + // Find addresses on the specified sources interfaces + // Listen on all addresses with SO_REUSE_ADDR and use interface's IP address + // when joining multicast group + // Only working with IPv4 addresses for now - for group in v.multicast_groups.iter() { - join_multicast(&socket, group) - .map_err(|e| format!("error in joining multicast group {}: {}", group, e))?; + for src_if_name in &v.sources { + let src_if = interfaces.iter().find(|x| &x.name == src_if_name).unwrap(); - info!("joined multicast group {}", group); + let listen_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); + + let ip_addr = match src_if.addr { + IfAddr::V4(ref v4) => IpAddr::V4(v4.ip), + IfAddr::V6(ref v6) => IpAddr::V6(v6.ip), + }; + + let socket = + CommSocket::new(listen_addr, v.port).expect("error in bind on address"); + + info!("started listening on {}:{}", listen_addr, v.port); + + for group in v.multicast_groups.iter() { + socket.join_multicast_group(&ip_addr, group).map_err(|e| { + format!("error in joining multicast group {}: {}", group, e) + })?; + + info!( + "joined multicast group {} with address = {}", + group, ip_addr + ); + } + sockets.insert((src_if_name.clone(), ip_addr, v.port), socket); } - - sockets.insert(v.port, socket); } + info!("{:?}", sockets); Ok(Communications { sockets, handles: vec![], @@ -67,18 +130,20 @@ impl Communications { let sockets = self .clone_sockets() .map_err(|e| format!("error in cloning sockets: {}", e))?; + info!("listener started"); - for (port, v) in sockets { + for ((src_if, local_addr, _), v) in sockets { let socket = v .try_clone() .map_err(|e| format!("error in cloning socket for read ops: {}", e))?; + let tx_chan = self.tx_chan.clone(); let handle = thread::spawn(move || { // TODO(ishan): What should be the size here ? loop { - let mut buf = [0u8; 2000]; + let mut buf = [0u8; 9000]; match socket.recv_from(&mut buf) { Ok((len, remote_addr)) => { @@ -86,15 +151,18 @@ impl Communications { tx_chan .send(Event { + src_if: src_if.clone(), remote_addr, + local_addr, payload: buf.to_vec(), }) .expect("error in sending to mpsc channel"); debug!("read {}bytes from {}", len, remote_addr); } - Err(e) => { - warn!("error in reading from socket {}: {}", port, e); + Err(_) => { + // TODO: Log all buy EAGAIN + // warn!("error in reading from socket {}: {}", port, e); continue; } }; @@ -113,11 +181,12 @@ impl Communications { } } - fn clone_sockets(&self) -> Result, std::io::Error> { + fn clone_sockets(&self) -> Result, std::io::Error> { let mut sockets = HashMap::new(); for (k, v) in self.sockets.iter() { - sockets.insert(*k, v.try_clone()?); + // TODO(ishan): find a better solution here + sockets.insert(k.clone(), v.0.try_clone()?); } Ok(sockets) diff --git a/src/config.rs b/src/config.rs index 0ef755f..573b183 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,23 +1,24 @@ use log::debug; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, fs::File, io::Read, net::IpAddr}; +use std::{ + collections::{HashMap, HashSet}, + fs::File, + io::Read, + net::IpAddr, +}; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { - pub config: HashMap, + pub mdns: Vec, } -#[derive(Debug, Serialize, Deserialize)] -pub struct Record { +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MdnsConfig { pub port: u16, pub multicast_groups: Vec, pub destinations: Vec, - pub traffic_type: TrafficType, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum TrafficType { - MDNS, + pub sources: Vec, + pub filters: HashSet, } impl Config { diff --git a/src/listener.rs b/src/listener.rs deleted file mode 100644 index f0c920c..0000000 --- a/src/listener.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::{ - io::{Error, Result as IoResult}, - net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, - os::fd::AsRawFd, -}; - -use nix::sys::socket::sockopt::ReuseAddr; - -pub fn new_socket(addr: &SocketAddr) -> Result { - let socket = UdpSocket::bind(addr)?; - - #[cfg(unix)] - nix::sys::socket::setsockopt(socket.as_raw_fd(), ReuseAddr, &true)?; - - // socket.set_read_timeout(Some(Duration::from_millis(100)))?; - - Ok(socket) -} - -pub fn join_multicast(socket: &UdpSocket, group: &IpAddr) -> IoResult<()> { - // TODO(ishan): Eventually, this should only listen on the - // interfaces specified in config.toml - match group { - IpAddr::V4(ref mdns_v4) => { - socket.multicast_loop_v4()?; - socket.join_multicast_v4(mdns_v4, &Ipv4Addr::new(0, 0, 0, 0)) - } - IpAddr::V6(ref mdns_v6) => { - socket.multicast_loop_v6()?; - - socket.join_multicast_v6(mdns_v6, 0) - } - } -} diff --git a/src/main.rs b/src/main.rs index b61e486..9ea0ac9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,29 +3,34 @@ // Or a common listener/transmitter and then different modules to parse and transmit each type of // traffic +pub mod socket_manager; +pub use socket_manager::*; pub mod communications; pub use communications::*; pub mod config; pub use config::*; -pub mod listener; pub mod processor; +use log::info; pub use processor::*; fn main() { env_logger::init(); + info!("starting up"); + let config = Config::parse("config.toml").expect("error in parsing config"); println!("{:?}", config); // TODO(ishan): Start listeners and transmitters on v4 and v6 here - let mut comms = Communications::new(config).expect("error in starting comms"); + let mut comms = Communications::new(config.clone()).expect("error in starting comms"); comms .start_listeners() .expect("error in starting listeners"); - let processor = Processor::new(comms.get_reader()); + let processor = + Processor::new(comms.get_reader(), config).expect("error in starting processor"); processor.start_read_loop(); diff --git a/src/processor.rs b/src/processor.rs index 4453086..b6e34a4 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,24 +1,121 @@ +use crate::{communications::CommSocket, Config, Event}; use dns_parser::Packet; -use log::info; -use std::sync::mpsc::Receiver; - -use crate::Event; +use get_if_addrs::IfAddr; +use log::{info, trace, warn}; +use std::{ + collections::{HashMap, HashSet}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::mpsc::Receiver, +}; pub struct Processor<'a> { reader: &'a Receiver, + config: Config, + sockets: HashMap, } impl<'a> Processor<'a> { - pub fn new(reader: &'a Receiver) -> Self { - Self { reader } + pub fn new(reader: &'a Receiver, config: Config) -> Result { + let mut sockets = HashMap::new(); + + let interfaces = get_if_addrs::get_if_addrs().unwrap(); + trace!("Interfaces list: {:?}", interfaces); + + for conf in &config.mdns { + let listen_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); + + for if_name in conf + .destinations + .clone() + .into_iter() + .chain(conf.sources.clone()) + { + if sockets.contains_key(&if_name) { + continue; + } + + let dst_if = match interfaces.iter().find(|x| x.name == if_name) { + Some(v) => v, + None => { + warn!("could not find interface {}", if_name); + continue; + } + }; + + let dst_ip_addr = match dst_if.addr { + IfAddr::V4(ref v4) => IpAddr::V4(v4.ip), + IfAddr::V6(ref v6) => IpAddr::V6(v6.ip), + }; + + let socket = + CommSocket::new(listen_addr, conf.port).expect("error in bind on address"); + + info!("started listening on {}:{}", listen_addr, conf.port); + + for group in &conf.multicast_groups { + socket + .join_multicast_group(&dst_ip_addr, group) + .map_err(|e| { + format!("error in joining multicast group {}: {}", group, e) + })?; + } + + sockets.insert(if_name, socket); + } + } + + Ok(Self { + reader, + config, + sockets, + }) } pub fn start_read_loop(&self) { for evt in self.reader { + // TODO: Generalize this to parse any type of supported packet let packet = Packet::parse(&evt.payload).expect("failed to parse packet as a dns packet"); - info!("{:?}", packet); + // info!("EVENT {:?}", evt); + //info!("{:?}", packet); + + for query in packet.questions { + info!("src_if = {} query = {}", evt.src_if, query.qname); + + // Find a config that has src-if and filter + let conf = self.config.mdns.iter().filter(|config| { + (config.destinations.contains(&evt.src_if) + && config.filters.contains(&query.qname.to_string())) + || (config.destinations.contains(&evt.src_if) && config.filters.is_empty()) + }); + + let mut sources: HashSet = conf + .clone() + .flat_map(|config| config.sources.clone()) + .collect(); + sources.remove(&evt.src_if); + + let multicast_groups: HashSet = conf + .flat_map(|config| config.multicast_groups.clone()) + .collect(); + + if !sources.is_empty() { + info!("conf = {:?}", sources); + } + for src in sources { + // Send this MDNS query to all matching sources + // When they respond, We'll forward the message to matching destinations + + if let Some(socket) = self.sockets.get(&src) { + for group in multicast_groups.clone() { + socket + .send(&evt.payload, SocketAddr::new(group, 1900)) + .expect("error in sending message"); + } + } + } + } } } } diff --git a/src/socket_manager.rs b/src/socket_manager.rs new file mode 100644 index 0000000..e69de29