Skip to content

Commit

Permalink
feat(iroh-gossip): configure the max message size (#2340)
Browse files Browse the repository at this point in the history
## Description

Add configuration option for `max_message_size` for
`iroh-gossip::proto::Config`.

This `Config` gets used in `iroh-gossip::Gossip::from_endpoint`.

`iroh-docs` still uses the default 4096 bytes. The `max_message_size`
configuration is useful for folks using `iroh-gossip::Gossip` as its own
library.

closes #2312

## Breaking Changes
Adds:
`iroh-gossip::Gossip::max_message_size` - that reports the configured
maximum message size for the gossip actor.

Changes:
`iroh_gossip::net::util::read_message` now takes a `max_message_size:
usize` parameter
`iroh_gossip::net::util::write_message` now takes a `max_message_size:
usize` parameter
`iroh_gossip::net::util::read_lp` now takes a `max_message_size: usize`
parameter

Removes:
  `iroh-gossip::proto:: MAX_MESSAGE_SIZE` const

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] All breaking changes documented.
  • Loading branch information
ramfox authored Jun 5, 2024
1 parent d0662c2 commit 7153a38
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 15 deletions.
2 changes: 1 addition & 1 deletion iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
match details
.outcome
.heads_received
.encode(Some(iroh_gossip::net::MAX_MESSAGE_SIZE))
.encode(Some(self.gossip.max_message_size()))
{
Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
Ok(heads) => {
Expand Down
30 changes: 23 additions & 7 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ pub mod util;

/// ALPN protocol name
pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/0";
/// Maximum message size is limited currently. The limit is more-or-less arbitrary.
// TODO: Make the limit configurable.
pub const MAX_MESSAGE_SIZE: usize = 4096;

/// Channel capacity for all subscription broadcast channels (single)
const SUBSCRIBE_ALL_CAP: usize = 2048;
/// Channel capacity for topic subscription broadcast channels (one per topic)
Expand Down Expand Up @@ -76,6 +72,7 @@ pub struct Gossip {
to_actor_tx: mpsc::Sender<ToActor>,
on_endpoints_tx: mpsc::Sender<Vec<iroh_net::config::Endpoint>>,
_actor_handle: Arc<JoinHandle<anyhow::Result<()>>>,
max_message_size: usize,
}

impl Gossip {
Expand All @@ -94,6 +91,7 @@ impl Gossip {
let (on_endpoints_tx, on_endpoints_rx) = mpsc::channel(ON_ENDPOINTS_CAP);

let me = endpoint.node_id().fmt_short();
let max_message_size = state.max_message_size();
let actor = Actor {
endpoint,
state,
Expand Down Expand Up @@ -125,9 +123,15 @@ impl Gossip {
to_actor_tx,
on_endpoints_tx,
_actor_handle: Arc::new(actor_handle),
max_message_size,
}
}

/// Get the maximum message size configured for this gossip actor.
pub fn max_message_size(&self) -> usize {
self.max_message_size
}

/// Join a topic and connect to peers.
///
///
Expand Down Expand Up @@ -427,12 +431,23 @@ impl Actor {
let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
self.conn_send_tx.insert(peer_id, send_tx.clone());

let max_message_size = self.state.max_message_size();

// Spawn a task for this connection
let in_event_tx = self.in_event_tx.clone();
tokio::spawn(
async move {
debug!("connection established");
match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await {
match connection_loop(
peer_id,
conn,
origin,
send_rx,
&in_event_tx,
max_message_size,
)
.await
{
Ok(()) => {
debug!("connection closed without error")
}
Expand Down Expand Up @@ -605,6 +620,7 @@ async fn connection_loop(
origin: ConnOrigin,
mut send_rx: mpsc::Receiver<ProtoMessage>,
in_event_tx: &mpsc::Sender<InEvent>,
max_message_size: usize,
) -> anyhow::Result<()> {
let (mut send, mut recv) = match origin {
ConnOrigin::Accept => conn.accept_bi().await?,
Expand All @@ -621,10 +637,10 @@ async fn connection_loop(
// but the other side may still want to use it to
// send data to us.
Some(msg) = send_rx.recv(), if !send_rx.is_closed() => {
write_message(&mut send, &mut send_buf, &msg).await?
write_message(&mut send, &mut send_buf, &msg, max_message_size).await?
}

msg = read_message(&mut recv, &mut recv_buf) => {
msg = read_message(&mut recv, &mut recv_buf, max_message_size) => {
let msg = msg?;
match msg {
None => break,
Expand Down
13 changes: 8 additions & 5 deletions iroh-gossip/src/net/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ use tokio::{

use crate::proto::util::TimerMap;

use super::{ProtoMessage, MAX_MESSAGE_SIZE};
use super::ProtoMessage;

/// Write a `ProtoMessage` as a length-prefixed, postcard-encoded message.
pub async fn write_message<W: AsyncWrite + Unpin>(
writer: &mut W,
buffer: &mut BytesMut,
frame: &ProtoMessage,
max_message_size: usize,
) -> Result<()> {
let len = postcard::experimental::serialized_size(&frame)?;
ensure!(len < MAX_MESSAGE_SIZE);
ensure!(len < max_message_size);
buffer.clear();
buffer.resize(len, 0u8);
let slice = postcard::to_slice(&frame, buffer)?;
Expand All @@ -33,8 +34,9 @@ pub async fn write_message<W: AsyncWrite + Unpin>(
pub async fn read_message(
reader: impl AsyncRead + Unpin,
buffer: &mut BytesMut,
max_message_size: usize,
) -> Result<Option<ProtoMessage>> {
match read_lp(reader, buffer).await? {
match read_lp(reader, buffer, max_message_size).await? {
None => Ok(None),
Some(data) => {
let message = postcard::from_bytes(&data)?;
Expand All @@ -52,6 +54,7 @@ pub async fn read_message(
pub async fn read_lp(
mut reader: impl AsyncRead + Unpin,
buffer: &mut BytesMut,
max_message_size: usize,
) -> Result<Option<Bytes>> {
let size = match reader.read_u32().await {
Ok(size) => size,
Expand All @@ -60,8 +63,8 @@ pub async fn read_lp(
};
let mut reader = reader.take(size as u64);
let size = usize::try_from(size).context("frame larger than usize")?;
if size > MAX_MESSAGE_SIZE {
bail!("Incoming message exceeds MAX_MESSAGE_SIZE");
if size > max_message_size {
bail!("Incoming message exceeds the maximum message size of {max_message_size} bytes");
}
buffer.reserve(size);
loop {
Expand Down
5 changes: 5 additions & 0 deletions iroh-gossip/src/proto/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ impl<PI: PeerIdentity, R: Rng + Clone> State<PI, R> {
.unwrap_or(false)
}

/// Returns the maximum message size configured in the gossip protocol.
pub fn max_message_size(&self) -> usize {
self.config.max_message_size
}

/// Handle an [`InEvent`]
///
/// This returns an iterator of [`OutEvent`]s that must be processed.
Expand Down
25 changes: 24 additions & 1 deletion iroh-gossip/src/proto/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ use super::{
};
use super::{PeerData, PeerIdentity};

/// The default maximum size in bytes for a gossip message.
/// This is a sane but arbitrary default and can be changed in the [`Config`].
pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 4096;

/// Input event to the topic state handler.
#[derive(Clone, Debug)]
pub enum InEvent<PI> {
Expand Down Expand Up @@ -170,13 +174,32 @@ impl<PI: Clone> IO<PI> for VecDeque<OutEvent<PI>> {
self.push_back(event.into())
}
}

/// Protocol configuration
#[derive(Clone, Default, Debug)]
#[derive(Clone, Debug)]
pub struct Config {
/// Configuration for the swarm membership layer
pub membership: hyparview::Config,
/// Configuration for the gossip broadcast layer
pub broadcast: plumtree::Config,
/// Max message size in bytes.
///
/// This size should be the same across a network to ensure all nodes can transmit and read large messages.
///
/// At minimum, this size should be large enough to send gossip control messages. This can vary, depending on the size of the [`PeerIdentity`] you use and the size of the [`PeerData`] you transmit in your messages.
///
/// The default is [`DEFAULT_MAX_MESSAGE_SIZE`].
pub max_message_size: usize,
}

impl Default for Config {
fn default() -> Self {
Self {
membership: Default::default(),
broadcast: Default::default(),
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
}
}
}

/// The topic state maintains the swarm membership and broadcast tree for a particular topic.
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/net/interfaces/bsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl WireFormat {

Ok(Some(WireMessage::Route(m)))
}
#[cfg(any(target_os = "openbsd",))]
#[cfg(target_os = "openbsd")]
MessageType::Route => {
if data.len() < self.body_off {
return Err(RouteError::MessageTooShort);
Expand Down

0 comments on commit 7153a38

Please sign in to comment.