-
Notifications
You must be signed in to change notification settings - Fork 711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Benph/bs estab de async heavy draft #3633
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Self-review/checklist of things that need attention.
async fn stream_final_state_and_consensus( | ||
fn stream_final_state_and_consensus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simple call
match tokio::time::timeout( | ||
cfg.write_timeout.into(), | ||
client.send(next_bootstrap_message), | ||
) | ||
.await | ||
{ | ||
Err(_) => Err(std::io::Error::new( | ||
std::io::ErrorKind::TimedOut, | ||
"bootstrap ask ledger part send timed out", | ||
) | ||
.into()), | ||
Ok(Err(e)) => Err(e), | ||
Ok(Ok(_)) => Ok(()), | ||
}?; | ||
let _ask_ledger_elapsed = client | ||
.blocking_send(next_bootstrap_message, Some(cfg.write_timeout.into())) | ||
.map_err(|e| e.update_io_msg_payload("bootstrap ask ledger part send timed out"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks okay
let msg = match tokio::time::timeout(cfg.read_timeout.into(), client.next()).await { | ||
Err(_) => { | ||
return Err(std::io::Error::new( | ||
std::io::ErrorKind::TimedOut, | ||
"final state bootstrap read timed out", | ||
) | ||
.into()); | ||
} | ||
Ok(Err(e)) => return Err(e), | ||
Ok(Ok(msg)) => msg, | ||
}; | ||
let (msg, _next_elapsed) = | ||
client | ||
.blocking_next(Some(cfg.read_timeout.to_duration())) | ||
.map_err(|e| e.update_io_msg_payload("final state bootstrap read timed out"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay
/// needs to be CANCELLABLE | ||
async fn bootstrap_from_server( | ||
/// TODO: setup cancellations | ||
fn bootstrap_from_server( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs attention
match tokio::time::timeout(cfg.read_error_timeout.into(), client.next()).await { | ||
Err(_) => { | ||
massa_trace!( | ||
"bootstrap.lib.bootstrap_from_server: No error sent at connection", | ||
{} | ||
); | ||
let next_tollerance = cfg.read_error_timeout.to_duration(); | ||
let res = client.blocking_next(Some(next_tollerance)); | ||
match res { | ||
Err(BootstrapError::IoError(ref e)) => { | ||
if e.kind() == ErrorKind::TimedOut { | ||
massa_trace!( | ||
"bootstrap.lib.bootstrap_from_server: No error sent at connection", | ||
{} | ||
); | ||
} else { | ||
return Err(res.unwrap_err()); | ||
} | ||
} | ||
Ok(Err(e)) => return Err(e), | ||
Ok(Ok(BootstrapServerMessage::BootstrapError { error: err })) => { | ||
return Err(BootstrapError::ReceivedError(err)) | ||
Err(e) => { | ||
return Err(e); | ||
} | ||
Ok(Ok(msg)) => return Err(BootstrapError::UnexpectedServerMessage(msg)), | ||
}; | ||
Ok((BootstrapServerMessage::BootstrapError { error }, _)) => { | ||
return Err(BootstrapError::ReceivedError(error.to_string())); | ||
} | ||
Ok((msg, _)) => return Err(BootstrapError::UnexpectedServerMessage(msg)), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
look at this side-by-side
) | ||
.await | ||
{ | ||
Err(_) => Err(std::io::Error::new( | ||
std::io::ErrorKind::TimedOut, | ||
"bootstrap clock send timed out", | ||
) | ||
.into()), | ||
Ok(Err(e)) => Err(e), | ||
Ok(Ok(_)) => Ok(()), | ||
}?; | ||
let fetch_time = server.send_msg( | ||
BootstrapServerMessage::BootstrapTime { | ||
server_time, | ||
version, | ||
}, | ||
Some(write_timeout), | ||
)?; | ||
// { | ||
// Err(_) => Err(std::io::Error::new( | ||
// std::io::ErrorKind::TimedOut, | ||
// "bootstrap clock send timed out", | ||
// ) | ||
// .into()), | ||
// Ok(Err(e)) => Err(e), | ||
// Ok(Ok(_)) => Ok(()), | ||
// }?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this right?
}, | ||
) | ||
.await | ||
{ | ||
Err(_) => Err(std::io::Error::new( | ||
std::io::ErrorKind::TimedOut, | ||
"bootstrap peers send timed out", | ||
) | ||
.into()), | ||
Ok(Err(e)) => Err(e), | ||
Ok(Ok(_)) => Ok(()), | ||
}?; | ||
} | ||
BootstrapClientMessage::AskBootstrapPart { | ||
last_slot, | ||
last_ledger_step, | ||
last_pool_step, | ||
last_cycle_step, | ||
last_credits_step, | ||
last_ops_step, | ||
last_consensus_step, | ||
} => { | ||
stream_bootstrap_information( | ||
server, | ||
final_state.clone(), | ||
consensus_controller.clone(), | ||
last_slot, | ||
last_ledger_step, | ||
last_pool_step, | ||
last_cycle_step, | ||
last_credits_step, | ||
last_ops_step, | ||
last_consensus_step, | ||
write_timeout, | ||
) | ||
.await?; | ||
} | ||
BootstrapClientMessage::BootstrapSuccess => break Ok(()), | ||
BootstrapClientMessage::BootstrapError { error } => { | ||
break Err(BootstrapError::ReceivedError(error)); | ||
} | ||
}, | ||
}; | ||
let next = server.blocking_next(Some(bootstrap_config.read_timeout.into())); | ||
todo!("handle next"); | ||
// { | ||
// Err(_) => break Ok(()), | ||
// Ok(Err(e)) => break Err(e), | ||
// Ok(Ok(msg)) => match msg { | ||
// BootstrapClientMessage::AskBootstrapPeers => { | ||
// match server | ||
// .send_msg( | ||
// write_timeout, | ||
// BootstrapServerMessage::BootstrapPeers { | ||
// peers: network_command_sender.get_bootstrap_peers().await?, | ||
// }, | ||
// ) | ||
// .await | ||
// { | ||
// Err(_) => Err(std::io::Error::new( | ||
// std::io::ErrorKind::TimedOut, | ||
// "bootstrap peers send timed out", | ||
// ) | ||
// .into()), | ||
// Ok(Err(e)) => Err(e), | ||
// Ok(Ok(_)) => Ok(()), | ||
// }?; | ||
// } | ||
// BootstrapClientMessage::AskBootstrapPart { | ||
// last_slot, | ||
// last_ledger_step, | ||
// last_pool_step, | ||
// last_cycle_step, | ||
// last_credits_step, | ||
// last_ops_step, | ||
// last_consensus_step, | ||
// } => { | ||
// stream_bootstrap_information( | ||
// server, | ||
// final_state.clone(), | ||
// consensus_controller.clone(), | ||
// last_slot, | ||
// last_ledger_step, | ||
// last_pool_step, | ||
// last_cycle_step, | ||
// last_credits_step, | ||
// last_ops_step, | ||
// last_consensus_step, | ||
// write_timeout, | ||
// ) | ||
// .await?; | ||
// } | ||
// BootstrapClientMessage::BootstrapSuccess => break Ok(()), | ||
// BootstrapClientMessage::BootstrapError { error } => { | ||
// break Err(BootstrapError::ReceivedError(error)); | ||
// } | ||
// }, | ||
// }; | ||
} | ||
todo!() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!!!!!!!
duplex: <Limiter>::new(limit).limit(duplex), | ||
duplex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert
pub(crate) fn close_and_send_error<F>( | ||
mut self, | ||
server_outer_rt_hnd: Handle, | ||
msg: String, | ||
addr: SocketAddr, | ||
close_fn: F, | ||
) where | ||
pub(crate) fn close_and_send_error<F>(mut self, msg: String, addr: SocketAddr, close_fn: F) | ||
where |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably deserves to live in a poll?
) => match res { | ||
) | ||
}); | ||
// bootstrap | ||
let bootstrap_state = crossbeam_channel::select! { | ||
//TODO: intercept a ctrl-c | ||
// _ = &mut stop_signal => { | ||
// info!("interrupt signal received in bootstrap loop"); | ||
// process::exit(0); | ||
// }, | ||
recv(state_rx) -> res => match res { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
surely mio can manage this?
@@ -40,6 +36,7 @@ massa_signature = { path = "../massa-signature" } | |||
massa_pos_exports = { path = "../massa-pos-exports" } | |||
massa_time = { path = "../massa-time" } | |||
crossbeam = "0.8.2" | |||
mio = {version = "0.8.6", features = ["os-poll", "net"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mio && crossbeam should be in dependencies not "custom modules"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the catch!
let mut acc = 0; | ||
let mut remain = time_limit; | ||
let mut clock = Duration::from_secs(0); | ||
while acc < buf.len() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can improve a bit that with :
let mut acc = 0;
while acc < buf.len() {
if Instant::now().duration_since(start) > time_limit {
return Err((acc, std::io::ErrorKind::TimedOut.into()));
}
// self.duplex
// .set_read_timeout(Some(time_limit - clock))
// .expect("internal error");
self.duplex.read(&mut buf[acc..]).map_err(|er| (acc, er))?;
}
that remove remain
, clock
and his alloc
) -> Result<Result<(), BootstrapError>, Elapsed> { | ||
tokio::time::timeout(timeout, self.send(msg)).await | ||
timeout: Option<Duration>, | ||
) -> Result<Duration, BootstrapError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't know if it's useful to return the Duration
@@ -301,7 +301,10 @@ pub fn get_bootstrap_config(bootstrap_public_key: NodeId) -> BootstrapConfig { | |||
write_timeout: 1000.into(), | |||
read_error_timeout: 200.into(), | |||
write_error_timeout: 200.into(), | |||
bootstrap_list: vec![(SocketAddr::new(BASE_BOOTSTRAP_IP, 16), bootstrap_public_key)], | |||
bootstrap_list: vec![( | |||
SocketAddr::new(BASE_BOOTSTRAP_IP, 8069), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why port 8069
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
16 was broken before. this change fixed it. had to pick one at random.
closed in favor of #3685 and assonciated PRs |
Is here to use github as a tool to setup a checklist of things to address...
In the future, it might just be better to use a personal fork :/