removed duplicate code, optimized processor

This commit is contained in:
Ishan Jain 2023-07-28 15:57:54 +05:30
parent 29155935a0
commit 1ede90cc29
Signed by: ishan
GPG Key ID: 0506DB2A1CC75C27
3 changed files with 36 additions and 51 deletions

View File

@ -14,8 +14,6 @@ impl Communications {
pub async fn start_listeners(&mut self) -> Result<(), String> {
info!("listener started");
let tx_chan = self.tx_chan.clone();
let interfaces = get_if_addrs::get_if_addrs().unwrap();
trace!("Interfaces list: {:?}", interfaces);
@ -35,7 +33,9 @@ impl Communications {
loop {
match multicast_socket.receive() {
Ok(msg) => {
tx_chan.send(msg).expect("error in sending to mpsc channel");
self.tx_chan
.send(msg)
.expect("error in sending to mpsc channel");
}
Err(e) if e.to_string().contains("EAGAIN") => continue,
Err(e) => {

View File

@ -3,18 +3,17 @@
// support. So, 1 for MDNS, another for WSDD?
// Or a common listener/transmitter and then different modules to parse and transmit each type of
// traffic
use log::info;
use multicast_socket::Message;
use std::sync::mpsc::{self, Receiver, Sender};
use tokio::runtime::Runtime;
pub mod communications;
use std::sync::mpsc::{self, Receiver, Sender};
pub use communications::*;
pub mod config;
pub use config::*;
pub mod processor;
use log::info;
use multicast_socket::Message;
pub use processor::*;
use tokio::runtime::Runtime;
fn main() {
env_logger::init();

View File

@ -49,64 +49,50 @@ impl Processor {
packet.answers.iter().map(|q| q.name).collect::<Vec<_>>()
);
for conf in &self.config.mdns {
let mut forward = false;
let mut dst_ifs = vec![];
for query in &packet.questions {
let forward = conf.destinations.contains(&src_ifname)
forward |= conf.destinations.contains(&src_ifname)
&& (conf.filters.is_empty()
|| conf.filters.contains(&query.qname.to_string()));
if forward {
let dst_ifs = conf
.sources
.clone()
.into_iter()
.filter_map(|dst_if| interfaces.iter().find(|x| x.name == dst_if));
for dst_if in dst_ifs {
let dst_ifid = ifname_to_ifidx(dst_if.name.to_string());
info!(
"forwarding {:?} from {:?} to {:?}({})",
packet, src_ifname, dst_if, dst_ifid
);
// TODO(ishan): Take a note of transaction id
// and avoid feedback loops
socket
.send(&evt.data, &Interface::Index(dst_ifid as i32))
.expect("error in sending mdns packet");
}
dst_ifs.extend(
conf.sources
.iter()
.filter_map(|dst_if| interfaces.iter().find(|x| &x.name == dst_if)),
);
}
}
for answer in &packet.answers {
let forward = conf.sources.contains(&src_ifname)
forward |= conf.sources.contains(&src_ifname)
&& (conf.filters.is_empty()
|| conf.filters.contains(&answer.name.to_string()));
if forward {
let dst_ifs = conf
.destinations
.clone()
.into_iter()
.filter_map(|dst_if| interfaces.iter().find(|x| x.name == dst_if));
// TODO(ishan): Stop blasting this every where.
// Try to limit answers traffic between destinations
// it should not be broadcasting an answer to all networks just because 1
// asked for it
for dst_if in dst_ifs {
let dst_ifid = ifname_to_ifidx(dst_if.name.to_string());
info!(
"forwarding {:?} from {:?} to {:?}({})",
packet, src_ifname, dst_if, dst_ifid
);
socket
.send(&evt.data, &Interface::Index(dst_ifid as i32))
.expect("error in sending mdns packet");
}
dst_ifs.extend(
conf.destinations
.iter()
.filter_map(|dst_if| interfaces.iter().find(|x| &x.name == dst_if)),
);
}
}
for dst_if in dst_ifs {
let dst_ifid = ifname_to_ifidx(dst_if.name.to_string());
info!(
"forwarding {:?} from {:?} to {:?}({})",
packet, src_ifname, dst_if, dst_ifid
);
// TODO(ishan): Take a note of transaction id
// and avoid feedback loops
socket
.send(&evt.data, &Interface::Index(dst_ifid as i32))
.expect("error in sending mdns packet");
}
}
}
}