Skip to content

Commit

Permalink
refactor!: align api naming between RPC and direct calls
Browse files Browse the repository at this point in the history
## Description

- make the `join` explicit 
- align API naming between RPC and direct calls

## Breaking Changes

- rename `net::Gossip::join` -> `net::Gossip::subscribe_and_join`
- added `net::Gossip::subscribe`
- rename `net::Gossip::join_with_opts` -> `net::Gossip::subscribe_with_opts` 
- rename `net::Gossip::join_with_stream` -> `net::Gossip::subscribe_with_stream`
  • Loading branch information
dignifiedquire authored Dec 6, 2024
1 parent 0e6fd20 commit 35d73db
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
2 changes: 1 addition & 1 deletion examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async fn main() -> Result<()> {
endpoint.add_node_addr(peer)?;
}
};
let (sender, receiver) = gossip.join(topic, peer_ids).await?.split();
let (sender, receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split();
println!("> connected!");

// broadcast our name, if set
Expand Down
43 changes: 28 additions & 15 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,34 +183,47 @@ impl Gossip {
}

/// Join a gossip topic with the default options and wait for at least one active connection.
pub async fn join(&self, topic_id: TopicId, bootstrap: Vec<NodeId>) -> Result<GossipTopic> {
let mut sub = self.join_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap));
pub async fn subscribe_and_join(
&self,
topic_id: TopicId,
bootstrap: Vec<NodeId>,
) -> Result<GossipTopic> {
let mut sub = self.subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap));
sub.joined().await?;
Ok(sub)
}

/// Join a gossip topic with the default options.
///
/// Note that this will not wait for any bootstrap node to be available. To ensure the topic is connected to at least one node, use [`GossipTopic::joined`] or [`Gossip::subscribe_and_join`]
pub fn subscribe(&self, topic_id: TopicId, bootstrap: Vec<NodeId>) -> Result<GossipTopic> {
let sub = self.subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap));

Ok(sub)
}

/// Join a gossip topic with options.
///
/// Returns a [`GossipTopic`] instantly. To wait for at least one connection to be established,
/// you can await [`GossipTopic::joined`].
///
/// Messages will be queued until a first connection is available. If the internal channel becomes full,
/// the oldest messages will be dropped from the channel.
pub fn join_with_opts(&self, topic_id: TopicId, opts: JoinOptions) -> GossipTopic {
pub fn subscribe_with_opts(&self, topic_id: TopicId, opts: JoinOptions) -> GossipTopic {
let (command_tx, command_rx) = async_channel::bounded(TOPIC_COMMANDS_DEFAULT_CAP);
let command_rx: CommandStream = Box::pin(command_rx);
let event_rx = self.join_with_stream(topic_id, opts, command_rx);
let event_rx = self.subscribe_with_stream(topic_id, opts, command_rx);
GossipTopic::new(command_tx, event_rx)
}

/// Join a gossip topic with options and an externally-created update stream.
///
/// This method differs from [`Self::join_with_opts`] by letting you pass in a `updates` command stream yourself
/// This method differs from [`Self::subscribe_with_opts`] by letting you pass in a `updates` command stream yourself
/// instead of using a channel created for you.
///
/// It returns a stream of events. If you want to wait for the topic to become active, wait for
/// the [`GossipEvent::Joined`] event.
pub fn join_with_stream(
pub fn subscribe_with_stream(
&self,
topic_id: TopicId,
options: JoinOptions,
Expand Down Expand Up @@ -1289,9 +1302,9 @@ mod test {
debug!("----- joining ----- ");
// join the topics and wait for the connection to succeed
let [sub1, mut sub2, mut sub3] = [
go1.join(topic, vec![]),
go2.join(topic, vec![pi1]),
go3.join(topic, vec![pi2]),
go1.subscribe_and_join(topic, vec![]),
go2.subscribe_and_join(topic, vec![pi1]),
go3.subscribe_and_join(topic, vec![pi2]),
]
.try_join()
.await
Expand Down Expand Up @@ -1415,7 +1428,7 @@ mod test {
// second node
let ct2 = ct.clone();
let go2_task = async move {
let (_pub_tx, mut sub_rx) = go2.join(topic, vec![]).await?.split();
let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();

let subscribe_fut = async {
while let Some(ev) = sub_rx.try_next().await? {
Expand Down Expand Up @@ -1449,16 +1462,16 @@ mod test {
let go1_task = async move {
// first subscribe is done immediately
tracing::info!("subscribing the first time");
let sub_1a = go1.join(topic, vec![node_id2]).await;
let sub_1a = go1.subscribe_and_join(topic, vec![node_id2]).await;

// wait for signal to subscribe a second time
rx.recv().await.expect("signal for second join");
rx.recv().await.expect("signal for second subscribe");
tracing::info!("subscribing a second time");
let sub_1b = go1.join(topic, vec![node_id2]).await;
let sub_1b = go1.subscribe_and_join(topic, vec![node_id2]).await;
drop(sub_1a);

// wait for signal to drop the second handle as well
rx.recv().await.expect("signal for second join");
rx.recv().await.expect("signal for second subscribe");
tracing::info!("dropping all handles");
drop(sub_1b);

Expand All @@ -1470,7 +1483,7 @@ mod test {
let go1_handle = tokio::spawn(go1_task);

// advance and check that the topic is now subscribed
actor.steps(3).await?; // handle our join;
actor.steps(3).await?; // handle our subscribe;
// get peer connection;
// receive the other peer's information for a NeighborUp
let state = actor.topics.get(&topic).context("get registered topic")?;
Expand Down
2 changes: 1 addition & 1 deletion src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Gossip {
Subscribe(msg) => {
let this = self.clone();
chan.bidi_streaming(msg, this, move |handler, req, updates| {
let stream = handler.join_with_stream(
let stream = handler.subscribe_with_stream(
req.topic,
crate::net::JoinOptions {
bootstrap: req.bootstrap,
Expand Down

0 comments on commit 35d73db

Please sign in to comment.