diff --git a/client/consensus/beefy/src/communication/gossip.rs b/client/consensus/beefy/src/communication/gossip.rs index 376172fc23370..9be648f8796c3 100644 --- a/client/consensus/beefy/src/communication/gossip.rs +++ b/client/consensus/beefy/src/communication/gossip.rs @@ -18,7 +18,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use sc_network::PeerId; +use sc_network::{PeerId, ReputationChange}; use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext}; use sp_core::hashing::twox_64; use sp_runtime::traits::{Block, Hash, Header, NumberFor}; @@ -26,10 +26,14 @@ use sp_runtime::traits::{Block, Hash, Header, NumberFor}; use codec::{Decode, Encode}; use log::{debug, trace}; use parking_lot::{Mutex, RwLock}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use wasm_timer::Instant; use crate::{ - communication::peers::KnownPeers, + communication::{ + benefit, cost, + peers::{KnownPeers, PeerReport}, + }, justification::{ proof_block_num_and_set_id, verify_with_validator_set, BeefyVersionedFinalityProof, }, @@ -47,6 +51,27 @@ const REBROADCAST_AFTER: Duration = Duration::from_secs(60); #[cfg(test)] const REBROADCAST_AFTER: Duration = Duration::from_secs(5); +#[derive(Debug, PartialEq)] +pub(super) enum Action { + // repropagate under given topic, to the given peers, applying cost/benefit to originator. + Keep(H, ReputationChange), + // discard, applying cost/benefit to originator. + Discard(ReputationChange), +} + +/// An outcome of examining a message. +#[derive(Debug, PartialEq, Clone, Copy)] +enum Consider { + /// Accept the message. + Accept, + /// Message is too early. Reject. + RejectPast, + /// Message is from the future. Reject. + RejectFuture, + /// Message cannot be evaluated. Reject. + RejectOutOfScope, +} + /// BEEFY gossip message type that gets encoded and sent on the network. #[derive(Debug, Encode, Decode)] pub(crate) enum GossipMessage { @@ -135,26 +160,47 @@ impl Filter { } } - /// Return true if `max(session_start, best_beefy) <= round <= best_grandpa`, + /// Accept if `max(session_start, best_beefy) <= round <= best_grandpa`, /// and vote `set_id` matches session set id. /// /// Latest concluded round is still considered alive to allow proper gossiping for it. - fn is_vote_accepted(&self, round: NumberFor, set_id: ValidatorSetId) -> bool { + fn consider_vote(&self, round: NumberFor, set_id: ValidatorSetId) -> Consider { self.inner .as_ref() - .map(|f| set_id == f.validator_set.id() && round >= f.start && round <= f.end) - .unwrap_or(false) + .map(|f| + // only from current set and only [filter.start, filter.end] + if set_id < f.validator_set.id() { + Consider::RejectPast + } else if set_id > f.validator_set.id() { + Consider::RejectFuture + } else if round < f.start { + Consider::RejectPast + } else if round > f.end { + Consider::RejectFuture + } else { + Consider::Accept + }) + .unwrap_or(Consider::RejectOutOfScope) } /// Return true if `round` is >= than `max(session_start, best_beefy)`, /// and proof `set_id` matches session set id. /// /// Latest concluded round is still considered alive to allow proper gossiping for it. - fn is_finality_proof_accepted(&self, round: NumberFor, set_id: ValidatorSetId) -> bool { + fn consider_finality_proof(&self, round: NumberFor, set_id: ValidatorSetId) -> Consider { self.inner .as_ref() - .map(|f| set_id == f.validator_set.id() && round >= f.start) - .unwrap_or(false) + .map(|f| + // only from current set and only >= filter.start + if round < f.start || set_id < f.validator_set.id() { + Consider::RejectPast + } else if set_id > f.validator_set.id() { + Consider::RejectFuture + } else { + Consider::Accept + } + ) + .unwrap_or(Consider::RejectOutOfScope) } /// Add new _known_ `hash` to the round's known votes. @@ -189,20 +235,26 @@ where gossip_filter: RwLock>, next_rebroadcast: Mutex, known_peers: Arc>>, + report_sender: TracingUnboundedSender, } impl GossipValidator where B: Block, { - pub fn new(known_peers: Arc>>) -> GossipValidator { - GossipValidator { + pub(crate) fn new( + known_peers: Arc>>, + ) -> (GossipValidator, TracingUnboundedReceiver) { + let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 10_000); + let val = GossipValidator { votes_topic: votes_topic::(), justifs_topic: proofs_topic::(), gossip_filter: RwLock::new(Filter::new()), next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER), known_peers, - } + report_sender: tx, + }; + (val, rx) } /// Update gossip validator filter. @@ -213,12 +265,16 @@ where self.gossip_filter.write().update(filter); } + fn report(&self, who: PeerId, cost_benefit: ReputationChange) { + let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit }); + } + fn validate_vote( &self, vote: VoteMessage, AuthorityId, Signature>, sender: &PeerId, data: &[u8], - ) -> ValidationResult { + ) -> Action { let msg_hash = twox_64(data); let round = vote.commitment.block_number; let set_id = vote.commitment.validator_set_id; @@ -230,25 +286,37 @@ where { let filter = self.gossip_filter.read(); - if !filter.is_vote_accepted(round, set_id) { - return ValidationResult::Discard + match filter.consider_vote(round, set_id) { + Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE), + Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE), + Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE), + Consider::Accept => {}, } if filter.is_known_vote(round, &msg_hash) { - return ValidationResult::ProcessAndKeep(self.votes_topic) + return Action::Keep(self.votes_topic, benefit::KNOWN_VOTE_MESSAGE) + } + + // ensure authority is part of the set. + if !filter + .validator_set() + .map(|set| set.validators().contains(&vote.id)) + .unwrap_or(false) + { + debug!(target: LOG_TARGET, "Message from voter not in validator set: {}", vote.id); + return Action::Discard(cost::UNKNOWN_VOTER) } } if BeefyKeystore::verify(&vote.id, &vote.signature, &vote.commitment.encode()) { self.gossip_filter.write().add_known_vote(round, msg_hash); - ValidationResult::ProcessAndKeep(self.votes_topic) + Action::Keep(self.votes_topic, benefit::VOTE_MESSAGE) } else { - // TODO: report peer debug!( target: LOG_TARGET, "🥩 Bad signature on message: {:?}, from: {:?}", vote, sender ); - ValidationResult::Discard + Action::Discard(cost::BAD_SIGNATURE) } } @@ -256,31 +324,38 @@ where &self, proof: BeefyVersionedFinalityProof, sender: &PeerId, - ) -> ValidationResult { + ) -> Action { let (round, set_id) = proof_block_num_and_set_id::(&proof); self.known_peers.lock().note_vote_for(*sender, round); let guard = self.gossip_filter.read(); - // Verify general usefulness of the justifications. - if !guard.is_finality_proof_accepted(round, set_id) { - return ValidationResult::Discard + // Verify general usefulness of the justification. + match guard.consider_finality_proof(round, set_id) { + Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE), + Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE), + Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE), + Consider::Accept => {}, } // Verify justification signatures. guard .validator_set() .map(|validator_set| { - if let Ok(()) = verify_with_validator_set::(round, validator_set, &proof) { - ValidationResult::ProcessAndKeep(self.justifs_topic) - } else { - // TODO: report peer + if let Err((_, signatures_checked)) = + verify_with_validator_set::(round, validator_set, &proof) + { debug!( target: LOG_TARGET, "🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender ); - ValidationResult::Discard + let mut cost = cost::INVALID_PROOF; + cost.value += + cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32); + Action::Discard(cost) + } else { + Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF) } }) - .unwrap_or(ValidationResult::Discard) + .unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE)) } } @@ -294,15 +369,32 @@ where fn validate( &self, - _context: &mut dyn ValidatorContext, + context: &mut dyn ValidatorContext, sender: &PeerId, mut data: &[u8], ) -> ValidationResult { - match GossipMessage::::decode(&mut data) { - Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, data), + let raw = data; + let action = match GossipMessage::::decode(&mut data) { + Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, raw), Ok(GossipMessage::FinalityProof(proof)) => self.validate_finality_proof(proof, sender), Err(e) => { debug!(target: LOG_TARGET, "Error decoding message: {}", e); + let bytes = raw.len().min(i32::MAX as usize) as i32; + let cost = ReputationChange::new( + bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE), + "BEEFY: Bad packet", + ); + Action::Discard(cost) + }, + }; + match action { + Action::Keep(topic, cb) => { + self.report(*sender, cb); + context.broadcast_message(topic, data.to_vec(), false); + ValidationResult::ProcessAndKeep(topic) + }, + Action::Discard(cb) => { + self.report(*sender, cb); ValidationResult::Discard }, } @@ -314,13 +406,13 @@ where Ok(GossipMessage::Vote(msg)) => { let round = msg.commitment.block_number; let set_id = msg.commitment.validator_set_id; - let expired = !filter.is_vote_accepted(round, set_id); + let expired = filter.consider_vote(round, set_id) != Consider::Accept; trace!(target: LOG_TARGET, "🥩 Vote for round #{} expired: {}", round, expired); expired }, Ok(GossipMessage::FinalityProof(proof)) => { let (round, set_id) = proof_block_num_and_set_id::(&proof); - let expired = !filter.is_finality_proof_accepted(round, set_id); + let expired = filter.consider_finality_proof(round, set_id) != Consider::Accept; trace!( target: LOG_TARGET, "🥩 Finality proof for round #{} expired: {}", @@ -358,13 +450,13 @@ where Ok(GossipMessage::Vote(msg)) => { let round = msg.commitment.block_number; let set_id = msg.commitment.validator_set_id; - let allowed = filter.is_vote_accepted(round, set_id); + let allowed = filter.consider_vote(round, set_id) == Consider::Accept; trace!(target: LOG_TARGET, "🥩 Vote for round #{} allowed: {}", round, allowed); allowed }, Ok(GossipMessage::FinalityProof(proof)) => { let (round, set_id) = proof_block_num_and_set_id::(&proof); - let allowed = filter.is_finality_proof_accepted(round, set_id); + let allowed = filter.consider_finality_proof(round, set_id) == Consider::Accept; trace!( target: LOG_TARGET, "🥩 Finality proof for round #{} allowed: {}", @@ -409,15 +501,16 @@ pub(crate) mod tests { assert_eq!(filter.live_votes.len(), 3); assert!(filter.inner.is_none()); - assert!(!filter.is_vote_accepted(1, 1)); + assert_eq!(filter.consider_vote(1, 1), Consider::RejectOutOfScope); filter.update(GossipFilterCfg { start: 3, end: 10, validator_set: &validator_set }); assert_eq!(filter.live_votes.len(), 1); assert!(filter.live_votes.contains_key(&3)); - assert!(!filter.is_vote_accepted(2, 1)); - assert!(filter.is_vote_accepted(3, 1)); - assert!(filter.is_vote_accepted(4, 1)); - assert!(!filter.is_vote_accepted(4, 2)); + assert_eq!(filter.consider_vote(2, 1), Consider::RejectPast); + assert_eq!(filter.consider_vote(3, 1), Consider::Accept); + assert_eq!(filter.consider_vote(4, 1), Consider::Accept); + assert_eq!(filter.consider_vote(20, 1), Consider::RejectFuture); + assert_eq!(filter.consider_vote(4, 2), Consider::RejectFuture); let validator_set = ValidatorSet::::new(keys, 2).unwrap(); filter.update(GossipFilterCfg { start: 5, end: 10, validator_set: &validator_set }); @@ -430,9 +523,7 @@ pub(crate) mod tests { todo!() } - fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec, _force: bool) { - todo!() - } + fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec, _force: bool) {} fn send_message(&mut self, _who: &sc_network::PeerId, _message: Vec) { todo!() @@ -485,18 +576,39 @@ pub(crate) mod tests { fn should_validate_messages() { let keys = vec![Keyring::Alice.public()]; let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); - let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); - gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); - let sender = sc_network::PeerId::random(); + let (gv, mut report_stream) = + GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + let sender = PeerId::random(); let mut context = TestContext; + // reject message, decoding error + let bad_encoding = b"0000000000".as_slice(); + let expected_cost = ReputationChange::new( + (bad_encoding.len() as i32).saturating_mul(cost::PER_UNDECODABLE_BYTE), + "BEEFY: Bad packet", + ); + let mut expected_report = PeerReport { who: sender, cost_benefit: expected_cost }; + let res = gv.validate(&mut context, &sender, bad_encoding); + assert!(matches!(res, ValidationResult::Discard)); + assert_eq!(report_stream.try_recv().unwrap(), expected_report); + + // verify votes validation + let vote = dummy_vote(3); - let gossip_vote = GossipMessage::::Vote(vote.clone()); + let encoded = GossipMessage::::Vote(vote.clone()).encode(); - // first time the cache should be populated - let res = gv.validate(&mut context, &sender, &gossip_vote.encode()); + // filter not initialized + let res = gv.validate(&mut context, &sender, &encoded); + assert!(matches!(res, ValidationResult::Discard)); + expected_report.cost_benefit = cost::OUT_OF_SCOPE_MESSAGE; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); + gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); + // nothing in cache first time + let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); + expected_report.cost_benefit = benefit::VOTE_MESSAGE; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); assert_eq!( gv.gossip_filter .read() @@ -507,43 +619,74 @@ pub(crate) mod tests { ); // second time we should hit the cache - let res = gv.validate(&mut context, &sender, &gossip_vote.encode()); + let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); + expected_report.cost_benefit = benefit::KNOWN_VOTE_MESSAGE; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); + + // reject vote, voter not in validator set + let mut bad_vote = vote.clone(); + bad_vote.id = Keyring::Bob.public(); + let bad_vote = GossipMessage::::Vote(bad_vote).encode(); + let res = gv.validate(&mut context, &sender, &bad_vote); + assert!(matches!(res, ValidationResult::Discard)); + expected_report.cost_benefit = cost::UNKNOWN_VOTER; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); - // next we should quickly reject if the round is not live - gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set }); - + // reject if the round is not GRANDPA finalized + gv.update_filter(GossipFilterCfg { start: 1, end: 2, validator_set: &validator_set }); let number = vote.commitment.block_number; let set_id = vote.commitment.validator_set_id; - assert!(!gv.gossip_filter.read().is_vote_accepted(number, set_id)); + assert_eq!(gv.gossip_filter.read().consider_vote(number, set_id), Consider::RejectFuture); + let res = gv.validate(&mut context, &sender, &encoded); + assert!(matches!(res, ValidationResult::Discard)); + expected_report.cost_benefit = cost::FUTURE_MESSAGE; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); - let res = gv.validate(&mut context, &sender, &vote.encode()); + // reject if the round is not live anymore + gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set }); + let number = vote.commitment.block_number; + let set_id = vote.commitment.validator_set_id; + assert_eq!(gv.gossip_filter.read().consider_vote(number, set_id), Consider::RejectPast); + let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::Discard)); + expected_report.cost_benefit = cost::OUTDATED_MESSAGE; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); + + // now verify proofs validation // reject old proof let proof = dummy_proof(5, &validator_set); let encoded_proof = GossipMessage::::FinalityProof(proof).encode(); let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::Discard)); + expected_report.cost_benefit = cost::OUTDATED_MESSAGE; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); // accept next proof with good set_id let proof = dummy_proof(7, &validator_set); let encoded_proof = GossipMessage::::FinalityProof(proof).encode(); let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); + expected_report.cost_benefit = benefit::VALIDATED_PROOF; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); // accept future proof with good set_id let proof = dummy_proof(20, &validator_set); let encoded_proof = GossipMessage::::FinalityProof(proof).encode(); let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); + expected_report.cost_benefit = benefit::VALIDATED_PROOF; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); - // reject proof, wrong set_id + // reject proof, future set_id let bad_validator_set = ValidatorSet::::new(keys, 1).unwrap(); let proof = dummy_proof(20, &bad_validator_set); let encoded_proof = GossipMessage::::FinalityProof(proof).encode(); let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::Discard)); + expected_report.cost_benefit = cost::FUTURE_MESSAGE; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); // reject proof, bad signatures (Bob instead of Alice) let bad_validator_set = @@ -552,13 +695,16 @@ pub(crate) mod tests { let encoded_proof = GossipMessage::::FinalityProof(proof).encode(); let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::Discard)); + expected_report.cost_benefit = cost::INVALID_PROOF; + expected_report.cost_benefit.value += cost::PER_SIGNATURE_CHECKED; + assert_eq!(report_stream.try_recv().unwrap(), expected_report); } #[test] fn messages_allowed_and_expired() { let keys = vec![Keyring::Alice.public()]; let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); - let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + let (gv, _) = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); let sender = sc_network::PeerId::random(); let topic = Default::default(); @@ -635,7 +781,7 @@ pub(crate) mod tests { fn messages_rebroadcast() { let keys = vec![Keyring::Alice.public()]; let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); - let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + let (gv, _) = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); let sender = sc_network::PeerId::random(); let topic = Default::default(); diff --git a/client/consensus/beefy/src/communication/mod.rs b/client/consensus/beefy/src/communication/mod.rs index 13735a9d3211b..d8e4d22053628 100644 --- a/client/consensus/beefy/src/communication/mod.rs +++ b/client/consensus/beefy/src/communication/mod.rs @@ -73,6 +73,39 @@ pub fn beefy_peers_set_config( cfg } +// cost scalars for reporting peers. +mod cost { + use sc_network::ReputationChange as Rep; + // Message that's for an outdated round. + pub(super) const OUTDATED_MESSAGE: Rep = Rep::new(-50, "BEEFY: Past message"); + // Message that's from the future relative to our current set-id. + pub(super) const FUTURE_MESSAGE: Rep = Rep::new(-100, "BEEFY: Future message"); + // Vote message containing bad signature. + pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "BEEFY: Bad signature"); + // Message received with vote from voter not in validator set. + pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "BEEFY: Unknown voter"); + // A message received that cannot be evaluated relative to our current state. + pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "BEEFY: Out-of-scope message"); + // Message containing invalid proof. + pub(super) const INVALID_PROOF: Rep = Rep::new(-5000, "BEEFY: Invalid commit"); + // Reputation cost per signature checked for invalid proof. + pub(super) const PER_SIGNATURE_CHECKED: i32 = -25; + // Reputation cost per byte for un-decodable message. + pub(super) const PER_UNDECODABLE_BYTE: i32 = -5; + // On-demand request was refused by peer. + pub(super) const REFUSAL_RESPONSE: Rep = Rep::new(-100, "BEEFY: Proof request refused"); + // On-demand request for a proof that can't be found in the backend. + pub(super) const UNKOWN_PROOF_REQUEST: Rep = Rep::new(-150, "BEEFY: Unknown proof request"); +} + +// benefit scalars for reporting peers. +mod benefit { + use sc_network::ReputationChange as Rep; + pub(super) const VOTE_MESSAGE: Rep = Rep::new(100, "BEEFY: Round vote message"); + pub(super) const KNOWN_VOTE_MESSAGE: Rep = Rep::new(50, "BEEFY: Known vote"); + pub(super) const VALIDATED_PROOF: Rep = Rep::new(100, "BEEFY: Justification"); +} + #[cfg(test)] mod tests { use super::*; diff --git a/client/consensus/beefy/src/communication/peers.rs b/client/consensus/beefy/src/communication/peers.rs index c2fb06faddf0c..4704b8dcf4576 100644 --- a/client/consensus/beefy/src/communication/peers.rs +++ b/client/consensus/beefy/src/communication/peers.rs @@ -18,13 +18,17 @@ //! Logic for keeping track of BEEFY peers. -// TODO (issue #12296): replace this naive peer tracking with generic one that infers data -// from multiple network protocols. - -use sc_network::PeerId; +use sc_network::{PeerId, ReputationChange}; use sp_runtime::traits::{Block, NumberFor, Zero}; use std::collections::{HashMap, VecDeque}; +/// Report specifying a reputation change for a given peer. +#[derive(Debug, PartialEq)] +pub(crate) struct PeerReport { + pub who: PeerId, + pub cost_benefit: ReputationChange, +} + struct PeerData { last_voted_on: NumberFor, } diff --git a/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs b/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs index 1670e99828831..d4f4b59f0195e 100644 --- a/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs +++ b/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs @@ -32,9 +32,12 @@ use sp_runtime::traits::Block; use std::{marker::PhantomData, sync::Arc}; use crate::{ - communication::request_response::{ - on_demand_justifications_protocol_config, Error, JustificationRequest, - BEEFY_SYNC_LOG_TARGET, + communication::{ + cost, + request_response::{ + on_demand_justifications_protocol_config, Error, JustificationRequest, + BEEFY_SYNC_LOG_TARGET, + }, }, metric_inc, metrics::{register_metrics, OnDemandIncomingRequestsMetrics}, @@ -69,17 +72,20 @@ impl IncomingRequest { /// Params: /// - The raw request to decode /// - Reputation changes to apply for the peer in case decoding fails. - pub fn try_from_raw( + pub fn try_from_raw( raw: netconfig::IncomingRequest, - reputation_changes: Vec, - ) -> Result { + reputation_changes_on_err: F, + ) -> Result + where + F: FnOnce(usize) -> Vec, + { let netconfig::IncomingRequest { payload, peer, pending_response } = raw; let payload = match JustificationRequest::decode(&mut payload.as_ref()) { Ok(payload) => payload, Err(err) => { let response = netconfig::OutgoingResponse { result: Err(()), - reputation_changes, + reputation_changes: reputation_changes_on_err(payload.len()), sent_feedback: None, }; if let Err(_) = pending_response.send(response) { @@ -111,11 +117,11 @@ impl IncomingRequestReceiver { pub async fn recv(&mut self, reputation_changes: F) -> Result, Error> where B: Block, - F: FnOnce() -> Vec, + F: FnOnce(usize) -> Vec, { let req = match self.raw.next().await { None => return Err(Error::RequestChannelExhausted), - Some(raw) => IncomingRequest::::try_from_raw(raw, reputation_changes())?, + Some(raw) => IncomingRequest::::try_from_raw(raw, reputation_changes)?, }; Ok(req) } @@ -159,26 +165,20 @@ where // Sends back justification response if justification found in client backend. fn handle_request(&self, request: IncomingRequest) -> Result<(), Error> { - // TODO (issue #12293): validate `request` and change peer reputation for invalid requests. - - let maybe_encoded_proof = if let Some(hash) = - self.client.block_hash(request.payload.begin).map_err(Error::Client)? - { - self.client - .justifications(hash) - .map_err(Error::Client)? - .and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned()) - // No BEEFY justification present. - .ok_or(()) - } else { - Err(()) - }; - + let mut reputation_changes = vec![]; + let maybe_encoded_proof = self + .client + .block_hash(request.payload.begin) + .ok() + .flatten() + .and_then(|hash| self.client.justifications(hash).ok().flatten()) + .and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned()) + .ok_or_else(|| reputation_changes.push(cost::UNKOWN_PROOF_REQUEST)); request .pending_response .send(netconfig::OutgoingResponse { result: maybe_encoded_proof, - reputation_changes: Vec::new(), + reputation_changes, sent_feedback: None, }) .map_err(|_| Error::SendResponse) @@ -188,7 +188,17 @@ where pub async fn run(mut self) { trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler"); - while let Ok(request) = self.request_receiver.recv(|| vec![]).await { + while let Ok(request) = self + .request_receiver + .recv(|bytes| { + let bytes = bytes.min(i32::MAX as usize) as i32; + vec![ReputationChange::new( + bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE), + "BEEFY: Bad request payload", + )] + }) + .await + { let peer = request.peer; match self.handle_request(request) { Ok(()) => { @@ -199,8 +209,8 @@ where ) }, Err(e) => { + // peer reputation changes already applied in `self.handle_request()` metric_inc!(self, beefy_failed_justification_responses); - // TODO (issue #12293): apply reputation changes here based on error type. debug!( target: BEEFY_SYNC_LOG_TARGET, "🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e, diff --git a/client/consensus/beefy/src/communication/request_response/mod.rs b/client/consensus/beefy/src/communication/request_response/mod.rs index c528d06bbe0c5..545ab18cf1d34 100644 --- a/client/consensus/beefy/src/communication/request_response/mod.rs +++ b/client/consensus/beefy/src/communication/request_response/mod.rs @@ -30,7 +30,7 @@ use codec::{Decode, Encode, Error as CodecError}; use sc_network::{config::RequestResponseConfig, PeerId}; use sp_runtime::traits::{Block, NumberFor}; -use crate::communication::beefy_protocol_name::justifications_protocol_name; +use crate::communication::{beefy_protocol_name::justifications_protocol_name, peers::PeerReport}; use incoming_requests_handler::IncomingRequestReceiver; // 10 seems reasonable, considering justifs are explicitly requested only @@ -76,7 +76,7 @@ pub struct JustificationRequest { } #[derive(Debug, thiserror::Error)] -pub enum Error { +pub(crate) enum Error { #[error(transparent)] Client(#[from] sp_blockchain::Error), @@ -99,5 +99,8 @@ pub enum Error { SendResponse, #[error("Received invalid response.")] - InvalidResponse, + InvalidResponse(PeerReport), + + #[error("Internal error while getting response.")] + ResponseError, } diff --git a/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs b/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs index fbf464bd639d9..10105ff2d417d 100644 --- a/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs +++ b/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs @@ -31,7 +31,11 @@ use sp_runtime::traits::{Block, NumberFor}; use std::{collections::VecDeque, result::Result, sync::Arc}; use crate::{ - communication::request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET}, + communication::{ + benefit, cost, + peers::PeerReport, + request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET}, + }, justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof}, metric_inc, metrics::{register_metrics, OnDemandOutgoingRequestsMetrics}, @@ -54,6 +58,16 @@ enum State { AwaitingResponse(PeerId, RequestInfo, ResponseReceiver), } +/// Possible engine responses. +pub(crate) enum ResponseInfo { + /// No peer response available yet. + Pending, + /// Valid justification provided alongside peer reputation changes. + ValidProof(BeefyVersionedFinalityProof, PeerReport), + /// No justification yet, only peer reputation changes. + PeerReport(PeerReport), +} + pub struct OnDemandJustificationsEngine { network: Arc, protocol_name: ProtocolName, @@ -84,12 +98,10 @@ impl OnDemandJustificationsEngine { } fn reset_peers_cache_for_block(&mut self, block: NumberFor) { - // TODO (issue #12296): replace peer selection with generic one that involves all protocols. self.peers_cache = self.live_peers.lock().further_than(block); } fn try_next_peer(&mut self) -> Option { - // TODO (issue #12296): replace peer selection with generic one that involves all protocols. let live = self.live_peers.lock(); while let Some(peer) = self.peers_cache.pop_front() { if live.contains(&peer) { @@ -159,24 +171,19 @@ impl OnDemandJustificationsEngine { fn process_response( &mut self, - peer: PeerId, + peer: &PeerId, req_info: &RequestInfo, response: Result, ) -> Result, Error> { response .map_err(|e| { - metric_inc!(self, beefy_on_demand_justification_peer_hang_up); debug!( target: BEEFY_SYNC_LOG_TARGET, - "🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}", - req_info.block, - peer, - e + "🥩 on-demand sc-network channel sender closed, err: {:?}", e ); - Error::InvalidResponse + Error::ResponseError })? .map_err(|e| { - metric_inc!(self, beefy_on_demand_justification_peer_error); debug!( target: BEEFY_SYNC_LOG_TARGET, "🥩 for on demand justification #{:?}, peer {:?} error: {:?}", @@ -184,7 +191,18 @@ impl OnDemandJustificationsEngine { peer, e ); - Error::InvalidResponse + match e { + RequestFailure::Refused => { + metric_inc!(self, beefy_on_demand_justification_peer_refused); + let peer_report = + PeerReport { who: *peer, cost_benefit: cost::REFUSAL_RESPONSE }; + Error::InvalidResponse(peer_report) + }, + _ => { + metric_inc!(self, beefy_on_demand_justification_peer_error); + Error::ResponseError + }, + } }) .and_then(|encoded| { decode_and_verify_finality_proof::( @@ -192,23 +210,26 @@ impl OnDemandJustificationsEngine { req_info.block, &req_info.active_set, ) - .map_err(|e| { + .map_err(|(err, signatures_checked)| { metric_inc!(self, beefy_on_demand_justification_invalid_proof); debug!( target: BEEFY_SYNC_LOG_TARGET, "🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}", - req_info.block, peer, e + req_info.block, peer, err ); - Error::InvalidResponse + let mut cost = cost::INVALID_PROOF; + cost.value += + cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32); + Error::InvalidResponse(PeerReport { who: *peer, cost_benefit: cost }) }) }) } - pub async fn next(&mut self) -> Option> { + pub(crate) async fn next(&mut self) -> ResponseInfo { let (peer, req_info, resp) = match &mut self.state { State::Idle => { futures::future::pending::<()>().await; - return None + return ResponseInfo::Pending }, State::AwaitingResponse(peer, req_info, receiver) => { let resp = receiver.await; @@ -220,8 +241,8 @@ impl OnDemandJustificationsEngine { self.state = State::Idle; let block = req_info.block; - self.process_response(peer, &req_info, resp) - .map_err(|_| { + match self.process_response(&peer, &req_info, resp) { + Err(err) => { // No valid justification received, try next peer in our set. if let Some(peer) = self.try_next_peer() { self.request_from_peer(peer, req_info); @@ -231,15 +252,22 @@ impl OnDemandJustificationsEngine { "🥩 ran out of peers to request justif #{:?} from", block ); } - }) - .map(|proof| { + // Report peer based on error type. + if let Error::InvalidResponse(peer_report) = err { + ResponseInfo::PeerReport(peer_report) + } else { + ResponseInfo::Pending + } + }, + Ok(proof) => { metric_inc!(self, beefy_on_demand_justification_good_proof); debug!( target: BEEFY_SYNC_LOG_TARGET, "🥩 received valid on-demand justif #{:?} from {:?}", block, peer ); - proof - }) - .ok() + let peer_report = PeerReport { who: peer, cost_benefit: benefit::VALIDATED_PROOF }; + ResponseInfo::ValidProof(proof, peer_report) + }, + } } } diff --git a/client/consensus/beefy/src/import.rs b/client/consensus/beefy/src/import.rs index dd2ed92ef8353..bda8169d95013 100644 --- a/client/consensus/beefy/src/import.rs +++ b/client/consensus/beefy/src/import.rs @@ -109,6 +109,7 @@ where .ok_or_else(|| ImportError("Unknown validator set".to_string()))?; decode_and_verify_finality_proof::(&encoded[..], number, &validator_set) + .map_err(|(err, _)| err) } } diff --git a/client/consensus/beefy/src/justification.rs b/client/consensus/beefy/src/justification.rs index 5175fd17d4ea3..731acdfa63389 100644 --- a/client/consensus/beefy/src/justification.rs +++ b/client/consensus/beefy/src/justification.rs @@ -42,9 +42,9 @@ pub(crate) fn decode_and_verify_finality_proof( encoded: &[u8], target_number: NumberFor, validator_set: &ValidatorSet, -) -> Result, ConsensusError> { +) -> Result, (ConsensusError, u32)> { let proof = >::decode(&mut &*encoded) - .map_err(|_| ConsensusError::InvalidJustification)?; + .map_err(|_| (ConsensusError::InvalidJustification, 0))?; verify_with_validator_set::(target_number, validator_set, &proof).map(|_| proof) } @@ -53,14 +53,15 @@ pub(crate) fn verify_with_validator_set( target_number: NumberFor, validator_set: &ValidatorSet, proof: &BeefyVersionedFinalityProof, -) -> Result<(), ConsensusError> { +) -> Result<(), (ConsensusError, u32)> { + let mut signatures_checked = 0u32; match proof { VersionedFinalityProof::V1(signed_commitment) => { if signed_commitment.signatures.len() != validator_set.len() || signed_commitment.commitment.validator_set_id != validator_set.id() || signed_commitment.commitment.block_number != target_number { - return Err(ConsensusError::InvalidJustification) + return Err((ConsensusError::InvalidJustification, 0)) } // Arrangement of signatures in the commitment should be in the same order @@ -73,14 +74,17 @@ pub(crate) fn verify_with_validator_set( .filter(|(id, signature)| { signature .as_ref() - .map(|sig| BeefyKeystore::verify(id, sig, &message[..])) + .map(|sig| { + signatures_checked += 1; + BeefyKeystore::verify(id, sig, &message[..]) + }) .unwrap_or(false) }) .count(); if valid_signatures >= crate::round::threshold(validator_set.len()) { Ok(()) } else { - Err(ConsensusError::InvalidJustification) + Err((ConsensusError::InvalidJustification, signatures_checked)) } }, } @@ -127,16 +131,16 @@ pub(crate) mod tests { // wrong block number -> should fail verification let good_proof = proof.clone().into(); match verify_with_validator_set::(block_num + 1, &validator_set, &good_proof) { - Err(ConsensusError::InvalidJustification) => (), - _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + Err((ConsensusError::InvalidJustification, 0)) => (), + e => assert!(false, "Got unexpected {:?}", e), }; // wrong validator set id -> should fail verification let good_proof = proof.clone().into(); let other = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap(); match verify_with_validator_set::(block_num, &other, &good_proof) { - Err(ConsensusError::InvalidJustification) => (), - _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + Err((ConsensusError::InvalidJustification, 0)) => (), + e => assert!(false, "Got unexpected {:?}", e), }; // wrong signatures length -> should fail verification @@ -147,8 +151,8 @@ pub(crate) mod tests { }; bad_signed_commitment.signatures.pop().flatten().unwrap(); match verify_with_validator_set::(block_num + 1, &validator_set, &bad_proof.into()) { - Err(ConsensusError::InvalidJustification) => (), - _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + Err((ConsensusError::InvalidJustification, 0)) => (), + e => assert!(false, "Got unexpected {:?}", e), }; // not enough signatures -> should fail verification @@ -158,9 +162,9 @@ pub(crate) mod tests { }; // remove a signature (but same length) *bad_signed_commitment.signatures.first_mut().unwrap() = None; - match verify_with_validator_set::(block_num + 1, &validator_set, &bad_proof.into()) { - Err(ConsensusError::InvalidJustification) => (), - _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + match verify_with_validator_set::(block_num, &validator_set, &bad_proof.into()) { + Err((ConsensusError::InvalidJustification, 2)) => (), + e => assert!(false, "Got unexpected {:?}", e), }; // not enough _correct_ signatures -> should fail verification @@ -171,9 +175,9 @@ pub(crate) mod tests { // change a signature to a different key *bad_signed_commitment.signatures.first_mut().unwrap() = Some(Keyring::Dave.sign(&bad_signed_commitment.commitment.encode())); - match verify_with_validator_set::(block_num + 1, &validator_set, &bad_proof.into()) { - Err(ConsensusError::InvalidJustification) => (), - _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + match verify_with_validator_set::(block_num, &validator_set, &bad_proof.into()) { + Err((ConsensusError::InvalidJustification, 3)) => (), + e => assert!(false, "Got unexpected {:?}", e), }; } diff --git a/client/consensus/beefy/src/lib.rs b/client/consensus/beefy/src/lib.rs index 3c66cc6eb716d..d3e5e4bc68936 100644 --- a/client/consensus/beefy/src/lib.rs +++ b/client/consensus/beefy/src/lib.rs @@ -52,7 +52,11 @@ use sp_consensus_beefy::{ use sp_keystore::KeystorePtr; use sp_mmr_primitives::MmrApi; use sp_runtime::traits::{Block, Zero}; -use std::{collections::VecDeque, marker::PhantomData, sync::Arc}; +use std::{ + collections::{BTreeMap, VecDeque}, + marker::PhantomData, + sync::Arc, +}; mod aux_schema; mod error; @@ -249,9 +253,10 @@ pub async fn start_beefy_gadget( let known_peers = Arc::new(Mutex::new(KnownPeers::new())); // Default votes filter is to discard everything. // Validator is updated later with correct starting round and set id. - let gossip_validator = - Arc::new(communication::gossip::GossipValidator::new(known_peers.clone())); - let mut gossip_engine = sc_network_gossip::GossipEngine::new( + let (gossip_validator, gossip_report_stream) = + communication::gossip::GossipValidator::new(known_peers.clone()); + let gossip_validator = Arc::new(gossip_validator); + let mut gossip_engine = GossipEngine::new( network.clone(), sync.clone(), gossip_protocol_name, @@ -295,7 +300,7 @@ pub async fn start_beefy_gadget( return } - let worker_params = worker::WorkerParams { + let worker = worker::BeefyWorker { backend, payload_provider, runtime, @@ -303,14 +308,14 @@ pub async fn start_beefy_gadget( key_store: key_store.into(), gossip_engine, gossip_validator, + gossip_report_stream, on_demand_justifications, links, metrics, + pending_justifications: BTreeMap::new(), persisted_state, }; - let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params); - futures::future::join( worker.run(block_import_justif, finality_notifications), on_demand_justifications_handler.run(), diff --git a/client/consensus/beefy/src/metrics.rs b/client/consensus/beefy/src/metrics.rs index 6653763fc6754..031748bdceab5 100644 --- a/client/consensus/beefy/src/metrics.rs +++ b/client/consensus/beefy/src/metrics.rs @@ -228,8 +228,8 @@ impl PrometheusRegister for OnDemandIncomingRequestsMetrics { pub struct OnDemandOutgoingRequestsMetrics { /// Number of times there was no good peer to request justification from pub beefy_on_demand_justification_no_peer_to_request_from: Counter, - /// Number of on-demand justification peer hang up - pub beefy_on_demand_justification_peer_hang_up: Counter, + /// Number of on-demand justification peer refused valid requests + pub beefy_on_demand_justification_peer_refused: Counter, /// Number of on-demand justification peer error pub beefy_on_demand_justification_peer_error: Counter, /// Number of on-demand justification invalid proof @@ -249,10 +249,10 @@ impl PrometheusRegister for OnDemandOutgoingRequestsMetrics { )?, registry, )?, - beefy_on_demand_justification_peer_hang_up: register( + beefy_on_demand_justification_peer_refused: register( Counter::new( - "substrate_beefy_on_demand_justification_peer_hang_up", - "Number of on-demand justification peer hang up", + "beefy_on_demand_justification_peer_refused", + "Number of on-demand justification peer refused valid requests", )?, registry, )?, diff --git a/client/consensus/beefy/src/tests.rs b/client/consensus/beefy/src/tests.rs index f36c2cd68f97f..48ecebdac3581 100644 --- a/client/consensus/beefy/src/tests.rs +++ b/client/consensus/beefy/src/tests.rs @@ -24,6 +24,7 @@ use crate::{ communication::{ gossip::{ proofs_topic, tests::sign_commitment, votes_topic, GossipFilterCfg, GossipMessage, + GossipValidator, }, request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler}, }, @@ -357,8 +358,8 @@ async fn voter_init_setup( ) -> sp_blockchain::Result> { let backend = net.peer(0).client().as_backend(); let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let gossip_validator = - Arc::new(crate::communication::gossip::GossipValidator::new(known_peers)); + let (gossip_validator, _) = GossipValidator::new(known_peers); + let gossip_validator = Arc::new(gossip_validator); let mut gossip_engine = sc_network_gossip::GossipEngine::new( net.peer(0).network_service().clone(), net.peer(0).sync_service().clone(), @@ -1262,8 +1263,8 @@ async fn gossipped_finality_proofs() { let charlie = &net.peers[2]; let known_peers = Arc::new(Mutex::new(KnownPeers::::new())); // Charlie will run just the gossip engine and not the full voter. - let charlie_gossip_validator = - Arc::new(crate::communication::gossip::GossipValidator::new(known_peers)); + let (gossip_validator, _) = GossipValidator::new(known_peers); + let charlie_gossip_validator = Arc::new(gossip_validator); charlie_gossip_validator.update_filter(GossipFilterCfg:: { start: 1, end: 10, diff --git a/client/consensus/beefy/src/worker.rs b/client/consensus/beefy/src/worker.rs index 19225ec214578..c05de197d58fd 100644 --- a/client/consensus/beefy/src/worker.rs +++ b/client/consensus/beefy/src/worker.rs @@ -19,7 +19,8 @@ use crate::{ communication::{ gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator}, - request_response::outgoing_requests_engine::OnDemandJustificationsEngine, + peers::PeerReport, + request_response::outgoing_requests_engine::{OnDemandJustificationsEngine, ResponseInfo}, }, error::Error, justification::BeefyVersionedFinalityProof, @@ -34,7 +35,7 @@ use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, log_enabled, trace, warn}; use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend}; use sc_network_gossip::GossipEngine; -use sc_utils::notification::NotificationReceiver; +use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver}; use sp_api::{BlockId, ProvideRuntimeApi}; use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; use sp_consensus::SyncOracle; @@ -255,20 +256,6 @@ impl VoterOracle { } } -pub(crate) struct WorkerParams { - pub backend: Arc, - pub payload_provider: P, - pub runtime: Arc, - pub sync: Arc, - pub key_store: BeefyKeystore, - pub gossip_engine: GossipEngine, - pub gossip_validator: Arc>, - pub on_demand_justifications: OnDemandJustificationsEngine, - pub links: BeefyVoterLinks, - pub metrics: Option, - pub persisted_state: PersistedState, -} - #[derive(Debug, Decode, Encode, PartialEq)] pub(crate) struct PersistedState { /// Best block we voted on. @@ -311,28 +298,29 @@ impl PersistedState { /// A BEEFY worker plays the BEEFY protocol pub(crate) struct BeefyWorker { // utilities - backend: Arc, - payload_provider: P, - runtime: Arc, - sync: Arc, - key_store: BeefyKeystore, + pub backend: Arc, + pub payload_provider: P, + pub runtime: Arc, + pub sync: Arc, + pub key_store: BeefyKeystore, // communication - gossip_engine: GossipEngine, - gossip_validator: Arc>, - on_demand_justifications: OnDemandJustificationsEngine, + pub gossip_engine: GossipEngine, + pub gossip_validator: Arc>, + pub gossip_report_stream: TracingUnboundedReceiver, + pub on_demand_justifications: OnDemandJustificationsEngine, // channels /// Links between the block importer, the background voter and the RPC layer. - links: BeefyVoterLinks, + pub links: BeefyVoterLinks, // voter state /// BEEFY client metrics. - metrics: Option, + pub metrics: Option, /// Buffer holding justifications for future processing. - pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, + pub pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, /// Persisted voter state. - persisted_state: PersistedState, + pub persisted_state: PersistedState, } impl BeefyWorker @@ -344,43 +332,6 @@ where R: ProvideRuntimeApi, R::Api: BeefyApi, { - /// Return a new BEEFY worker instance. - /// - /// Note that a BEEFY worker is only fully functional if a corresponding - /// BEEFY pallet has been deployed on-chain. - /// - /// The BEEFY pallet is needed in order to keep track of the BEEFY authority set. - pub(crate) fn new(worker_params: WorkerParams) -> Self { - let WorkerParams { - backend, - payload_provider, - runtime, - key_store, - sync, - gossip_engine, - gossip_validator, - on_demand_justifications, - links, - metrics, - persisted_state, - } = worker_params; - - BeefyWorker { - backend, - payload_provider, - runtime, - sync, - key_store, - gossip_engine, - gossip_validator, - on_demand_justifications, - links, - metrics, - pending_justifications: BTreeMap::new(), - persisted_state, - } - } - fn best_grandpa_block(&self) -> NumberFor { *self.persisted_state.voting_oracle.best_grandpa_block_header.number() } @@ -849,7 +800,12 @@ where // Act on changed 'state'. self.process_new_state(); + // Mutable reference used to drive the gossip engine. let mut gossip_engine = &mut self.gossip_engine; + // Use temp val and report after async section, + // to avoid having to Mutex-wrap `gossip_engine`. + let mut gossip_report: Option = None; + // Wait for, and handle external events. // The branches below only change 'state', actual voting happens afterwards, // based on the new resulting 'state'. @@ -870,11 +826,16 @@ where return; }, // Process incoming justifications as these can make some in-flight votes obsolete. - justif = self.on_demand_justifications.next().fuse() => { - if let Some(justif) = justif { - if let Err(err) = self.triage_incoming_justif(justif) { - debug!(target: LOG_TARGET, "🥩 {}", err); - } + response_info = self.on_demand_justifications.next().fuse() => { + match response_info { + ResponseInfo::ValidProof(justif, peer_report) => { + if let Err(err) = self.triage_incoming_justif(justif) { + debug!(target: LOG_TARGET, "🥩 {}", err); + } + gossip_report = Some(peer_report); + }, + ResponseInfo::PeerReport(peer_report) => gossip_report = Some(peer_report), + ResponseInfo::Pending => (), } }, justif = block_import_justif.next() => { @@ -918,6 +879,13 @@ where return; } }, + // Process peer reports. + report = self.gossip_report_stream.next() => { + gossip_report = report; + }, + } + if let Some(PeerReport { who, cost_benefit }) = gossip_report { + self.gossip_engine.report(who, cost_benefit); } } } @@ -1122,7 +1090,8 @@ pub(crate) mod tests { let network = peer.network_service().clone(); let sync = peer.sync_service().clone(); let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let gossip_validator = Arc::new(GossipValidator::new(known_peers.clone())); + let (gossip_validator, gossip_report_stream) = GossipValidator::new(known_peers.clone()); + let gossip_validator = Arc::new(gossip_validator); let gossip_engine = GossipEngine::new( network.clone(), sync.clone(), @@ -1152,7 +1121,7 @@ pub(crate) mod tests { ) .unwrap(); let payload_provider = MmrRootProvider::new(api.clone()); - let worker_params = crate::worker::WorkerParams { + BeefyWorker { backend, payload_provider, runtime: api, @@ -1160,12 +1129,13 @@ pub(crate) mod tests { links, gossip_engine, gossip_validator, + gossip_report_stream, metrics, sync: Arc::new(sync), on_demand_justifications, + pending_justifications: BTreeMap::new(), persisted_state, - }; - BeefyWorker::<_, _, _, _, _>::new(worker_params) + } } #[test]