Skip to content

Commit

Permalink
fix the trait bound `service::Error<error::Error>: From<service::Erro…
Browse files Browse the repository at this point in the history
…r<oracle::errors::Error>>` is not satisfied

       --> clients/vault/src/system.rs:820:4
  • Loading branch information
b-yap committed Aug 2, 2024
1 parent 93e177a commit d8d6ec2
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 88 deletions.
58 changes: 33 additions & 25 deletions clients/stellar-relay-lib/examples/connect.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use stellar_relay_lib::{
connect_to_stellar_overlay_network,
sdk::types::{ScpStatementPledges, StellarMessage},
StellarOverlayConfig,
};
use stellar_relay_lib::{connect_to_stellar_overlay_network, Error, sdk::types::{ScpStatementPledges, StellarMessage}, StellarOverlayConfig};

use wallet::keys::get_source_secret_key_from_env;

Expand All @@ -15,41 +11,53 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let arg_network = if args.len() > 1 { &args[1] } else { "testnet" };

let cfg_file_path = if arg_network == "mainnet" {
"./clients/stellar-relay-lib/resources/config/mainnet/stellar_relay_config_mainnet_iowa.json"
"./clients/stellar-relay-lib/resources/config/mainnet/stellar_relay_config_singapore.json"
} else {
"./clients/stellar-relay-lib/resources/config/testnet/stellar_relay_config_sdftest1.json"
};
let cfg = StellarOverlayConfig::try_from_path(cfg_file_path)?;

let secret_key = get_source_secret_key_from_env(arg_network == "mainnet");

let mut overlay_connection = connect_to_stellar_overlay_network(cfg, &secret_key).await?;

while let Ok(Some(msg)) = overlay_connection.listen().await {
match msg {
StellarMessage::ScpMessage(msg) => {
let node_id = msg.statement.node_id.to_encoding();
let node_id = base64::encode(&node_id);
let slot = msg.statement.slot_index;

let stmt_type = match msg.statement.pledges {
ScpStatementPledges::ScpStPrepare(_) => "ScpStPrepare",
ScpStatementPledges::ScpStConfirm(_) => "ScpStConfirm",
ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize",
ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ",
};
tracing::info!(
let mut overlay_connection = connect_to_stellar_overlay_network(cfg, secret_key).await?;

loop {
match overlay_connection.listen().await {
Ok(Some(msg)) => match msg {
StellarMessage::ScpMessage(msg) => {
let node_id = msg.statement.node_id.to_encoding();
let node_id = base64::encode(&node_id);
let slot = msg.statement.slot_index;

let stmt_type = match msg.statement.pledges {
ScpStatementPledges::ScpStPrepare(_) => "ScpStPrepare",
ScpStatementPledges::ScpStConfirm(_) => "ScpStConfirm",
ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize",
ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ",
};
tracing::info!(
"{} sent StellarMessage of type {} for ledger {}",
node_id,
stmt_type,
slot
);
},
_ => {
let _ = overlay_connection.send_to_node(StellarMessage::GetPeers).await;
},
},
_ => {
let _ = overlay_connection.send_to_node(StellarMessage::GetPeers).await;
},
Ok(None) => {}
Err(Error::Timeout) => {
tracing::warn!("took more than a second to respond");
}
Err(e) => {
tracing::error!("Error: {:?}", e);
break
}
}

}

tracing::info!("ooops, connection stopped ");
Ok(())
}
8 changes: 4 additions & 4 deletions clients/stellar-relay-lib/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ impl StellarOverlayConfig {
}

#[allow(dead_code)]
pub(crate) fn connection_info(&self, secret_key: &str) -> Result<ConnectionInfo, Error> {
pub(crate) fn connection_info(&self, secret_key_as_string: String) -> Result<ConnectionInfo, Error> {
let cfg = &self.connection_info;
let secret_key = SecretKey::from_encoding(secret_key)?;
let secret_key = SecretKey::from_encoding(secret_key_as_string)?;

let public_key = secret_key.get_public().to_encoding();
let public_key = std::str::from_utf8(&public_key).unwrap();
Expand Down Expand Up @@ -128,9 +128,9 @@ impl ConnectionInfoCfg {
/// Returns the `StellarOverlayConnection` if connection is a success, otherwise an Error
pub async fn connect_to_stellar_overlay_network(
cfg: StellarOverlayConfig,
secret_key: &str,
secret_key_as_string: String,
) -> Result<StellarOverlayConnection, Error> {
let conn_info = cfg.connection_info(secret_key)?;
let conn_info = cfg.connection_info(secret_key_as_string)?;
let local_node = cfg.node_info;

StellarOverlayConnection::connect(local_node.into(), conn_info).await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ mod test {
let cfg =
StellarOverlayConfig::try_from_path(cfg_file_path).expect("should create a config");
let node_info = cfg.node_info();
let conn_info = cfg.connection_info(&secret_key).expect("should create a connection info");
let conn_info = cfg.connection_info(secret_key).expect("should create a connection info");
// this is a channel to communicate with the connection/config (this needs renaming)

let connector = Connector::start(node_info.clone(), conn_info.clone())
Expand Down
12 changes: 11 additions & 1 deletion clients/stellar-relay-lib/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ impl StellarOverlayConnection {

let connector = Connector::start(local_node_info, conn_info).await?;

#[cfg(tokio_unstable)]
tokio::task::Builder::new()
.name("poll_messages_from_stellar")
.spawn(poll_messages_from_stellar(
connector,
send_to_user_sender,
send_to_node_receiver,
)).unwrap();

#[cfg(not(tokio_unstable))]
tokio::spawn(poll_messages_from_stellar(
connector,
send_to_user_sender,
Expand All @@ -65,7 +75,7 @@ impl StellarOverlayConnection {
return Err(Error::Disconnected)
}

timeout(Duration::from_secs(1), self.receiver.recv()).await
timeout(Duration::from_secs(5), self.receiver.recv()).await
.map_err(|_| Error::Timeout)
}

Expand Down
2 changes: 1 addition & 1 deletion clients/stellar-relay-lib/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn overlay_infos(is_mainnet: bool) -> (NodeInfo, ConnectionInfo) {

(
cfg.node_info(),
cfg.connection_info(&secret_key(is_mainnet)).expect("should return conn info"),
cfg.connection_info(secret_key(is_mainnet)).expect("should return conn info"),
)
}

Expand Down
1 change: 0 additions & 1 deletion clients/vault/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ sp-runtime = { git = "/~https://github.com/paritytech/substrate", branch = "polkad
sp-std = { git = "/~https://github.com/paritytech/substrate", branch = "polkadot-v0.9.42" }

parking_lot = "0.12.1"
err-derive = "0.3.1"
flate2 = "1.0"

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions clients/vault/src/issue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub(crate) async fn initialize_issue_set(
issue_set: &ArcRwLock<IssueRequestsMap>,
memos_to_issue_ids: &ArcRwLock<IssueIdLookup>,
) -> Result<(), Error> {
tracing::info!("initialize_issue_set(): started");
let (mut issue_set, mut memos_to_issue_ids, requests) = future::join3(
issue_set.write(),
memos_to_issue_ids.write(),
Expand Down
13 changes: 8 additions & 5 deletions clients/vault/src/oracle/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@ async fn handle_message(
pub async fn listen_for_stellar_messages(
config: StellarOverlayConfig,
collector: Arc<RwLock<ScpMessageCollector>>,
secret_key_as_str: &str,
secret_key_as_string: String,
shutdown_sender: ShutdownSender,
) -> Result<(),service::Error<Error>> {
) -> Result<(),service::Error<crate::Error>> {
tracing::info!("listen_for_stellar_messages(): Starting connection to Stellar overlay network...");

let mut overlay_conn = connect_to_stellar_overlay_network(config.clone(), secret_key_as_str).await
.map_err(|e| service::Error::VaultError(Error::Other(format!("{e:?}"))))?;
let mut overlay_conn = connect_to_stellar_overlay_network(config.clone(), secret_key_as_string)
.await.map_err(|e|{
tracing::error!("listen_for_stellar_messages(): Failed to connect to Stellar overlay network: {e:?}");
service::Error::StartOracleAgentError
})?;

// use StellarOverlayConnection's sender to send message to Stellar
let sender = overlay_conn.sender();
Expand Down Expand Up @@ -133,7 +136,7 @@ pub async fn start_oracle_agent(

tracing::info!("start_oracle_agent(): Starting connection to Stellar overlay network...");

let mut overlay_conn = connect_to_stellar_overlay_network(config.clone(), secret_key).await?;
let mut overlay_conn = connect_to_stellar_overlay_network(config.clone(), secret_key.to_string()).await?;
// use StellarOverlayConnection's sender to send message to Stellar
let sender = overlay_conn.sender();

Expand Down
84 changes: 42 additions & 42 deletions clients/vault/src/oracle/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,39 @@ use tokio::sync::{mpsc, oneshot};

use stellar_relay_lib::sdk::StellarSdkError;

#[derive(Debug, err_derive::Error)]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(display = "{:?}", _0)]
#[error("Stellar SDK Error: {0:?}")]
StellarSdkError(StellarSdkError),

#[error(display = "{:?}", _0)]
TryFromSliceError(TryFromSliceError),
#[error("TryFromSliceError: {0}")]
TryFromSliceError(#[from] TryFromSliceError),

#[error(display = "{:?}", _0)]
SerdeError(bincode::Error),
#[error("Serde Error: {0}")]
SerdeError(#[from] bincode::Error),

#[error(display = "{:?}", _0)]
StdIoError(std::io::Error),
#[error("StdIoError: {0}")]
StdIoError(#[from] std::io::Error),

#[error(display = "{:?}", _0)]
#[error("Other: {0}")]
Other(String),

#[error(display = "{:?}", _0)]
ConnError(stellar_relay_lib::Error),
#[error("Stellar Relay Error: {0}")]
ConnError(#[from] stellar_relay_lib::Error),

#[error(display = "{:?}", _0)]
WalletError(wallet::error::Error),
#[error("Wallet Error: {0}")]
WalletError(#[from] wallet::error::Error),

#[error(display = "{:?}", _0)]
#[error("Proof Timeout: {0}")]
ProofTimeout(String),

#[error(display = "{} is not initialized", _0)]
#[error("Unititialized: {0}")]
Uninitialized(String),

#[error(display = "{}", _0)]
#[error("Archive Error: {0}")]
ArchiveError(String),

#[error(display = "{}", _0)]
#[error("ArchiveResponseError: {0}")]
ArchiveResponseError(String),
}

Expand All @@ -45,30 +45,30 @@ impl From<StellarSdkError> for Error {
Error::StellarSdkError(e)
}
}

impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::StdIoError(e)
}
}

impl From<bincode::Error> for Error {
fn from(e: bincode::Error) -> Self {
Error::SerdeError(e)
}
}

impl From<TryFromSliceError> for Error {
fn from(e: TryFromSliceError) -> Self {
Error::TryFromSliceError(e)
}
}

impl From<stellar_relay_lib::Error> for Error {
fn from(e: stellar_relay_lib::Error) -> Self {
Error::ConnError(e)
}
}
//
// impl From<std::io::Error> for Error {
// fn from(e: std::io::Error) -> Self {
// Error::StdIoError(e)
// }
// }
//
// impl From<bincode::Error> for Error {
// fn from(e: bincode::Error) -> Self {
// Error::SerdeError(e)
// }
// }
//
// impl From<TryFromSliceError> for Error {
// fn from(e: TryFromSliceError) -> Self {
// Error::TryFromSliceError(e)
// }
// }
//
// impl From<stellar_relay_lib::Error> for Error {
// fn from(e: stellar_relay_lib::Error) -> Self {
// Error::ConnError(e)
// }
// }

impl<T> From<mpsc::error::SendError<T>> for Error {
fn from(e: mpsc::error::SendError<T>) -> Self {
Expand All @@ -80,4 +80,4 @@ impl From<oneshot::error::RecvError> for Error {
fn from(e: oneshot::error::RecvError) -> Self {
Error::ConnError(stellar_relay_lib::Error::SendFailed(e.to_string()))
}
}
}
40 changes: 32 additions & 8 deletions clients/vault/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,13 @@ async fn run_and_monitor_tasks(
_ => None,
}?;
let task = monitor.instrument(task);

#[cfg(all(tokio_unstable, feature = "allow_debugger"))]
let task = tokio::task::Builder::new().name(name).spawn(task).unwrap();

#[cfg(not(feature = "allow-debugger"))]
let task = tokio::spawn(task);

Some(((name.to_string(), metrics_iterator), task))
})
.unzip();
Expand All @@ -337,11 +343,25 @@ async fn run_and_monitor_tasks(
tracing::info!("run_and_monitor_tasks(): running all tasks...");
match join(tokio_metrics, join_all(tasks)).await {
(Ok(Err(err)), _) => Err(err),
(_, results) => results
.into_iter()
.find(|res| matches!(res, Ok(Err(_))))
.and_then(|res| res.ok())
.unwrap_or(Ok(())),
(_, results) => {
let res = results.into_iter()
.find(|res| matches!(res, Ok(())))
.and_then(|res| res.ok());

if let Some(_) = res {
tracing::info!("run_and_monitor_tasks(): join ok");
Ok(())
} else {
tracing::info!("run_and_monitor_tasks(): returned None");
Ok(())
}

// results
// .into_iter()
// .find(|res| matches!(res, Ok(Err(_))))
// .and_then(|res| res.ok())
// .unwrap_or(Ok(()))
},
}
}

Expand Down Expand Up @@ -744,6 +764,10 @@ impl VaultService {
})
}

fn secret_key (&self) -> String {
self.secret_key.clone()
}

fn get_vault_id(
&self,
collateral_currency: CurrencyId,
Expand Down Expand Up @@ -797,7 +821,8 @@ impl VaultService {
let oracle_agent = self.create_oracle_agent(is_public_network, self.shutdown.clone())?;
self.agent = Some(oracle_agent.clone());

self.execute_open_requests(oracle_agent.clone());
// self.execute_open_requests(oracle_agent.clone());
tracing::info!("proceed to initializing issue sets");

// issue handling
// this vec is passed to the stellar wallet to filter out transactions that are not relevant
Expand All @@ -814,14 +839,13 @@ impl VaultService {

tracing::info!("Starting all services...");

//
let mut tasks = vec![(
"Stellar Messages Listener",
run(
listen_for_stellar_messages(
self.stellar_overlay_cfg()?,
oracle_agent.collector.clone(),
&self.secret_key,
self.secret_key(),
self.shutdown.clone()
)
)
Expand Down

0 comments on commit d8d6ec2

Please sign in to comment.