Completed a basic version of the program that works with mdns

This commit is contained in:
Ishan Jain 2023-07-28 04:59:18 +05:30
parent b2a1e152e2
commit 4f87d80227
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
8 changed files with 191 additions and 264 deletions

64
Cargo.lock generated
View File

@ -209,6 +209,19 @@ dependencies = [
"autocfg",
]
[[package]]
name = "multicast-socket"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53eacba0998466687d051439bf144286203d7286623275a8007505d22fd21cfb"
dependencies = [
"get_if_addrs",
"libc",
"nix 0.19.1",
"socket2",
"winapi 0.3.9",
]
[[package]]
name = "multicaster"
version = "0.1.0"
@ -216,10 +229,12 @@ dependencies = [
"dns-parser",
"env_logger",
"get_if_addrs",
"libc",
"log",
"multicast-socket",
"net2",
"network-interface",
"nix",
"nix 0.26.2",
"once_cell",
"serde",
"toml",
@ -248,6 +263,18 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "nix"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2"
dependencies = [
"bitflags 1.3.2",
"cc",
"cfg-if 1.0.0",
"libc",
]
[[package]]
name = "nix"
version = "0.26.2"
@ -276,9 +303,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.65"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92de25114670a878b1261c79c9f8f729fb97e95bac93f6312f583c60dd6a1dfe"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
dependencies = [
"unicode-ident",
]
@ -291,9 +318,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "1.0.30"
version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5907a1b7c277254a8b15170f6e7c97cfa60ee7872a3217663bb81151e48184bb"
checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965"
dependencies = [
"proc-macro2",
]
@ -342,18 +369,18 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.171"
version = "1.0.177"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9"
checksum = "63ba2516aa6bf82e0b19ca8b50019d52df58455d3cf9bdaf6315225fdd0c560a"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.171"
version = "1.0.177"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682"
checksum = "401797fe7833d72109fedec6bfcbe67c0eed9b99772f26eb8afd261f0abc6fd3"
dependencies = [
"proc-macro2",
"quote",
@ -369,6 +396,17 @@ dependencies = [
"serde",
]
[[package]]
name = "socket2"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if 1.0.0",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
@ -377,9 +415,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "syn"
version = "2.0.26"
version = "2.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970"
checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0"
dependencies = [
"proc-macro2",
"quote",
@ -560,9 +598,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "winnow"
version = "0.5.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81fac9742fd1ad1bd9643b991319f72dd031016d44b77039a26977eb667141e7"
checksum = "25b5872fa2e10bd067ae946f927e726d7d603eaeb6e02fa6a350e0722d2b8c11"
dependencies = [
"memchr",
]

View File

@ -7,7 +7,9 @@ edition = "2021"
dns-parser = "0.8.0"
env_logger = "0.10.0"
get_if_addrs = "0.5.3"
libc = "0.2.147"
log = "0.4.18"
multicast-socket = "0.2.2"
net2 = "0.2.39"
network-interface = "1.0.1"
nix = "0.26.2"

View File

@ -7,7 +7,7 @@ It'll allow you to be very specific about the exact traffic that is sent over.
# Working Notes
1. It needs to listen on the specified port to receive multicast traffic.
This causes problems if there are other softwares that are also listening without using `SO_ADDR_REUSE`.
This causes problems if there are other softwares that are also listening without using `SO_REUSE_ADDR`.
For now, Disable those softwares when running this. A list of such softwares,
@ -33,7 +33,7 @@ For now, Only work on IPv4. IPv6 will be added once IPv4 is ready
3. A DNS query from destination should not be forwarded to source if it is not in the allow list for the config
4. A DNS answer should not be forwarded from destination to source in any circumstances
4. A DNS answer should not be forwarded from destination to source in any circumstances. This is not enforced right now.

View File

@ -1,6 +1,4 @@
[[mdns]]
port = 5353
multicast_groups = ["224.0.0.251"]
sources = ["enp7s0", "lo"]
sources = ["enp7s0"]
destinations = ["wlp6s0"]
filters = [ "emerald.local" ]

View File

@ -1,125 +1,32 @@
use crate::Config;
use get_if_addrs::IfAddr;
use log::{debug, info, trace, warn};
use net2::UdpBuilder;
use log::{info, trace};
use multicast_socket::{Message, MulticastOptions, MulticastSocket};
use std::{
collections::HashMap,
io::{Error as IoError, Result as IoResult},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
net::SocketAddrV4,
sync::mpsc::{self, Receiver, Sender},
thread::{self, JoinHandle},
time::Duration,
};
pub struct Communications {
// TODO(ishan): Accommodate IPv6 sockets as well here
sockets: HashMap<(String, IpAddr, u16), CommSocket>,
handles: Vec<JoinHandle<()>>,
tx_chan: Sender<Event>,
rx_chan: Receiver<Event>,
}
#[derive(Debug)]
pub struct Event {
pub src_if: String,
pub local_addr: IpAddr,
pub remote_addr: SocketAddr,
pub payload: Vec<u8>,
}
#[derive(Debug)]
pub struct CommSocket(UdpSocket);
impl CommSocket {
pub fn new(addr: IpAddr, port: u16) -> Result<CommSocket, IoError> {
// TODO(ishan): Remove expect
let addr = SocketAddr::new(addr, port);
let socket = match addr {
SocketAddr::V4(_) => UdpBuilder::new_v4()?.reuse_address(true)?.bind(addr)?,
SocketAddr::V6(_) => UdpBuilder::new_v6()?.reuse_address(true)?.bind(addr)?,
};
socket.set_read_timeout(Some(Duration::from_millis(100)))?;
socket.set_nonblocking(false)?;
Ok(CommSocket(socket))
}
pub fn join_multicast_group(&self, ip_addr: &IpAddr, group: &IpAddr) -> IoResult<()> {
match (group, ip_addr) {
(IpAddr::V4(ref group), IpAddr::V4(ref v4)) => {
self.0.set_multicast_loop_v4(false)?;
// TODO(ishan): This should be an input from config.toml
self.0.join_multicast_v4(group, v4)
}
(IpAddr::V6(ref group), IpAddr::V6(_)) => {
self.0.set_multicast_loop_v6(false)?;
self.0.join_multicast_v6(group, 0)
}
(_, _) => unreachable!(),
}
}
// TODO(ishan): Add a method to send messages
pub fn send(&self, buf: &[u8], addr: SocketAddr) -> Result<usize, IoError> {
self.0.set_ttl(1)?;
self.0.send_to(buf, addr)
}
pub msg: Message,
}
impl Communications {
pub fn new(config: Config) -> Result<Self, String> {
let mut sockets = HashMap::new();
// This channel is used to send packets from this module to the processor
// TODO(ishan): Eventually, swap out std::mpsc for some thing faster
// or switch to a different model completely that doesn't use channels like this
let (rx, tx) = mpsc::channel();
let interfaces = get_if_addrs::get_if_addrs().unwrap();
trace!("Interfaces list: {:?}", interfaces);
for v in &config.mdns {
// Find addresses on the specified sources interfaces
// Listen on all addresses with SO_REUSE_ADDR and use interface's IP address
// when joining multicast group
// Only working with IPv4 addresses for now
for src_if_name in &v.sources {
let src_if = interfaces.iter().find(|x| &x.name == src_if_name).unwrap();
let listen_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let ip_addr = match src_if.addr {
IfAddr::V4(ref v4) => IpAddr::V4(v4.ip),
IfAddr::V6(ref v6) => IpAddr::V6(v6.ip),
};
let socket =
CommSocket::new(listen_addr, v.port).expect("error in bind on address");
info!("started listening on {}:{}", listen_addr, v.port);
for group in v.multicast_groups.iter() {
socket.join_multicast_group(&ip_addr, group).map_err(|e| {
format!("error in joining multicast group {}: {}", group, e)
})?;
info!(
"joined multicast group {} with address = {}",
group, ip_addr
);
}
sockets.insert((src_if_name.clone(), ip_addr, v.port), socket);
}
}
info!("{:?}", sockets);
Ok(Communications {
sockets,
handles: vec![],
rx_chan: tx,
tx_chan: rx,
@ -127,50 +34,46 @@ impl Communications {
}
pub fn start_listeners(&mut self) -> Result<(), String> {
let sockets = self
.clone_sockets()
.map_err(|e| format!("error in cloning sockets: {}", e))?;
info!("listener started");
for ((src_if, local_addr, _), v) in sockets {
let socket = v
.try_clone()
.map_err(|e| format!("error in cloning socket for read ops: {}", e))?;
let tx_chan = self.tx_chan.clone();
let tx_chan = self.tx_chan.clone();
let handle = thread::spawn(move || {
// TODO(ishan): What should be the size here ?
let handle = thread::spawn(move || {
// TODO(ishan): What should be the size here ?
let interfaces = get_if_addrs::get_if_addrs().unwrap();
trace!("Interfaces list: {:?}", interfaces);
loop {
let mut buf = [0u8; 9000];
// mdns
let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353);
let multicast_socket = MulticastSocket::with_options(
mdns_address,
multicast_socket::all_ipv4_interfaces().expect("could not fetch all interfaces"),
MulticastOptions {
loopback: false,
buffer_size: 4096,
..Default::default()
},
)
.expect("error in creating multicast socket");
match socket.recv_from(&mut buf) {
Ok((len, remote_addr)) => {
let buf = &buf[..len];
loop {
match multicast_socket.receive() {
Ok(msg) => {
tx_chan
.send(Event { msg })
.expect("error in sending to mpsc channel");
}
Err(_) => {
// TODO: Log all buy EAGAIN
// warn!("error in reading from socket {}: {}", port, e);
continue;
}
};
}
});
tx_chan
.send(Event {
src_if: src_if.clone(),
remote_addr,
local_addr,
payload: buf.to_vec(),
})
.expect("error in sending to mpsc channel");
debug!("read {}bytes from {}", len, remote_addr);
}
Err(_) => {
// TODO: Log all buy EAGAIN
// warn!("error in reading from socket {}: {}", port, e);
continue;
}
};
}
});
self.handles.push(handle);
}
self.handles.push(handle);
Ok(())
}
@ -181,17 +84,6 @@ impl Communications {
}
}
fn clone_sockets(&self) -> Result<HashMap<(String, IpAddr, u16), UdpSocket>, std::io::Error> {
let mut sockets = HashMap::new();
for (k, v) in self.sockets.iter() {
// TODO(ishan): find a better solution here
sockets.insert(k.clone(), v.0.try_clone()?);
}
Ok(sockets)
}
pub fn get_reader(&self) -> &Receiver<Event> {
&self.rx_chan
}

View File

@ -1,11 +1,6 @@
use log::debug;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
fs::File,
io::Read,
net::IpAddr,
};
use std::{collections::HashSet, fs::File, io::Read};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
@ -14,8 +9,6 @@ pub struct Config {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MdnsConfig {
pub port: u16,
pub multicast_groups: Vec<IpAddr>,
pub destinations: Vec<String>,
pub sources: Vec<String>,
pub filters: HashSet<String>,

View File

@ -24,7 +24,6 @@ fn main() {
// TODO(ishan): Start listeners and transmitters on v4 and v6 here
let mut comms = Communications::new(config.clone()).expect("error in starting comms");
comms
.start_listeners()
.expect("error in starting listeners");

View File

@ -1,117 +1,106 @@
use crate::{communications::CommSocket, Config, Event};
use crate::{Config, Event};
use dns_parser::Packet;
use get_if_addrs::IfAddr;
use log::{info, trace, warn};
use std::{
collections::{HashMap, HashSet},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::mpsc::Receiver,
};
use log::{info, warn};
use multicast_socket::{Interface, MulticastOptions, MulticastSocket};
use std::{ffi::CString, net::SocketAddrV4, sync::mpsc::Receiver};
pub struct Processor<'a> {
reader: &'a Receiver<Event>,
config: Config,
sockets: HashMap<String, CommSocket>,
}
impl<'a> Processor<'a> {
pub fn new(reader: &'a Receiver<Event>, config: Config) -> Result<Self, String> {
let mut sockets = HashMap::new();
let interfaces = get_if_addrs::get_if_addrs().unwrap();
trace!("Interfaces list: {:?}", interfaces);
for conf in &config.mdns {
let listen_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
for if_name in conf
.destinations
.clone()
.into_iter()
.chain(conf.sources.clone())
{
if sockets.contains_key(&if_name) {
continue;
}
let dst_if = match interfaces.iter().find(|x| x.name == if_name) {
Some(v) => v,
None => {
warn!("could not find interface {}", if_name);
continue;
}
};
let dst_ip_addr = match dst_if.addr {
IfAddr::V4(ref v4) => IpAddr::V4(v4.ip),
IfAddr::V6(ref v6) => IpAddr::V6(v6.ip),
};
let socket =
CommSocket::new(listen_addr, conf.port).expect("error in bind on address");
info!("started listening on {}:{}", listen_addr, conf.port);
for group in &conf.multicast_groups {
socket
.join_multicast_group(&dst_ip_addr, group)
.map_err(|e| {
format!("error in joining multicast group {}: {}", group, e)
})?;
}
sockets.insert(if_name, socket);
}
}
Ok(Self {
reader,
config,
sockets,
})
Ok(Self { reader, config })
}
pub fn start_read_loop(&self) {
// mdns
let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353);
let socket = MulticastSocket::with_options(
mdns_address,
multicast_socket::all_ipv4_interfaces().expect("could not fetch all interfaces"),
MulticastOptions {
loopback: false,
buffer_size: 4096,
..Default::default()
},
)
.expect("error in creating multicast socket");
for evt in self.reader {
// TODO: Generalize this to parse any type of supported packet
let packet =
Packet::parse(&evt.payload).expect("failed to parse packet as a dns packet");
Packet::parse(&evt.msg.data).expect("failed to parse packet as a dns packet");
// info!("EVENT {:?}", evt);
//info!("{:?}", packet);
let interfaces = get_if_addrs::get_if_addrs().unwrap();
for query in packet.questions {
info!("src_if = {} query = {}", evt.src_if, query.qname);
let src_ifname = if let Interface::Index(idx) = evt.msg.interface {
ifidx_to_ifname(idx as u32)
} else {
"lo".to_string()
};
// Find a config that has src-if and filter
let conf = self.config.mdns.iter().filter(|config| {
(config.destinations.contains(&evt.src_if)
&& config.filters.contains(&query.qname.to_string()))
|| (config.destinations.contains(&evt.src_if) && config.filters.is_empty())
});
info!(
"EVENT src-if = {} if-index {:?} address = {}, packet: {:?} answers = {:?}",
src_ifname,
evt.msg.interface,
evt.msg.origin_address,
packet.questions.iter().map(|q| q.qname).collect::<Vec<_>>(),
packet.answers.iter().map(|q| q.name).collect::<Vec<_>>()
);
for conf in &self.config.mdns {
for query in &packet.questions {
let forward = conf.destinations.contains(&src_ifname)
&& (conf.filters.is_empty()
|| conf.filters.contains(&query.qname.to_string()));
let mut sources: HashSet<String> = conf
.clone()
.flat_map(|config| config.sources.clone())
.collect();
sources.remove(&evt.src_if);
if forward {
let dst_ifs = conf
.sources
.clone()
.into_iter()
.filter_map(|dst_if| interfaces.iter().find(|x| x.name == dst_if));
let multicast_groups: HashSet<IpAddr> = conf
.flat_map(|config| config.multicast_groups.clone())
.collect();
for dst_if in dst_ifs {
let dst_ifid = ifname_to_ifidx(dst_if.name.to_string());
if !sources.is_empty() {
info!("conf = {:?}", sources);
}
for src in sources {
// Send this MDNS query to all matching sources
// When they respond, We'll forward the message to matching destinations
if let Some(socket) = self.sockets.get(&src) {
for group in multicast_groups.clone() {
info!(
"forwarding {:?} from {:?} to {:?}({})",
packet, src_ifname, dst_if, dst_ifid
);
// TODO(ishan): Take a note of transaction id
// and avoid feedback loops
socket
.send(&evt.payload, SocketAddr::new(group, 1900))
.expect("error in sending message");
.send(&evt.msg.data, &Interface::Index(dst_ifid as i32))
.expect("error in sending mdns packet");
}
}
}
for answer in &packet.answers {
let forward = conf.sources.contains(&src_ifname)
&& (conf.filters.is_empty()
|| conf.filters.contains(&answer.name.to_string()));
if forward {
let dst_ifs = conf
.destinations
.clone()
.into_iter()
.filter_map(|dst_if| interfaces.iter().find(|x| x.name == dst_if));
for dst_if in dst_ifs {
let dst_ifid = ifname_to_ifidx(dst_if.name.to_string());
info!(
"forwarding {:?} from {:?} to {:?}({})",
packet, src_ifname, dst_if, dst_ifid
);
socket
.send(&evt.msg.data, &Interface::Index(dst_ifid as i32))
.expect("error in sending mdns packet");
}
}
}
@ -119,3 +108,19 @@ impl<'a> Processor<'a> {
}
}
}
fn ifidx_to_ifname(idx: u32) -> String {
let out = CString::new("askdjhaskdjakdjadksa").unwrap();
unsafe {
let ptr = out.into_raw();
let response = libc::if_indextoname(idx, ptr);
CString::from_raw(response).into_string().unwrap()
}
}
fn ifname_to_ifidx(name: String) -> u32 {
let out = name.as_ptr() as *const _;
unsafe { libc::if_nametoindex(out) }
}