Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-use VecDeque in run_socket_consume / run_listen #5101

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// Chosen to be able to handle 1Gbps of pure gossip traffic
/// 128MB/PACKET_DATA_SIZE
const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
/// Capacity for the [`ClusterInfo::run_socket_consume`] and [`ClusterInfo::run_listen`]
/// intermediate packet batch buffers.
///
/// Uses a heuristic of 28 packets per [`PacketBatch`], which is an observed
/// average of packets per batch. The buffers are re-used across processing loops,
/// so any extra capacity that may be reserved due to traffic variations will be preserved,
/// avoiding excessive resizing and re-allocation.
const CHANNEL_RECV_BUFFER_INITIAL_CAPACITY: usize = MAX_GOSSIP_TRAFFIC.div_ceil(28);
const GOSSIP_PING_CACHE_CAPACITY: usize = 126976;
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280);
const GOSSIP_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(1280 / 64);
Expand Down Expand Up @@ -1936,7 +1944,7 @@ impl ClusterInfo {

fn process_packets(
&self,
mut packets: VecDeque<Vec<(/*from:*/ SocketAddr, Protocol)>>,
packets: &mut VecDeque<Vec<(/*from:*/ SocketAddr, Protocol)>>,
thread_pool: &ThreadPool,
recycler: &PacketBatchRecycler,
response_sender: &PacketBatchSender,
Expand Down Expand Up @@ -2005,7 +2013,7 @@ impl ClusterInfo {
let mut prune_messages = vec![];
let mut ping_messages = vec![];
let mut pong_messages = vec![];
for (from_addr, packet) in packets.into_iter().flatten() {
for (from_addr, packet) in packets.drain(..).flatten() {
match packet {
Protocol::PullRequest(filter, caller) => {
if !check_pull_request_shred_version(self_shred_version, &caller) {
Expand Down Expand Up @@ -2085,6 +2093,7 @@ impl ClusterInfo {
epoch_specs: Option<&mut EpochSpecs>,
receiver: &PacketBatchReceiver,
sender: &Sender<Vec<(/*from:*/ SocketAddr, Protocol)>>,
packets: &mut VecDeque<PacketBatch>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename packets to packet_buffer or something. everytime i read this i think the packets are getting passed into this function via packets instead of receiver.

) -> Result<(), GossipError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
fn count_dropped_packets(packets: &PacketBatch, dropped_packets_counts: &mut [u64; 7]) {
Expand All @@ -2100,7 +2109,6 @@ impl ClusterInfo {
}
let mut dropped_packets_counts = [0u64; 7];
let mut num_packets = 0;
let mut packets = VecDeque::with_capacity(2);
for packet_batch in receiver
.recv_timeout(RECV_TIMEOUT)
.map(std::iter::once)?
Expand Down Expand Up @@ -2150,7 +2158,7 @@ impl ClusterInfo {
.map(EpochSpecs::current_epoch_staked_nodes)
.cloned()
.unwrap_or_default();
let packets: Vec<_> = {
let packets_verified: Vec<_> = {
let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time);
thread_pool.install(|| {
if packets.len() == 1 {
Expand All @@ -2167,7 +2175,8 @@ impl ClusterInfo {
}
})
};
Ok(sender.send(packets)?)
packets.clear();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worthwhile to toss up a datapoint if packets was resized here, so we can know when it makes sense to bump CHANNEL_RECV_BUFFER_INITIAL_CAPACITY?

Ok(sender.send(packets_verified)?)
}

/// Process messages from the network
Expand All @@ -2180,12 +2189,12 @@ impl ClusterInfo {
thread_pool: &ThreadPool,
last_print: &mut Instant,
should_check_duplicate_instance: bool,
packets: &mut VecDeque<Vec<(/*from:*/ SocketAddr, Protocol)>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same nit on naming convention. won't die on this hill but it is confusing to me.

) -> Result<(), GossipError> {
let _st = ScopedTimer::from(&self.stats.gossip_listen_loop_time);
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
const SUBMIT_GOSSIP_STATS_INTERVAL: Duration = Duration::from_secs(2);
let mut num_packets = 0;
let mut packets = VecDeque::with_capacity(2);
for pkts in receiver
.recv_timeout(RECV_TIMEOUT)
.map(std::iter::once)?
Expand Down Expand Up @@ -2220,6 +2229,7 @@ impl ClusterInfo {
epoch_duration,
should_check_duplicate_instance,
)?;
packets.clear();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly

if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL {
submit_gossip_stats(&self.stats, &self.gossip, &stakes);
*last_print = Instant::now();
Expand All @@ -2243,13 +2253,15 @@ impl ClusterInfo {
.build()
.unwrap();
let mut epoch_specs = bank_forks.map(EpochSpecs::from);
let mut packets = VecDeque::with_capacity(CHANNEL_RECV_BUFFER_INITIAL_CAPACITY);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see so previously we defined a new vecdeque with capacity 2 that had to grow/allocate memory every time run_socket_consume() was called. now it will grow and stay that size and get reused.

looks like the max size of the packets vecdeque would be 2 * MAX_GOSSIP_TRAFFIC, assuming worst case batch sizes (1 packet per batch). so approx: 2 * MAX_GOSSIP_TRAFFIC * 8 bytes/ptr / 1000000 ≈ 1.66MB. Doesn't seem too bad.

any thoughts here?

let run_consume = move || {
while !exit.load(Ordering::Relaxed) {
match self.run_socket_consume(
&thread_pool,
epoch_specs.as_mut(),
&receiver,
&sender,
&mut packets,
) {
Err(GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
Err(GossipError::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Expand Down Expand Up @@ -2281,6 +2293,7 @@ impl ClusterInfo {
.build()
.unwrap();
let mut epoch_specs = bank_forks.map(EpochSpecs::from);
let mut packets = VecDeque::with_capacity(CHANNEL_RECV_BUFFER_INITIAL_CAPACITY);
Builder::new()
.name("solGossipListen".to_string())
.spawn(move || {
Expand All @@ -2293,6 +2306,7 @@ impl ClusterInfo {
&thread_pool,
&mut last_print,
should_check_duplicate_instance,
&mut packets,
) {
match err {
GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Expand Down
Loading