Added a basic version of processor. It can listen on multiple ports and aggregate all of it in 1 place

This commit is contained in:
Ishan Jain 2023-06-05 04:38:36 +05:30
parent 46b21c2013
commit 5936b16b0e
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
10 changed files with 410 additions and 81 deletions

203
Cargo.lock generated
View File

@ -2,6 +2,15 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "aho-corasick"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@ -14,18 +23,86 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "cc"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "dns-parser"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4d33be9473d06f75f58220f71f7a9317aca647dc061dbd3c361b0bef505fbea"
dependencies = [
"byteorder",
"quick-error",
]
[[package]]
name = "env_logger"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0"
dependencies = [
"humantime",
"is-terminal",
"log",
"regex",
"termcolor",
]
[[package]]
name = "errno"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [
"errno-dragonfly",
"libc",
"windows-sys",
]
[[package]]
name = "errno-dragonfly"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
dependencies = [
"cc",
"libc",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.3" version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hermit-abi"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "1.9.3" version = "1.9.3"
@ -36,12 +113,47 @@ dependencies = [
"hashbrown", "hashbrown",
] ]
[[package]]
name = "io-lifetimes"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"libc",
"windows-sys",
]
[[package]]
name = "is-terminal"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [
"hermit-abi",
"io-lifetimes",
"rustix",
"windows-sys",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.144" version = "0.2.144"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "log"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.5.0" version = "2.5.0"
@ -61,10 +173,12 @@ dependencies = [
name = "multicaster" name = "multicaster"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"dns-parser",
"env_logger",
"log",
"nix", "nix",
"once_cell", "once_cell",
"serde", "serde",
"socket2",
"toml", "toml",
] ]
@ -103,6 +217,12 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.28" version = "1.0.28"
@ -112,6 +232,37 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "regex"
version = "1.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81ca098a9821bd52d6b24fd8b10bd081f47d39c22778cafaa75a2857a62c6390"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
[[package]]
name = "rustix"
version = "0.37.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acf8729d8542766f1b2cf77eb034d52f40d375bb8b615d0b147089946e16613d"
dependencies = [
"bitflags",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys",
]
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.163" version = "1.0.163"
@ -141,16 +292,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "socket2"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
dependencies = [
"libc",
"windows-sys",
]
[[package]] [[package]]
name = "static_assertions" name = "static_assertions"
version = "1.1.0" version = "1.1.0"
@ -168,6 +309,15 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "termcolor"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.7.4" version = "0.7.4"
@ -208,6 +358,37 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"

View File

@ -4,8 +4,10 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
dns-parser = "0.8.0"
env_logger = "0.10.0"
log = "0.4.18"
nix = "0.26.2" nix = "0.26.2"
once_cell = "1.17.2" once_cell = "1.17.2"
serde = { version = "1.0.163", features = ["derive"] } serde = { version = "1.0.163", features = ["derive"] }
socket2 = "0.5.3"
toml = "0.7.4" toml = "0.7.4"

View File

@ -12,3 +12,23 @@ This causes problems if there are other softwares that are also listening withou
For now, Disable those softwares when running this. A list of such softwares, For now, Disable those softwares when running this. A list of such softwares,
a. avahi-daemon a. avahi-daemon
### How should this be designed??
For now, I am restricting it to only consider 1 config.
It won't listen on multiple ports for multicast traffic. This will be changed once I have the basic structure ready.
For now, Only work on IPv4. IPv6 will be added once IPv4 is ready

View File

@ -2,3 +2,4 @@
port = 5353 port = 5353
multicast_groups = ["224.0.0.251"] multicast_groups = ["224.0.0.251"]
destinations = ["wlp6s0"] destinations = ["wlp6s0"]
traffic_type = "MDNS"

131
src/communications.rs Normal file
View File

@ -0,0 +1,131 @@
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
sync::mpsc::{self, Receiver, Sender},
thread::{self, JoinHandle},
};
use crate::{
listener::{join_multicast, new_socket},
Config,
};
use log::{debug, info, warn};
use once_cell::sync::Lazy;
pub static IPV4: Lazy<IpAddr> = Lazy::new(|| Ipv4Addr::new(0, 0, 0, 0).into());
pub static IPV6: Lazy<IpAddr> = Lazy::new(|| {
Ipv6Addr::new(0x2a0a, 0x6040, 0x4004, 0x10, 0xf1c6, 0xf2b0, 0x9f45, 0xb425).into()
});
pub struct Communications {
// TODO(ishan): Accommodate IPv6 sockets as well here
sockets: HashMap<u16, UdpSocket>,
handles: Vec<JoinHandle<()>>,
tx_chan: Sender<Event>,
rx_chan: Receiver<Event>,
}
pub struct Event {
pub remote_addr: SocketAddr,
pub payload: Vec<u8>,
}
impl Communications {
pub fn new(input: Config) -> Result<Self, String> {
let mut sockets = HashMap::new();
// This channel is used to send packets from this module to the processor
// TODO(ishan): Eventually, swap out std::mpsc for some thing faster
// or switch to a different model completely that doesn't use channels like this
let (rx, tx) = mpsc::channel();
for (_, v) in input.config.iter() {
let socket =
new_socket(&SocketAddr::new(*IPV4, v.port)).expect("error in bind on ipv4 address");
info!("started listening on {}:{}", *IPV4, v.port);
for group in v.multicast_groups.iter() {
join_multicast(&socket, group)
.map_err(|e| format!("error in joining multicast group {}: {}", group, e))?;
info!("joined multicast group {}", group);
}
sockets.insert(v.port, socket);
}
Ok(Communications {
sockets,
handles: vec![],
rx_chan: tx,
tx_chan: rx,
})
}
pub fn start_listeners(&mut self) -> Result<(), String> {
let sockets = self
.clone_sockets()
.map_err(|e| format!("error in cloning sockets: {}", e))?;
for (port, v) in sockets {
let socket = v
.try_clone()
.map_err(|e| format!("error in cloning socket for read ops: {}", e))?;
let tx_chan = self.tx_chan.clone();
let handle = thread::spawn(move || {
// TODO(ishan): What should be the size here ?
loop {
let mut buf = [0u8; 2000];
match socket.recv_from(&mut buf) {
Ok((len, remote_addr)) => {
let buf = &buf[..len];
tx_chan
.send(Event {
remote_addr,
payload: buf.to_vec(),
})
.expect("error in sending to mpsc channel");
debug!("read {}bytes from {}", len, remote_addr);
}
Err(e) => {
warn!("error in reading from socket {}: {}", port, e);
continue;
}
};
}
});
self.handles.push(handle);
}
Ok(())
}
pub fn wait(&mut self) {
while let Some(handle) = self.handles.pop() {
handle.join().unwrap();
}
}
fn clone_sockets(&self) -> Result<HashMap<u16, UdpSocket>, std::io::Error> {
let mut sockets = HashMap::new();
for (k, v) in self.sockets.iter() {
sockets.insert(*k, v.try_clone()?);
}
Ok(sockets)
}
pub fn get_reader(&self) -> &Receiver<Event> {
&self.rx_chan
}
// TODO(ishan): Add a function to send messages from a port
}

View File

@ -1,3 +1,4 @@
use log::debug;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fs::File, io::Read, net::IpAddr}; use std::{collections::HashMap, fs::File, io::Read, net::IpAddr};
@ -11,6 +12,12 @@ pub struct Record {
pub port: u16, pub port: u16,
pub multicast_groups: Vec<IpAddr>, pub multicast_groups: Vec<IpAddr>,
pub destinations: Vec<String>, pub destinations: Vec<String>,
pub traffic_type: TrafficType,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum TrafficType {
MDNS,
} }
impl Config { impl Config {
@ -18,6 +25,10 @@ impl Config {
pub fn parse(mut filename: &str) -> Result<Config, String> { pub fn parse(mut filename: &str) -> Result<Config, String> {
if filename.is_empty() { if filename.is_empty() {
debug!(
"filename is empty. using default name: {}",
Config::FILENAME
);
filename = Config::FILENAME; filename = Config::FILENAME;
} }

View File

@ -1,66 +0,0 @@
// TODO(ishan): Eventually we'll have a listener and transmitter module for every thing we want to
// support. So, 1 for MDNS, another for WSDD?
// Or a common listener/transmitter and then different modules to parse and transmit each type of
// traffic
pub mod config;
pub use config::*;
pub mod listener;
use crate::listener::{join_multicast, new_socket};
use once_cell::sync::Lazy;
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
thread,
};
pub static IPV4: Lazy<IpAddr> = Lazy::new(|| Ipv4Addr::new(0, 0, 0, 0).into());
pub static IPV6: Lazy<IpAddr> = Lazy::new(|| {
Ipv6Addr::new(0x2a0a, 0x6040, 0x4004, 0x10, 0xf1c6, 0xf2b0, 0x9f45, 0xb425).into()
});
pub fn start(config: crate::Config) {
// TODO(ishan): Start listeners and transmitters on v4 and v6 here
let mut handles = vec![];
for (_, v) in config.config {
let handle = thread::spawn(move || {
let mut ipv4_socket = new_socket(&SocketAddr::new(*IPV4, v.port))
.expect("error in bind op on ipv4 address");
let mut ipv6_socket = new_socket(&SocketAddr::new(*IPV6, v.port))
.expect("error in bind op on ipv6 address");
for group in v.multicast_groups {
join_multicast(&mut ipv4_socket, &group)
.expect("error in joining ipv4 multicast group");
join_multicast(&mut ipv6_socket, &group)
.expect("error in joining ipv6 multicast group");
println!(
"Listening for multicast packets on ipv4/ipv6 in group {}:{}",
group, v.port
);
}
let mut buf = [0u8; 64];
match ipv4_socket.recv_from(&mut buf) {
Ok((len, remote_addr)) => {
let buf = &buf[..len];
println!("received {}bytes response from {}", len, remote_addr);
let v = String::from_utf8_lossy(buf);
println!("output = {:?}", v);
}
Err(e) => {
println!("got an error: {}", e);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap()
}
}

View File

@ -17,7 +17,7 @@ pub fn new_socket(addr: &SocketAddr) -> Result<UdpSocket, Error> {
Ok(socket) Ok(socket)
} }
pub fn join_multicast(socket: &mut UdpSocket, group: &IpAddr) -> IoResult<()> { pub fn join_multicast(socket: &UdpSocket, group: &IpAddr) -> IoResult<()> {
// TODO(ishan): Eventually, this should only listen on the // TODO(ishan): Eventually, this should only listen on the
// interfaces specified in config.toml // interfaces specified in config.toml
match group { match group {

View File

@ -1,8 +1,33 @@
use multicaster::Config; // TODO(ishan): Eventually we'll have a listener and transmitter module for every thing we want to
// support. So, 1 for MDNS, another for WSDD?
// Or a common listener/transmitter and then different modules to parse and transmit each type of
// traffic
pub mod communications;
pub use communications::*;
pub mod config;
pub use config::*;
pub mod listener;
pub mod processor;
pub use processor::*;
fn main() { fn main() {
env_logger::init();
let config = Config::parse("config.toml").expect("error in parsing config"); let config = Config::parse("config.toml").expect("error in parsing config");
println!("{:?}", config); println!("{:?}", config);
multicaster::start(config); // TODO(ishan): Start listeners and transmitters on v4 and v6 here
let mut comms = Communications::new(config).expect("error in starting comms");
comms
.start_listeners()
.expect("error in starting listeners");
let processor = Processor::new(comms.get_reader());
processor.start_read_loop();
comms.wait();
} }

24
src/processor.rs Normal file
View File

@ -0,0 +1,24 @@
use dns_parser::Packet;
use log::info;
use std::sync::mpsc::Receiver;
use crate::Event;
pub struct Processor<'a> {
reader: &'a Receiver<Event>,
}
impl<'a> Processor<'a> {
pub fn new(reader: &'a Receiver<Event>) -> Self {
Self { reader }
}
pub fn start_read_loop(&self) {
for evt in self.reader {
let packet =
Packet::parse(&evt.payload).expect("failed to parse packet as a dns packet");
info!("{:?}", packet);
}
}
}