Skip to content

Commit

Permalink
Avoid a duplicate block request when syncing from a fork (paritytech#…
Browse files Browse the repository at this point in the history
…11094)

* Separate queueing blocks for import from removal

* Add regression tests

* Remove unnecessary log

* Clear queued blocks when processed

* Move check out of match block

* Track queued block ranges

* Update client/network/sync/src/blocks.rs

* Update client/network/sync/src/blocks.rs

* Update client/network/sync/src/blocks.rs

* Update client/network/sync/src/blocks.rs

* FMT

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Bastian Köcher <info@kchr.de>
  • Loading branch information
3 people authored and ark0f committed Feb 27, 2023
1 parent d7e1d69 commit 7933f6c
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 43 deletions.
10 changes: 10 additions & 0 deletions client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ pub enum BlockImportStatus<N: std::fmt::Debug + PartialEq> {
ImportedUnknown(N, ImportedAux, Option<Origin>),
}

impl<N: std::fmt::Debug + PartialEq> BlockImportStatus<N> {
/// Returns the imported block number.
pub fn number(&self) -> &N {
match self {
BlockImportStatus::ImportedKnown(n, _) |
BlockImportStatus::ImportedUnknown(n, _, _) => n,
}
}
}

/// Block import error.
#[derive(Debug, thiserror::Error)]
pub enum BlockImportError {
Expand Down
114 changes: 87 additions & 27 deletions client/network/sync/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::message;
use libp2p::PeerId;
use log::trace;
use log::{debug, trace};
use sp_runtime::traits::{Block as BlockT, NumberFor, One};
use std::{
cmp,
Expand All @@ -39,13 +39,15 @@ pub struct BlockData<B: BlockT> {
enum BlockRangeState<B: BlockT> {
Downloading { len: NumberFor<B>, downloading: u32 },
Complete(Vec<BlockData<B>>),
Queued { len: NumberFor<B> },
}

impl<B: BlockT> BlockRangeState<B> {
pub fn len(&self) -> NumberFor<B> {
match *self {
Self::Downloading { len, .. } => len,
Self::Complete(ref blocks) => (blocks.len() as u32).into(),
Self::Queued { len } => len,
}
}
}
Expand All @@ -56,12 +58,19 @@ pub struct BlockCollection<B: BlockT> {
/// Downloaded blocks.
blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
peer_requests: HashMap<PeerId, NumberFor<B>>,
/// Block ranges downloaded and queued for import.
/// Maps start_hash => (start_num, end_num).
queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
}

impl<B: BlockT> BlockCollection<B> {
/// Create a new instance.
pub fn new() -> Self {
Self { blocks: BTreeMap::new(), peer_requests: HashMap::new() }
Self {
blocks: BTreeMap::new(),
peer_requests: HashMap::new(),
queued_blocks: HashMap::new(),
}
}

/// Clear everything.
Expand Down Expand Up @@ -170,29 +179,52 @@ impl<B: BlockT> BlockCollection<B> {
}

/// Get a valid chain of blocks ordered in descending order and ready for importing into
/// blockchain.
pub fn drain(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut drained = Vec::new();
let mut ranges = Vec::new();
/// the blockchain.
pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut ready = Vec::new();

let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
BlockRangeState::Complete(blocks) if *start <= prev => {
prev = *start + (blocks.len() as u32).into();
// Remove all elements from `blocks` and add them to `drained`
drained.append(blocks);
ranges.push(*start);
for (&start, range_data) in &mut self.blocks {
if start > prev {
break
}
let len = match range_data {
BlockRangeState::Complete(blocks) => {
let len = (blocks.len() as u32).into();
prev = start + len;
// Remove all elements from `blocks` and add them to `ready`
ready.append(blocks);
len
},
BlockRangeState::Queued { .. } => continue,
_ => break,
}
};
*range_data = BlockRangeState::Queued { len };
}

for r in ranges {
self.blocks.remove(&r);
if let Some(BlockData { block, .. }) = ready.first() {
self.queued_blocks
.insert(block.hash, (from, from + (ready.len() as u32).into()));
}

trace!(target: "sync", "{} blocks ready for import", ready.len());
ready
}

pub fn clear_queued(&mut self, from_hash: &B::Hash) {
match self.queued_blocks.remove(from_hash) {
None => {
debug!(target: "sync", "Can't clear unknown queued blocks from {:?}", from_hash);
},
Some((from, to)) => {
let mut block_num = from;
while block_num < to {
self.blocks.remove(&block_num);
block_num += One::one();
}
trace!(target: "sync", "Cleared blocks from {:?} to {:?}", from, to);
},
}
trace!(target: "sync", "Drained {} blocks from {:?}", drained.len(), from);
drained
}

pub fn clear_peer_download(&mut self, who: &PeerId) {
Expand Down Expand Up @@ -268,14 +300,14 @@ mod test {

bc.clear_peer_download(&peer1);
bc.insert(41, blocks[41..81].to_vec(), peer1.clone());
assert_eq!(bc.drain(1), vec![]);
assert_eq!(bc.ready_blocks(1), vec![]);
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1, 200), Some(121..151));
bc.clear_peer_download(&peer0);
bc.insert(1, blocks[1..11].to_vec(), peer0.clone());

assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1, 200), Some(11..41));
assert_eq!(
bc.drain(1),
bc.ready_blocks(1),
blocks[1..11]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) })
Expand All @@ -285,16 +317,16 @@ mod test {
bc.clear_peer_download(&peer0);
bc.insert(11, blocks[11..41].to_vec(), peer0.clone());

let drained = bc.drain(12);
let ready = bc.ready_blocks(12);
assert_eq!(
drained[..30],
ready[..30],
blocks[11..41]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) })
.collect::<Vec<_>>()[..]
);
assert_eq!(
drained[30..],
ready[30..],
blocks[41..81]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) })
Expand All @@ -308,17 +340,17 @@ mod test {
bc.clear_peer_download(&peer1);
bc.insert(121, blocks[121..150].to_vec(), peer1.clone());

assert_eq!(bc.drain(80), vec![]);
let drained = bc.drain(81);
assert_eq!(bc.ready_blocks(80), vec![]);
let ready = bc.ready_blocks(81);
assert_eq!(
drained[..40],
ready[..40],
blocks[81..121]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer2.clone()) })
.collect::<Vec<_>>()[..]
);
assert_eq!(
drained[40..],
ready[40..],
blocks[121..150]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) })
Expand All @@ -344,4 +376,32 @@ mod test {
Some(100 + 128..100 + 128 + 128)
);
}

#[test]
fn no_duplicate_requests_on_fork() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let peer = PeerId::random();

let blocks = generate_blocks(10);

// count = 5, peer_best = 50, common = 39, max_parallel = 0, max_ahead = 200
assert_eq!(bc.needed_blocks(peer.clone(), 5, 50, 39, 0, 200), Some(40..45));

// got a response on the request for `40..45`
bc.clear_peer_download(&peer);
bc.insert(40, blocks[..5].to_vec(), peer.clone());

// our "node" started on a fork, with its current best = 47, which is > common
let ready = bc.ready_blocks(48);
assert_eq!(
ready,
blocks[..5]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer.clone()) })
.collect::<Vec<_>>()
);

assert_eq!(bc.needed_blocks(peer.clone(), 5, 50, 39, 0, 200), Some(45..50));
}
}
Loading

0 comments on commit 7933f6c

Please sign in to comment.