Skip to content

Commit

Permalink
uses Box<[SocketAddr]> instead of Vec<SocketAddr>
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Feb 27, 2025
1 parent cc74cb9 commit d60edaa
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 25 deletions.
32 changes: 18 additions & 14 deletions turbine/src/addr_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ pub(crate) struct AddrCache {
struct CacheEntry {
// Root distance and socket addresses cached either speculatively or when
// retransmitting incoming shreds.
code: Vec<Option<(/*root_distance:*/ u8, Vec<SocketAddr>)>>,
data: Vec<Option<(/*root_distance:*/ u8, Vec<SocketAddr>)>>,
code: Vec<Option<(/*root_distance:*/ u8, Box<[SocketAddr]>)>>,
data: Vec<Option<(/*root_distance:*/ u8, Box<[SocketAddr]>)>>,
// Code and data indices where [..index] are fully populated.
index_code: usize,
index_data: usize,
Expand Down Expand Up @@ -81,15 +81,19 @@ impl AddrCache {

// Returns (root-distance, socket-addresses) cached for the given shred-id.
#[inline]
pub(crate) fn get(&self, shred: &ShredId) -> Option<(/*root_distance:*/ u8, &Vec<SocketAddr>)> {
pub(crate) fn get(&self, shred: &ShredId) -> Option<(/*root_distance:*/ u8, &[SocketAddr])> {
self.cache
.get(&shred.slot())?
.get(shred.shred_type(), shred.index())
}

// Stores (root-distance, socket-addresses) precomputed speculatively for
// the given shred-id.
pub(crate) fn put(&mut self, shred: &ShredId, entry: (/*root_distance:*/ u8, Vec<SocketAddr>)) {
pub(crate) fn put(
&mut self,
shred: &ShredId,
entry: (/*root_distance:*/ u8, Box<[SocketAddr]>),
) {
self.get_cache_entry_mut(shred.slot())
.put(shred.shred_type(), shred.index(), entry);
self.maybe_trim_cache();
Expand Down Expand Up @@ -262,14 +266,14 @@ impl CacheEntry {
&self,
shred_type: ShredType,
shred_index: u32,
) -> Option<(/*root_distance:*/ u8, &Vec<SocketAddr>)> {
) -> Option<(/*root_distance:*/ u8, &[SocketAddr])> {
match shred_type {
ShredType::Code => &self.code,
ShredType::Data => &self.data,
}
.get(shred_index as usize)?
.as_ref()
.map(|(root_distance, addrs)| (*root_distance, addrs))
.map(|(root_distance, addrs)| (*root_distance, addrs.as_ref()))
}

// Stores (root-distance, socket-addresses) for the given shred type and
Expand All @@ -279,7 +283,7 @@ impl CacheEntry {
&mut self,
shred_type: ShredType,
shred_index: u32,
entry: (/*root_distance:*/ u8, Vec<SocketAddr>),
entry: (/*root_distance:*/ u8, Box<[SocketAddr]>),
) {
let cache = match shred_type {
ShredType::Code => &mut self.code,
Expand Down Expand Up @@ -356,9 +360,9 @@ mod tests {
assert_eq!(entry.index_code, 0);
assert_eq!(entry.index_data, 0);

entry.put(ShredType::Code, 0, (0, vec![]));
entry.put(ShredType::Code, 2, (0, vec![]));
entry.put(ShredType::Data, 1, (0, vec![]));
entry.put(ShredType::Code, 0, (0, Box::new([])));
entry.put(ShredType::Code, 2, (0, Box::new([])));
entry.put(ShredType::Data, 1, (0, Box::new([])));
assert!(entry.get_shreds(5).eq([
(ShredType::Code, 1),
(ShredType::Data, 0),
Expand All @@ -371,10 +375,10 @@ mod tests {
assert_eq!(entry.index_code, 1);
assert_eq!(entry.index_data, 0);

entry.put(ShredType::Code, 1, (0, vec![]));
entry.put(ShredType::Code, 4, (0, vec![]));
entry.put(ShredType::Data, 0, (0, vec![]));
entry.put(ShredType::Data, 3, (0, vec![]));
entry.put(ShredType::Code, 1, (0, Box::new([])));
entry.put(ShredType::Code, 4, (0, Box::new([])));
entry.put(ShredType::Data, 0, (0, Box::new([])));
entry.put(ShredType::Data, 3, (0, Box::new([])));
assert!(entry.get_shreds(5).eq([
(ShredType::Code, 3),
(ShredType::Data, 2),
Expand Down
19 changes: 8 additions & 11 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct RetransmitShredOutput {
// Number of nodes the shred was retransmitted to.
num_nodes: usize,
// Addresses the shred was sent to if there was a cache miss.
addrs: Option<Vec<SocketAddr>>,
addrs: Option<Box<[SocketAddr]>>,
}

#[derive(Default)]
Expand All @@ -87,7 +87,7 @@ pub(crate) struct RetransmitSlotStats {
num_shreds_sent: [usize; MAX_NUM_TURBINE_HOPS],
// Root distance and socket-addresses the shreds were sent to if there was
// a cache miss.
pub(crate) addrs: Vec<(ShredId, /*root_distance:*/ u8, Vec<SocketAddr>)>,
pub(crate) addrs: Vec<(ShredId, /*root_distance:*/ u8, Box<[SocketAddr]>)>,
}

struct RetransmitStats {
Expand Down Expand Up @@ -411,7 +411,7 @@ fn retransmit_shred(
root_distance,
num_nodes,
addrs: match addrs {
Cow::Owned(addrs) => Some(addrs),
Cow::Owned(addrs) => Some(addrs.into_boxed_slice()),
Cow::Borrowed(_) => None,
},
})
Expand All @@ -424,7 +424,7 @@ fn get_retransmit_addrs<'a>(
addr_cache: &'a AddrCache,
socket_addr_space: &SocketAddrSpace,
stats: &RetransmitStats,
) -> Option<(/*root_distance:*/ u8, Cow<'a, Vec<SocketAddr>>)> {
) -> Option<(/*root_distance:*/ u8, Cow<'a, [SocketAddr]>)> {
if let Some((root_distance, addrs)) = addr_cache.get(shred) {
stats.addr_cache_hit.fetch_add(1, Ordering::Relaxed);
return Some((root_distance, Cow::Borrowed(addrs)));
Expand Down Expand Up @@ -481,13 +481,10 @@ fn cache_retransmit_addrs(
let get_retransmit_addrs = |shred: ShredId| {
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(shred.slot(), &root_bank);
let (slot_leader, cluster_nodes) = cache.get(&shred.slot())?;
let addrs = cluster_nodes.get_retransmit_addrs(
slot_leader,
&shred,
data_plane_fanout,
socket_addr_space,
);
Some((shred, addrs.ok()?))
let (root_distance, addrs) = cluster_nodes
.get_retransmit_addrs(slot_leader, &shred, data_plane_fanout, socket_addr_space)
.ok()?;
Some((shred, (root_distance, addrs.into_boxed_slice())))
};
let mut out = false;
if shreds.len() < PAR_ITER_MIN_NUM_SHREDS {
Expand Down

0 comments on commit d60edaa

Please sign in to comment.