From 44778682534b48266614d05f98697386d29d42b9 Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Tue, 26 Sep 2023 11:43:43 -0700 Subject: [PATCH] Refactor, address comments, more unit tests --- substrate/client/network/sync/src/lib.rs | 280 ++++++++++++++++++----- 1 file changed, 221 insertions(+), 59 deletions(-) diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 4c192ee7f1e9..8f5c004dfd81 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -423,65 +423,107 @@ pub struct PeerDownloadState { /// The range being download [start, end). range: Range>, - /// The blocks downloaded so far. - downloaded: Vec>, + /// State of the blocks downloaded so far. + downloaded: Option>, +} + +/// State of the blocks downloaded so far. +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct PeerDownloaded { + /// The downloaded blocks. + blocks: Vec>, + + /// Block number of the first block in `blocks`. + start_block: NumberFor, } impl PeerDownloadState { fn new(common_number: NumberFor, range: Range>) -> Self { - Self { common_number, range, downloaded: Vec::new() } + Self { common_number, range, downloaded: None } } - /// Handles the new blocks received from the peer. - /// Returns: - /// Ok(true): If done and no more requests need to be issued. - /// Ok(false): if more requests need to be issued. - /// Err(): On any failures. - fn handle_blocks( + /// Validates the new blocks received from the peer and prepends to the + /// already downloaded blocks. Assumes the blocks are received in descending order + /// (e.g) [50 -100), [25 - 50), etc + fn prepend_blocks( &mut self, who: &PeerId, mut new_blocks: Vec>, request: BlockRequest, - collection: &mut BlockCollection, - ) -> Result { - collection.clear_peer_download(who); - + ) -> Result<(), BadPeer> { // Validate the blocks. let start_block = match validate_blocks::(&new_blocks, who, Some(request)) { Ok(Some(start_block)) => start_block, - Ok(None) => return Ok(true), + Ok(None) => return Err(BadPeer(*who, rep::BAD_RESPONSE)), Err(err) => return Err(err), }; - // Add the new blocks to the downloaded list. - let range_len = (self.range.end - self.range.start).saturated_into::() as usize; - if (self.downloaded.len() + new_blocks.len()) > range_len { + // Validate against the current state. + let remaining = self.range.start.. + self.downloaded + .as_ref() + .map_or(self.range.end, |downloaded| downloaded.start_block); + // Start block should be in range. + if !remaining.contains(&start_block) { + return Err(BadPeer(*who, rep::BAD_RESPONSE)) + } + // The new blocks should be a suffix of the remaining range. + let remaining_len = (remaining.end - start_block).saturated_into::(); + if new_blocks.len() != remaining_len { return Err(BadPeer(*who, rep::BAD_RESPONSE)) } - let new_blocks_len = new_blocks.len(); - new_blocks.append(&mut self.downloaded); - self.downloaded.append(&mut new_blocks); - if self.range.start == (self.common_number + One::one()) && - self.downloaded.len() < range_len - { - // The download extends the common ancestor, but the response - // was partial. We want to issue further requests to fetch - // the incomplete part before importing. - trace!( - target: LOG_TARGET, - "Download state: incomplete download: \ - common = {}, range = {:?}, new_blocks = {new_blocks_len}, downloaded = {}", - self.common_number, self.range, self.downloaded.len() - ); - return Ok(false) + // Prepend the new blocks to the downloaded list. + if let Some(downloaded) = self.downloaded.as_mut() { + new_blocks.append(&mut downloaded.blocks); + downloaded.blocks.append(&mut new_blocks); + downloaded.start_block = start_block; + } else { + self.downloaded = Some(PeerDownloaded { blocks: new_blocks, start_block }); } - // Done, report the accumulated blocks to the collection. - let mut downloaded = Vec::new(); - downloaded.append(&mut self.downloaded); - collection.insert(start_block, downloaded, *who); - Ok(true) + Ok(()) + } + + /// Returns the downloaded blocks if we are done downloading the full range. + /// Returns: Some(_) if we are done downloading, None if more requests need to + /// be issued. + fn get_downloaded_blocks(&mut self) -> Option<(NumberFor, Vec>)> { + if let Some(downloaded) = self.downloaded.as_mut() { + // Consider the following scenario where we are downloading from a fork: + // common ancestor between current chain and fork = 915, current best + // queued number = 925. + // 1. Request is sent for blocks [916, 916 + 64) + // 2. Peer sends a partial responds with blocks [926, ..) + // 3. Since BlockCollection works off block numbers(and not hash), if this partial + // response is submitted to the collection, this would be considered to extend the + // current tip. Block 925 on current chain would be erroneously considered the parent + // of 926 on the fork. This would result in the partial response being imported, + // which would fail as parent not found and result in the sync failing/restarted. + // + // Instead, hold on to the partial response in this scenario, + // issue more requests to download the missing [916, 925). And submit + // to the block collection after the full range is downloaded. + let range_len = (self.range.end - self.range.start).saturated_into::(); + if self.range.start == (self.common_number + One::one()) && + downloaded.blocks.len() < range_len + { + trace!( + target: LOG_TARGET, + "Download state: incomplete download: \ + common = {}, range = {:?}, downloaded = {}", + self.common_number, self.range, downloaded.blocks.len() + ); + None + } else { + // Done, return the downloaded blocks. + let mut blocks = Vec::new(); + blocks.append(&mut downloaded.blocks); + Some((downloaded.start_block, blocks)) + } + } else { + None + } } /// Returns the next request to be issued based on the download state. @@ -491,20 +533,18 @@ impl PeerDownloadState { peer_best_number: NumberFor, peer_best_hash: &B::Hash, ) -> Option<(Range>, BlockRequest)> { - let downloaded_head = if let Some(number) = - self.downloaded.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()) - { - number + let start_block = if let Some(downloaded) = &self.downloaded { + downloaded.start_block } else { warn!( target: LOG_TARGET, "Download state: unable to get block number of the downloaded blocks: \ - common = {}, range = {:?}, downloaded = {}", - self.common_number, self.range, self.downloaded.len() + common = {}, range = {:?}", + self.common_number, self.range ); return None }; - let range = Range { start: self.range.start, end: downloaded_head }; + let range = Range { start: self.range.start, end: start_block }; // The end is not part of the range. let last = range.end.saturating_sub(One::one()); @@ -822,20 +862,21 @@ where if let Some(request) = request { match &mut peer.state { PeerSyncState::DownloadingNew(download_state) => { - match download_state.handle_blocks(who, blocks, request, &mut self.blocks) { - Ok(true) => { - // Done with the range, go back to available state. - peer.state = PeerSyncState::Available; - self.ready_blocks() - }, - Ok(false) => { - // Not done yet, leave it in DownloadingNew state. - vec![] - }, - Err(err) => { - peer.state = PeerSyncState::Available; - return Err(err) - }, + self.blocks.clear_peer_download(who); + if let Err(err) = download_state.prepend_blocks(who, blocks, request) { + peer.state = PeerSyncState::Available; + return Err(err) + } + if let Some((start_block, blocks)) = download_state.get_downloaded_blocks() + { + // Done with the range, report the blocks to the collection, + // go back to available state. + peer.state = PeerSyncState::Available; + self.blocks.insert(start_block, blocks, *who); + self.ready_blocks() + } else { + // Not done yet, leave it in DownloadingNew state to start + vec![] } }, PeerSyncState::DownloadingGap(_) => { @@ -4142,4 +4183,125 @@ mod test { sync.peer_disconnected(&peers[1]); assert_eq!(sync.pending_responses.len(), 0); } + + #[test] + fn peer_download_state() { + sp_tracing::try_init_simple(); + let mut client = Arc::new(TestClientBuilder::new().build()); + let blocks = (0..64).map(|_| build_block(&mut client, None, false)).collect::>(); + let peer_id = PeerId::random(); + + // Full response + { + let mut state = PeerDownloadState::::new(19, 20..30); + assert!(state.get_downloaded_blocks().is_none()); + + let request = BlockRequest:: { + id: 0, + fields: BlockAttributes::HEADER | BlockAttributes::BODY, + from: FromBlock::Number(29), + direction: Direction::Descending, + max: Some(10), + }; + let resp_blocks = blocks[19..29].to_vec(); + let response = create_block_response(resp_blocks.clone()); + assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_ok()); + let (start_block, downloaded) = state.get_downloaded_blocks().unwrap(); + assert_eq!(start_block, 20); + assert_eq!(downloaded.len(), 10); + } + + // Partial response, not starting after common ancestor + { + let mut state = PeerDownloadState::::new(19, 25..30); + assert!(state.get_downloaded_blocks().is_none()); + + let request = BlockRequest:: { + id: 0, + fields: BlockAttributes::HEADER | BlockAttributes::BODY, + from: FromBlock::Number(29), + direction: Direction::Descending, + max: Some(5), + }; + let resp_blocks = blocks[28..29].to_vec(); // 1 block. + let response = create_block_response(resp_blocks.clone()); + assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_ok()); + let (start_block, downloaded) = state.get_downloaded_blocks().unwrap(); + assert_eq!(start_block, 29); + assert_eq!(downloaded.len(), 1); + } + + // Partial response, starting after common ancestor + { + let mut state = PeerDownloadState::::new(19, 20..30); + assert!(state.get_downloaded_blocks().is_none()); + + // Response 1: Blocks [25 .. 29]. + let request = BlockRequest:: { + id: 0, + fields: BlockAttributes::HEADER | BlockAttributes::BODY, + from: FromBlock::Number(29), + direction: Direction::Descending, + max: Some(10), + }; + let resp_blocks = blocks[24..29].to_vec(); + let response = create_block_response(resp_blocks.clone()); + assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_ok()); + assert!(state.get_downloaded_blocks().is_none()); + + // Response 2: Blocks [20 .. 24]. + let (range, request) = state + .peer_block_request( + BlockAttributes::HEADER | BlockAttributes::BODY, + 100, + &blocks[0].hash(), + ) + .unwrap(); + assert_eq!(range.start, 20); + assert_eq!(range.end, 25); + let resp_blocks = blocks[19..24].to_vec(); + let response = create_block_response(resp_blocks.clone()); + assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_ok()); + let (start_block, downloaded) = state.get_downloaded_blocks().unwrap(); + assert_eq!(start_block, 20); + assert_eq!(downloaded.len(), 10); + } + } + + #[test] + fn peer_download_state_err() { + sp_tracing::try_init_simple(); + let mut client = Arc::new(TestClientBuilder::new().build()); + let blocks = (0..64).map(|_| build_block(&mut client, None, false)).collect::>(); + let peer_id = PeerId::random(); + + { + let mut state = PeerDownloadState::::new(19, 20..30); + assert!(state.get_downloaded_blocks().is_none()); + + // Start block should be in remaining range. + let request = BlockRequest:: { + id: 0, + fields: BlockAttributes::HEADER | BlockAttributes::BODY, + from: FromBlock::Number(40), + direction: Direction::Descending, + max: Some(1), + }; + let resp_blocks = blocks[39..40].to_vec(); + let response = create_block_response(resp_blocks.clone()); + assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_err()); + + // Blocks should be suffix of remaining range. + let request = BlockRequest:: { + id: 0, + fields: BlockAttributes::HEADER | BlockAttributes::BODY, + from: FromBlock::Number(28), + direction: Direction::Descending, + max: Some(10), + }; + let resp_blocks = blocks[23..28].to_vec(); + let response = create_block_response(resp_blocks.clone()); + assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_err()); + } + } }