switched to tokio, cleaned up some interfaces

This commit is contained in:
Ishan Jain 2023-07-28 15:17:52 +05:30
parent cd6f101d04
commit 29155935a0
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
6 changed files with 169 additions and 169 deletions

166
Cargo.lock generated
View File

@ -2,6 +2,21 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "addr2line"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "1.0.2" version = "1.0.2"
@ -17,6 +32,21 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backtrace"
version = "0.3.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.3.2" version = "1.3.2"
@ -47,12 +77,6 @@ version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
@ -137,6 +161,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "gimli"
version = "0.27.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.14.0" version = "0.14.0"
@ -201,12 +231,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]] [[package]]
name = "memoffset" name = "miniz_oxide"
version = "0.7.1" version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
dependencies = [ dependencies = [
"autocfg", "adler",
]
[[package]]
name = "mio"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
dependencies = [
"libc",
"wasi",
"windows-sys",
] ]
[[package]] [[package]]
@ -217,8 +258,8 @@ checksum = "53eacba0998466687d051439bf144286203d7286623275a8007505d22fd21cfb"
dependencies = [ dependencies = [
"get_if_addrs", "get_if_addrs",
"libc", "libc",
"nix 0.19.1", "nix",
"socket2", "socket2 0.3.19",
"winapi 0.3.9", "winapi 0.3.9",
] ]
@ -232,37 +273,11 @@ dependencies = [
"libc", "libc",
"log", "log",
"multicast-socket", "multicast-socket",
"net2",
"network-interface",
"nix 0.26.2",
"once_cell",
"serde", "serde",
"tokio",
"toml", "toml",
] ]
[[package]]
name = "net2"
version = "0.2.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b13b648036a2339d06de780866fbdfda0dde886de7b3af2ddeba8b14f4ee34ac"
dependencies = [
"cfg-if 0.1.10",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "network-interface"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "296ae2183dab440e352740b1fcf0589d8058afa40bc29fd163227ddb01fbf539"
dependencies = [
"cc",
"libc",
"thiserror",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "nix" name = "nix"
version = "0.19.1" version = "0.19.1"
@ -271,35 +286,34 @@ checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
"cc", "cc",
"cfg-if 1.0.0", "cfg-if",
"libc", "libc",
] ]
[[package]] [[package]]
name = "nix" name = "num_cpus"
version = "0.26.2" version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [ dependencies = [
"bitflags 1.3.2", "hermit-abi",
"cfg-if 1.0.0",
"libc", "libc",
"memoffset",
"pin-utils",
"static_assertions",
] ]
[[package]] [[package]]
name = "once_cell" name = "object"
version = "1.18.0" version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "pin-utils" name = "pin-project-lite"
version = "0.1.0" version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
@ -354,6 +368,12 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2" checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2"
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.38.4" version = "0.38.4"
@ -402,16 +422,20 @@ version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if",
"libc", "libc",
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]] [[package]]
name = "static_assertions" name = "socket2"
version = "1.1.0" version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "syn" name = "syn"
@ -434,19 +458,27 @@ dependencies = [
] ]
[[package]] [[package]]
name = "thiserror" name = "tokio"
version = "1.0.44" version = "1.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90" checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da"
dependencies = [ dependencies = [
"thiserror-impl", "autocfg",
"backtrace",
"libc",
"mio",
"num_cpus",
"pin-project-lite",
"socket2 0.4.9",
"tokio-macros",
"windows-sys",
] ]
[[package]] [[package]]
name = "thiserror-impl" name = "tokio-macros"
version = "1.0.44" version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -493,6 +525,12 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.2.8" version = "0.2.8"

View File

@ -10,9 +10,6 @@ get_if_addrs = "0.5.3"
libc = "0.2.147" libc = "0.2.147"
log = "0.4.18" log = "0.4.18"
multicast-socket = "0.2.2" multicast-socket = "0.2.2"
net2 = "0.2.39"
network-interface = "1.0.1"
nix = "0.26.2"
once_cell = "1.17.2"
serde = { version = "1.0.163", features = ["derive"] } serde = { version = "1.0.163", features = ["derive"] }
tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread", "net"] }
toml = "0.7.4" toml = "0.7.4"

View File

@ -1,92 +1,47 @@
use crate::Config; use log::{info, trace, warn};
use log::{info, trace};
use multicast_socket::{Message, MulticastOptions, MulticastSocket}; use multicast_socket::{Message, MulticastOptions, MulticastSocket};
use std::{ use std::{net::SocketAddrV4, sync::mpsc::Sender};
net::SocketAddrV4,
sync::mpsc::{self, Receiver, Sender},
thread::{self, JoinHandle},
};
pub struct Communications { pub struct Communications {
// TODO(ishan): Accommodate IPv6 sockets as well here tx_chan: Sender<Message>,
handles: Vec<JoinHandle<()>>,
tx_chan: Sender<Event>,
rx_chan: Receiver<Event>,
}
#[derive(Debug)]
pub struct Event {
pub msg: Message,
} }
impl Communications { impl Communications {
pub fn new(config: Config) -> Result<Self, String> { pub fn new(tx_chan: Sender<Message>) -> Result<Self, String> {
// This channel is used to send packets from this module to the processor Ok(Communications { tx_chan })
// TODO(ishan): Eventually, swap out std::mpsc for some thing faster
// or switch to a different model completely that doesn't use channels like this
let (rx, tx) = mpsc::channel();
Ok(Communications {
handles: vec![],
rx_chan: tx,
tx_chan: rx,
})
} }
pub fn start_listeners(&mut self) -> Result<(), String> { pub async fn start_listeners(&mut self) -> Result<(), String> {
info!("listener started"); info!("listener started");
let tx_chan = self.tx_chan.clone(); let tx_chan = self.tx_chan.clone();
let handle = thread::spawn(move || { let interfaces = get_if_addrs::get_if_addrs().unwrap();
// TODO(ishan): What should be the size here ? trace!("Interfaces list: {:?}", interfaces);
let interfaces = get_if_addrs::get_if_addrs().unwrap(); // mdns
trace!("Interfaces list: {:?}", interfaces); let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353);
let multicast_socket = MulticastSocket::with_options(
mdns_address,
multicast_socket::all_ipv4_interfaces().expect("could not fetch all interfaces"),
MulticastOptions {
loopback: false,
buffer_size: 4096,
..Default::default()
},
)
.expect("error in creating multicast socket");
// mdns loop {
let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353); match multicast_socket.receive() {
let multicast_socket = MulticastSocket::with_options( Ok(msg) => {
mdns_address, tx_chan.send(msg).expect("error in sending to mpsc channel");
multicast_socket::all_ipv4_interfaces().expect("could not fetch all interfaces"), }
MulticastOptions { Err(e) if e.to_string().contains("EAGAIN") => continue,
loopback: false, Err(e) => {
buffer_size: 4096, warn!("error in reading from socket {:?} ", e);
..Default::default() }
}, };
)
.expect("error in creating multicast socket");
loop {
match multicast_socket.receive() {
Ok(msg) => {
tx_chan
.send(Event { msg })
.expect("error in sending to mpsc channel");
}
Err(_) => {
// TODO: Log all buy EAGAIN
// warn!("error in reading from socket {}: {}", port, e);
continue;
}
};
}
});
self.handles.push(handle);
Ok(())
}
pub fn wait(&mut self) {
while let Some(handle) = self.handles.pop() {
handle.join().unwrap();
} }
} }
pub fn get_reader(&self) -> &Receiver<Event> {
&self.rx_chan
}
// TODO(ishan): Add a function to send messages from a port
} }

View File

@ -1,17 +1,20 @@
#![feature(async_closure)]
// TODO(ishan): Eventually we'll have a listener and transmitter module for every thing we want to // TODO(ishan): Eventually we'll have a listener and transmitter module for every thing we want to
// support. So, 1 for MDNS, another for WSDD? // support. So, 1 for MDNS, another for WSDD?
// Or a common listener/transmitter and then different modules to parse and transmit each type of // Or a common listener/transmitter and then different modules to parse and transmit each type of
// traffic // traffic
pub mod socket_manager;
pub use socket_manager::*;
pub mod communications; pub mod communications;
use std::sync::mpsc::{self, Receiver, Sender};
pub use communications::*; pub use communications::*;
pub mod config; pub mod config;
pub use config::*; pub use config::*;
pub mod processor; pub mod processor;
use log::info; use log::info;
use multicast_socket::Message;
pub use processor::*; pub use processor::*;
use tokio::runtime::Runtime;
fn main() { fn main() {
env_logger::init(); env_logger::init();
@ -22,16 +25,24 @@ fn main() {
println!("{:?}", config); println!("{:?}", config);
// TODO(ishan): Start listeners and transmitters on v4 and v6 here let rt = Runtime::new().expect("error in creating runtime");
let mut comms = Communications::new(config.clone()).expect("error in starting comms");
comms
.start_listeners()
.expect("error in starting listeners");
let processor = // This channel is used to send packets from this module to the processor
Processor::new(comms.get_reader(), config).expect("error in starting processor"); // TODO(ishan): Eventually, swap out std::mpsc for some thing faster
// or switch to a different model completely that doesn't use channels like this
let (tx, rx): (Sender<Message>, Receiver<Message>) = mpsc::channel();
// TODO(ishan): Start listeners and transmitters on v4 and v6 here
let mut comms = Communications::new(tx).expect("error in starting comms");
rt.spawn(async move {
comms
.start_listeners()
.await
.expect("error in comms listener");
});
let processor = Processor::new(rx, config).expect("error in starting processor");
processor.start_read_loop(); processor.start_read_loop();
comms.wait();
} }

View File

@ -1,20 +1,20 @@
use crate::{Config, Event}; use crate::Config;
use dns_parser::Packet; use dns_parser::Packet;
use log::{info, warn}; use log::info;
use multicast_socket::{Interface, MulticastOptions, MulticastSocket}; use multicast_socket::{Interface, Message, MulticastOptions, MulticastSocket};
use std::{ffi::CString, net::SocketAddrV4, sync::mpsc::Receiver}; use std::{ffi::CString, net::SocketAddrV4, sync::mpsc::Receiver};
pub struct Processor<'a> { pub struct Processor {
reader: &'a Receiver<Event>, reader: Receiver<Message>,
config: Config, config: Config,
} }
impl<'a> Processor<'a> { impl Processor {
pub fn new(reader: &'a Receiver<Event>, config: Config) -> Result<Self, String> { pub fn new(reader: Receiver<Message>, config: Config) -> Result<Self, String> {
Ok(Self { reader, config }) Ok(Self { reader, config })
} }
pub fn start_read_loop(&self) { pub fn start_read_loop(self) {
// mdns // mdns
let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353); let mdns_address = SocketAddrV4::new([224, 0, 0, 251].into(), 5353);
let socket = MulticastSocket::with_options( let socket = MulticastSocket::with_options(
@ -30,12 +30,11 @@ impl<'a> Processor<'a> {
for evt in self.reader { for evt in self.reader {
// TODO: Generalize this to parse any type of supported packet // TODO: Generalize this to parse any type of supported packet
let packet = let packet = Packet::parse(&evt.data).expect("failed to parse packet as a dns packet");
Packet::parse(&evt.msg.data).expect("failed to parse packet as a dns packet");
let interfaces = get_if_addrs::get_if_addrs().unwrap(); let interfaces = get_if_addrs::get_if_addrs().unwrap();
let src_ifname = if let Interface::Index(idx) = evt.msg.interface { let src_ifname = if let Interface::Index(idx) = evt.interface {
ifidx_to_ifname(idx as u32) ifidx_to_ifname(idx as u32)
} else { } else {
"lo".to_string() "lo".to_string()
@ -44,8 +43,8 @@ impl<'a> Processor<'a> {
info!( info!(
"EVENT src-if = {} if-index {:?} address = {}, packet: {:?} answers = {:?}", "EVENT src-if = {} if-index {:?} address = {}, packet: {:?} answers = {:?}",
src_ifname, src_ifname,
evt.msg.interface, evt.interface,
evt.msg.origin_address, evt.origin_address,
packet.questions.iter().map(|q| q.qname).collect::<Vec<_>>(), packet.questions.iter().map(|q| q.qname).collect::<Vec<_>>(),
packet.answers.iter().map(|q| q.name).collect::<Vec<_>>() packet.answers.iter().map(|q| q.name).collect::<Vec<_>>()
); );
@ -72,7 +71,7 @@ impl<'a> Processor<'a> {
// TODO(ishan): Take a note of transaction id // TODO(ishan): Take a note of transaction id
// and avoid feedback loops // and avoid feedback loops
socket socket
.send(&evt.msg.data, &Interface::Index(dst_ifid as i32)) .send(&evt.data, &Interface::Index(dst_ifid as i32))
.expect("error in sending mdns packet"); .expect("error in sending mdns packet");
} }
} }
@ -103,7 +102,7 @@ impl<'a> Processor<'a> {
); );
socket socket
.send(&evt.msg.data, &Interface::Index(dst_ifid as i32)) .send(&evt.data, &Interface::Index(dst_ifid as i32))
.expect("error in sending mdns packet"); .expect("error in sending mdns packet");
} }
} }

View File