-
Notifications
You must be signed in to change notification settings - Fork 372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-use VecDeque
in run_socket_consume
/ run_listen
#5101
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,6 +110,14 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100; | |
/// Chosen to be able to handle 1Gbps of pure gossip traffic | ||
/// 128MB/PACKET_DATA_SIZE | ||
const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; | ||
/// Capacity for the [`ClusterInfo::run_socket_consume`] and [`ClusterInfo::run_listen`] | ||
/// intermediate packet batch buffers. | ||
/// | ||
/// Uses a heuristic of 28 packets per [`PacketBatch`], which is an observed | ||
/// average of packets per batch. The buffers are re-used across processing loops, | ||
/// so any extra capacity that may be reserved due to traffic variations will be preserved, | ||
/// avoiding excessive resizing and re-allocation. | ||
const CHANNEL_RECV_BUFFER_INITIAL_CAPACITY: usize = MAX_GOSSIP_TRAFFIC.div_ceil(28); | ||
const GOSSIP_PING_CACHE_CAPACITY: usize = 126976; | ||
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280); | ||
const GOSSIP_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(1280 / 64); | ||
|
@@ -1936,7 +1944,7 @@ impl ClusterInfo { | |
|
||
fn process_packets( | ||
&self, | ||
mut packets: VecDeque<Vec<(/*from:*/ SocketAddr, Protocol)>>, | ||
packets: &mut VecDeque<Vec<(/*from:*/ SocketAddr, Protocol)>>, | ||
thread_pool: &ThreadPool, | ||
recycler: &PacketBatchRecycler, | ||
response_sender: &PacketBatchSender, | ||
|
@@ -2005,7 +2013,7 @@ impl ClusterInfo { | |
let mut prune_messages = vec![]; | ||
let mut ping_messages = vec![]; | ||
let mut pong_messages = vec![]; | ||
for (from_addr, packet) in packets.into_iter().flatten() { | ||
for (from_addr, packet) in packets.drain(..).flatten() { | ||
match packet { | ||
Protocol::PullRequest(filter, caller) => { | ||
if !check_pull_request_shred_version(self_shred_version, &caller) { | ||
|
@@ -2085,6 +2093,7 @@ impl ClusterInfo { | |
epoch_specs: Option<&mut EpochSpecs>, | ||
receiver: &PacketBatchReceiver, | ||
sender: &Sender<Vec<(/*from:*/ SocketAddr, Protocol)>>, | ||
packets: &mut VecDeque<PacketBatch>, | ||
) -> Result<(), GossipError> { | ||
const RECV_TIMEOUT: Duration = Duration::from_secs(1); | ||
fn count_dropped_packets(packets: &PacketBatch, dropped_packets_counts: &mut [u64; 7]) { | ||
|
@@ -2100,7 +2109,6 @@ impl ClusterInfo { | |
} | ||
let mut dropped_packets_counts = [0u64; 7]; | ||
let mut num_packets = 0; | ||
let mut packets = VecDeque::with_capacity(2); | ||
for packet_batch in receiver | ||
.recv_timeout(RECV_TIMEOUT) | ||
.map(std::iter::once)? | ||
|
@@ -2150,7 +2158,7 @@ impl ClusterInfo { | |
.map(EpochSpecs::current_epoch_staked_nodes) | ||
.cloned() | ||
.unwrap_or_default(); | ||
let packets: Vec<_> = { | ||
let packets_verified: Vec<_> = { | ||
let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time); | ||
thread_pool.install(|| { | ||
if packets.len() == 1 { | ||
|
@@ -2167,7 +2175,8 @@ impl ClusterInfo { | |
} | ||
}) | ||
}; | ||
Ok(sender.send(packets)?) | ||
packets.clear(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would it be worthwhile to toss up a datapoint if |
||
Ok(sender.send(packets_verified)?) | ||
} | ||
|
||
/// Process messages from the network | ||
|
@@ -2180,12 +2189,12 @@ impl ClusterInfo { | |
thread_pool: &ThreadPool, | ||
last_print: &mut Instant, | ||
should_check_duplicate_instance: bool, | ||
packets: &mut VecDeque<Vec<(/*from:*/ SocketAddr, Protocol)>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: same nit on naming convention. won't die on this hill but it is confusing to me. |
||
) -> Result<(), GossipError> { | ||
let _st = ScopedTimer::from(&self.stats.gossip_listen_loop_time); | ||
const RECV_TIMEOUT: Duration = Duration::from_secs(1); | ||
const SUBMIT_GOSSIP_STATS_INTERVAL: Duration = Duration::from_secs(2); | ||
let mut num_packets = 0; | ||
let mut packets = VecDeque::with_capacity(2); | ||
for pkts in receiver | ||
.recv_timeout(RECV_TIMEOUT) | ||
.map(std::iter::once)? | ||
|
@@ -2220,6 +2229,7 @@ impl ClusterInfo { | |
epoch_duration, | ||
should_check_duplicate_instance, | ||
)?; | ||
packets.clear(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similarly |
||
if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL { | ||
submit_gossip_stats(&self.stats, &self.gossip, &stakes); | ||
*last_print = Instant::now(); | ||
|
@@ -2243,13 +2253,15 @@ impl ClusterInfo { | |
.build() | ||
.unwrap(); | ||
let mut epoch_specs = bank_forks.map(EpochSpecs::from); | ||
let mut packets = VecDeque::with_capacity(CHANNEL_RECV_BUFFER_INITIAL_CAPACITY); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i see so previously we defined a new vecdeque with capacity 2 that had to grow/allocate memory every time run_socket_consume() was called. now it will grow and stay that size and get reused. looks like the max size of the any thoughts here? |
||
let run_consume = move || { | ||
while !exit.load(Ordering::Relaxed) { | ||
match self.run_socket_consume( | ||
&thread_pool, | ||
epoch_specs.as_mut(), | ||
&receiver, | ||
&sender, | ||
&mut packets, | ||
) { | ||
Err(GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, | ||
Err(GossipError::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), | ||
|
@@ -2281,6 +2293,7 @@ impl ClusterInfo { | |
.build() | ||
.unwrap(); | ||
let mut epoch_specs = bank_forks.map(EpochSpecs::from); | ||
let mut packets = VecDeque::with_capacity(CHANNEL_RECV_BUFFER_INITIAL_CAPACITY); | ||
Builder::new() | ||
.name("solGossipListen".to_string()) | ||
.spawn(move || { | ||
|
@@ -2293,6 +2306,7 @@ impl ClusterInfo { | |
&thread_pool, | ||
&mut last_print, | ||
should_check_duplicate_instance, | ||
&mut packets, | ||
) { | ||
match err { | ||
GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe rename
packets
topacket_buffer
or something. everytime i read this i think the packets are getting passed into this function viapackets
instead ofreceiver
.