Skip to content

Commit

Permalink
reduce timeouts; write log every 2 minutes
Browse files Browse the repository at this point in the history
  • Loading branch information
b-yap committed Aug 12, 2024
1 parent 1193f75 commit 2a46e99
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub(crate) async fn poll_messages_from_stellar(
.unwrap_or_else(|_| format!("{stellar_msg_as_base64_xdr:?}"))
);
}
tokio::task::yield_now().await;
},
Ok(None) => {},
Err(e) => {
Expand Down
5 changes: 2 additions & 3 deletions clients/stellar-relay-lib/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl StellarOverlayConnection {

#[cfg(tokio_unstable)]
tokio::task::Builder::new()
.name("poll stellar messages")
.name("Poll Stellar Messages")
.spawn(poll_messages_from_stellar(
connector,
send_to_user_sender,
Expand Down Expand Up @@ -75,8 +75,7 @@ impl StellarOverlayConnection {
return Err(Error::Disconnected)
}

timeout(Duration::from_secs(5), self.receiver.recv()).await
.map_err(|_| Error::Timeout)
Ok(self.receiver.recv().await)
}

pub fn is_alive(&mut self) -> bool {
Expand Down
54 changes: 25 additions & 29 deletions clients/vault/src/oracle/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::{
sync::{mpsc, RwLock},
time::{sleep, timeout},
};
use tokio::time::Instant;

use runtime::ShutdownSender;
use runtime::stellar::SecretKey;
Expand Down Expand Up @@ -113,39 +114,34 @@ where F: Future<Output = Result<(), service::Error<crate::Error>>> + Send + 'sta
// use StellarOverlayConnection's sender to send message to Stellar
let sender = overlay_conn.sender();

// occasionally log a new message received.
let mut log_counter:u8 = 0;
// log a new message received, every 2 minutes.
let interval = Duration::from_secs(120);
let mut next_time = Instant::now() + interval;
loop {
if log_counter == u8::MAX {
log_counter = 0;
} else {
log_counter += 1;
}

tokio::select! {
_ = sleep(Duration::from_millis(100)) => {},
result = overlay_conn.listen() => match result {
Ok(None) => {},
Ok(Some(msg)) => {
let msg_as_str = to_base64_xdr_string(&msg);

if log_counter % 100 == 0 {
tracing::info!("listen_for_stellar_messages(): received message: {msg_as_str}");
}
match overlay_conn.listen().await {
Ok(None) => {
tracing::info!("listen_for_stellar_messages(): yielding to outside tasks");
tokio::task::yield_now().await;
},
Ok(Some(msg)) => {
let msg_as_str = to_base64_xdr_string(&msg);
if Instant::now() >= next_time {
tracing::info!("listen_for_stellar_messages(): received message: {msg_as_str}");
next_time += interval;
}

if let Err(e) = handle_message(msg, collector.clone(), &sender).await {
tracing::error!("listen_for_stellar_messages(): failed to handle message: {msg_as_str}: {e:?}");
}
if let Err(e) = handle_message(msg, collector.clone(), &sender).await {
tracing::error!("listen_for_stellar_messages(): failed to handle message: {msg_as_str}: {e:?}");
}
// connection got lost
Err(e) => {
tracing::error!("listen_for_stellar_messages(): encounter error in overlay: {e:?}");

if let Err(e) = shutdown_sender.send(()) {
tracing::error!("listen_for_stellar_messages(): Failed to send shutdown signal in thread: {e:?}");
}
break
}
// connection got lost
Err(e) => {
tracing::error!("listen_for_stellar_messages(): encounter error in overlay: {e:?}");

if let Err(e) = shutdown_sender.send(()) {
tracing::error!("listen_for_stellar_messages(): Failed to send shutdown signal in thread: {e:?}");
}
break
}
}
}
Expand Down

0 comments on commit 2a46e99

Please sign in to comment.