Skip to content

Commit

Permalink
fix: dispatcher pipelines is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa authored and rvql committed Sep 14, 2024
1 parent 96241af commit d072408
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 50 deletions.
14 changes: 10 additions & 4 deletions agent/src/dispatcher/base_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,8 +806,11 @@ impl BaseDispatcherListener {
}
}
});
if deleted.len() > 0 {
info!("Dispatcher{} Removing VMs: {:?}", self.log_id, deleted);
if !deleted.is_empty() {
info!(
"Dispatcher{} Removing VMs: {:?} by {:?} + {:?}",
self.log_id, deleted, keys, vm_macs
);
}
if pipelines.len() == keys.len() {
return;
Expand Down Expand Up @@ -845,8 +848,11 @@ impl BaseDispatcherListener {
})),
);
}
if added.len() > 0 {
info!("Dispatcher{} Adding VMs: {:?}", self.log_id, added);
if !added.is_empty() {
info!(
"Dispatcher{} Adding VMs: {:?} by {:?} + {:?}",
self.log_id, added, keys, vm_macs
);
}
}

Expand Down
59 changes: 37 additions & 22 deletions agent/src/dispatcher/local_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,42 @@ pub struct LocalModeDispatcherListener {
rewriter: MacRewriter,
}

pub fn skip_by_blacklist(
id: &String,
keys: Vec<u32>,
macs: Vec<MacAddr>,
blacklist: &Vec<u64>,
) -> (Vec<u32>, Vec<MacAddr>) {
if blacklist.is_empty() {
return (keys, macs);
}

// 当虚拟机内的容器节点已部署采集器时,宿主机采集器需要排除容器节点的接口,避免采集双份重复流量
let mut blackset = HashSet::with_capacity(blacklist.len());
for mac in blacklist {
blackset.insert(*mac & 0xffffff);
}
let mut inject_keys = vec![];
let mut inject_macs = vec![];
let mut rejected = vec![];
for (i, mac) in macs.iter().enumerate() {
if blackset.contains(&(u64::from(mac.clone()) & 0xffffff)) {
rejected.push(mac);
} else {
inject_keys.push(keys[i]);
inject_macs.push(macs[i]);
}
}

if !rejected.is_empty() {
debug!(
"Dispatcher{} Tap interfaces {:?} rejected by blacklist",
id, rejected
);
}
(inject_keys, inject_macs)
}

impl LocalModeDispatcherListener {
pub(super) fn new(
base: BaseDispatcherListener,
Expand Down Expand Up @@ -352,28 +388,6 @@ impl LocalModeDispatcherListener {
blacklist: &Vec<u64>,
) {
let mut interfaces = interfaces.to_vec();
if !blacklist.is_empty() {
// 当虚拟机内的容器节点已部署采集器时,宿主机采集器需要排除容器节点的接口,避免采集双份重复流量
let mut blackset = HashSet::with_capacity(blacklist.len());
for mac in blacklist {
blackset.insert(*mac & 0xffffffff);
}
let mut rejected = vec![];
interfaces.retain(|iface| {
if blackset.contains(&(u64::from(iface.mac_addr) & 0xffffffff)) {
rejected.push(iface.mac_addr);
false
} else {
true
}
});
if !rejected.is_empty() {
debug!(
"Dispatcher{} Tap interfaces {:?} rejected by blacklist",
self.base.log_id, rejected
);
}
}
// interfaces为实际TAP口的集合,macs为TAP口对应主机的MAC地址集合
interfaces.sort_by_key(|link| link.if_index);
let keys = interfaces
Expand All @@ -387,6 +401,7 @@ impl LocalModeDispatcherListener {
#[cfg(target_os = "linux")]
&self.base.options.lock().unwrap().tap_mac_script,
);
let (keys, macs) = skip_by_blacklist(&self.base.log_id, keys, macs, blacklist);
self.base.on_vm_change(&keys, &macs);
self.base.on_tap_interface_change(interfaces, if_mac_source);
}
Expand Down
26 changes: 2 additions & 24 deletions agent/src/dispatcher/local_plus_mode_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#[cfg(any(target_os = "linux", target_os = "android"))]
use std::collections::HashMap;
use std::collections::HashSet;
use std::mem::drop;
#[cfg(any(target_os = "linux", target_os = "android"))]
use std::process::Command;
Expand All @@ -36,7 +35,7 @@ use nix::{

use super::base_dispatcher::{BaseDispatcher, BaseDispatcherListener};
use super::error::Result;
use super::local_mode_dispatcher::{LocalModeDispatcherListener, MacRewriter};
use super::local_mode_dispatcher::{skip_by_blacklist, LocalModeDispatcherListener, MacRewriter};
use super::Packet;

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -519,28 +518,6 @@ impl LocalPlusModeDispatcherListener {
blacklist: &Vec<u64>,
) {
let mut interfaces = interfaces.to_vec();
if !blacklist.is_empty() {
// 当虚拟机内的容器节点已部署采集器时,宿主机采集器需要排除容器节点的接口,避免采集双份重复流量
let mut blackset = HashSet::with_capacity(blacklist.len());
for mac in blacklist {
blackset.insert(*mac & 0xffffffff);
}
let mut rejected = vec![];
interfaces.retain(|iface| {
if blackset.contains(&(u64::from(iface.mac_addr) & 0xffffffff)) {
rejected.push(iface.mac_addr);
false
} else {
true
}
});
if !rejected.is_empty() {
debug!(
"Dispatcher{} Tap interfaces {:?} rejected by blacklist",
self.base.log_id, rejected
);
}
}
// interfaces为实际TAP口的集合,macs为TAP口对应主机的MAC地址集合
interfaces.sort_by_key(|link| link.if_index);
let keys = interfaces
Expand All @@ -554,6 +531,7 @@ impl LocalPlusModeDispatcherListener {
#[cfg(target_os = "linux")]
&self.base.options.lock().unwrap().tap_mac_script,
);
let (keys, macs) = skip_by_blacklist(&self.base.log_id, keys, macs, blacklist);
self.base.on_vm_change(&keys, &macs);
self.base.on_tap_interface_change(interfaces, if_mac_source);
}
Expand Down

0 comments on commit d072408

Please sign in to comment.