Updated communicator & processor

Trying to figure out the way to know the right ingress interface
This commit is contained in:
Ishan Jain 2023-07-27 23:43:22 +05:30
parent 5936b16b0e
commit b2a1e152e2
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
10 changed files with 407 additions and 154 deletions

235
Cargo.lock generated
View File

@ -23,18 +23,36 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "c_linked_list"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4964518bd3b4a8190e832886cdc0da9794f12e8e6c1613a9e90ff331c4c8724b"
[[package]]
name = "cc"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -64,6 +82,12 @@ dependencies = [
"termcolor",
]
[[package]]
name = "equivalent"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.1"
@ -86,16 +110,44 @@ dependencies = [
]
[[package]]
name = "hashbrown"
version = "0.12.3"
name = "gcc"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
[[package]]
name = "get_if_addrs"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abddb55a898d32925f3148bd281174a68eeb68bbfd9a5938a57b18f506ee4ef7"
dependencies = [
"c_linked_list",
"get_if_addrs-sys",
"libc",
"winapi 0.2.8",
]
[[package]]
name = "get_if_addrs-sys"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d04f9fb746cf36b191c00f3ede8bde9c8e64f9f4b05ae2694a9ccf5e3f5ab48"
dependencies = [
"gcc",
"libc",
]
[[package]]
name = "hashbrown"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
[[package]]
name = "hermit-abi"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b"
[[package]]
name = "humantime"
@ -105,54 +157,42 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "indexmap"
version = "1.9.3"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
dependencies = [
"autocfg",
"equivalent",
"hashbrown",
]
[[package]]
name = "io-lifetimes"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"libc",
"windows-sys",
]
[[package]]
name = "is-terminal"
version = "0.4.7"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
dependencies = [
"hermit-abi",
"io-lifetimes",
"rustix",
"windows-sys",
]
[[package]]
name = "libc"
version = "0.2.144"
version = "0.2.147"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0"
[[package]]
name = "log"
version = "0.4.18"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
[[package]]
name = "memchr"
@ -175,21 +215,47 @@ version = "0.1.0"
dependencies = [
"dns-parser",
"env_logger",
"get_if_addrs",
"log",
"net2",
"network-interface",
"nix",
"once_cell",
"serde",
"toml",
]
[[package]]
name = "net2"
version = "0.2.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b13b648036a2339d06de780866fbdfda0dde886de7b3af2ddeba8b14f4ee34ac"
dependencies = [
"cfg-if 0.1.10",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "network-interface"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "296ae2183dab440e352740b1fcf0589d8058afa40bc29fd163227ddb01fbf539"
dependencies = [
"cc",
"libc",
"thiserror",
"winapi 0.3.9",
]
[[package]]
name = "nix"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
dependencies = [
"bitflags",
"cfg-if",
"bitflags 1.3.2",
"cfg-if 1.0.0",
"libc",
"memoffset",
"pin-utils",
@ -198,9 +264,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.17.2"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9670a07f94779e00908f3e686eab508878ebb390ba6e604d3a284c00e8d0487b"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "pin-utils"
@ -210,9 +276,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.59"
version = "1.0.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6aeca18b86b413c660b781aa319e4e2648a3e6f9eadc9b47e9038e6fe9f3451b"
checksum = "92de25114670a878b1261c79c9f8f729fb97e95bac93f6312f583c60dd6a1dfe"
dependencies = [
"unicode-ident",
]
@ -225,18 +291,30 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "1.0.28"
version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488"
checksum = "5907a1b7c277254a8b15170f6e7c97cfa60ee7872a3217663bb81151e48184bb"
dependencies = [
"proc-macro2",
]
[[package]]
name = "regex"
version = "1.8.3"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81ca098a9821bd52d6b24fd8b10bd081f47d39c22778cafaa75a2857a62c6390"
checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39354c10dd07468c2e73926b23bb9c2caca74c5501e38a35da70406f1d923310"
dependencies = [
"aho-corasick",
"memchr",
@ -245,19 +323,18 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.7.2"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2"
[[package]]
name = "rustix"
version = "0.37.19"
version = "0.38.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acf8729d8542766f1b2cf77eb034d52f40d375bb8b615d0b147089946e16613d"
checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5"
dependencies = [
"bitflags",
"bitflags 2.3.3",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys",
@ -265,18 +342,18 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.163"
version = "1.0.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2"
checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.163"
version = "1.0.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e"
checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682"
dependencies = [
"proc-macro2",
"quote",
@ -285,9 +362,9 @@ dependencies = [
[[package]]
name = "serde_spanned"
version = "0.6.2"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93107647184f6027e3b7dcb2e11034cf95ffa1e3a682c67951963ac69c1c007d"
checksum = "96426c9936fd7a0124915f9185ea1d20aa9445cc9821142f0a73bc9207a2e186"
dependencies = [
"serde",
]
@ -300,9 +377,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "syn"
version = "2.0.18"
version = "2.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e"
checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970"
dependencies = [
"proc-macro2",
"quote",
@ -319,10 +396,30 @@ dependencies = [
]
[[package]]
name = "toml"
version = "0.7.4"
name = "thiserror"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec"
checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "toml"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17e963a819c331dcacd7ab957d80bc2b9a9c1e71c804826d2f283dd65306542"
dependencies = [
"serde",
"serde_spanned",
@ -332,18 +429,18 @@ dependencies = [
[[package]]
name = "toml_datetime"
version = "0.6.2"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f"
checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.19.10"
version = "0.19.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739"
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
dependencies = [
"indexmap",
"serde",
@ -354,9 +451,15 @@ dependencies = [
[[package]]
name = "unicode-ident"
version = "1.0.9"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0"
checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c"
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
[[package]]
name = "winapi"
@ -380,7 +483,7 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -400,9 +503,9 @@ dependencies = [
[[package]]
name = "windows-targets"
version = "0.48.0"
version = "0.48.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
@ -457,9 +560,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "winnow"
version = "0.4.6"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699"
checksum = "81fac9742fd1ad1bd9643b991319f72dd031016d44b77039a26977eb667141e7"
dependencies = [
"memchr",
]

View File

@ -6,7 +6,10 @@ edition = "2021"
[dependencies]
dns-parser = "0.8.0"
env_logger = "0.10.0"
get_if_addrs = "0.5.3"
log = "0.4.18"
net2 = "0.2.39"
network-interface = "1.0.1"
nix = "0.26.2"
once_cell = "1.17.2"
serde = { version = "1.0.163", features = ["derive"] }

View File

@ -25,7 +25,15 @@ For now, Only work on IPv4. IPv6 will be added once IPv4 is ready
#### MDNS
1. If a DNS Query comes on the source interface, We don't forward it to the destination. We want the destination to be able to resolve mdns hosts in source. A Query from source should not be forwarded to the destination.
2. A DNS answer from source should be forwarded to the destination _if_ the domain name is in the allow list for that config.
3. A DNS query from destination should not be forwarded to source if it is not in the allow list for the config
4. A DNS answer should not be forwarded from destination to source in any circumstances

View File

@ -1,5 +1,6 @@
[config.mdns]
[[mdns]]
port = 5353
multicast_groups = ["224.0.0.251"]
sources = ["enp7s0", "lo"]
destinations = ["wlp6s0"]
traffic_type = "MDNS"
filters = [ "emerald.local" ]

View File

@ -1,37 +1,77 @@
use crate::Config;
use get_if_addrs::IfAddr;
use log::{debug, info, trace, warn};
use net2::UdpBuilder;
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
io::{Error as IoError, Result as IoResult},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::mpsc::{self, Receiver, Sender},
thread::{self, JoinHandle},
time::Duration,
};
use crate::{
listener::{join_multicast, new_socket},
Config,
};
use log::{debug, info, warn};
use once_cell::sync::Lazy;
pub static IPV4: Lazy<IpAddr> = Lazy::new(|| Ipv4Addr::new(0, 0, 0, 0).into());
pub static IPV6: Lazy<IpAddr> = Lazy::new(|| {
Ipv6Addr::new(0x2a0a, 0x6040, 0x4004, 0x10, 0xf1c6, 0xf2b0, 0x9f45, 0xb425).into()
});
pub struct Communications {
// TODO(ishan): Accommodate IPv6 sockets as well here
sockets: HashMap<u16, UdpSocket>,
sockets: HashMap<(String, IpAddr, u16), CommSocket>,
handles: Vec<JoinHandle<()>>,
tx_chan: Sender<Event>,
rx_chan: Receiver<Event>,
}
#[derive(Debug)]
pub struct Event {
pub src_if: String,
pub local_addr: IpAddr,
pub remote_addr: SocketAddr,
pub payload: Vec<u8>,
}
#[derive(Debug)]
pub struct CommSocket(UdpSocket);
impl CommSocket {
pub fn new(addr: IpAddr, port: u16) -> Result<CommSocket, IoError> {
// TODO(ishan): Remove expect
let addr = SocketAddr::new(addr, port);
let socket = match addr {
SocketAddr::V4(_) => UdpBuilder::new_v4()?.reuse_address(true)?.bind(addr)?,
SocketAddr::V6(_) => UdpBuilder::new_v6()?.reuse_address(true)?.bind(addr)?,
};
socket.set_read_timeout(Some(Duration::from_millis(100)))?;
socket.set_nonblocking(false)?;
Ok(CommSocket(socket))
}
pub fn join_multicast_group(&self, ip_addr: &IpAddr, group: &IpAddr) -> IoResult<()> {
match (group, ip_addr) {
(IpAddr::V4(ref group), IpAddr::V4(ref v4)) => {
self.0.set_multicast_loop_v4(false)?;
// TODO(ishan): This should be an input from config.toml
self.0.join_multicast_v4(group, v4)
}
(IpAddr::V6(ref group), IpAddr::V6(_)) => {
self.0.set_multicast_loop_v6(false)?;
self.0.join_multicast_v6(group, 0)
}
(_, _) => unreachable!(),
}
}
// TODO(ishan): Add a method to send messages
pub fn send(&self, buf: &[u8], addr: SocketAddr) -> Result<usize, IoError> {
self.0.set_ttl(1)?;
self.0.send_to(buf, addr)
}
}
impl Communications {
pub fn new(input: Config) -> Result<Self, String> {
pub fn new(config: Config) -> Result<Self, String> {
let mut sockets = HashMap::new();
// This channel is used to send packets from this module to the processor
@ -39,22 +79,45 @@ impl Communications {
// or switch to a different model completely that doesn't use channels like this
let (rx, tx) = mpsc::channel();
for (_, v) in input.config.iter() {
let socket =
new_socket(&SocketAddr::new(*IPV4, v.port)).expect("error in bind on ipv4 address");
let interfaces = get_if_addrs::get_if_addrs().unwrap();
trace!("Interfaces list: {:?}", interfaces);
info!("started listening on {}:{}", *IPV4, v.port);
for v in &config.mdns {
// Find addresses on the specified sources interfaces
// Listen on all addresses with SO_REUSE_ADDR and use interface's IP address
// when joining multicast group
// Only working with IPv4 addresses for now
for group in v.multicast_groups.iter() {
join_multicast(&socket, group)
.map_err(|e| format!("error in joining multicast group {}: {}", group, e))?;
for src_if_name in &v.sources {
let src_if = interfaces.iter().find(|x| &x.name == src_if_name).unwrap();
info!("joined multicast group {}", group);
let listen_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let ip_addr = match src_if.addr {
IfAddr::V4(ref v4) => IpAddr::V4(v4.ip),
IfAddr::V6(ref v6) => IpAddr::V6(v6.ip),
};
let socket =
CommSocket::new(listen_addr, v.port).expect("error in bind on address");
info!("started listening on {}:{}", listen_addr, v.port);
for group in v.multicast_groups.iter() {
socket.join_multicast_group(&ip_addr, group).map_err(|e| {
format!("error in joining multicast group {}: {}", group, e)
})?;
info!(
"joined multicast group {} with address = {}",
group, ip_addr
);
}
sockets.insert((src_if_name.clone(), ip_addr, v.port), socket);
}
sockets.insert(v.port, socket);
}
info!("{:?}", sockets);
Ok(Communications {
sockets,
handles: vec![],
@ -67,18 +130,20 @@ impl Communications {
let sockets = self
.clone_sockets()
.map_err(|e| format!("error in cloning sockets: {}", e))?;
info!("listener started");
for (port, v) in sockets {
for ((src_if, local_addr, _), v) in sockets {
let socket = v
.try_clone()
.map_err(|e| format!("error in cloning socket for read ops: {}", e))?;
let tx_chan = self.tx_chan.clone();
let handle = thread::spawn(move || {
// TODO(ishan): What should be the size here ?
loop {
let mut buf = [0u8; 2000];
let mut buf = [0u8; 9000];
match socket.recv_from(&mut buf) {
Ok((len, remote_addr)) => {
@ -86,15 +151,18 @@ impl Communications {
tx_chan
.send(Event {
src_if: src_if.clone(),
remote_addr,
local_addr,
payload: buf.to_vec(),
})
.expect("error in sending to mpsc channel");
debug!("read {}bytes from {}", len, remote_addr);
}
Err(e) => {
warn!("error in reading from socket {}: {}", port, e);
Err(_) => {
// TODO: Log all buy EAGAIN
// warn!("error in reading from socket {}: {}", port, e);
continue;
}
};
@ -113,11 +181,12 @@ impl Communications {
}
}
fn clone_sockets(&self) -> Result<HashMap<u16, UdpSocket>, std::io::Error> {
fn clone_sockets(&self) -> Result<HashMap<(String, IpAddr, u16), UdpSocket>, std::io::Error> {
let mut sockets = HashMap::new();
for (k, v) in self.sockets.iter() {
sockets.insert(*k, v.try_clone()?);
// TODO(ishan): find a better solution here
sockets.insert(k.clone(), v.0.try_clone()?);
}
Ok(sockets)

View File

@ -1,23 +1,24 @@
use log::debug;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fs::File, io::Read, net::IpAddr};
use std::{
collections::{HashMap, HashSet},
fs::File,
io::Read,
net::IpAddr,
};
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub config: HashMap<String, Record>,
pub mdns: Vec<MdnsConfig>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Record {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MdnsConfig {
pub port: u16,
pub multicast_groups: Vec<IpAddr>,
pub destinations: Vec<String>,
pub traffic_type: TrafficType,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum TrafficType {
MDNS,
pub sources: Vec<String>,
pub filters: HashSet<String>,
}
impl Config {

View File

@ -1,34 +0,0 @@
use std::{
io::{Error, Result as IoResult},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
os::fd::AsRawFd,
};
use nix::sys::socket::sockopt::ReuseAddr;
pub fn new_socket(addr: &SocketAddr) -> Result<UdpSocket, Error> {
let socket = UdpSocket::bind(addr)?;
#[cfg(unix)]
nix::sys::socket::setsockopt(socket.as_raw_fd(), ReuseAddr, &true)?;
// socket.set_read_timeout(Some(Duration::from_millis(100)))?;
Ok(socket)
}
pub fn join_multicast(socket: &UdpSocket, group: &IpAddr) -> IoResult<()> {
// TODO(ishan): Eventually, this should only listen on the
// interfaces specified in config.toml
match group {
IpAddr::V4(ref mdns_v4) => {
socket.multicast_loop_v4()?;
socket.join_multicast_v4(mdns_v4, &Ipv4Addr::new(0, 0, 0, 0))
}
IpAddr::V6(ref mdns_v6) => {
socket.multicast_loop_v6()?;
socket.join_multicast_v6(mdns_v6, 0)
}
}
}

View File

@ -3,29 +3,34 @@
// Or a common listener/transmitter and then different modules to parse and transmit each type of
// traffic
pub mod socket_manager;
pub use socket_manager::*;
pub mod communications;
pub use communications::*;
pub mod config;
pub use config::*;
pub mod listener;
pub mod processor;
use log::info;
pub use processor::*;
fn main() {
env_logger::init();
info!("starting up");
let config = Config::parse("config.toml").expect("error in parsing config");
println!("{:?}", config);
// TODO(ishan): Start listeners and transmitters on v4 and v6 here
let mut comms = Communications::new(config).expect("error in starting comms");
let mut comms = Communications::new(config.clone()).expect("error in starting comms");
comms
.start_listeners()
.expect("error in starting listeners");
let processor = Processor::new(comms.get_reader());
let processor =
Processor::new(comms.get_reader(), config).expect("error in starting processor");
processor.start_read_loop();

View File

@ -1,24 +1,121 @@
use crate::{communications::CommSocket, Config, Event};
use dns_parser::Packet;
use log::info;
use std::sync::mpsc::Receiver;
use crate::Event;
use get_if_addrs::IfAddr;
use log::{info, trace, warn};
use std::{
collections::{HashMap, HashSet},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::mpsc::Receiver,
};
pub struct Processor<'a> {
reader: &'a Receiver<Event>,
config: Config,
sockets: HashMap<String, CommSocket>,
}
impl<'a> Processor<'a> {
pub fn new(reader: &'a Receiver<Event>) -> Self {
Self { reader }
pub fn new(reader: &'a Receiver<Event>, config: Config) -> Result<Self, String> {
let mut sockets = HashMap::new();
let interfaces = get_if_addrs::get_if_addrs().unwrap();
trace!("Interfaces list: {:?}", interfaces);
for conf in &config.mdns {
let listen_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
for if_name in conf
.destinations
.clone()
.into_iter()
.chain(conf.sources.clone())
{
if sockets.contains_key(&if_name) {
continue;
}
let dst_if = match interfaces.iter().find(|x| x.name == if_name) {
Some(v) => v,
None => {
warn!("could not find interface {}", if_name);
continue;
}
};
let dst_ip_addr = match dst_if.addr {
IfAddr::V4(ref v4) => IpAddr::V4(v4.ip),
IfAddr::V6(ref v6) => IpAddr::V6(v6.ip),
};
let socket =
CommSocket::new(listen_addr, conf.port).expect("error in bind on address");
info!("started listening on {}:{}", listen_addr, conf.port);
for group in &conf.multicast_groups {
socket
.join_multicast_group(&dst_ip_addr, group)
.map_err(|e| {
format!("error in joining multicast group {}: {}", group, e)
})?;
}
sockets.insert(if_name, socket);
}
}
Ok(Self {
reader,
config,
sockets,
})
}
pub fn start_read_loop(&self) {
for evt in self.reader {
// TODO: Generalize this to parse any type of supported packet
let packet =
Packet::parse(&evt.payload).expect("failed to parse packet as a dns packet");
info!("{:?}", packet);
// info!("EVENT {:?}", evt);
//info!("{:?}", packet);
for query in packet.questions {
info!("src_if = {} query = {}", evt.src_if, query.qname);
// Find a config that has src-if and filter
let conf = self.config.mdns.iter().filter(|config| {
(config.destinations.contains(&evt.src_if)
&& config.filters.contains(&query.qname.to_string()))
|| (config.destinations.contains(&evt.src_if) && config.filters.is_empty())
});
let mut sources: HashSet<String> = conf
.clone()
.flat_map(|config| config.sources.clone())
.collect();
sources.remove(&evt.src_if);
let multicast_groups: HashSet<IpAddr> = conf
.flat_map(|config| config.multicast_groups.clone())
.collect();
if !sources.is_empty() {
info!("conf = {:?}", sources);
}
for src in sources {
// Send this MDNS query to all matching sources
// When they respond, We'll forward the message to matching destinations
if let Some(socket) = self.sockets.get(&src) {
for group in multicast_groups.clone() {
socket
.send(&evt.payload, SocketAddr::new(group, 1900))
.expect("error in sending message");
}
}
}
}
}
}
}

0
src/socket_manager.rs Normal file
View File