Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-1592: make connection manager operate of validator network directly #790

Merged
merged 3 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions finality-aleph/src/network/io.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,43 @@
use futures::channel::mpsc;

use crate::network::{
manager::{DataInSession, VersionedAuthentication},
ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO,
use crate::{
network::{
manager::{DataInSession, VersionedAuthentication},
ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO,
},
validator_network::{Network as ValidatorNetwork, PublicKey},
};

type AuthenticationNetworkIO<D, M> = NetworkIO<VersionedAuthentication<M>, DataInSession<D>, M>;
type AuthenticationNetworkIO<M> = NetworkIO<VersionedAuthentication<M>>;

pub fn setup<D: Data, M: Multiaddress + 'static>() -> (
ConnectionManagerIO<D, M>,
AuthenticationNetworkIO<D, M>,
pub fn setup<
D: Data,
M: Multiaddress + 'static,
VN: ValidatorNetwork<M::PeerId, M, DataInSession<D>>,
>(
validator_network: VN,
) -> (
ConnectionManagerIO<D, M, VN>,
AuthenticationNetworkIO<M>,
SessionManagerIO<D>,
) {
)
where
M::PeerId: PublicKey,
{
// Prepare and start the network
let (commands_for_network, commands_from_io) = mpsc::unbounded();
let (data_for_network, data_from_user) = mpsc::unbounded();
let (messages_for_network, messages_from_user) = mpsc::unbounded();
let (commands_for_service, commands_from_user) = mpsc::unbounded();
let (messages_for_service, commands_from_manager) = mpsc::unbounded();
let (data_for_user, data_from_network) = mpsc::unbounded();
let (messages_for_user, messages_from_network) = mpsc::unbounded();

let connection_io = ConnectionManagerIO::new(
commands_for_network,
data_for_network,
messages_for_network,
commands_from_user,
commands_from_manager,
data_from_network,
messages_from_network,
validator_network,
);
let channels_for_network = NetworkIO::new(
data_from_user,
messages_from_user,
data_for_user,
messages_for_user,
commands_from_io,
);
let channels_for_network = NetworkIO::new(messages_from_user, messages_for_user);
let channels_for_session_manager =
SessionManagerIO::new(commands_for_service, messages_for_service);

Expand Down
69 changes: 39 additions & 30 deletions finality-aleph/src/network/manager/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
},
AddressedData, ConnectionCommand, Data, Multiaddress, NetworkIdentity, PeerId,
},
validator_network::{Network as ValidatorNetwork, PublicKey},
MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL,
};

Expand Down Expand Up @@ -595,22 +596,22 @@ impl<NI: NetworkIdentity, D: Data> Service<NI, D> {
}
}

/// Input/output interface for the connectiona manager service.
pub struct IO<D: Data, M: Multiaddress> {
commands_for_network: mpsc::UnboundedSender<ConnectionCommand<M>>,
data_for_network: mpsc::UnboundedSender<AddressedData<DataInSession<D>, M::PeerId>>,
/// Input/output interface for the connection manager service.
pub struct IO<D: Data, M: Multiaddress, VN: ValidatorNetwork<M::PeerId, M, DataInSession<D>>>
where
M::PeerId: PublicKey,
{
authentications_for_network: mpsc::UnboundedSender<VersionedAuthentication<M>>,
commands_from_user: mpsc::UnboundedReceiver<SessionCommand<D>>,
messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>,
data_from_network: mpsc::UnboundedReceiver<DataInSession<D>>,
authentications_from_network: mpsc::UnboundedReceiver<VersionedAuthentication<M>>,
validator_network: VN,
}

/// Errors that can happen during the network service operations.
#[derive(Debug, PartialEq, Eq)]
pub enum Error {
NetworkSend,
CommandSend,
/// Should never be fatal.
UserSend,
/// Should never be fatal.
Expand All @@ -620,31 +621,28 @@ pub enum Error {
NetworkChannel,
}

impl<D: Data, M: Multiaddress> IO<D, M> {
impl<D: Data, M: Multiaddress, VN: ValidatorNetwork<M::PeerId, M, DataInSession<D>>> IO<D, M, VN>
where
M::PeerId: PublicKey,
{
pub fn new(
commands_for_network: mpsc::UnboundedSender<ConnectionCommand<M>>,
data_for_network: mpsc::UnboundedSender<AddressedData<DataInSession<D>, M::PeerId>>,
authentications_for_network: mpsc::UnboundedSender<VersionedAuthentication<M>>,
commands_from_user: mpsc::UnboundedReceiver<SessionCommand<D>>,
messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>,
data_from_network: mpsc::UnboundedReceiver<DataInSession<D>>,
authentications_from_network: mpsc::UnboundedReceiver<VersionedAuthentication<M>>,
) -> IO<D, M> {
validator_network: VN,
) -> IO<D, M, VN> {
IO {
commands_for_network,
data_for_network,
authentications_for_network,
commands_from_user,
messages_from_user,
data_from_network,
authentications_from_network,
validator_network,
}
}

fn send_data(&self, to_send: AddressedData<DataInSession<D>, M::PeerId>) -> Result<(), Error> {
self.data_for_network
.unbounded_send(to_send)
.map_err(|_| Error::NetworkSend)
fn send_data(&self, to_send: AddressedData<DataInSession<D>, M::PeerId>) {
self.validator_network.send(to_send.0, to_send.1)
}

fn send_authentication(&self, to_send: DiscoveryMessage<M>) -> Result<(), Error> {
Expand All @@ -653,21 +651,32 @@ impl<D: Data, M: Multiaddress> IO<D, M> {
.map_err(|_| Error::NetworkSend)
}

fn send_command(&self, to_send: ConnectionCommand<M>) -> Result<(), Error> {
self.commands_for_network
.unbounded_send(to_send)
.map_err(|_| Error::CommandSend)
fn handle_connection_command(&mut self, connection_command: ConnectionCommand<M>) {
match connection_command {
ConnectionCommand::AddReserved(addresses) => {
for multi in addresses {
if let Some(peer_id) = multi.get_peer_id() {
self.validator_network.add_connection(peer_id, vec![multi]);
}
}
}
ConnectionCommand::DelReserved(peers) => {
for peer in peers {
self.validator_network.remove_connection(peer);
}
}
};
}

fn send(
&self,
fn handle_service_actions(
&mut self,
ServiceActions {
maybe_command,
maybe_message,
}: ServiceActions<M>,
) -> Result<(), Error> {
if let Some(command) = maybe_command {
self.send_command(command)?;
self.handle_connection_command(command);
}
if let Some(message) = maybe_message {
self.send_authentication(message)?;
Expand Down Expand Up @@ -695,7 +704,7 @@ impl<D: Data, M: Multiaddress> IO<D, M> {
trace!(target: "aleph-network", "Manager received a command from user");
match maybe_command {
Some(command) => match service.on_command(command).await {
Ok(to_send) => self.send(to_send)?,
Ok(to_send) => self.handle_service_actions(to_send)?,
Err(e) => warn!(target: "aleph-network", "Failed to update handler: {:?}", e),
},
None => return Err(Error::CommandsChannel),
Expand All @@ -705,12 +714,12 @@ impl<D: Data, M: Multiaddress> IO<D, M> {
trace!(target: "aleph-network", "Manager received a message from user");
match maybe_message {
Some((message, session_id, recipient)) => for message in service.on_user_message(message, session_id, recipient) {
self.send_data(message)?;
self.send_data(message);
},
None => return Err(Error::MessageChannel),
}
},
maybe_data = self.data_from_network.next() => {
maybe_data = self.validator_network.next() => {
trace!(target: "aleph-network", "Manager received some data from network");
match maybe_data {
Some(DataInSession{data, session_id}) => if let Err(e) = service.send_session_data(&session_id, data) {
Expand All @@ -727,7 +736,7 @@ impl<D: Data, M: Multiaddress> IO<D, M> {
trace!(target: "aleph-network", "Manager received an authentication from network");
match maybe_authentication {
Some(authentication) => match authentication.try_into() {
Ok(message) => self.send(service.on_discovery_message(message))?,
Ok(message) => self.handle_service_actions(service.on_discovery_message(message))?,
Err(e) => warn!(target: "aleph-network", "Error casting versioned authentication to discovery message: {:?}", e),
},
None => return Err(Error::NetworkChannel),
Expand All @@ -736,7 +745,7 @@ impl<D: Data, M: Multiaddress> IO<D, M> {
_ = maintenance.tick() => {
debug!(target: "aleph-network", "Manager starts maintenence");
match service.retry_session_start().await {
Ok(to_send) => self.send(to_send)?,
Ok(to_send) => self.handle_service_actions(to_send)?,
Err(e) => warn!(target: "aleph-network", "Retry failed to update handler: {:?}", e),
}
for to_send in service.discovery() {
Expand Down
42 changes: 1 addition & 41 deletions finality-aleph/src/network/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ use tokio::time::timeout;

use crate::{
crypto::{AuthorityPen, AuthorityVerifier},
network::{
manager::VersionedAuthentication, AddressedData, ConnectionCommand, Event, EventStream,
Multiaddress, Network, NetworkIdentity, NetworkSender, NetworkServiceIO, Protocol,
},
network::{Event, EventStream, Network, NetworkIdentity, NetworkSender, Protocol},
testing::mocks::validator_network::{random_identity, MockMultiaddress},
validator_network::mock::MockPublicKey,
AuthorityId, NodeIndex,
Expand Down Expand Up @@ -101,43 +98,6 @@ pub type MockEvent = Event<MockMultiaddress, MockPublicKey>;

pub type MockData = Vec<u8>;

pub struct MockIO<M: Multiaddress> {
pub messages_for_network: mpsc::UnboundedSender<VersionedAuthentication<M>>,
pub data_for_network: mpsc::UnboundedSender<AddressedData<MockData, M::PeerId>>,
pub messages_from_network: mpsc::UnboundedReceiver<VersionedAuthentication<M>>,
pub data_from_network: mpsc::UnboundedReceiver<MockData>,
pub commands_for_network: mpsc::UnboundedSender<ConnectionCommand<M>>,
}

impl<M: Multiaddress + 'static> MockIO<M> {
pub fn new() -> (
MockIO<M>,
NetworkServiceIO<VersionedAuthentication<M>, MockData, M>,
) {
let (messages_for_network, messages_from_user) = mpsc::unbounded();
let (data_for_network, data_from_user) = mpsc::unbounded();
let (messages_for_user, messages_from_network) = mpsc::unbounded();
let (data_for_user, data_from_network) = mpsc::unbounded();
let (commands_for_network, commands_from_manager) = mpsc::unbounded();
(
MockIO {
messages_for_network,
data_for_network,
messages_from_network,
data_from_network,
commands_for_network,
},
NetworkServiceIO::new(
data_from_user,
messages_from_user,
data_for_user,
messages_for_user,
commands_from_manager,
),
)
}
}

pub struct MockEventStream(mpsc::UnboundedReceiver<MockEvent>);

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send
}

/// Represents the address of an arbitrary node.
pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync {
pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync + 'static {
type PeerId: PeerId;

/// Returns the peer id associated with this multiaddress if it exists and is unique.
Expand Down
Loading