Skip to content

Commit

Permalink
sc-consensus-beefy: add peer reputation cost/benefit changes (parityt…
Browse files Browse the repository at this point in the history
…ech#13881)

* add cost/benefit to gossip messages
* report BEEFY gossip peer reputation changes
* drop WorkerParams helper struct
* add reputation costs to tests
* add peer reputation cost/benefit to on-demand-requests protocol
* include amount of signatures checked in invalid proof reputation cost

Signed-off-by: Adrian Catangiu <adrian@parity.io>
  • Loading branch information
acatangiu authored and nathanwhit committed Jul 19, 2023
1 parent 82cd447 commit 4f482ce
Show file tree
Hide file tree
Showing 12 changed files with 430 additions and 225 deletions.
264 changes: 205 additions & 59 deletions client/consensus/beefy/src/communication/gossip.rs

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions client/consensus/beefy/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,39 @@ pub fn beefy_peers_set_config(
cfg
}

// cost scalars for reporting peers.
mod cost {
use sc_network::ReputationChange as Rep;
// Message that's for an outdated round.
pub(super) const OUTDATED_MESSAGE: Rep = Rep::new(-50, "BEEFY: Past message");
// Message that's from the future relative to our current set-id.
pub(super) const FUTURE_MESSAGE: Rep = Rep::new(-100, "BEEFY: Future message");
// Vote message containing bad signature.
pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "BEEFY: Bad signature");
// Message received with vote from voter not in validator set.
pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "BEEFY: Unknown voter");
// A message received that cannot be evaluated relative to our current state.
pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "BEEFY: Out-of-scope message");
// Message containing invalid proof.
pub(super) const INVALID_PROOF: Rep = Rep::new(-5000, "BEEFY: Invalid commit");
// Reputation cost per signature checked for invalid proof.
pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
// Reputation cost per byte for un-decodable message.
pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
// On-demand request was refused by peer.
pub(super) const REFUSAL_RESPONSE: Rep = Rep::new(-100, "BEEFY: Proof request refused");
// On-demand request for a proof that can't be found in the backend.
pub(super) const UNKOWN_PROOF_REQUEST: Rep = Rep::new(-150, "BEEFY: Unknown proof request");
}

// benefit scalars for reporting peers.
mod benefit {
use sc_network::ReputationChange as Rep;
pub(super) const VOTE_MESSAGE: Rep = Rep::new(100, "BEEFY: Round vote message");
pub(super) const KNOWN_VOTE_MESSAGE: Rep = Rep::new(50, "BEEFY: Known vote");
pub(super) const VALIDATED_PROOF: Rep = Rep::new(100, "BEEFY: Justification");
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
12 changes: 8 additions & 4 deletions client/consensus/beefy/src/communication/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

//! Logic for keeping track of BEEFY peers.
// TODO (issue #12296): replace this naive peer tracking with generic one that infers data
// from multiple network protocols.

use sc_network::PeerId;
use sc_network::{PeerId, ReputationChange};
use sp_runtime::traits::{Block, NumberFor, Zero};
use std::collections::{HashMap, VecDeque};

/// Report specifying a reputation change for a given peer.
#[derive(Debug, PartialEq)]
pub(crate) struct PeerReport {
pub who: PeerId,
pub cost_benefit: ReputationChange,
}

struct PeerData<B: Block> {
last_voted_on: NumberFor<B>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ use sp_runtime::traits::Block;
use std::{marker::PhantomData, sync::Arc};

use crate::{
communication::request_response::{
on_demand_justifications_protocol_config, Error, JustificationRequest,
BEEFY_SYNC_LOG_TARGET,
communication::{
cost,
request_response::{
on_demand_justifications_protocol_config, Error, JustificationRequest,
BEEFY_SYNC_LOG_TARGET,
},
},
metric_inc,
metrics::{register_metrics, OnDemandIncomingRequestsMetrics},
Expand Down Expand Up @@ -69,17 +72,20 @@ impl<B: Block> IncomingRequest<B> {
/// Params:
/// - The raw request to decode
/// - Reputation changes to apply for the peer in case decoding fails.
pub fn try_from_raw(
pub fn try_from_raw<F>(
raw: netconfig::IncomingRequest,
reputation_changes: Vec<ReputationChange>,
) -> Result<Self, Error> {
reputation_changes_on_err: F,
) -> Result<Self, Error>
where
F: FnOnce(usize) -> Vec<ReputationChange>,
{
let netconfig::IncomingRequest { payload, peer, pending_response } = raw;
let payload = match JustificationRequest::decode(&mut payload.as_ref()) {
Ok(payload) => payload,
Err(err) => {
let response = netconfig::OutgoingResponse {
result: Err(()),
reputation_changes,
reputation_changes: reputation_changes_on_err(payload.len()),
sent_feedback: None,
};
if let Err(_) = pending_response.send(response) {
Expand Down Expand Up @@ -111,11 +117,11 @@ impl IncomingRequestReceiver {
pub async fn recv<B, F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<B>, Error>
where
B: Block,
F: FnOnce() -> Vec<ReputationChange>,
F: FnOnce(usize) -> Vec<ReputationChange>,
{
let req = match self.raw.next().await {
None => return Err(Error::RequestChannelExhausted),
Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes())?,
Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes)?,
};
Ok(req)
}
Expand Down Expand Up @@ -159,26 +165,20 @@ where

// Sends back justification response if justification found in client backend.
fn handle_request(&self, request: IncomingRequest<B>) -> Result<(), Error> {
// TODO (issue #12293): validate `request` and change peer reputation for invalid requests.

let maybe_encoded_proof = if let Some(hash) =
self.client.block_hash(request.payload.begin).map_err(Error::Client)?
{
self.client
.justifications(hash)
.map_err(Error::Client)?
.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
// No BEEFY justification present.
.ok_or(())
} else {
Err(())
};

let mut reputation_changes = vec![];
let maybe_encoded_proof = self
.client
.block_hash(request.payload.begin)
.ok()
.flatten()
.and_then(|hash| self.client.justifications(hash).ok().flatten())
.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
.ok_or_else(|| reputation_changes.push(cost::UNKOWN_PROOF_REQUEST));
request
.pending_response
.send(netconfig::OutgoingResponse {
result: maybe_encoded_proof,
reputation_changes: Vec::new(),
reputation_changes,
sent_feedback: None,
})
.map_err(|_| Error::SendResponse)
Expand All @@ -188,7 +188,17 @@ where
pub async fn run(mut self) {
trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler");

while let Ok(request) = self.request_receiver.recv(|| vec![]).await {
while let Ok(request) = self
.request_receiver
.recv(|bytes| {
let bytes = bytes.min(i32::MAX as usize) as i32;
vec![ReputationChange::new(
bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
"BEEFY: Bad request payload",
)]
})
.await
{
let peer = request.peer;
match self.handle_request(request) {
Ok(()) => {
Expand All @@ -199,8 +209,8 @@ where
)
},
Err(e) => {
// peer reputation changes already applied in `self.handle_request()`
metric_inc!(self, beefy_failed_justification_responses);
// TODO (issue #12293): apply reputation changes here based on error type.
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use codec::{Decode, Encode, Error as CodecError};
use sc_network::{config::RequestResponseConfig, PeerId};
use sp_runtime::traits::{Block, NumberFor};

use crate::communication::beefy_protocol_name::justifications_protocol_name;
use crate::communication::{beefy_protocol_name::justifications_protocol_name, peers::PeerReport};
use incoming_requests_handler::IncomingRequestReceiver;

// 10 seems reasonable, considering justifs are explicitly requested only
Expand Down Expand Up @@ -76,7 +76,7 @@ pub struct JustificationRequest<B: Block> {
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
pub(crate) enum Error {
#[error(transparent)]
Client(#[from] sp_blockchain::Error),

Expand All @@ -99,5 +99,8 @@ pub enum Error {
SendResponse,

#[error("Received invalid response.")]
InvalidResponse,
InvalidResponse(PeerReport),

#[error("Internal error while getting response.")]
ResponseError,
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ use sp_runtime::traits::{Block, NumberFor};
use std::{collections::VecDeque, result::Result, sync::Arc};

use crate::{
communication::request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
communication::{
benefit, cost,
peers::PeerReport,
request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
},
justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof},
metric_inc,
metrics::{register_metrics, OnDemandOutgoingRequestsMetrics},
Expand All @@ -54,6 +58,16 @@ enum State<B: Block> {
AwaitingResponse(PeerId, RequestInfo<B>, ResponseReceiver),
}

/// Possible engine responses.
pub(crate) enum ResponseInfo<B: Block> {
/// No peer response available yet.
Pending,
/// Valid justification provided alongside peer reputation changes.
ValidProof(BeefyVersionedFinalityProof<B>, PeerReport),
/// No justification yet, only peer reputation changes.
PeerReport(PeerReport),
}

pub struct OnDemandJustificationsEngine<B: Block> {
network: Arc<dyn NetworkRequest + Send + Sync>,
protocol_name: ProtocolName,
Expand Down Expand Up @@ -84,12 +98,10 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
}

fn reset_peers_cache_for_block(&mut self, block: NumberFor<B>) {
// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
self.peers_cache = self.live_peers.lock().further_than(block);
}

fn try_next_peer(&mut self) -> Option<PeerId> {
// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
let live = self.live_peers.lock();
while let Some(peer) = self.peers_cache.pop_front() {
if live.contains(&peer) {
Expand Down Expand Up @@ -159,56 +171,65 @@ impl<B: Block> OnDemandJustificationsEngine<B> {

fn process_response(
&mut self,
peer: PeerId,
peer: &PeerId,
req_info: &RequestInfo<B>,
response: Result<Response, Canceled>,
) -> Result<BeefyVersionedFinalityProof<B>, Error> {
response
.map_err(|e| {
metric_inc!(self, beefy_on_demand_justification_peer_hang_up);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}",
req_info.block,
peer,
e
"🥩 on-demand sc-network channel sender closed, err: {:?}", e
);
Error::InvalidResponse
Error::ResponseError
})?
.map_err(|e| {
metric_inc!(self, beefy_on_demand_justification_peer_error);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
req_info.block,
peer,
e
);
Error::InvalidResponse
match e {
RequestFailure::Refused => {
metric_inc!(self, beefy_on_demand_justification_peer_refused);
let peer_report =
PeerReport { who: *peer, cost_benefit: cost::REFUSAL_RESPONSE };
Error::InvalidResponse(peer_report)
},
_ => {
metric_inc!(self, beefy_on_demand_justification_peer_error);
Error::ResponseError
},
}
})
.and_then(|encoded| {
decode_and_verify_finality_proof::<B>(
&encoded[..],
req_info.block,
&req_info.active_set,
)
.map_err(|e| {
.map_err(|(err, signatures_checked)| {
metric_inc!(self, beefy_on_demand_justification_invalid_proof);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
req_info.block, peer, e
req_info.block, peer, err
);
Error::InvalidResponse
let mut cost = cost::INVALID_PROOF;
cost.value +=
cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
Error::InvalidResponse(PeerReport { who: *peer, cost_benefit: cost })
})
})
}

pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
pub(crate) async fn next(&mut self) -> ResponseInfo<B> {
let (peer, req_info, resp) = match &mut self.state {
State::Idle => {
futures::future::pending::<()>().await;
return None
return ResponseInfo::Pending
},
State::AwaitingResponse(peer, req_info, receiver) => {
let resp = receiver.await;
Expand All @@ -220,8 +241,8 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
self.state = State::Idle;

let block = req_info.block;
self.process_response(peer, &req_info, resp)
.map_err(|_| {
match self.process_response(&peer, &req_info, resp) {
Err(err) => {
// No valid justification received, try next peer in our set.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, req_info);
Expand All @@ -231,15 +252,22 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
"🥩 ran out of peers to request justif #{:?} from", block
);
}
})
.map(|proof| {
// Report peer based on error type.
if let Error::InvalidResponse(peer_report) = err {
ResponseInfo::PeerReport(peer_report)
} else {
ResponseInfo::Pending
}
},
Ok(proof) => {
metric_inc!(self, beefy_on_demand_justification_good_proof);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 received valid on-demand justif #{:?} from {:?}", block, peer
);
proof
})
.ok()
let peer_report = PeerReport { who: peer, cost_benefit: benefit::VALIDATED_PROOF };
ResponseInfo::ValidProof(proof, peer_report)
},
}
}
}
1 change: 1 addition & 0 deletions client/consensus/beefy/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ where
.ok_or_else(|| ImportError("Unknown validator set".to_string()))?;

decode_and_verify_finality_proof::<Block>(&encoded[..], number, &validator_set)
.map_err(|(err, _)| err)
}
}

Expand Down
Loading

0 comments on commit 4f482ce

Please sign in to comment.