From 4f87d8022737c667983555731b35638f35330b64 Mon Sep 17 00:00:00 2001 From: Ishan Jain Date: Fri, 28 Jul 2023 04:59:18 +0530 Subject: [PATCH] Completed a basic version of the program that works with mdns --- Cargo.lock | 64 +++++++++++--- Cargo.toml | 2 + README.md | 4 +- config.toml | 4 +- src/communications.rs | 182 +++++++++------------------------------- src/config.rs | 9 +- src/main.rs | 1 - src/processor.rs | 189 ++++++++++++++++++++++-------------------- 8 files changed, 191 insertions(+), 264 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d40dda3..fb9c83e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,6 +209,19 @@ dependencies = [ "autocfg", ] +[[package]] +name = "multicast-socket" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53eacba0998466687d051439bf144286203d7286623275a8007505d22fd21cfb" +dependencies = [ + "get_if_addrs", + "libc", + "nix 0.19.1", + "socket2", + "winapi 0.3.9", +] + [[package]] name = "multicaster" version = "0.1.0" @@ -216,10 +229,12 @@ dependencies = [ "dns-parser", "env_logger", "get_if_addrs", + "libc", "log", + "multicast-socket", "net2", "network-interface", - "nix", + "nix 0.26.2", "once_cell", "serde", "toml", @@ -248,6 +263,18 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "nix" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2" +dependencies = [ + "bitflags 1.3.2", + "cc", + "cfg-if 1.0.0", + "libc", +] + [[package]] name = "nix" version = "0.26.2" @@ -276,9 +303,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "proc-macro2" -version = "1.0.65" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92de25114670a878b1261c79c9f8f729fb97e95bac93f6312f583c60dd6a1dfe" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" dependencies = [ "unicode-ident", ] @@ -291,9 +318,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quote" -version = "1.0.30" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5907a1b7c277254a8b15170f6e7c97cfa60ee7872a3217663bb81151e48184bb" +checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965" dependencies = [ "proc-macro2", ] @@ -342,18 +369,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.171" +version = "1.0.177" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" +checksum = "63ba2516aa6bf82e0b19ca8b50019d52df58455d3cf9bdaf6315225fdd0c560a" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.171" +version = "1.0.177" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" +checksum = "401797fe7833d72109fedec6bfcbe67c0eed9b99772f26eb8afd261f0abc6fd3" dependencies = [ "proc-macro2", "quote", @@ -369,6 +396,17 @@ dependencies = [ "serde", ] +[[package]] +name = "socket2" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "winapi 0.3.9", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -377,9 +415,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "syn" -version = "2.0.26" +version = "2.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970" +checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0" dependencies = [ "proc-macro2", "quote", @@ -560,9 +598,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "winnow" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fac9742fd1ad1bd9643b991319f72dd031016d44b77039a26977eb667141e7" +checksum = "25b5872fa2e10bd067ae946f927e726d7d603eaeb6e02fa6a350e0722d2b8c11" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 3667e70..a1b3741 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,9 @@ edition = "2021" dns-parser = "0.8.0" env_logger = "0.10.0" get_if_addrs = "0.5.3" +libc = "0.2.147" log = "0.4.18" +multicast-socket = "0.2.2" net2 = "0.2.39" network-interface = "1.0.1" nix = "0.26.2" diff --git a/README.md b/README.md index b9fa094..b089cea 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ It'll allow you to be very specific about the exact traffic that is sent over. # Working Notes 1. It needs to listen on the specified port to receive multicast traffic. -This causes problems if there are other softwares that are also listening without using `SO_ADDR_REUSE`. +This causes problems if there are other softwares that are also listening without using `SO_REUSE_ADDR`. For now, Disable those softwares when running this. A list of such softwares, @@ -33,7 +33,7 @@ For now, Only work on IPv4. IPv6 will be added once IPv4 is ready 3. A DNS query from destination should not be forwarded to source if it is not in the allow list for the config -4. A DNS answer should not be forwarded from destination to source in any circumstances +4. A DNS answer should not be forwarded from destination to source in any circumstances. This is not enforced right now. diff --git a/config.toml b/config.toml index 446ddbb..2c3f130 100644 --- a/config.toml +++ b/config.toml @@ -1,6 +1,4 @@ [[mdns]] -port = 5353 -multicast_groups = ["224.0.0.251"] -sources = ["enp7s0", "lo"] +sources = ["enp7s0"] destinations = ["wlp6s0"] filters = [ "emerald.local" ] diff --git a/src/communications.rs b/src/communications.rs index 881f747..fee5d43 100644 --- a/src/communications.rs +++ b/src/communications.rs @@ -1,125 +1,32 @@ use crate::Config; -use get_if_addrs::IfAddr; -use log::{debug, info, trace, warn}; -use net2::UdpBuilder; +use log::{info, trace}; +use multicast_socket::{Message, MulticastOptions, MulticastSocket}; use std::{ - collections::HashMap, - io::{Error as IoError, Result as IoResult}, - net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, + net::SocketAddrV4, sync::mpsc::{self, Receiver, Sender}, thread::{self, JoinHandle}, - time::Duration, }; pub struct Communications { // TODO(ishan): Accommodate IPv6 sockets as well here - sockets: HashMap<(String, IpAddr, u16), CommSocket>, handles: Vec>, - tx_chan: Sender, rx_chan: Receiver, } #[derive(Debug)] pub struct Event { - pub src_if: String, - pub local_addr: IpAddr, - pub remote_addr: SocketAddr, - pub payload: Vec, -} - -#[derive(Debug)] -pub struct CommSocket(UdpSocket); - -impl CommSocket { - pub fn new(addr: IpAddr, port: u16) -> Result { - // TODO(ishan): Remove expect - let addr = SocketAddr::new(addr, port); - - let socket = match addr { - SocketAddr::V4(_) => UdpBuilder::new_v4()?.reuse_address(true)?.bind(addr)?, - SocketAddr::V6(_) => UdpBuilder::new_v6()?.reuse_address(true)?.bind(addr)?, - }; - - socket.set_read_timeout(Some(Duration::from_millis(100)))?; - socket.set_nonblocking(false)?; - - Ok(CommSocket(socket)) - } - - pub fn join_multicast_group(&self, ip_addr: &IpAddr, group: &IpAddr) -> IoResult<()> { - match (group, ip_addr) { - (IpAddr::V4(ref group), IpAddr::V4(ref v4)) => { - self.0.set_multicast_loop_v4(false)?; - // TODO(ishan): This should be an input from config.toml - self.0.join_multicast_v4(group, v4) - } - (IpAddr::V6(ref group), IpAddr::V6(_)) => { - self.0.set_multicast_loop_v6(false)?; - self.0.join_multicast_v6(group, 0) - } - - (_, _) => unreachable!(), - } - } - - // TODO(ishan): Add a method to send messages - pub fn send(&self, buf: &[u8], addr: SocketAddr) -> Result { - self.0.set_ttl(1)?; - self.0.send_to(buf, addr) - } + pub msg: Message, } impl Communications { pub fn new(config: Config) -> Result { - let mut sockets = HashMap::new(); - // This channel is used to send packets from this module to the processor // TODO(ishan): Eventually, swap out std::mpsc for some thing faster // or switch to a different model completely that doesn't use channels like this let (rx, tx) = mpsc::channel(); - let interfaces = get_if_addrs::get_if_addrs().unwrap(); - trace!("Interfaces list: {:?}", interfaces); - - for v in &config.mdns { - // Find addresses on the specified sources interfaces - // Listen on all addresses with SO_REUSE_ADDR and use interface's IP address - // when joining multicast group - // Only working with IPv4 addresses for now - - for src_if_name in &v.sources { - let src_if = interfaces.iter().find(|x| &x.name == src_if_name).unwrap(); - - let listen_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); - - let ip_addr = match src_if.addr { - IfAddr::V4(ref v4) => IpAddr::V4(v4.ip), - IfAddr::V6(ref v6) => IpAddr::V6(v6.ip), - }; - - let socket = - CommSocket::new(listen_addr, v.port).expect("error in bind on address"); - - info!("started listening on {}:{}", listen_addr, v.port); - - for group in v.multicast_groups.iter() { - socket.join_multicast_group(&ip_addr, group).map_err(|e| { - format!("error in joining multicast group {}: {}", group, e) - })?; - - info!( - "joined multicast group {} with address = {}", - group, ip_addr - ); - } - sockets.insert((src_if_name.clone(), ip_addr, v.port), socket); - } - } - - info!("{:?}", sockets); Ok(Communications { - sockets, handles: vec![], rx_chan: tx, tx_chan: rx, @@ -127,50 +34,46 @@ impl Communications { } pub fn start_listeners(&mut self) -> Result<(), String> { - let sockets = self - .clone_sockets() - .map_err(|e| format!("error in cloning sockets: {}", e))?; info!("listener started"); - for ((src_if, local_addr, _), v) in sockets { - let socket = v - .try_clone() - .map_err(|e| format!("error in cloning socket for read ops: {}", e))?; + let tx_chan = self.tx_chan.clone(); - let tx_chan = self.tx_chan.clone(); + let handle = thread::spawn(move || { + // TODO(ishan): What should be the size here ? - let handle = thread::spawn(move || { - // TODO(ishan): What should be the size here ? + let interfaces = get_if_addrs::get_if_addrs().unwrap(); + trace!("Interfaces list: {:?}", interfaces); - loop { - let mut buf = [0u8; 9000]; + // mdns + let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353); + let multicast_socket = MulticastSocket::with_options( + mdns_address, + multicast_socket::all_ipv4_interfaces().expect("could not fetch all interfaces"), + MulticastOptions { + loopback: false, + buffer_size: 4096, + ..Default::default() + }, + ) + .expect("error in creating multicast socket"); - match socket.recv_from(&mut buf) { - Ok((len, remote_addr)) => { - let buf = &buf[..len]; + loop { + match multicast_socket.receive() { + Ok(msg) => { + tx_chan + .send(Event { msg }) + .expect("error in sending to mpsc channel"); + } + Err(_) => { + // TODO: Log all buy EAGAIN + // warn!("error in reading from socket {}: {}", port, e); + continue; + } + }; + } + }); - tx_chan - .send(Event { - src_if: src_if.clone(), - remote_addr, - local_addr, - payload: buf.to_vec(), - }) - .expect("error in sending to mpsc channel"); - - debug!("read {}bytes from {}", len, remote_addr); - } - Err(_) => { - // TODO: Log all buy EAGAIN - // warn!("error in reading from socket {}: {}", port, e); - continue; - } - }; - } - }); - - self.handles.push(handle); - } + self.handles.push(handle); Ok(()) } @@ -181,17 +84,6 @@ impl Communications { } } - fn clone_sockets(&self) -> Result, std::io::Error> { - let mut sockets = HashMap::new(); - - for (k, v) in self.sockets.iter() { - // TODO(ishan): find a better solution here - sockets.insert(k.clone(), v.0.try_clone()?); - } - - Ok(sockets) - } - pub fn get_reader(&self) -> &Receiver { &self.rx_chan } diff --git a/src/config.rs b/src/config.rs index 573b183..ee82c89 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,11 +1,6 @@ use log::debug; use serde::{Deserialize, Serialize}; -use std::{ - collections::{HashMap, HashSet}, - fs::File, - io::Read, - net::IpAddr, -}; +use std::{collections::HashSet, fs::File, io::Read}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { @@ -14,8 +9,6 @@ pub struct Config { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MdnsConfig { - pub port: u16, - pub multicast_groups: Vec, pub destinations: Vec, pub sources: Vec, pub filters: HashSet, diff --git a/src/main.rs b/src/main.rs index 9ea0ac9..406b2b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,6 @@ fn main() { // TODO(ishan): Start listeners and transmitters on v4 and v6 here let mut comms = Communications::new(config.clone()).expect("error in starting comms"); - comms .start_listeners() .expect("error in starting listeners"); diff --git a/src/processor.rs b/src/processor.rs index b6e34a4..eb720a4 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,117 +1,106 @@ -use crate::{communications::CommSocket, Config, Event}; +use crate::{Config, Event}; use dns_parser::Packet; -use get_if_addrs::IfAddr; -use log::{info, trace, warn}; -use std::{ - collections::{HashMap, HashSet}, - net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::mpsc::Receiver, -}; +use log::{info, warn}; +use multicast_socket::{Interface, MulticastOptions, MulticastSocket}; +use std::{ffi::CString, net::SocketAddrV4, sync::mpsc::Receiver}; pub struct Processor<'a> { reader: &'a Receiver, config: Config, - sockets: HashMap, } impl<'a> Processor<'a> { pub fn new(reader: &'a Receiver, config: Config) -> Result { - let mut sockets = HashMap::new(); - - let interfaces = get_if_addrs::get_if_addrs().unwrap(); - trace!("Interfaces list: {:?}", interfaces); - - for conf in &config.mdns { - let listen_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); - - for if_name in conf - .destinations - .clone() - .into_iter() - .chain(conf.sources.clone()) - { - if sockets.contains_key(&if_name) { - continue; - } - - let dst_if = match interfaces.iter().find(|x| x.name == if_name) { - Some(v) => v, - None => { - warn!("could not find interface {}", if_name); - continue; - } - }; - - let dst_ip_addr = match dst_if.addr { - IfAddr::V4(ref v4) => IpAddr::V4(v4.ip), - IfAddr::V6(ref v6) => IpAddr::V6(v6.ip), - }; - - let socket = - CommSocket::new(listen_addr, conf.port).expect("error in bind on address"); - - info!("started listening on {}:{}", listen_addr, conf.port); - - for group in &conf.multicast_groups { - socket - .join_multicast_group(&dst_ip_addr, group) - .map_err(|e| { - format!("error in joining multicast group {}: {}", group, e) - })?; - } - - sockets.insert(if_name, socket); - } - } - - Ok(Self { - reader, - config, - sockets, - }) + Ok(Self { reader, config }) } pub fn start_read_loop(&self) { + // mdns + let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353); + let socket = MulticastSocket::with_options( + mdns_address, + multicast_socket::all_ipv4_interfaces().expect("could not fetch all interfaces"), + MulticastOptions { + loopback: false, + buffer_size: 4096, + ..Default::default() + }, + ) + .expect("error in creating multicast socket"); + for evt in self.reader { // TODO: Generalize this to parse any type of supported packet let packet = - Packet::parse(&evt.payload).expect("failed to parse packet as a dns packet"); + Packet::parse(&evt.msg.data).expect("failed to parse packet as a dns packet"); - // info!("EVENT {:?}", evt); - //info!("{:?}", packet); + let interfaces = get_if_addrs::get_if_addrs().unwrap(); - for query in packet.questions { - info!("src_if = {} query = {}", evt.src_if, query.qname); + let src_ifname = if let Interface::Index(idx) = evt.msg.interface { + ifidx_to_ifname(idx as u32) + } else { + "lo".to_string() + }; - // Find a config that has src-if and filter - let conf = self.config.mdns.iter().filter(|config| { - (config.destinations.contains(&evt.src_if) - && config.filters.contains(&query.qname.to_string())) - || (config.destinations.contains(&evt.src_if) && config.filters.is_empty()) - }); + info!( + "EVENT src-if = {} if-index {:?} address = {}, packet: {:?} answers = {:?}", + src_ifname, + evt.msg.interface, + evt.msg.origin_address, + packet.questions.iter().map(|q| q.qname).collect::>(), + packet.answers.iter().map(|q| q.name).collect::>() + ); + for conf in &self.config.mdns { + for query in &packet.questions { + let forward = conf.destinations.contains(&src_ifname) + && (conf.filters.is_empty() + || conf.filters.contains(&query.qname.to_string())); - let mut sources: HashSet = conf - .clone() - .flat_map(|config| config.sources.clone()) - .collect(); - sources.remove(&evt.src_if); + if forward { + let dst_ifs = conf + .sources + .clone() + .into_iter() + .filter_map(|dst_if| interfaces.iter().find(|x| x.name == dst_if)); - let multicast_groups: HashSet = conf - .flat_map(|config| config.multicast_groups.clone()) - .collect(); + for dst_if in dst_ifs { + let dst_ifid = ifname_to_ifidx(dst_if.name.to_string()); - if !sources.is_empty() { - info!("conf = {:?}", sources); - } - for src in sources { - // Send this MDNS query to all matching sources - // When they respond, We'll forward the message to matching destinations - - if let Some(socket) = self.sockets.get(&src) { - for group in multicast_groups.clone() { + info!( + "forwarding {:?} from {:?} to {:?}({})", + packet, src_ifname, dst_if, dst_ifid + ); + // TODO(ishan): Take a note of transaction id + // and avoid feedback loops socket - .send(&evt.payload, SocketAddr::new(group, 1900)) - .expect("error in sending message"); + .send(&evt.msg.data, &Interface::Index(dst_ifid as i32)) + .expect("error in sending mdns packet"); + } + } + } + + for answer in &packet.answers { + let forward = conf.sources.contains(&src_ifname) + && (conf.filters.is_empty() + || conf.filters.contains(&answer.name.to_string())); + + if forward { + let dst_ifs = conf + .destinations + .clone() + .into_iter() + .filter_map(|dst_if| interfaces.iter().find(|x| x.name == dst_if)); + + for dst_if in dst_ifs { + let dst_ifid = ifname_to_ifidx(dst_if.name.to_string()); + + info!( + "forwarding {:?} from {:?} to {:?}({})", + packet, src_ifname, dst_if, dst_ifid + ); + + socket + .send(&evt.msg.data, &Interface::Index(dst_ifid as i32)) + .expect("error in sending mdns packet"); } } } @@ -119,3 +108,19 @@ impl<'a> Processor<'a> { } } } + +fn ifidx_to_ifname(idx: u32) -> String { + let out = CString::new("askdjhaskdjakdjadksa").unwrap(); + + unsafe { + let ptr = out.into_raw(); + let response = libc::if_indextoname(idx, ptr); + + CString::from_raw(response).into_string().unwrap() + } +} + +fn ifname_to_ifidx(name: String) -> u32 { + let out = name.as_ptr() as *const _; + unsafe { libc::if_nametoindex(out) } +}