From 5eb24c6a1be149a83b9a5f926f841c670ed38e80 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Thu, 13 Apr 2023 15:46:16 +0300 Subject: [PATCH 01/24] Replace `RollingSessionWindow` with `RuntimeInfo` - initial commit --- node/core/approval-voting/src/import.rs | 95 +++---- node/core/approval-voting/src/lib.rs | 329 +++++++++++++++++------- 2 files changed, 288 insertions(+), 136 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index f3a571a7133d..2be583fb4107 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -40,10 +40,7 @@ use polkadot_node_subsystem::{ }, overseer, RuntimeApiError, SubsystemError, SubsystemResult, }; -use polkadot_node_subsystem_util::{ - determine_new_blocks, - rolling_session_window::{RollingSessionWindow, SessionWindowUpdate}, -}; +use polkadot_node_subsystem_util::determine_new_blocks; use polkadot_primitives::{ BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, ConsensusLog, CoreIndex, GroupIndex, Hash, Header, SessionIndex, @@ -62,6 +59,7 @@ use crate::{ criteria::{AssignmentCriteria, OurAssignment}, persisted_entries::CandidateEntry, time::{slot_number_to_tick, Tick}, + SessionInfoProvider, }; use super::{State, LOG_TARGET}; @@ -78,7 +76,7 @@ struct ImportedBlockInfo { } struct ImportedBlockInfoEnv<'a> { - session_window: &'a Option, + runtime_info: &'a mut Option, // this is required just for the `earliest_session()` assignment_criteria: &'a (dyn AssignmentCriteria + Send + Sync), keystore: &'a LocalKeystore, } @@ -160,11 +158,7 @@ async fn imported_block_info( return Err(ImportedBlockInfoError::FutureCancelled("SessionIndexForChild", error)), }; - if env - .session_window - .as_ref() - .map_or(true, |s| session_index < s.earliest_session()) - { + if env.runtime_info.as_ref().map_or(true, |s| session_index < s.earliest_session) { gum::debug!( target: LOG_TARGET, "Block {} is from ancient session {}. Skipping", @@ -212,10 +206,21 @@ async fn imported_block_info( } }; - let session_info = match env.session_window.as_ref().and_then(|s| s.session_info(session_index)) - { - Some(s) => s, - None => { + // todo: refactor this + let session_info = if let Some(session_info_provider) = env.runtime_info { + session_info_provider + .runtime_info + .get_session_info_by_index(ctx.sender(), block_hash, session_index) + .await + .map_err(|_| ImportedBlockInfoError::SessionInfoUnavailable) + } else { + gum::debug!(target: LOG_TARGET, "SessionInfoProvider unavailable for block {}", block_hash,); + return Err(ImportedBlockInfoError::SessionInfoUnavailable) + }; + + let session_info = match session_info { + Ok(extended_session_info) => &extended_session_info.session_info, + Err(_) => { gum::debug!(target: LOG_TARGET, "Session info unavailable for block {}", block_hash,); return Err(ImportedBlockInfoError::SessionInfoUnavailable) @@ -360,25 +365,24 @@ pub(crate) async fn handle_new_head( }; // Update session info based on most recent head. - match state.cache_session_info_for_head(ctx, head).await { - Err(e) => { - gum::debug!( - target: LOG_TARGET, - ?head, - ?e, - "Could not cache session info when processing head.", - ); - return Ok(Vec::new()) - }, - Ok(Some(a @ SessionWindowUpdate::Advanced { .. })) => { - gum::info!( - target: LOG_TARGET, - update = ?a, - "Advanced session window for approvals", - ); - }, - Ok(_) => {}, - } + state.cache_session_info_for_head(ctx, head).await; + // Err(e) => { + // gum::debug!( + // target: LOG_TARGET, + // ?head, + // ?e, + // "Could not cache session info when processing head.", + // ); + // return Ok(Vec::new()) + // }, + // Ok(Some(a @ SessionWindowUpdate::Advanced { .. })) => { + // gum::info!( + // target: LOG_TARGET, + // update = ?a, + // "Advanced session window for approvals", + // ); + // }, + // Ok(_) => {},s // If we've just started the node and are far behind, // import at most `MAX_HEADS_LOOK_BACK` blocks. @@ -407,7 +411,7 @@ pub(crate) async fn handle_new_head( let mut imported_blocks_and_info = Vec::with_capacity(new_blocks.len()); for (block_hash, block_header) in new_blocks.into_iter().rev() { let env = ImportedBlockInfoEnv { - session_window: &state.session_window, + runtime_info: &mut state.session_info, assignment_criteria: &*state.assignment_criteria, keystore: &state.keystore, }; @@ -461,11 +465,16 @@ pub(crate) async fn handle_new_head( force_approve, } = imported_block_info; - let session_info = state - .session_window - .as_ref() - .and_then(|s| s.session_info(session_index)) - .expect("imported_block_info requires session info to be available; qed"); + // todo: refactor this + let session_info = &state + .session_info + .as_mut() + .map(|s| &mut s.runtime_info) + .expect("imported_block_info requires session info to be available; qed") + .get_session_info_by_index(ctx.sender(), head, session_index) + .await + .map_err(|e| SubsystemError::FromOrigin { origin: "", source: e.into() })? + .session_info; let (block_tick, no_show_duration) = { let block_tick = slot_number_to_tick(state.slot_duration_millis, slot); @@ -782,7 +791,7 @@ pub(crate) mod tests { let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - session_window: &Some(session_window), + runtime_info: &Some(session_window), assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -888,7 +897,7 @@ pub(crate) mod tests { let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - session_window: &Some(session_window), + runtime_info: &Some(session_window), assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -987,7 +996,7 @@ pub(crate) mod tests { let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - session_window: &session_window, + runtime_info: &session_window, assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -1083,7 +1092,7 @@ pub(crate) mod tests { let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - session_window: &session_window, + runtime_info: &session_window, assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 6a7ae10bfa2a..9c6f71c78f16 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -27,7 +27,7 @@ use polkadot_node_primitives::{ approval::{ BlockApprovalMeta, DelayTranche, IndirectAssignmentCert, IndirectSignedApprovalVote, }, - ValidationResult, + ValidationResult, DISPUTE_WINDOW, }; use polkadot_node_subsystem::{ errors::RecoveryError, @@ -42,11 +42,10 @@ use polkadot_node_subsystem::{ SubsystemSender, }; use polkadot_node_subsystem_util::{ + self, database::Database, metrics::{self, prometheus}, - rolling_session_window::{ - DatabaseParams, RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable, - }, + runtime::{Config as RuntimeInfoConfig, RuntimeInfo}, TimeoutExt, }; use polkadot_primitives::{ @@ -638,70 +637,176 @@ impl CurrentlyCheckingSet { } } +// TODO: better name +struct SessionInfoProvider { + runtime_info: RuntimeInfo, + last_consecutive_cached_session: SessionIndex, + earliest_session: SessionIndex, +} + struct State { - session_window: Option, + session_info: Option, keystore: Arc, slot_duration_millis: u64, clock: Box, assignment_criteria: Box, - // Require for `RollingSessionWindow`. - db_config: DatabaseConfig, - db: Arc, spans: HashMap, } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] impl State { - fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> { - self.session_window.as_ref().and_then(|w| w.session_info(i)) + async fn session_info( + &mut self, + sender: &mut Sender, + relay_parent: Hash, + session_index: SessionIndex, + ) -> Option<&SessionInfo> + where + Sender: SubsystemSender, + { + match &mut self.session_info { + Some(session_info_provider) => match session_info_provider + .runtime_info + .get_session_info_by_index(sender, relay_parent, session_index) + .await + { + Ok(extended_info) => Some(&extended_info.session_info), + Err(_) => { + // todo log error + None + }, + }, + + None => None, + } } - /// Bring `session_window` up to date. - pub async fn cache_session_info_for_head( - &mut self, - ctx: &mut Context, - head: Hash, - ) -> Result, SessionsUnavailable> + /// If `head` is in a new session - cache it + pub async fn cache_session_info_for_head(&mut self, ctx: &mut Context, head: Hash) where ::Sender: Sized + Send, { - let session_window = self.session_window.take(); - match session_window { + match self.session_info.take() { None => { - let sender = ctx.sender().clone(); - self.session_window = Some( - RollingSessionWindow::new( - sender, - head, - DatabaseParams { - db: self.db.clone(), - db_column: self.db_config.col_session_data, + let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }); + + let head_session_idx = + match runtime_info.get_session_index_for_child(ctx.sender(), head).await { + Ok(session_idx) => session_idx, + Err(err) => { + gum::debug!( + target: LOG_TARGET, + ?head, + ?err, + "Error getting session index for head. Won't cache any sessions" + ); + return }, - ) - .await?, - ); - Ok(None) + }; + + let mut gap_in_cache = false; + let mut last_consecutive_cached_session = 0; + // TODO: fix this -> start of the windoow should be no less than the last finalized block + for idx in head_session_idx.saturating_sub(DISPUTE_WINDOW.get())..=head_session_idx + { + if let Err(err) = + runtime_info.get_session_info_by_index(ctx.sender(), head, idx).await + { + gap_in_cache = true; + gum::debug!( + target: LOG_TARGET, + ?err, + session = idx, + "Can cache session. Moving on." + ); + continue + } + + if !gap_in_cache { + last_consecutive_cached_session = idx; + } + } + + // TODO: fix this + let earliest_session = 0; + self.session_info = Some(SessionInfoProvider { + runtime_info, + last_consecutive_cached_session, + earliest_session, + }); }, - Some(mut session_window) => { - let r = session_window - .cache_session_info_for_head(ctx.sender(), head) + Some(mut session_info_provider) => { + let head_session_idx = match session_info_provider + .runtime_info + .get_session_index_for_child(ctx.sender(), head) .await - .map(Option::Some); - self.session_window = Some(session_window); - r + { + Ok(session_idx) => session_idx, + Err(err) => { + gum::debug!( + target: LOG_TARGET, + ?head, + ?err, + "Error getting session index for head. Won't cache any sessions" + ); + return + }, + }; + + let mut gap_in_cache = false; + if head_session_idx > session_info_provider.last_consecutive_cached_session { + for idx in + session_info_provider.last_consecutive_cached_session + 1..=head_session_idx + { + if let Err(err) = session_info_provider + .runtime_info + .get_session_info_by_index(ctx.sender(), head, idx) + .await + { + gap_in_cache = true; + gum::debug!( + target: LOG_TARGET, + ?err, + session = idx, + "Can cache session. Moving on." + ); + continue + } + + if !gap_in_cache { + session_info_provider.last_consecutive_cached_session = idx; + } + } + } + + // TODO: update `earliest_session` + + self.session_info = Some(session_info_provider); }, } } // Compute the required tranches for approval for this block and candidate combo. // Fails if there is no approval entry for the block under the candidate or no candidate entry // under the block, or if the session is out of bounds. - fn approval_status<'a, 'b>( - &'a self, + async fn approval_status( + &'a mut self, + sender: &mut Sender, block_entry: &'a BlockEntry, candidate_entry: &'b CandidateEntry, - ) -> Option<(&'b ApprovalEntry, ApprovalStatus)> { - let session_info = match self.session_info(block_entry.session()) { - Some(s) => s, + ) -> Option<(&'b ApprovalEntry, ApprovalStatus)> + where + Sender: SubsystemSender, + { + // We can't borrow the session here. Only get copies of what's needed. + let (no_show_slots, needed_approvals) = match self + .session_info(sender, block_entry.parent_hash(), block_entry.session()) + .await + { + Some(s) => (s.no_show_slots, s.needed_approvals), None => { gum::warn!( target: LOG_TARGET, @@ -715,10 +820,8 @@ impl State { let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot()); let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot()); - let no_show_duration = slot_number_to_tick( - self.slot_duration_millis, - Slot::from(u64::from(session_info.no_show_slots)), - ); + let no_show_duration = + slot_number_to_tick(self.slot_duration_millis, Slot::from(u64::from(no_show_slots))); if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) { let required_tranches = approval_checking::tranches_to_approve( @@ -727,7 +830,7 @@ impl State { tranche_now, block_tick, no_show_duration, - session_info.needed_approvals as _, + needed_approvals as _, ); let status = ApprovalStatus { required_tranches, block_tick, tranche_now }; @@ -779,13 +882,11 @@ where } let mut state = State { - session_window: None, + session_info: None, keystore: subsystem.keystore, slot_duration_millis: subsystem.slot_duration_millis, clock, assignment_criteria, - db_config: subsystem.db_config, - db: subsystem.db, spans: HashMap::new(), }; @@ -811,12 +912,13 @@ where (_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => { subsystem.metrics.on_wakeup(); process_wakeup( + &mut ctx, &mut state, &mut overlayed_db, woken_block, woken_candidate, &subsystem.metrics, - )? + ).await? } next_msg = ctx.recv().fuse() => { let mut actions = handle_from_overseer( @@ -1260,15 +1362,16 @@ async fn handle_from_overseer( FromOrchestra::Communication { msg } => match msg { ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_core, res) => { let (check_outcome, actions) = - check_and_import_assignment(state, db, a, claimed_core)?; + check_and_import_assignment(ctx.sender(), state, db, a, claimed_core).await?; let _ = res.send(check_outcome); actions }, ApprovalVotingMessage::CheckAndImportApproval(a, res) => - check_and_import_approval(state, db, metrics, a, |r| { + check_and_import_approval(ctx.sender(), state, db, metrics, a, |r| { let _ = res.send(r); - })? + }) + .await? .0, ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => { let mut approved_ancestor_span = state @@ -1738,12 +1841,16 @@ fn schedule_wakeup_action( maybe_action } -fn check_and_import_assignment( - state: &State, +async fn check_and_import_assignment( + sender: &mut Sender, + state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, assignment: IndirectAssignmentCert, candidate_index: CandidateIndex, -) -> SubsystemResult<(AssignmentCheckResult, Vec)> { +) -> SubsystemResult<(AssignmentCheckResult, Vec)> +where + Sender: SubsystemSender, +{ let tick_now = state.clock.tick_now(); let mut check_and_import_assignment_span = state @@ -1766,7 +1873,10 @@ fn check_and_import_assignment( )), }; - let session_info = match state.session_info(block_entry.session()) { + let session_info = match state + .session_info(sender, block_entry.parent_hash(), block_entry.session()) + .await + { Some(s) => s, None => return Ok(( @@ -1822,10 +1932,11 @@ fn check_and_import_assignment( )), }; + let config = &criteria::Config::from(session_info); let res = state.assignment_criteria.check_assignment_cert( claimed_core_index, assignment.validator, - &criteria::Config::from(session_info), + config, block_entry.relay_vrf_story(), &assignment.cert, approval_entry.backing_group(), @@ -1877,7 +1988,9 @@ fn check_and_import_assignment( let mut actions = Vec::new(); // We've imported a new approval, so we need to schedule a wake-up for when that might no-show. - if let Some((approval_entry, status)) = state.approval_status(&block_entry, &candidate_entry) { + if let Some((approval_entry, status)) = + state.approval_status(sender, &block_entry, &candidate_entry).await + { actions.extend(schedule_wakeup_action( approval_entry, block_entry.block_hash(), @@ -1895,13 +2008,17 @@ fn check_and_import_assignment( Ok((res, actions)) } -fn check_and_import_approval( - state: &State, +async fn check_and_import_approval( + sender: &mut Sender, + state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, metrics: &Metrics, approval: IndirectSignedApprovalVote, with_response: impl FnOnce(ApprovalCheckResult) -> T, -) -> SubsystemResult<(Vec, T)> { +) -> SubsystemResult<(Vec, T)> +where + Sender: SubsystemSender, +{ macro_rules! respond_early { ($e: expr) => {{ let t = with_response($e); @@ -1927,14 +2044,15 @@ fn check_and_import_approval( }, }; - let session_info = match state.session_info(block_entry.session()) { - Some(s) => s, - None => { - respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownSessionIndex( - block_entry.session() - ),)) - }, - }; + let session_info = + match state.session_info(sender, approval.block_hash, block_entry.session()).await { + Some(s) => s, + None => { + respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownSessionIndex( + block_entry.session() + ),)) + }, + }; let approved_candidate_hash = match block_entry.candidate(approval.candidate_index as usize) { Some((_, h)) => *h, @@ -2008,6 +2126,7 @@ fn check_and_import_approval( ); let actions = advance_approval_state( + sender, state, db, &metrics, @@ -2015,11 +2134,15 @@ fn check_and_import_approval( approved_candidate_hash, candidate_entry, ApprovalStateTransition::RemoteApproval(approval.validator), - ); + ) + .await; Ok((actions, t)) } +// TODO? +// fn get_session_info(..) {} + #[derive(Debug)] enum ApprovalStateTransition { RemoteApproval(ValidatorIndex), @@ -2048,15 +2171,19 @@ impl ApprovalStateTransition { // Advance the approval state, either by importing an approval vote which is already checked to be valid and corresponding to an assigned // validator on the candidate and block, or by noting that there are no further wakeups or tranches needed. This updates the block entry and candidate entry as // necessary and schedules any further wakeups. -fn advance_approval_state( - state: &State, +async fn advance_approval_state( + sender: &mut Sender, + state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, metrics: &Metrics, mut block_entry: BlockEntry, candidate_hash: CandidateHash, mut candidate_entry: CandidateEntry, transition: ApprovalStateTransition, -) -> Vec { +) -> Vec +where + Sender: SubsystemSender, +{ let validator_index = transition.validator_index(); let already_approved_by = validator_index.as_ref().map(|v| candidate_entry.mark_approval(*v)); @@ -2087,7 +2214,7 @@ fn advance_approval_state( let tick_now = state.clock.tick_now(); let (is_approved, status) = if let Some((approval_entry, status)) = - state.approval_status(&block_entry, &candidate_entry) + state.approval_status(sender, &block_entry, &candidate_entry).await { let check = approval_checking::check_approval( &candidate_entry, @@ -2217,8 +2344,10 @@ fn should_trigger_assignment( } } -fn process_wakeup( - state: &State, +#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] +async fn process_wakeup( + ctx: &mut Context, + state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, relay_block: Hash, candidate_hash: CandidateHash, @@ -2243,7 +2372,12 @@ fn process_wakeup( _ => return Ok(Vec::new()), }; - let session_info = match state.session_info(block_entry.session()) { + let slot_duration_millis = state.slot_duration_millis; + let tranche_now = state.clock.tranche_now(slot_duration_millis, block_entry.slot()); + let session_info = match state + .session_info(ctx.sender(), block_entry.parent_hash(), block_entry.session()) + .await + { Some(i) => i, None => { gum::warn!( @@ -2257,12 +2391,12 @@ fn process_wakeup( }, }; - let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot()); + let block_tick = slot_number_to_tick(slot_duration_millis, block_entry.slot()); let no_show_duration = slot_number_to_tick( - state.slot_duration_millis, + slot_duration_millis, Slot::from(u64::from(session_info.no_show_slots)), ); - let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot()); + span.add_uint_tag("tranche", tranche_now as u64); gum::trace!( target: LOG_TARGET, @@ -2353,15 +2487,19 @@ fn process_wakeup( // we need to check for that and advance the state on-disk. // // Note that this function also schedules a wakeup as necessary. - actions.extend(advance_approval_state( - state, - db, - metrics, - block_entry, - candidate_hash, - candidate_entry, - ApprovalStateTransition::WakeupProcessed, - )); + actions.extend( + advance_approval_state( + ctx.sender(), + state, + db, + metrics, + block_entry, + candidate_hash, + candidate_entry, + ApprovalStateTransition::WakeupProcessed, + ) + .await, + ); Ok(actions) } @@ -2612,7 +2750,10 @@ async fn issue_approval( }; issue_approval_span.add_int_tag("candidate_index", candidate_index as i64); - let session_info = match state.session_info(block_entry.session()) { + let session_info = match state + .session_info(ctx.sender(), block_entry.parent_hash(), block_entry.session()) + .await + { Some(s) => s, None => { gum::warn!( @@ -2658,7 +2799,7 @@ async fn issue_approval( }; let validator_pubkey = match session_info.validators.get(validator_index) { - Some(p) => p, + Some(p) => p.clone(), // todo: remove this None => { gum::warn!( target: LOG_TARGET, @@ -2697,6 +2838,7 @@ async fn issue_approval( ); let actions = advance_approval_state( + ctx.sender(), state, db, metrics, @@ -2704,7 +2846,8 @@ async fn issue_approval( candidate_hash, candidate_entry, ApprovalStateTransition::LocalApproval(validator_index as _, sig.clone()), - ); + ) + .await; metrics.on_approval_produced(); From b0da049a71e8a2fb13f7535e5a392c898f941afd Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Thu, 20 Apr 2023 14:00:49 +0300 Subject: [PATCH 02/24] Fix tests in import --- node/core/approval-voting/src/import.rs | 121 +++++++++++++++++++----- node/subsystem-util/src/runtime/mod.rs | 15 +++ 2 files changed, 112 insertions(+), 24 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 2be583fb4107..b7bcde4c5def 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -618,13 +618,19 @@ pub(crate) async fn handle_new_head( #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::approval_db::v1::DbBackend; + use crate::{approval_db::v1::DbBackend, RuntimeInfo, RuntimeInfoConfig}; use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use assert_matches::assert_matches; - use polkadot_node_primitives::approval::{VrfSignature, VrfTranscript}; + use polkadot_node_primitives::{ + approval::{VrfSignature, VrfTranscript}, + DISPUTE_WINDOW, + }; use polkadot_node_subsystem::messages::{AllMessages, ApprovalVotingMessage}; use polkadot_node_subsystem_test_helpers::make_subsystem_context; - use polkadot_node_subsystem_util::database::Database; + use polkadot_node_subsystem_util::{ + database::Database, + runtime::{ExtendedSessionInfo, ValidatorInfo}, + }; use polkadot_primitives::{Id as ParaId, IndexedVec, SessionInfo, ValidatorId, ValidatorIndex}; pub(crate) use sp_consensus_babe::{ digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest}, @@ -633,7 +639,7 @@ pub(crate) mod tests { use sp_core::{crypto::VrfSigner, testing::TaskExecutor}; use sp_keyring::sr25519::Keyring as Sr25519Keyring; pub(crate) use sp_runtime::{Digest, DigestItem}; - use std::{pin::Pin, sync::Arc}; + use std::{num::NonZeroUsize, pin::Pin, sync::Arc}; use crate::{approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry}; @@ -658,24 +664,40 @@ pub(crate) mod tests { } fn blank_state() -> State { - let db = kvdb_memorydb::create(NUM_COLUMNS); - let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]); - let db: Arc = Arc::new(db); State { - session_window: None, + session_info: None, keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: 6_000, clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria), - db, - db_config: TEST_CONFIG, spans: HashMap::new(), } } - fn single_session_state(index: SessionIndex, info: SessionInfo) -> State { + fn single_session_state(index: SessionIndex, info: SessionInfo, relay_parent: Hash) -> State { + let runtime_info = RuntimeInfo::new_with_cache( + RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }, + vec![( + index, + relay_parent, + ExtendedSessionInfo { + session_info: info, + validator_info: ValidatorInfo { our_group: None, our_index: None }, + }, + )], + ); + + //TODO: is it okay to use index for earliest_session and last_consecutive_cached_session? State { - session_window: Some(RollingSessionWindow::with_session_info(index, vec![info])), + session_info: Some(SessionInfoProvider { + earliest_session: index, + last_consecutive_cached_session: index, + runtime_info, + }), ..blank_state() } } @@ -785,13 +807,30 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let session_window = - RollingSessionWindow::with_session_info(session, vec![session_info]); + let session_info_provider = SessionInfoProvider { + runtime_info: RuntimeInfo::new_with_cache( + RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }, + vec![( + session, + hash, + ExtendedSessionInfo { + session_info, + validator_info: ValidatorInfo { our_index: None, our_group: None }, + }, + )], + ), + last_consecutive_cached_session: session, //TODO: ?? + earliest_session: session, //TODO: ?? + }; let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - runtime_info: &Some(session_window), + runtime_info: &mut Some(session_info_provider), assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -891,13 +930,30 @@ pub(crate) mod tests { .collect::>(); let test_fut = { - let session_window = - RollingSessionWindow::with_session_info(session, vec![session_info]); + let session_info_provider = SessionInfoProvider { + runtime_info: RuntimeInfo::new_with_cache( + RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }, + vec![( + session, + hash, + ExtendedSessionInfo { + session_info, + validator_info: ValidatorInfo { our_index: None, our_group: None }, + }, + )], + ), + last_consecutive_cached_session: session, //TODO: ?? + earliest_session: session, //TODO: ?? + }; let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - runtime_info: &Some(session_window), + runtime_info: &mut Some(session_info_provider), assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -991,12 +1047,12 @@ pub(crate) mod tests { .collect::>(); let test_fut = { - let session_window = None; + let mut runtime_info = None; let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - runtime_info: &session_window, + runtime_info: &mut runtime_info, assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -1086,13 +1142,30 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let session_window = - Some(RollingSessionWindow::with_session_info(session, vec![session_info])); + let session_info_provider = SessionInfoProvider { + runtime_info: RuntimeInfo::new_with_cache( + RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }, + vec![( + session, + hash, + ExtendedSessionInfo { + session_info, + validator_info: ValidatorInfo { our_index: None, our_group: None }, + }, + )], + ), + last_consecutive_cached_session: session, //TODO: ?? + earliest_session: session, //TODO: ?? + }; let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - runtime_info: &session_window, + runtime_info: &mut Some(session_info_provider), assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -1229,7 +1302,7 @@ pub(crate) mod tests { .map(|(r, c, g)| CandidateEvent::CandidateIncluded(r, Vec::new().into(), c, g)) .collect::>(); - let mut state = single_session_state(session, session_info); + let mut state = single_session_state(session, session_info, parent_hash); overlay_db.write_block_entry( v1::BlockEntry { block_hash: parent_hash, diff --git a/node/subsystem-util/src/runtime/mod.rs b/node/subsystem-util/src/runtime/mod.rs index 6e06b99bbe03..87122d024fd2 100644 --- a/node/subsystem-util/src/runtime/mod.rs +++ b/node/subsystem-util/src/runtime/mod.rs @@ -118,6 +118,21 @@ impl RuntimeInfo { } } + /// Create an instance with pre-populated cache. Used only for testing + // TODO: why doesn't compile + // #[cfg(test)] + pub fn new_with_cache( + cfg: Config, + data: Vec<(SessionIndex, Hash, ExtendedSessionInfo)>, + ) -> Self { + let mut r = Self::new_with_config(cfg); + for (idx, parent, session) in data { + r.session_index_cache.put(parent, idx); + r.session_info_cache.put(idx, session); + } + r + } + /// Returns the session index expected at any child of the `parent` block. /// This does not return the session index for the `parent` block. pub async fn get_session_index_for_child( From 93e6863364b657249b8c644049e79d76dd77b39e Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Thu, 20 Apr 2023 14:43:19 +0300 Subject: [PATCH 03/24] Fix the rest of the tests --- node/core/approval-voting/src/tests.rs | 31 -------------------------- 1 file changed, 31 deletions(-) diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index 9ec0c89d4baa..059ab09bc705 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -815,37 +815,6 @@ async fn import_block( ); if !fork { - assert_matches!( - overseer_recv(overseer).await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(number)); - } - ); - - assert_matches!( - overseer_recv(overseer).await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - block_number, - s_tx, - )) => { - assert_eq!(block_number, number); - let _ = s_tx.send(Ok(Some(hashes[number as usize].0))); - } - ); - - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, hashes[number as usize].0); - let _ = s_tx.send(Ok(number.into())); - } - ); - assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi( From a7b74ccbfd520146765e20cca9641f7e4ae8e213 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Thu, 20 Apr 2023 21:59:48 +0300 Subject: [PATCH 04/24] Remove dead code --- node/core/approval-voting/src/import.rs | 17 ----------------- node/subsystem-util/src/runtime/mod.rs | 2 -- 2 files changed, 19 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index b7bcde4c5def..0fbd2f7c5b06 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -366,23 +366,6 @@ pub(crate) async fn handle_new_head( // Update session info based on most recent head. state.cache_session_info_for_head(ctx, head).await; - // Err(e) => { - // gum::debug!( - // target: LOG_TARGET, - // ?head, - // ?e, - // "Could not cache session info when processing head.", - // ); - // return Ok(Vec::new()) - // }, - // Ok(Some(a @ SessionWindowUpdate::Advanced { .. })) => { - // gum::info!( - // target: LOG_TARGET, - // update = ?a, - // "Advanced session window for approvals", - // ); - // }, - // Ok(_) => {},s // If we've just started the node and are far behind, // import at most `MAX_HEADS_LOOK_BACK` blocks. diff --git a/node/subsystem-util/src/runtime/mod.rs b/node/subsystem-util/src/runtime/mod.rs index 87122d024fd2..468ba94449e7 100644 --- a/node/subsystem-util/src/runtime/mod.rs +++ b/node/subsystem-util/src/runtime/mod.rs @@ -119,8 +119,6 @@ impl RuntimeInfo { } /// Create an instance with pre-populated cache. Used only for testing - // TODO: why doesn't compile - // #[cfg(test)] pub fn new_with_cache( cfg: Config, data: Vec<(SessionIndex, Hash, ExtendedSessionInfo)>, From 0f84b42bb460f96957cef7d30c3a675382be30df Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 21 Apr 2023 09:56:03 +0300 Subject: [PATCH 05/24] Fix todos --- node/core/approval-voting/src/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 9c6f71c78f16..b3deb7d54677 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -637,7 +637,6 @@ impl CurrentlyCheckingSet { } } -// TODO: better name struct SessionInfoProvider { runtime_info: RuntimeInfo, last_consecutive_cached_session: SessionIndex, @@ -672,7 +671,12 @@ impl State { { Ok(extended_info) => Some(&extended_info.session_info), Err(_) => { - // todo log error + gum::error!( + target: LOG_TARGET, + session: session_index, + ?relay_parent, + "Can't get SessionInfo" + ); None }, }, From 0e486d487ab5731f8c7ff3a11b3c985927a57f34 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 21 Apr 2023 11:30:02 +0300 Subject: [PATCH 06/24] Simplify session caching --- node/core/approval-voting/src/import.rs | 19 ++++---- node/core/approval-voting/src/lib.rs | 64 ++++++++++++++----------- 2 files changed, 44 insertions(+), 39 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 0fbd2f7c5b06..870cc52309e2 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -158,7 +158,7 @@ async fn imported_block_info( return Err(ImportedBlockInfoError::FutureCancelled("SessionIndexForChild", error)), }; - if env.runtime_info.as_ref().map_or(true, |s| session_index < s.earliest_session) { + if env.runtime_info.as_ref().map_or(true, |s| session_index < s.earliest_session()) { gum::debug!( target: LOG_TARGET, "Block {} is from ancient session {}. Skipping", @@ -674,11 +674,10 @@ pub(crate) mod tests { )], ); - //TODO: is it okay to use index for earliest_session and last_consecutive_cached_session? State { session_info: Some(SessionInfoProvider { - earliest_session: index, - last_consecutive_cached_session: index, + highest_session_seen: index, + gaps_in_cache: false, runtime_info, }), ..blank_state() @@ -806,8 +805,8 @@ pub(crate) mod tests { }, )], ), - last_consecutive_cached_session: session, //TODO: ?? - earliest_session: session, //TODO: ?? + gaps_in_cache: false, + highest_session_seen: session, }; let header = header.clone(); @@ -929,8 +928,8 @@ pub(crate) mod tests { }, )], ), - last_consecutive_cached_session: session, //TODO: ?? - earliest_session: session, //TODO: ?? + gaps_in_cache: false, + highest_session_seen: session, }; let header = header.clone(); @@ -1141,8 +1140,8 @@ pub(crate) mod tests { }, )], ), - last_consecutive_cached_session: session, //TODO: ?? - earliest_session: session, //TODO: ?? + gaps_in_cache: false, + highest_session_seen: session, }; let header = header.clone(); diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index b3deb7d54677..fd76e2063cdb 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -639,11 +639,18 @@ impl CurrentlyCheckingSet { struct SessionInfoProvider { runtime_info: RuntimeInfo, - last_consecutive_cached_session: SessionIndex, - earliest_session: SessionIndex, + gaps_in_cache: bool, + highest_session_seen: SessionIndex, +} + +impl SessionInfoProvider { + fn earliest_session(&self) -> SessionIndex { + self.highest_session_seen.saturating_sub(DISPUTE_WINDOW.get() - 1) + } } struct State { + // `None` on start-up. Gets initialized/updated on leaf update session_info: Option, keystore: Arc, slot_duration_millis: u64, @@ -673,7 +680,7 @@ impl State { Err(_) => { gum::error!( target: LOG_TARGET, - session: session_index, + session = session_index, ?relay_parent, "Can't get SessionInfo" ); @@ -712,35 +719,28 @@ impl State { }, }; - let mut gap_in_cache = false; - let mut last_consecutive_cached_session = 0; - // TODO: fix this -> start of the windoow should be no less than the last finalized block - for idx in head_session_idx.saturating_sub(DISPUTE_WINDOW.get())..=head_session_idx + let mut gaps_in_cache = false; + for idx in + head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)..=head_session_idx { if let Err(err) = runtime_info.get_session_info_by_index(ctx.sender(), head, idx).await { - gap_in_cache = true; + gaps_in_cache = true; gum::debug!( target: LOG_TARGET, ?err, session = idx, - "Can cache session. Moving on." + "Can't cache session. Moving on." ); continue } - - if !gap_in_cache { - last_consecutive_cached_session = idx; - } } - // TODO: fix this - let earliest_session = 0; self.session_info = Some(SessionInfoProvider { runtime_info, - last_consecutive_cached_session, - earliest_session, + gaps_in_cache, + highest_session_seen: head_session_idx, }); }, Some(mut session_info_provider) => { @@ -761,17 +761,23 @@ impl State { }, }; - let mut gap_in_cache = false; - if head_session_idx > session_info_provider.last_consecutive_cached_session { - for idx in - session_info_provider.last_consecutive_cached_session + 1..=head_session_idx - { + let mut gaps_in_cache = false; + if session_info_provider.gaps_in_cache || + head_session_idx > session_info_provider.highest_session_seen + { + let lower_bound = if session_info_provider.gaps_in_cache { + head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1) + } else { + session_info_provider.highest_session_seen + 1 + }; + + for idx in lower_bound..=head_session_idx { if let Err(err) = session_info_provider .runtime_info .get_session_info_by_index(ctx.sender(), head, idx) .await { - gap_in_cache = true; + gaps_in_cache = true; gum::debug!( target: LOG_TARGET, ?err, @@ -780,14 +786,14 @@ impl State { ); continue } - - if !gap_in_cache { - session_info_provider.last_consecutive_cached_session = idx; - } } - } - // TODO: update `earliest_session` + session_info_provider = SessionInfoProvider { + runtime_info: session_info_provider.runtime_info, + gaps_in_cache, + highest_session_seen: head_session_idx, + }; + } self.session_info = Some(session_info_provider); }, From f6a9d98539fbf3b127a8247efb6316d48fafdbb5 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 21 Apr 2023 11:52:45 +0300 Subject: [PATCH 07/24] Comments for `SessionInfoProvider` --- node/core/approval-voting/src/lib.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index fd76e2063cdb..ce4c9e44033b 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -637,9 +637,14 @@ impl CurrentlyCheckingSet { } } +// Wraps `RuntimeInfo` and some metadata. On each new leaf `SessionInfo` is +// cached. `RuntimeInfo` keeps the last `DISPUTE_WINDOW` number of sessions. struct SessionInfoProvider { + // `RuntimeInfo` caches sessions internally. runtime_info: RuntimeInfo, + // Will be true if an error had occurred during the last session caching attempt gaps_in_cache: bool, + // Highest session index seen so far. Also used to calculate the earliest one. highest_session_seen: SessionIndex, } From 2918b42b69376a6af3139e865b055f2db2493595 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 21 Apr 2023 16:26:26 +0300 Subject: [PATCH 08/24] Separate `SessionInfoProvider` from `State` --- node/core/approval-voting/src/import.rs | 45 +++-- node/core/approval-voting/src/lib.rs | 208 ++++++++++++++++-------- 2 files changed, 165 insertions(+), 88 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 870cc52309e2..4b894824e647 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -330,6 +330,7 @@ pub(crate) async fn handle_new_head( ctx: &mut Context, state: &mut State, db: &mut OverlayedBackend<'_, B>, + session_info_provider: &mut Option, head: Hash, finalized_number: &Option, ) -> SubsystemResult> { @@ -365,7 +366,7 @@ pub(crate) async fn handle_new_head( }; // Update session info based on most recent head. - state.cache_session_info_for_head(ctx, head).await; + state.cache_session_info_for_head(ctx, head, session_info_provider).await; // If we've just started the node and are far behind, // import at most `MAX_HEADS_LOOK_BACK` blocks. @@ -394,7 +395,7 @@ pub(crate) async fn handle_new_head( let mut imported_blocks_and_info = Vec::with_capacity(new_blocks.len()); for (block_hash, block_header) in new_blocks.into_iter().rev() { let env = ImportedBlockInfoEnv { - runtime_info: &mut state.session_info, + runtime_info: session_info_provider, assignment_criteria: &*state.assignment_criteria, keystore: &state.keystore, }; @@ -449,8 +450,7 @@ pub(crate) async fn handle_new_head( } = imported_block_info; // todo: refactor this - let session_info = &state - .session_info + let session_info = &session_info_provider .as_mut() .map(|s| &mut s.runtime_info) .expect("imported_block_info requires session info to be available; qed") @@ -648,7 +648,6 @@ pub(crate) mod tests { fn blank_state() -> State { State { - session_info: None, keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: 6_000, clock: Box::new(MockClock::default()), @@ -657,7 +656,11 @@ pub(crate) mod tests { } } - fn single_session_state(index: SessionIndex, info: SessionInfo, relay_parent: Hash) -> State { + fn single_session_state( + index: SessionIndex, + info: SessionInfo, + relay_parent: Hash, + ) -> (State, SessionInfoProvider) { let runtime_info = RuntimeInfo::new_with_cache( RuntimeInfoConfig { keystore: None, @@ -674,14 +677,12 @@ pub(crate) mod tests { )], ); - State { - session_info: Some(SessionInfoProvider { - highest_session_seen: index, - gaps_in_cache: false, - runtime_info, - }), - ..blank_state() - } + let state = blank_state(); + + let session_info_provider = + SessionInfoProvider { runtime_info, gaps_in_cache: false, highest_session_seen: index }; + + (state, session_info_provider) } struct MockAssignmentCriteria; @@ -1284,7 +1285,8 @@ pub(crate) mod tests { .map(|(r, c, g)| CandidateEvent::CandidateIncluded(r, Vec::new().into(), c, g)) .collect::>(); - let mut state = single_session_state(session, session_info, parent_hash); + let (mut state, session_info_provider) = + single_session_state(session, session_info, parent_hash); overlay_db.write_block_entry( v1::BlockEntry { block_hash: parent_hash, @@ -1306,9 +1308,16 @@ pub(crate) mod tests { let test_fut = { Box::pin(async move { let mut overlay_db = OverlayedBackend::new(&db); - let result = handle_new_head(&mut ctx, &mut state, &mut overlay_db, hash, &Some(1)) - .await - .unwrap(); + let result = handle_new_head( + &mut ctx, + &mut state, + &mut overlay_db, + &mut Some(session_info_provider), + hash, + &Some(1), + ) + .await + .unwrap(); let write_ops = overlay_db.into_write_ops(); db.write(write_ops).unwrap(); diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index ce4c9e44033b..1d22b5e97efd 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -655,8 +655,6 @@ impl SessionInfoProvider { } struct State { - // `None` on start-up. Gets initialized/updated on leaf update - session_info: Option, keystore: Arc, slot_duration_millis: u64, clock: Box, @@ -666,43 +664,16 @@ struct State { #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] impl State { - async fn session_info( - &mut self, - sender: &mut Sender, - relay_parent: Hash, - session_index: SessionIndex, - ) -> Option<&SessionInfo> - where - Sender: SubsystemSender, - { - match &mut self.session_info { - Some(session_info_provider) => match session_info_provider - .runtime_info - .get_session_info_by_index(sender, relay_parent, session_index) - .await - { - Ok(extended_info) => Some(&extended_info.session_info), - Err(_) => { - gum::error!( - target: LOG_TARGET, - session = session_index, - ?relay_parent, - "Can't get SessionInfo" - ); - None - }, - }, - - None => None, - } - } - /// If `head` is in a new session - cache it - pub async fn cache_session_info_for_head(&mut self, ctx: &mut Context, head: Hash) - where + pub async fn cache_session_info_for_head( + &mut self, + ctx: &mut Context, + head: Hash, + session_info: &mut Option, + ) where ::Sender: Sized + Send, { - match self.session_info.take() { + match session_info.take() { None => { let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, @@ -742,7 +713,7 @@ impl State { } } - self.session_info = Some(SessionInfoProvider { + *session_info = Some(SessionInfoProvider { runtime_info, gaps_in_cache, highest_session_seen: head_session_idx, @@ -800,16 +771,18 @@ impl State { }; } - self.session_info = Some(session_info_provider); + *session_info = Some(session_info_provider); }, } } + // Compute the required tranches for approval for this block and candidate combo. // Fails if there is no approval entry for the block under the candidate or no candidate entry // under the block, or if the session is out of bounds. async fn approval_status( &'a mut self, sender: &mut Sender, + session_info_provider: &'a mut Option, block_entry: &'a BlockEntry, candidate_entry: &'b CandidateEntry, ) -> Option<(&'b ApprovalEntry, ApprovalStatus)> @@ -817,9 +790,13 @@ impl State { Sender: SubsystemSender, { // We can't borrow the session here. Only get copies of what's needed. - let (no_show_slots, needed_approvals) = match self - .session_info(sender, block_entry.parent_hash(), block_entry.session()) - .await + let (no_show_slots, needed_approvals) = match session_info( + session_info_provider, + sender, + block_entry.parent_hash(), + block_entry.session(), + ) + .await { Some(s) => (s.no_show_slots, s.needed_approvals), None => { @@ -897,7 +874,6 @@ where } let mut state = State { - session_info: None, keystore: subsystem.keystore, slot_duration_millis: subsystem.slot_duration_millis, clock, @@ -905,6 +881,9 @@ where spans: HashMap::new(), }; + // `None` on start-up. Gets initialized/updated on leaf update + let mut session_info_provider: Option = None; + let mut wakeups = Wakeups::default(); let mut currently_checking_set = CurrentlyCheckingSet::default(); let mut approvals_cache = lru::LruCache::new(APPROVAL_CACHE_SIZE); @@ -930,6 +909,7 @@ where &mut ctx, &mut state, &mut overlayed_db, + &mut session_info_provider, woken_block, woken_candidate, &subsystem.metrics, @@ -940,6 +920,7 @@ where &mut ctx, &mut state, &mut overlayed_db, + &mut session_info_provider, &subsystem.metrics, next_msg?, &mut last_finalized_height, @@ -990,6 +971,7 @@ where &mut ctx, &mut state, &mut overlayed_db, + &mut session_info_provider, &subsystem.metrics, &mut wakeups, &mut currently_checking_set, @@ -1036,6 +1018,7 @@ async fn handle_actions( ctx: &mut Context, state: &mut State, overlayed_db: &mut OverlayedBackend<'_, impl Backend>, + session_info_provider: &mut Option, metrics: &Metrics, wakeups: &mut Wakeups, currently_checking_set: &mut CurrentlyCheckingSet, @@ -1066,6 +1049,7 @@ async fn handle_actions( ctx, state, overlayed_db, + session_info_provider, metrics, candidate_hash, approval_request, @@ -1293,6 +1277,7 @@ async fn handle_from_overseer( ctx: &mut Context, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, + session_info_provider: &mut Option, metrics: &Metrics, x: FromOrchestra, last_finalized_height: &mut Option, @@ -1306,7 +1291,16 @@ async fn handle_from_overseer( let approval_voting_span = jaeger::PerLeafSpan::new(activated.span, "approval-voting"); state.spans.insert(head, approval_voting_span); - match import::handle_new_head(ctx, state, db, head, &*last_finalized_height).await { + match import::handle_new_head( + ctx, + state, + db, + session_info_provider, + head, + &*last_finalized_height, + ) + .await + { Err(e) => return Err(SubsystemError::with_origin("db", e)), Ok(block_imported_candidates) => { // Schedule wakeups for all imported candidates. @@ -1376,16 +1370,31 @@ async fn handle_from_overseer( }, FromOrchestra::Communication { msg } => match msg { ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_core, res) => { - let (check_outcome, actions) = - check_and_import_assignment(ctx.sender(), state, db, a, claimed_core).await?; + let (check_outcome, actions) = check_and_import_assignment( + ctx.sender(), + state, + db, + session_info_provider, + a, + claimed_core, + ) + .await?; let _ = res.send(check_outcome); actions }, ApprovalVotingMessage::CheckAndImportApproval(a, res) => - check_and_import_approval(ctx.sender(), state, db, metrics, a, |r| { - let _ = res.send(r); - }) + check_and_import_approval( + ctx.sender(), + state, + db, + session_info_provider, + metrics, + a, + |r| { + let _ = res.send(r); + }, + ) .await? .0, ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => { @@ -1860,6 +1869,7 @@ async fn check_and_import_assignment( sender: &mut Sender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, + session_info_provider: &mut Option, assignment: IndirectAssignmentCert, candidate_index: CandidateIndex, ) -> SubsystemResult<(AssignmentCheckResult, Vec)> @@ -1888,9 +1898,13 @@ where )), }; - let session_info = match state - .session_info(sender, block_entry.parent_hash(), block_entry.session()) - .await + let session_info = match session_info( + session_info_provider, + sender, + block_entry.parent_hash(), + block_entry.session(), + ) + .await { Some(s) => s, None => @@ -2003,8 +2017,9 @@ where let mut actions = Vec::new(); // We've imported a new approval, so we need to schedule a wake-up for when that might no-show. - if let Some((approval_entry, status)) = - state.approval_status(sender, &block_entry, &candidate_entry).await + if let Some((approval_entry, status)) = state + .approval_status(sender, session_info_provider, &block_entry, &candidate_entry) + .await { actions.extend(schedule_wakeup_action( approval_entry, @@ -2027,6 +2042,7 @@ async fn check_and_import_approval( sender: &mut Sender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, + session_info_provider: &mut Option, metrics: &Metrics, approval: IndirectSignedApprovalVote, with_response: impl FnOnce(ApprovalCheckResult) -> T, @@ -2059,15 +2075,21 @@ where }, }; - let session_info = - match state.session_info(sender, approval.block_hash, block_entry.session()).await { - Some(s) => s, - None => { - respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownSessionIndex( - block_entry.session() - ),)) - }, - }; + let session_info = match session_info( + session_info_provider, + sender, + approval.block_hash, + block_entry.session(), + ) + .await + { + Some(s) => s, + None => { + respond_early!(ApprovalCheckResult::Bad(ApprovalCheckError::UnknownSessionIndex( + block_entry.session() + ),)) + }, + }; let approved_candidate_hash = match block_entry.candidate(approval.candidate_index as usize) { Some((_, h)) => *h, @@ -2144,6 +2166,7 @@ where sender, state, db, + session_info_provider, &metrics, block_entry, approved_candidate_hash, @@ -2190,6 +2213,7 @@ async fn advance_approval_state( sender: &mut Sender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, + session_info_provider: &mut Option, metrics: &Metrics, mut block_entry: BlockEntry, candidate_hash: CandidateHash, @@ -2228,8 +2252,9 @@ where let tick_now = state.clock.tick_now(); - let (is_approved, status) = if let Some((approval_entry, status)) = - state.approval_status(sender, &block_entry, &candidate_entry).await + let (is_approved, status) = if let Some((approval_entry, status)) = state + .approval_status(sender, session_info_provider, &block_entry, &candidate_entry) + .await { let check = approval_checking::check_approval( &candidate_entry, @@ -2364,6 +2389,7 @@ async fn process_wakeup( ctx: &mut Context, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, + session_info_provider: &mut Option, relay_block: Hash, candidate_hash: CandidateHash, metrics: &Metrics, @@ -2389,9 +2415,13 @@ async fn process_wakeup( let slot_duration_millis = state.slot_duration_millis; let tranche_now = state.clock.tranche_now(slot_duration_millis, block_entry.slot()); - let session_info = match state - .session_info(ctx.sender(), block_entry.parent_hash(), block_entry.session()) - .await + let session_info = match session_info( + session_info_provider, + ctx.sender(), + block_entry.parent_hash(), + block_entry.session(), + ) + .await { Some(i) => i, None => { @@ -2507,6 +2537,7 @@ async fn process_wakeup( ctx.sender(), state, db, + session_info_provider, metrics, block_entry, candidate_hash, @@ -2724,6 +2755,7 @@ async fn issue_approval( ctx: &mut Context, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, + session_info_provider: &mut Option, metrics: &Metrics, candidate_hash: CandidateHash, ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest, @@ -2765,9 +2797,13 @@ async fn issue_approval( }; issue_approval_span.add_int_tag("candidate_index", candidate_index as i64); - let session_info = match state - .session_info(ctx.sender(), block_entry.parent_hash(), block_entry.session()) - .await + let session_info = match session_info( + session_info_provider, + ctx.sender(), + block_entry.parent_hash(), + block_entry.session(), + ) + .await { Some(s) => s, None => { @@ -2856,6 +2892,7 @@ async fn issue_approval( ctx.sender(), state, db, + session_info_provider, metrics, block_entry, candidate_hash, @@ -2918,3 +2955,34 @@ fn issue_local_invalid_statement( false, )); } + +async fn session_info<'a, Sender>( + session_info_provider: &'a mut Option, + sender: &mut Sender, + relay_parent: Hash, + session_index: SessionIndex, +) -> Option<&'a SessionInfo> +where + Sender: SubsystemSender, +{ + match session_info_provider { + Some(session_info_provider) => match session_info_provider + .runtime_info + .get_session_info_by_index(sender, relay_parent, session_index) + .await + { + Ok(extended_info) => Some(&extended_info.session_info), + Err(_) => { + gum::error!( + target: LOG_TARGET, + session = session_index, + ?relay_parent, + "Can't get SessionInfo" + ); + None + }, + }, + + None => None, + } +} From 837f952b84110a09aa36f2ad0295d88cab441536 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 21 Apr 2023 16:42:44 +0300 Subject: [PATCH 09/24] `cache_session_info_for_head` becomes freestanding function --- node/core/approval-voting/src/import.rs | 3 +- node/core/approval-voting/src/lib.rs | 220 ++++++++++++------------ 2 files changed, 110 insertions(+), 113 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 4b894824e647..73e58430392b 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -56,6 +56,7 @@ use std::collections::HashMap; use super::approval_db::v1; use crate::{ backend::{Backend, OverlayedBackend}, + cache_session_info_for_head, criteria::{AssignmentCriteria, OurAssignment}, persisted_entries::CandidateEntry, time::{slot_number_to_tick, Tick}, @@ -366,7 +367,7 @@ pub(crate) async fn handle_new_head( }; // Update session info based on most recent head. - state.cache_session_info_for_head(ctx, head, session_info_provider).await; + cache_session_info_for_head(ctx.sender(), head, session_info_provider).await; // If we've just started the node and are far behind, // import at most `MAX_HEADS_LOOK_BACK` blocks. diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 1d22b5e97efd..70efd2b45389 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -664,118 +664,6 @@ struct State { #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] impl State { - /// If `head` is in a new session - cache it - pub async fn cache_session_info_for_head( - &mut self, - ctx: &mut Context, - head: Hash, - session_info: &mut Option, - ) where - ::Sender: Sized + Send, - { - match session_info.take() { - None => { - let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), - }); - - let head_session_idx = - match runtime_info.get_session_index_for_child(ctx.sender(), head).await { - Ok(session_idx) => session_idx, - Err(err) => { - gum::debug!( - target: LOG_TARGET, - ?head, - ?err, - "Error getting session index for head. Won't cache any sessions" - ); - return - }, - }; - - let mut gaps_in_cache = false; - for idx in - head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)..=head_session_idx - { - if let Err(err) = - runtime_info.get_session_info_by_index(ctx.sender(), head, idx).await - { - gaps_in_cache = true; - gum::debug!( - target: LOG_TARGET, - ?err, - session = idx, - "Can't cache session. Moving on." - ); - continue - } - } - - *session_info = Some(SessionInfoProvider { - runtime_info, - gaps_in_cache, - highest_session_seen: head_session_idx, - }); - }, - Some(mut session_info_provider) => { - let head_session_idx = match session_info_provider - .runtime_info - .get_session_index_for_child(ctx.sender(), head) - .await - { - Ok(session_idx) => session_idx, - Err(err) => { - gum::debug!( - target: LOG_TARGET, - ?head, - ?err, - "Error getting session index for head. Won't cache any sessions" - ); - return - }, - }; - - let mut gaps_in_cache = false; - if session_info_provider.gaps_in_cache || - head_session_idx > session_info_provider.highest_session_seen - { - let lower_bound = if session_info_provider.gaps_in_cache { - head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1) - } else { - session_info_provider.highest_session_seen + 1 - }; - - for idx in lower_bound..=head_session_idx { - if let Err(err) = session_info_provider - .runtime_info - .get_session_info_by_index(ctx.sender(), head, idx) - .await - { - gaps_in_cache = true; - gum::debug!( - target: LOG_TARGET, - ?err, - session = idx, - "Can cache session. Moving on." - ); - continue - } - } - - session_info_provider = SessionInfoProvider { - runtime_info: session_info_provider.runtime_info, - gaps_in_cache, - highest_session_seen: head_session_idx, - }; - } - - *session_info = Some(session_info_provider); - }, - } - } - // Compute the required tranches for approval for this block and candidate combo. // Fails if there is no approval entry for the block under the candidate or no candidate entry // under the block, or if the session is out of bounds. @@ -2986,3 +2874,111 @@ where None => None, } } + +/// If `head` is in a new session - cache it +async fn cache_session_info_for_head( + sender: &mut Sender, + head: Hash, + session_info: &mut Option, +) where + Sender: SubsystemSender, +{ + match session_info.take() { + None => { + let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }); + + let head_session_idx = + match runtime_info.get_session_index_for_child(sender, head).await { + Ok(session_idx) => session_idx, + Err(err) => { + gum::debug!( + target: LOG_TARGET, + ?head, + ?err, + "Error getting session index for head. Won't cache any sessions" + ); + return + }, + }; + + let mut gaps_in_cache = false; + for idx in head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)..=head_session_idx + { + if let Err(err) = runtime_info.get_session_info_by_index(sender, head, idx).await { + gaps_in_cache = true; + gum::debug!( + target: LOG_TARGET, + ?err, + session = idx, + "Can't cache session. Moving on." + ); + continue + } + } + + *session_info = Some(SessionInfoProvider { + runtime_info, + gaps_in_cache, + highest_session_seen: head_session_idx, + }); + }, + Some(mut session_info_provider) => { + let head_session_idx = match session_info_provider + .runtime_info + .get_session_index_for_child(sender, head) + .await + { + Ok(session_idx) => session_idx, + Err(err) => { + gum::debug!( + target: LOG_TARGET, + ?head, + ?err, + "Error getting session index for head. Won't cache any sessions" + ); + return + }, + }; + + let mut gaps_in_cache = false; + if session_info_provider.gaps_in_cache || + head_session_idx > session_info_provider.highest_session_seen + { + let lower_bound = if session_info_provider.gaps_in_cache { + head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1) + } else { + session_info_provider.highest_session_seen + 1 + }; + + for idx in lower_bound..=head_session_idx { + if let Err(err) = session_info_provider + .runtime_info + .get_session_info_by_index(sender, head, idx) + .await + { + gaps_in_cache = true; + gum::debug!( + target: LOG_TARGET, + ?err, + session = idx, + "Can cache session. Moving on." + ); + continue + } + } + + session_info_provider = SessionInfoProvider { + runtime_info: session_info_provider.runtime_info, + gaps_in_cache, + highest_session_seen: head_session_idx, + }; + } + + *session_info = Some(session_info_provider); + }, + } +} From 6f609f3eb52619ed7ec51e3c4ce6c92b76b370c9 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 21 Apr 2023 16:49:20 +0300 Subject: [PATCH 10/24] Remove unneeded `mut` usage --- node/core/approval-voting/src/import.rs | 6 +++--- node/core/approval-voting/src/lib.rs | 20 ++++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 73e58430392b..540375a6c028 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -329,7 +329,7 @@ pub struct BlockImportedCandidates { #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] pub(crate) async fn handle_new_head( ctx: &mut Context, - state: &mut State, + state: &State, db: &mut OverlayedBackend<'_, B>, session_info_provider: &mut Option, head: Hash, @@ -1286,7 +1286,7 @@ pub(crate) mod tests { .map(|(r, c, g)| CandidateEvent::CandidateIncluded(r, Vec::new().into(), c, g)) .collect::>(); - let (mut state, session_info_provider) = + let (state, session_info_provider) = single_session_state(session, session_info, parent_hash); overlay_db.write_block_entry( v1::BlockEntry { @@ -1311,7 +1311,7 @@ pub(crate) mod tests { let mut overlay_db = OverlayedBackend::new(&db); let result = handle_new_head( &mut ctx, - &mut state, + &state, &mut overlay_db, &mut Some(session_info_provider), hash, diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 70efd2b45389..19e9c23f443a 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -668,7 +668,7 @@ impl State { // Fails if there is no approval entry for the block under the candidate or no candidate entry // under the block, or if the session is out of bounds. async fn approval_status( - &'a mut self, + &'a self, sender: &mut Sender, session_info_provider: &'a mut Option, block_entry: &'a BlockEntry, @@ -795,7 +795,7 @@ where subsystem.metrics.on_wakeup(); process_wakeup( &mut ctx, - &mut state, + &state, &mut overlayed_db, &mut session_info_provider, woken_block, @@ -857,7 +857,7 @@ where if handle_actions( &mut ctx, - &mut state, + &state, &mut overlayed_db, &mut session_info_provider, &subsystem.metrics, @@ -904,7 +904,7 @@ where #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] async fn handle_actions( ctx: &mut Context, - state: &mut State, + state: &State, overlayed_db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut Option, metrics: &Metrics, @@ -1051,7 +1051,7 @@ async fn handle_actions( fn distribution_messages_for_activation( db: &OverlayedBackend<'_, impl Backend>, - state: &mut State, + state: &State, ) -> SubsystemResult> { let all_blocks: Vec = db.load_all_blocks()?; @@ -1755,7 +1755,7 @@ fn schedule_wakeup_action( async fn check_and_import_assignment( sender: &mut Sender, - state: &mut State, + state: &State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut Option, assignment: IndirectAssignmentCert, @@ -1928,7 +1928,7 @@ where async fn check_and_import_approval( sender: &mut Sender, - state: &mut State, + state: &State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut Option, metrics: &Metrics, @@ -2099,7 +2099,7 @@ impl ApprovalStateTransition { // necessary and schedules any further wakeups. async fn advance_approval_state( sender: &mut Sender, - state: &mut State, + state: &State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut Option, metrics: &Metrics, @@ -2275,7 +2275,7 @@ fn should_trigger_assignment( #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] async fn process_wakeup( ctx: &mut Context, - state: &mut State, + state: &State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut Option, relay_block: Hash, @@ -2641,7 +2641,7 @@ async fn launch_approval( #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] async fn issue_approval( ctx: &mut Context, - state: &mut State, + state: &State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut Option, metrics: &Metrics, From 82a0ebc0aeb0a0b5a70abaec80262ef827eaf102 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 24 Apr 2023 11:19:02 +0300 Subject: [PATCH 11/24] fn session_info -> fn get_session_info() to avoid name clashes. The function also tries to initialize `SessionInfoProvider` --- node/core/approval-voting/src/lib.rs | 64 +++++++++++++++++----------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 19e9c23f443a..dd69931550be 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -678,7 +678,7 @@ impl State { Sender: SubsystemSender, { // We can't borrow the session here. Only get copies of what's needed. - let (no_show_slots, needed_approvals) = match session_info( + let (no_show_slots, needed_approvals) = match get_session_info( session_info_provider, sender, block_entry.parent_hash(), @@ -1786,7 +1786,7 @@ where )), }; - let session_info = match session_info( + let session_info = match get_session_info( session_info_provider, sender, block_entry.parent_hash(), @@ -1963,7 +1963,7 @@ where }, }; - let session_info = match session_info( + let session_info = match get_session_info( session_info_provider, sender, approval.block_hash, @@ -2303,7 +2303,7 @@ async fn process_wakeup( let slot_duration_millis = state.slot_duration_millis; let tranche_now = state.clock.tranche_now(slot_duration_millis, block_entry.slot()); - let session_info = match session_info( + let session_info = match get_session_info( session_info_provider, ctx.sender(), block_entry.parent_hash(), @@ -2685,7 +2685,7 @@ async fn issue_approval( }; issue_approval_span.add_int_tag("candidate_index", candidate_index as i64); - let session_info = match session_info( + let session_info = match get_session_info( session_info_provider, ctx.sender(), block_entry.parent_hash(), @@ -2844,7 +2844,7 @@ fn issue_local_invalid_statement( )); } -async fn session_info<'a, Sender>( +async fn get_session_info<'a, Sender>( session_info_provider: &'a mut Option, sender: &mut Sender, relay_parent: Hash, @@ -2853,25 +2853,41 @@ async fn session_info<'a, Sender>( where Sender: SubsystemSender, { - match session_info_provider { - Some(session_info_provider) => match session_info_provider - .runtime_info - .get_session_info_by_index(sender, relay_parent, session_index) - .await - { - Ok(extended_info) => Some(&extended_info.session_info), - Err(_) => { - gum::error!( - target: LOG_TARGET, - session = session_index, - ?relay_parent, - "Can't get SessionInfo" - ); - None - }, - }, + // If `SessionInfoProvider` is not initialized - try to initialize it first + if session_info_provider.is_none() { + cache_session_info_for_head(sender, relay_parent, session_info_provider).await; + } + + if session_info_provider.is_none() { + // `cache_session_info_for_head` should initialize `session_info_provider` + // no matter what so log an error if it is still `None` + gum::error!( + target: LOG_TARGET, + session = session_index, + ?relay_parent, + "SessionInfoProvider is not initialized after caching sessions" + ); + return None + } - None => None, + // And then process the request + match session_info_provider + .as_mut() + .expect("Checked that session_info_provider is Some on the line above. qed.") + .runtime_info + .get_session_info_by_index(sender, relay_parent, session_index) + .await + { + Ok(extended_info) => Some(&extended_info.session_info), + Err(_) => { + gum::error!( + target: LOG_TARGET, + session = session_index, + ?relay_parent, + "Can't get SessionInfo" + ); + None + }, } } From dca728782f378d9cd036b7ac40565d57c6d07f28 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 24 Apr 2023 11:40:47 +0300 Subject: [PATCH 12/24] Fix SessionInfo retrieval --- node/core/approval-voting/src/import.rs | 62 +++++++++++++------------ 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 540375a6c028..10e4c9f1579e 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -58,6 +58,7 @@ use crate::{ backend::{Backend, OverlayedBackend}, cache_session_info_for_head, criteria::{AssignmentCriteria, OurAssignment}, + get_session_info, persisted_entries::CandidateEntry, time::{slot_number_to_tick, Tick}, SessionInfoProvider, @@ -207,26 +208,19 @@ async fn imported_block_info( } }; - // todo: refactor this - let session_info = if let Some(session_info_provider) = env.runtime_info { - session_info_provider - .runtime_info - .get_session_info_by_index(ctx.sender(), block_hash, session_index) - .await - .map_err(|_| ImportedBlockInfoError::SessionInfoUnavailable) - } else { - gum::debug!(target: LOG_TARGET, "SessionInfoProvider unavailable for block {}", block_hash,); - return Err(ImportedBlockInfoError::SessionInfoUnavailable) - }; - - let session_info = match session_info { - Ok(extended_session_info) => &extended_session_info.session_info, - Err(_) => { - gum::debug!(target: LOG_TARGET, "Session info unavailable for block {}", block_hash,); - - return Err(ImportedBlockInfoError::SessionInfoUnavailable) - }, - }; + let session_info = + match get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await { + Some(session_info) => session_info, + None => { + gum::error!( + target: LOG_TARGET, + relay_parent = ?block_hash, + session = session_index, + "Session info unavailable" + ); + return Err(ImportedBlockInfoError::SessionInfoUnavailable) + }, + }; let (assignments, slot, relay_vrf_story) = { let unsafe_vrf = approval_types::babe_unsafe_vrf_info(&block_header); @@ -450,15 +444,25 @@ pub(crate) async fn handle_new_head( force_approve, } = imported_block_info; - // todo: refactor this - let session_info = &session_info_provider - .as_mut() - .map(|s| &mut s.runtime_info) - .expect("imported_block_info requires session info to be available; qed") - .get_session_info_by_index(ctx.sender(), head, session_index) - .await - .map_err(|e| SubsystemError::FromOrigin { origin: "", source: e.into() })? - .session_info; + let session_info = match get_session_info( + session_info_provider, + ctx.sender(), + head, + session_index, + ) + .await + { + Some(session_info) => session_info, + None => { + gum::error!( + target: LOG_TARGET, + session = session_index, + ?head, + "Can't get session info for the new head" + ); + return Ok(Vec::new()) + }, + }; let (block_tick, no_show_duration) = { let block_tick = slot_number_to_tick(state.slot_duration_millis, slot); From 360b96d67f425fb0d0ae937268fdfa370d4ddb2a Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 24 Apr 2023 14:07:31 +0300 Subject: [PATCH 13/24] Code cleanup --- node/core/approval-voting/src/lib.rs | 32 +++++++++++----------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index dd69931550be..c0e8b75fddab 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -677,8 +677,7 @@ impl State { where Sender: SubsystemSender, { - // We can't borrow the session here. Only get copies of what's needed. - let (no_show_slots, needed_approvals) = match get_session_info( + let session_info = match get_session_info( session_info_provider, sender, block_entry.parent_hash(), @@ -686,7 +685,7 @@ impl State { ) .await { - Some(s) => (s.no_show_slots, s.needed_approvals), + Some(s) => s, None => { gum::warn!( target: LOG_TARGET, @@ -700,8 +699,10 @@ impl State { let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot()); let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot()); - let no_show_duration = - slot_number_to_tick(self.slot_duration_millis, Slot::from(u64::from(no_show_slots))); + let no_show_duration = slot_number_to_tick( + self.slot_duration_millis, + Slot::from(u64::from(session_info.no_show_slots)), + ); if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) { let required_tranches = approval_checking::tranches_to_approve( @@ -710,7 +711,7 @@ impl State { tranche_now, block_tick, no_show_duration, - needed_approvals as _, + session_info.needed_approvals as _, ); let status = ApprovalStatus { required_tranches, block_tick, tranche_now }; @@ -771,7 +772,6 @@ where // `None` on start-up. Gets initialized/updated on leaf update let mut session_info_provider: Option = None; - let mut wakeups = Wakeups::default(); let mut currently_checking_set = CurrentlyCheckingSet::default(); let mut approvals_cache = lru::LruCache::new(APPROVAL_CACHE_SIZE); @@ -1849,11 +1849,10 @@ where )), }; - let config = &criteria::Config::from(session_info); let res = state.assignment_criteria.check_assignment_cert( claimed_core_index, assignment.validator, - config, + &criteria::Config::from(session_info), block_entry.relay_vrf_story(), &assignment.cert, approval_entry.backing_group(), @@ -2066,9 +2065,6 @@ where Ok((actions, t)) } -// TODO? -// fn get_session_info(..) {} - #[derive(Debug)] enum ApprovalStateTransition { RemoteApproval(ValidatorIndex), @@ -2301,8 +2297,6 @@ async fn process_wakeup( _ => return Ok(Vec::new()), }; - let slot_duration_millis = state.slot_duration_millis; - let tranche_now = state.clock.tranche_now(slot_duration_millis, block_entry.slot()); let session_info = match get_session_info( session_info_provider, ctx.sender(), @@ -2324,12 +2318,12 @@ async fn process_wakeup( }, }; - let block_tick = slot_number_to_tick(slot_duration_millis, block_entry.slot()); + let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot()); let no_show_duration = slot_number_to_tick( - slot_duration_millis, + state.slot_duration_millis, Slot::from(u64::from(session_info.no_show_slots)), ); - + let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot()); span.add_uint_tag("tranche", tranche_now as u64); gum::trace!( target: LOG_TARGET, @@ -2738,7 +2732,7 @@ async fn issue_approval( }; let validator_pubkey = match session_info.validators.get(validator_index) { - Some(p) => p.clone(), // todo: remove this + Some(p) => p, None => { gum::warn!( target: LOG_TARGET, @@ -2884,7 +2878,7 @@ where target: LOG_TARGET, session = session_index, ?relay_parent, - "Can't get SessionInfo" + "Can't obtain SessionInfo" ); None }, From caa25293ef736b5d1cf5e4d5cdf3f8e16cad7dba Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 25 Apr 2023 16:46:02 +0300 Subject: [PATCH 14/24] Don't wrap `SessionInfoProvider` in an `Option` --- node/core/approval-voting/src/import.rs | 75 +++-- node/core/approval-voting/src/lib.rs | 355 ++++++++++-------------- 2 files changed, 188 insertions(+), 242 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 10e4c9f1579e..ec03a312ba39 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -56,9 +56,7 @@ use std::collections::HashMap; use super::approval_db::v1; use crate::{ backend::{Backend, OverlayedBackend}, - cache_session_info_for_head, criteria::{AssignmentCriteria, OurAssignment}, - get_session_info, persisted_entries::CandidateEntry, time::{slot_number_to_tick, Tick}, SessionInfoProvider, @@ -78,7 +76,7 @@ struct ImportedBlockInfo { } struct ImportedBlockInfoEnv<'a> { - runtime_info: &'a mut Option, // this is required just for the `earliest_session()` + runtime_info: &'a mut SessionInfoProvider, // this is required just for the `earliest_session()` assignment_criteria: &'a (dyn AssignmentCriteria + Send + Sync), keystore: &'a LocalKeystore, } @@ -160,7 +158,7 @@ async fn imported_block_info( return Err(ImportedBlockInfoError::FutureCancelled("SessionIndexForChild", error)), }; - if env.runtime_info.as_ref().map_or(true, |s| session_index < s.earliest_session()) { + if env.runtime_info.earliest_session().map_or(true, |s| session_index < s) { gum::debug!( target: LOG_TARGET, "Block {} is from ancient session {}. Skipping", @@ -209,7 +207,7 @@ async fn imported_block_info( }; let session_info = - match get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await { + match env.runtime_info.get_session_info(ctx.sender(), block_hash, session_index).await { Some(session_info) => session_info, None => { gum::error!( @@ -325,7 +323,7 @@ pub(crate) async fn handle_new_head( ctx: &mut Context, state: &State, db: &mut OverlayedBackend<'_, B>, - session_info_provider: &mut Option, + session_info_provider: &mut SessionInfoProvider, head: Hash, finalized_number: &Option, ) -> SubsystemResult> { @@ -361,7 +359,7 @@ pub(crate) async fn handle_new_head( }; // Update session info based on most recent head. - cache_session_info_for_head(ctx.sender(), head, session_info_provider).await; + session_info_provider.cache_session_info_for_head(ctx.sender(), head).await; // If we've just started the node and are far behind, // import at most `MAX_HEADS_LOOK_BACK` blocks. @@ -444,25 +442,19 @@ pub(crate) async fn handle_new_head( force_approve, } = imported_block_info; - let session_info = match get_session_info( - session_info_provider, - ctx.sender(), - head, - session_index, - ) - .await - { - Some(session_info) => session_info, - None => { - gum::error!( - target: LOG_TARGET, - session = session_index, - ?head, - "Can't get session info for the new head" - ); - return Ok(Vec::new()) - }, - }; + let session_info = + match session_info_provider.get_session_info(ctx.sender(), head, session_index).await { + Some(session_info) => session_info, + None => { + gum::error!( + target: LOG_TARGET, + session = session_index, + ?head, + "Can't get session info for the new head" + ); + return Ok(Vec::new()) + }, + }; let (block_tick, no_show_duration) = { let block_tick = slot_number_to_tick(state.slot_duration_millis, slot); @@ -684,8 +676,11 @@ pub(crate) mod tests { let state = blank_state(); - let session_info_provider = - SessionInfoProvider { runtime_info, gaps_in_cache: false, highest_session_seen: index }; + let session_info_provider = SessionInfoProvider { + runtime_info, + gaps_in_cache: false, + highest_session_seen: Some(index), + }; (state, session_info_provider) } @@ -795,7 +790,7 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let session_info_provider = SessionInfoProvider { + let mut session_info_provider = SessionInfoProvider { runtime_info: RuntimeInfo::new_with_cache( RuntimeInfoConfig { keystore: None, @@ -812,13 +807,13 @@ pub(crate) mod tests { )], ), gaps_in_cache: false, - highest_session_seen: session, + highest_session_seen: Some(session), }; let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - runtime_info: &mut Some(session_info_provider), + runtime_info: &mut session_info_provider, assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -918,7 +913,7 @@ pub(crate) mod tests { .collect::>(); let test_fut = { - let session_info_provider = SessionInfoProvider { + let mut session_info_provider = SessionInfoProvider { runtime_info: RuntimeInfo::new_with_cache( RuntimeInfoConfig { keystore: None, @@ -935,13 +930,13 @@ pub(crate) mod tests { )], ), gaps_in_cache: false, - highest_session_seen: session, + highest_session_seen: Some(session), }; let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - runtime_info: &mut Some(session_info_provider), + runtime_info: &mut session_info_provider, assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -1035,7 +1030,7 @@ pub(crate) mod tests { .collect::>(); let test_fut = { - let mut runtime_info = None; + let mut runtime_info = SessionInfoProvider::new(); let header = header.clone(); Box::pin(async move { @@ -1130,7 +1125,7 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let session_info_provider = SessionInfoProvider { + let mut session_info_provider = SessionInfoProvider { runtime_info: RuntimeInfo::new_with_cache( RuntimeInfoConfig { keystore: None, @@ -1147,13 +1142,13 @@ pub(crate) mod tests { )], ), gaps_in_cache: false, - highest_session_seen: session, + highest_session_seen: Some(session), }; let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - runtime_info: &mut Some(session_info_provider), + runtime_info: &mut session_info_provider, assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -1290,7 +1285,7 @@ pub(crate) mod tests { .map(|(r, c, g)| CandidateEvent::CandidateIncluded(r, Vec::new().into(), c, g)) .collect::>(); - let (state, session_info_provider) = + let (state, mut session_info_provider) = single_session_state(session, session_info, parent_hash); overlay_db.write_block_entry( v1::BlockEntry { @@ -1317,7 +1312,7 @@ pub(crate) mod tests { &mut ctx, &state, &mut overlay_db, - &mut Some(session_info_provider), + &mut session_info_provider, hash, &Some(1), ) diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index c0e8b75fddab..7d36ee16545f 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -645,12 +645,138 @@ struct SessionInfoProvider { // Will be true if an error had occurred during the last session caching attempt gaps_in_cache: bool, // Highest session index seen so far. Also used to calculate the earliest one. - highest_session_seen: SessionIndex, + highest_session_seen: Option, } impl SessionInfoProvider { - fn earliest_session(&self) -> SessionIndex { - self.highest_session_seen.saturating_sub(DISPUTE_WINDOW.get() - 1) + fn new() -> Self { + SessionInfoProvider { + runtime_info: RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }), + gaps_in_cache: false, + highest_session_seen: None, + } + } + + fn earliest_session(&self) -> Option { + self.highest_session_seen.map(|s| s.saturating_sub(DISPUTE_WINDOW.get() - 1)) + } + + async fn cache_session_info_for_head(&mut self, sender: &mut Sender, head: Hash) + where + Sender: SubsystemSender, + { + match self.highest_session_seen { + None => { + let head_session_idx = + match self.runtime_info.get_session_index_for_child(sender, head).await { + Ok(session_idx) => session_idx, + Err(err) => { + gum::debug!( + target: LOG_TARGET, + ?head, + ?err, + "Error getting session index for head. Won't cache any sessions" + ); + return + }, + }; + + for idx in + head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)..=head_session_idx + { + if let Err(err) = + self.runtime_info.get_session_info_by_index(sender, head, idx).await + { + self.gaps_in_cache = true; + gum::debug!( + target: LOG_TARGET, + ?err, + session = idx, + "Can't cache session. Moving on." + ); + continue + } + } + + self.highest_session_seen = Some(head_session_idx); + }, + Some(highest_session_seen) => { + let head_session_idx = + match self.runtime_info.get_session_index_for_child(sender, head).await { + Ok(session_idx) => session_idx, + Err(err) => { + gum::debug!( + target: LOG_TARGET, + ?head, + ?err, + "Error getting session index for head. Won't cache any sessions" + ); + return + }, + }; + + if self.gaps_in_cache || head_session_idx > highest_session_seen { + let lower_bound = if self.gaps_in_cache { + head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1) + } else { + highest_session_seen + 1 + }; + + for idx in lower_bound..=head_session_idx { + if let Err(err) = + self.runtime_info.get_session_info_by_index(sender, head, idx).await + { + self.gaps_in_cache = true; + gum::debug!( + target: LOG_TARGET, + ?err, + session = idx, + "Can cache session. Moving on." + ); + continue + } + } + + self.highest_session_seen = Some(head_session_idx); + } + }, + } + } + + async fn get_session_info<'a, Sender>( + &'a mut self, + sender: &mut Sender, + relay_parent: Hash, + session_index: SessionIndex, + ) -> Option<&'a SessionInfo> + where + Sender: SubsystemSender, + { + // If this session is new - perform caching + if self.highest_session_seen.map_or(true, |s| session_index > s) { + self.cache_session_info_for_head(sender, relay_parent).await; + } + + match self + .runtime_info + .get_session_info_by_index(sender, relay_parent, session_index) + .await + { + Ok(extended_info) => Some(&extended_info.session_info), + Err(_) => { + gum::error!( + target: LOG_TARGET, + session = session_index, + ?relay_parent, + "Can't obtain SessionInfo" + ); + None + }, + } } } @@ -670,20 +796,16 @@ impl State { async fn approval_status( &'a self, sender: &mut Sender, - session_info_provider: &'a mut Option, + session_info_provider: &'a mut SessionInfoProvider, block_entry: &'a BlockEntry, candidate_entry: &'b CandidateEntry, ) -> Option<(&'b ApprovalEntry, ApprovalStatus)> where Sender: SubsystemSender, { - let session_info = match get_session_info( - session_info_provider, - sender, - block_entry.parent_hash(), - block_entry.session(), - ) - .await + let session_info = match session_info_provider + .get_session_info(sender, block_entry.parent_hash(), block_entry.session()) + .await { Some(s) => s, None => { @@ -771,7 +893,7 @@ where }; // `None` on start-up. Gets initialized/updated on leaf update - let mut session_info_provider: Option = None; + let mut session_info_provider = SessionInfoProvider::new(); let mut wakeups = Wakeups::default(); let mut currently_checking_set = CurrentlyCheckingSet::default(); let mut approvals_cache = lru::LruCache::new(APPROVAL_CACHE_SIZE); @@ -906,7 +1028,7 @@ async fn handle_actions( ctx: &mut Context, state: &State, overlayed_db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut Option, + session_info_provider: &mut SessionInfoProvider, metrics: &Metrics, wakeups: &mut Wakeups, currently_checking_set: &mut CurrentlyCheckingSet, @@ -1165,7 +1287,7 @@ async fn handle_from_overseer( ctx: &mut Context, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut Option, + session_info_provider: &mut SessionInfoProvider, metrics: &Metrics, x: FromOrchestra, last_finalized_height: &mut Option, @@ -1757,7 +1879,7 @@ async fn check_and_import_assignment( sender: &mut Sender, state: &State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut Option, + session_info_provider: &mut SessionInfoProvider, assignment: IndirectAssignmentCert, candidate_index: CandidateIndex, ) -> SubsystemResult<(AssignmentCheckResult, Vec)> @@ -1786,13 +1908,9 @@ where )), }; - let session_info = match get_session_info( - session_info_provider, - sender, - block_entry.parent_hash(), - block_entry.session(), - ) - .await + let session_info = match session_info_provider + .get_session_info(sender, block_entry.parent_hash(), block_entry.session()) + .await { Some(s) => s, None => @@ -1929,7 +2047,7 @@ async fn check_and_import_approval( sender: &mut Sender, state: &State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut Option, + session_info_provider: &mut SessionInfoProvider, metrics: &Metrics, approval: IndirectSignedApprovalVote, with_response: impl FnOnce(ApprovalCheckResult) -> T, @@ -1962,13 +2080,9 @@ where }, }; - let session_info = match get_session_info( - session_info_provider, - sender, - approval.block_hash, - block_entry.session(), - ) - .await + let session_info = match session_info_provider + .get_session_info(sender, approval.block_hash, block_entry.session()) + .await { Some(s) => s, None => { @@ -2097,7 +2211,7 @@ async fn advance_approval_state( sender: &mut Sender, state: &State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut Option, + session_info_provider: &mut SessionInfoProvider, metrics: &Metrics, mut block_entry: BlockEntry, candidate_hash: CandidateHash, @@ -2273,7 +2387,7 @@ async fn process_wakeup( ctx: &mut Context, state: &State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut Option, + session_info_provider: &mut SessionInfoProvider, relay_block: Hash, candidate_hash: CandidateHash, metrics: &Metrics, @@ -2297,13 +2411,9 @@ async fn process_wakeup( _ => return Ok(Vec::new()), }; - let session_info = match get_session_info( - session_info_provider, - ctx.sender(), - block_entry.parent_hash(), - block_entry.session(), - ) - .await + let session_info = match session_info_provider + .get_session_info(ctx.sender(), block_entry.parent_hash(), block_entry.session()) + .await { Some(i) => i, None => { @@ -2637,7 +2747,7 @@ async fn issue_approval( ctx: &mut Context, state: &State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut Option, + session_info_provider: &mut SessionInfoProvider, metrics: &Metrics, candidate_hash: CandidateHash, ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest, @@ -2679,13 +2789,9 @@ async fn issue_approval( }; issue_approval_span.add_int_tag("candidate_index", candidate_index as i64); - let session_info = match get_session_info( - session_info_provider, - ctx.sender(), - block_entry.parent_hash(), - block_entry.session(), - ) - .await + let session_info = match session_info_provider + .get_session_info(ctx.sender(), block_entry.parent_hash(), block_entry.session()) + .await { Some(s) => s, None => { @@ -2837,158 +2943,3 @@ fn issue_local_invalid_statement( false, )); } - -async fn get_session_info<'a, Sender>( - session_info_provider: &'a mut Option, - sender: &mut Sender, - relay_parent: Hash, - session_index: SessionIndex, -) -> Option<&'a SessionInfo> -where - Sender: SubsystemSender, -{ - // If `SessionInfoProvider` is not initialized - try to initialize it first - if session_info_provider.is_none() { - cache_session_info_for_head(sender, relay_parent, session_info_provider).await; - } - - if session_info_provider.is_none() { - // `cache_session_info_for_head` should initialize `session_info_provider` - // no matter what so log an error if it is still `None` - gum::error!( - target: LOG_TARGET, - session = session_index, - ?relay_parent, - "SessionInfoProvider is not initialized after caching sessions" - ); - return None - } - - // And then process the request - match session_info_provider - .as_mut() - .expect("Checked that session_info_provider is Some on the line above. qed.") - .runtime_info - .get_session_info_by_index(sender, relay_parent, session_index) - .await - { - Ok(extended_info) => Some(&extended_info.session_info), - Err(_) => { - gum::error!( - target: LOG_TARGET, - session = session_index, - ?relay_parent, - "Can't obtain SessionInfo" - ); - None - }, - } -} - -/// If `head` is in a new session - cache it -async fn cache_session_info_for_head( - sender: &mut Sender, - head: Hash, - session_info: &mut Option, -) where - Sender: SubsystemSender, -{ - match session_info.take() { - None => { - let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), - }); - - let head_session_idx = - match runtime_info.get_session_index_for_child(sender, head).await { - Ok(session_idx) => session_idx, - Err(err) => { - gum::debug!( - target: LOG_TARGET, - ?head, - ?err, - "Error getting session index for head. Won't cache any sessions" - ); - return - }, - }; - - let mut gaps_in_cache = false; - for idx in head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)..=head_session_idx - { - if let Err(err) = runtime_info.get_session_info_by_index(sender, head, idx).await { - gaps_in_cache = true; - gum::debug!( - target: LOG_TARGET, - ?err, - session = idx, - "Can't cache session. Moving on." - ); - continue - } - } - - *session_info = Some(SessionInfoProvider { - runtime_info, - gaps_in_cache, - highest_session_seen: head_session_idx, - }); - }, - Some(mut session_info_provider) => { - let head_session_idx = match session_info_provider - .runtime_info - .get_session_index_for_child(sender, head) - .await - { - Ok(session_idx) => session_idx, - Err(err) => { - gum::debug!( - target: LOG_TARGET, - ?head, - ?err, - "Error getting session index for head. Won't cache any sessions" - ); - return - }, - }; - - let mut gaps_in_cache = false; - if session_info_provider.gaps_in_cache || - head_session_idx > session_info_provider.highest_session_seen - { - let lower_bound = if session_info_provider.gaps_in_cache { - head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1) - } else { - session_info_provider.highest_session_seen + 1 - }; - - for idx in lower_bound..=head_session_idx { - if let Err(err) = session_info_provider - .runtime_info - .get_session_info_by_index(sender, head, idx) - .await - { - gaps_in_cache = true; - gum::debug!( - target: LOG_TARGET, - ?err, - session = idx, - "Can cache session. Moving on." - ); - continue - } - } - - session_info_provider = SessionInfoProvider { - runtime_info: session_info_provider.runtime_info, - gaps_in_cache, - highest_session_seen: head_session_idx, - }; - } - - *session_info = Some(session_info_provider); - }, - } -} From 7665ecdf0638258a89fc4ec2ebcba68f30a3f8ee Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov <610755+tdimitrov@users.noreply.github.com> Date: Thu, 27 Apr 2023 16:08:46 +0300 Subject: [PATCH 15/24] Remove `earliest_session()` --- node/core/approval-voting/src/import.rs | 31 ++++++++++++++----------- node/core/approval-voting/src/lib.rs | 6 +---- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index ec03a312ba39..be52b99c1210 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -76,7 +76,7 @@ struct ImportedBlockInfo { } struct ImportedBlockInfoEnv<'a> { - runtime_info: &'a mut SessionInfoProvider, // this is required just for the `earliest_session()` + runtime_info: &'a mut SessionInfoProvider, assignment_criteria: &'a (dyn AssignmentCriteria + Send + Sync), keystore: &'a LocalKeystore, } @@ -94,8 +94,8 @@ enum ImportedBlockInfoError { #[error(transparent)] ApprovalError(approval_types::ApprovalError), - #[error("block is from an ancient session")] - BlockFromAncientSession, + #[error("block is already finalized")] + BlockAlreadyFinalized, #[error("session info unavailable")] SessionInfoUnavailable, @@ -111,6 +111,7 @@ async fn imported_block_info( env: ImportedBlockInfoEnv<'_>, block_hash: Hash, block_header: &Header, + last_finalized_height: &Option, ) -> Result { // Ignore any runtime API errors - that means these blocks are old and finalized. // Only unfinalized blocks factor into the approval voting process. @@ -158,15 +159,17 @@ async fn imported_block_info( return Err(ImportedBlockInfoError::FutureCancelled("SessionIndexForChild", error)), }; - if env.runtime_info.earliest_session().map_or(true, |s| session_index < s) { + // If we can't determine if the block is finalized or not - try processing it + if last_finalized_height.map_or(false, |finalized| block_header.number < finalized) { gum::debug!( target: LOG_TARGET, - "Block {} is from ancient session {}. Skipping", + session = session_index, + finalized = ?last_finalized_height, + "Block {} is either finalized or last finalized height is unknown. Skipping", block_hash, - session_index ); - return Err(ImportedBlockInfoError::BlockFromAncientSession) + return Err(ImportedBlockInfoError::BlockAlreadyFinalized) } session_index @@ -393,7 +396,7 @@ pub(crate) async fn handle_new_head( keystore: &state.keystore, }; - match imported_block_info(ctx, env, block_hash, &block_header).await { + match imported_block_info(ctx, env, block_hash, &block_header, finalized_number).await { Ok(i) => imported_blocks_and_info.push((block_hash, block_header, i)), Err(error) => { // It's possible that we've lost a race with finality. @@ -818,7 +821,8 @@ pub(crate) mod tests { keystore: &LocalKeystore::in_memory(), }; - let info = imported_block_info(&mut ctx, env, hash, &header).await.unwrap(); + let info = + imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap(); assert_eq!(info.included_candidates, included_candidates); assert_eq!(info.session_index, session); @@ -941,7 +945,7 @@ pub(crate) mod tests { keystore: &LocalKeystore::in_memory(), }; - let info = imported_block_info(&mut ctx, env, hash, &header).await; + let info = imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await; assert_matches!(info, Err(ImportedBlockInfoError::VrfInfoUnavailable)); }) @@ -1040,9 +1044,9 @@ pub(crate) mod tests { keystore: &LocalKeystore::in_memory(), }; - let info = imported_block_info(&mut ctx, env, hash, &header).await; + let info = imported_block_info(&mut ctx, env, hash, &header, &Some(6)).await; - assert_matches!(info, Err(ImportedBlockInfoError::BlockFromAncientSession)); + assert_matches!(info, Err(ImportedBlockInfoError::BlockAlreadyFinalized)); }) }; @@ -1153,7 +1157,8 @@ pub(crate) mod tests { keystore: &LocalKeystore::in_memory(), }; - let info = imported_block_info(&mut ctx, env, hash, &header).await.unwrap(); + let info = + imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap(); assert_eq!(info.included_candidates, included_candidates); assert_eq!(info.session_index, session); diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 7d36ee16545f..db6324d0cd4f 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -661,10 +661,6 @@ impl SessionInfoProvider { } } - fn earliest_session(&self) -> Option { - self.highest_session_seen.map(|s| s.saturating_sub(DISPUTE_WINDOW.get() - 1)) - } - async fn cache_session_info_for_head(&mut self, sender: &mut Sender, head: Hash) where Sender: SubsystemSender, @@ -1307,7 +1303,7 @@ async fn handle_from_overseer( db, session_info_provider, head, - &*last_finalized_height, + last_finalized_height, ) .await { From 3f2b85c5f4709f7c01808b5390caa874870f653f Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov <610755+tdimitrov@users.noreply.github.com> Date: Fri, 28 Apr 2023 11:19:52 +0300 Subject: [PATCH 16/24] Remove pre-caching -> wip --- node/core/approval-voting/src/import.rs | 161 ++++++++--------- node/core/approval-voting/src/lib.rs | 229 ++++++++---------------- 2 files changed, 144 insertions(+), 246 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index be52b99c1210..d0f6df421376 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -40,7 +40,7 @@ use polkadot_node_subsystem::{ }, overseer, RuntimeApiError, SubsystemError, SubsystemResult, }; -use polkadot_node_subsystem_util::determine_new_blocks; +use polkadot_node_subsystem_util::{determine_new_blocks, runtime::RuntimeInfo}; use polkadot_primitives::{ BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, ConsensusLog, CoreIndex, GroupIndex, Hash, Header, SessionIndex, @@ -57,9 +57,9 @@ use super::approval_db::v1; use crate::{ backend::{Backend, OverlayedBackend}, criteria::{AssignmentCriteria, OurAssignment}, + get_session_info, persisted_entries::CandidateEntry, time::{slot_number_to_tick, Tick}, - SessionInfoProvider, }; use super::{State, LOG_TARGET}; @@ -76,7 +76,7 @@ struct ImportedBlockInfo { } struct ImportedBlockInfoEnv<'a> { - runtime_info: &'a mut SessionInfoProvider, + runtime_info: &'a mut RuntimeInfo, assignment_criteria: &'a (dyn AssignmentCriteria + Send + Sync), keystore: &'a LocalKeystore, } @@ -210,7 +210,7 @@ async fn imported_block_info( }; let session_info = - match env.runtime_info.get_session_info(ctx.sender(), block_hash, session_index).await { + match get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await { Some(session_info) => session_info, None => { gum::error!( @@ -326,7 +326,7 @@ pub(crate) async fn handle_new_head( ctx: &mut Context, state: &State, db: &mut OverlayedBackend<'_, B>, - session_info_provider: &mut SessionInfoProvider, + session_info_provider: &mut RuntimeInfo, head: Hash, finalized_number: &Option, ) -> SubsystemResult> { @@ -361,9 +361,6 @@ pub(crate) async fn handle_new_head( } }; - // Update session info based on most recent head. - session_info_provider.cache_session_info_for_head(ctx.sender(), head).await; - // If we've just started the node and are far behind, // import at most `MAX_HEADS_LOOK_BACK` blocks. let lower_bound_number = header.number.saturating_sub(MAX_HEADS_LOOK_BACK); @@ -445,19 +442,25 @@ pub(crate) async fn handle_new_head( force_approve, } = imported_block_info; - let session_info = - match session_info_provider.get_session_info(ctx.sender(), head, session_index).await { - Some(session_info) => session_info, - None => { - gum::error!( - target: LOG_TARGET, - session = session_index, - ?head, - "Can't get session info for the new head" - ); - return Ok(Vec::new()) - }, - }; + let session_info = match get_session_info( + session_info_provider, + ctx.sender(), + head, + session_index, + ) + .await + { + Some(session_info) => session_info, + None => { + gum::error!( + target: LOG_TARGET, + session = session_index, + ?head, + "Can't get session info for the new head" + ); + return Ok(Vec::new()) + }, + }; let (block_tick, no_show_duration) = { let block_tick = slot_number_to_tick(state.slot_duration_millis, slot); @@ -660,7 +663,7 @@ pub(crate) mod tests { index: SessionIndex, info: SessionInfo, relay_parent: Hash, - ) -> (State, SessionInfoProvider) { + ) -> (State, RuntimeInfo) { let runtime_info = RuntimeInfo::new_with_cache( RuntimeInfoConfig { keystore: None, @@ -677,15 +680,7 @@ pub(crate) mod tests { )], ); - let state = blank_state(); - - let session_info_provider = SessionInfoProvider { - runtime_info, - gaps_in_cache: false, - highest_session_seen: Some(index), - }; - - (state, session_info_provider) + (blank_state(), runtime_info) } struct MockAssignmentCriteria; @@ -793,25 +788,21 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let mut session_info_provider = SessionInfoProvider { - runtime_info: RuntimeInfo::new_with_cache( - RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), + let mut session_info_provider = RuntimeInfo::new_with_cache( + RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }, + vec![( + session, + hash, + ExtendedSessionInfo { + session_info, + validator_info: ValidatorInfo { our_index: None, our_group: None }, }, - vec![( - session, - hash, - ExtendedSessionInfo { - session_info, - validator_info: ValidatorInfo { our_index: None, our_group: None }, - }, - )], - ), - gaps_in_cache: false, - highest_session_seen: Some(session), - }; + )], + ); let header = header.clone(); Box::pin(async move { @@ -917,25 +908,21 @@ pub(crate) mod tests { .collect::>(); let test_fut = { - let mut session_info_provider = SessionInfoProvider { - runtime_info: RuntimeInfo::new_with_cache( - RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), + let mut session_info_provider = RuntimeInfo::new_with_cache( + RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }, + vec![( + session, + hash, + ExtendedSessionInfo { + session_info, + validator_info: ValidatorInfo { our_index: None, our_group: None }, }, - vec![( - session, - hash, - ExtendedSessionInfo { - session_info, - validator_info: ValidatorInfo { our_index: None, our_group: None }, - }, - )], - ), - gaps_in_cache: false, - highest_session_seen: Some(session), - }; + )], + ); let header = header.clone(); Box::pin(async move { @@ -1034,7 +1021,11 @@ pub(crate) mod tests { .collect::>(); let test_fut = { - let mut runtime_info = SessionInfoProvider::new(); + let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }); let header = header.clone(); Box::pin(async move { @@ -1129,25 +1120,21 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let mut session_info_provider = SessionInfoProvider { - runtime_info: RuntimeInfo::new_with_cache( - RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), + let mut session_info_provider = RuntimeInfo::new_with_cache( + RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }, + vec![( + session, + hash, + ExtendedSessionInfo { + session_info, + validator_info: ValidatorInfo { our_index: None, our_group: None }, }, - vec![( - session, - hash, - ExtendedSessionInfo { - session_info, - validator_info: ValidatorInfo { our_index: None, our_group: None }, - }, - )], - ), - gaps_in_cache: false, - highest_session_seen: Some(session), - }; + )], + ); let header = header.clone(); Box::pin(async move { @@ -1291,7 +1278,7 @@ pub(crate) mod tests { .collect::>(); let (state, mut session_info_provider) = - single_session_state(session, session_info, parent_hash); + single_session_state(session, session_info.clone(), parent_hash); overlay_db.write_block_entry( v1::BlockEntry { block_hash: parent_hash, diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index db6324d0cd4f..f61dae4293c9 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -637,142 +637,29 @@ impl CurrentlyCheckingSet { } } -// Wraps `RuntimeInfo` and some metadata. On each new leaf `SessionInfo` is -// cached. `RuntimeInfo` keeps the last `DISPUTE_WINDOW` number of sessions. -struct SessionInfoProvider { - // `RuntimeInfo` caches sessions internally. - runtime_info: RuntimeInfo, - // Will be true if an error had occurred during the last session caching attempt - gaps_in_cache: bool, - // Highest session index seen so far. Also used to calculate the earliest one. - highest_session_seen: Option, -} - -impl SessionInfoProvider { - fn new() -> Self { - SessionInfoProvider { - runtime_info: RuntimeInfo::new_with_config(RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), - }), - gaps_in_cache: false, - highest_session_seen: None, - } - } - - async fn cache_session_info_for_head(&mut self, sender: &mut Sender, head: Hash) - where - Sender: SubsystemSender, - { - match self.highest_session_seen { - None => { - let head_session_idx = - match self.runtime_info.get_session_index_for_child(sender, head).await { - Ok(session_idx) => session_idx, - Err(err) => { - gum::debug!( - target: LOG_TARGET, - ?head, - ?err, - "Error getting session index for head. Won't cache any sessions" - ); - return - }, - }; - - for idx in - head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)..=head_session_idx - { - if let Err(err) = - self.runtime_info.get_session_info_by_index(sender, head, idx).await - { - self.gaps_in_cache = true; - gum::debug!( - target: LOG_TARGET, - ?err, - session = idx, - "Can't cache session. Moving on." - ); - continue - } - } - - self.highest_session_seen = Some(head_session_idx); - }, - Some(highest_session_seen) => { - let head_session_idx = - match self.runtime_info.get_session_index_for_child(sender, head).await { - Ok(session_idx) => session_idx, - Err(err) => { - gum::debug!( - target: LOG_TARGET, - ?head, - ?err, - "Error getting session index for head. Won't cache any sessions" - ); - return - }, - }; - - if self.gaps_in_cache || head_session_idx > highest_session_seen { - let lower_bound = if self.gaps_in_cache { - head_session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1) - } else { - highest_session_seen + 1 - }; - - for idx in lower_bound..=head_session_idx { - if let Err(err) = - self.runtime_info.get_session_info_by_index(sender, head, idx).await - { - self.gaps_in_cache = true; - gum::debug!( - target: LOG_TARGET, - ?err, - session = idx, - "Can cache session. Moving on." - ); - continue - } - } - - self.highest_session_seen = Some(head_session_idx); - } - }, - } - } - - async fn get_session_info<'a, Sender>( - &'a mut self, - sender: &mut Sender, - relay_parent: Hash, - session_index: SessionIndex, - ) -> Option<&'a SessionInfo> - where - Sender: SubsystemSender, +async fn get_session_info<'a, Sender>( + runtime_info: &'a mut RuntimeInfo, + sender: &mut Sender, + relay_parent: Hash, + session_index: SessionIndex, +) -> Option<&'a SessionInfo> +where + Sender: SubsystemSender, +{ + match runtime_info + .get_session_info_by_index(sender, relay_parent, session_index) + .await { - // If this session is new - perform caching - if self.highest_session_seen.map_or(true, |s| session_index > s) { - self.cache_session_info_for_head(sender, relay_parent).await; - } - - match self - .runtime_info - .get_session_info_by_index(sender, relay_parent, session_index) - .await - { - Ok(extended_info) => Some(&extended_info.session_info), - Err(_) => { - gum::error!( - target: LOG_TARGET, - session = session_index, - ?relay_parent, - "Can't obtain SessionInfo" - ); - None - }, - } + Ok(extended_info) => Some(&extended_info.session_info), + Err(_) => { + gum::error!( + target: LOG_TARGET, + session = session_index, + ?relay_parent, + "Can't obtain SessionInfo" + ); + None + }, } } @@ -792,16 +679,20 @@ impl State { async fn approval_status( &'a self, sender: &mut Sender, - session_info_provider: &'a mut SessionInfoProvider, + session_info_provider: &'a mut RuntimeInfo, block_entry: &'a BlockEntry, candidate_entry: &'b CandidateEntry, ) -> Option<(&'b ApprovalEntry, ApprovalStatus)> where Sender: SubsystemSender, { - let session_info = match session_info_provider - .get_session_info(sender, block_entry.parent_hash(), block_entry.session()) - .await + let session_info = match get_session_info( + session_info_provider, + sender, + block_entry.parent_hash(), + block_entry.session(), + ) + .await { Some(s) => s, None => { @@ -889,7 +780,11 @@ where }; // `None` on start-up. Gets initialized/updated on leaf update - let mut session_info_provider = SessionInfoProvider::new(); + let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }); let mut wakeups = Wakeups::default(); let mut currently_checking_set = CurrentlyCheckingSet::default(); let mut approvals_cache = lru::LruCache::new(APPROVAL_CACHE_SIZE); @@ -1024,7 +919,7 @@ async fn handle_actions( ctx: &mut Context, state: &State, overlayed_db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut SessionInfoProvider, + session_info_provider: &mut RuntimeInfo, metrics: &Metrics, wakeups: &mut Wakeups, currently_checking_set: &mut CurrentlyCheckingSet, @@ -1283,7 +1178,7 @@ async fn handle_from_overseer( ctx: &mut Context, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut SessionInfoProvider, + session_info_provider: &mut RuntimeInfo, metrics: &Metrics, x: FromOrchestra, last_finalized_height: &mut Option, @@ -1875,7 +1770,7 @@ async fn check_and_import_assignment( sender: &mut Sender, state: &State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut SessionInfoProvider, + session_info_provider: &mut RuntimeInfo, assignment: IndirectAssignmentCert, candidate_index: CandidateIndex, ) -> SubsystemResult<(AssignmentCheckResult, Vec)> @@ -1904,9 +1799,13 @@ where )), }; - let session_info = match session_info_provider - .get_session_info(sender, block_entry.parent_hash(), block_entry.session()) - .await + let session_info = match get_session_info( + session_info_provider, + sender, + block_entry.parent_hash(), + block_entry.session(), + ) + .await { Some(s) => s, None => @@ -2043,7 +1942,7 @@ async fn check_and_import_approval( sender: &mut Sender, state: &State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut SessionInfoProvider, + session_info_provider: &mut RuntimeInfo, metrics: &Metrics, approval: IndirectSignedApprovalVote, with_response: impl FnOnce(ApprovalCheckResult) -> T, @@ -2076,9 +1975,13 @@ where }, }; - let session_info = match session_info_provider - .get_session_info(sender, approval.block_hash, block_entry.session()) - .await + let session_info = match get_session_info( + session_info_provider, + sender, + approval.block_hash, + block_entry.session(), + ) + .await { Some(s) => s, None => { @@ -2207,7 +2110,7 @@ async fn advance_approval_state( sender: &mut Sender, state: &State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut SessionInfoProvider, + session_info_provider: &mut RuntimeInfo, metrics: &Metrics, mut block_entry: BlockEntry, candidate_hash: CandidateHash, @@ -2383,7 +2286,7 @@ async fn process_wakeup( ctx: &mut Context, state: &State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut SessionInfoProvider, + session_info_provider: &mut RuntimeInfo, relay_block: Hash, candidate_hash: CandidateHash, metrics: &Metrics, @@ -2407,9 +2310,13 @@ async fn process_wakeup( _ => return Ok(Vec::new()), }; - let session_info = match session_info_provider - .get_session_info(ctx.sender(), block_entry.parent_hash(), block_entry.session()) - .await + let session_info = match get_session_info( + session_info_provider, + ctx.sender(), + block_entry.parent_hash(), + block_entry.session(), + ) + .await { Some(i) => i, None => { @@ -2743,7 +2650,7 @@ async fn issue_approval( ctx: &mut Context, state: &State, db: &mut OverlayedBackend<'_, impl Backend>, - session_info_provider: &mut SessionInfoProvider, + session_info_provider: &mut RuntimeInfo, metrics: &Metrics, candidate_hash: CandidateHash, ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest, @@ -2785,9 +2692,13 @@ async fn issue_approval( }; issue_approval_span.add_int_tag("candidate_index", candidate_index as i64); - let session_info = match session_info_provider - .get_session_info(ctx.sender(), block_entry.parent_hash(), block_entry.session()) - .await + let session_info = match get_session_info( + session_info_provider, + ctx.sender(), + block_entry.parent_hash(), + block_entry.session(), + ) + .await { Some(s) => s, None => { From 19e9588d1af849ab553f175b782e68983e3f8927 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 2 May 2023 14:44:04 +0300 Subject: [PATCH 17/24] Fix some tests and code cleanup --- node/core/approval-voting/src/import.rs | 161 ++++++++++++------------ node/core/approval-voting/src/tests.rs | 43 +++---- node/subsystem-util/src/runtime/mod.rs | 13 -- 3 files changed, 93 insertions(+), 124 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index d0f6df421376..6e5996df39fa 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -613,10 +613,7 @@ pub(crate) mod tests { }; use polkadot_node_subsystem::messages::{AllMessages, ApprovalVotingMessage}; use polkadot_node_subsystem_test_helpers::make_subsystem_context; - use polkadot_node_subsystem_util::{ - database::Database, - runtime::{ExtendedSessionInfo, ValidatorInfo}, - }; + use polkadot_node_subsystem_util::database::Database; use polkadot_primitives::{Id as ParaId, IndexedVec, SessionInfo, ValidatorId, ValidatorIndex}; pub(crate) use sp_consensus_babe::{ digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest}, @@ -659,26 +656,12 @@ pub(crate) mod tests { } } - fn single_session_state( - index: SessionIndex, - info: SessionInfo, - relay_parent: Hash, - ) -> (State, RuntimeInfo) { - let runtime_info = RuntimeInfo::new_with_cache( - RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), - }, - vec![( - index, - relay_parent, - ExtendedSessionInfo { - session_info: info, - validator_info: ValidatorInfo { our_group: None, our_index: None }, - }, - )], - ); + fn single_session_state() -> (State, RuntimeInfo) { + let runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }); (blank_state(), runtime_info) } @@ -788,21 +771,11 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let mut session_info_provider = RuntimeInfo::new_with_cache( - RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), - }, - vec![( - session, - hash, - ExtendedSessionInfo { - session_info, - validator_info: ValidatorInfo { our_index: None, our_group: None }, - }, - )], - ); + let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }); let header = header.clone(); Box::pin(async move { @@ -867,6 +840,20 @@ pub(crate) mod tests { })); } ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionInfo(idx, si_tx), + ) + ) => { + assert_eq!(session, idx); + assert_eq!(req_block_hash, hash); + si_tx.send(Ok(Some(session_info.clone()))).unwrap(); + } + ); }); futures::executor::block_on(futures::future::join(test_fut, aux_fut)); @@ -908,21 +895,11 @@ pub(crate) mod tests { .collect::>(); let test_fut = { - let mut session_info_provider = RuntimeInfo::new_with_cache( - RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), - }, - vec![( - session, - hash, - ExtendedSessionInfo { - session_info, - validator_info: ValidatorInfo { our_index: None, our_group: None }, - }, - )], - ); + let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }); let header = header.clone(); Box::pin(async move { @@ -981,6 +958,20 @@ pub(crate) mod tests { })); } ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionInfo(idx, si_tx), + ) + ) => { + assert_eq!(session, idx); + assert_eq!(req_block_hash, hash); + si_tx.send(Ok(Some(session_info.clone()))).unwrap(); + } + ); }); futures::executor::block_on(futures::future::join(test_fut, aux_fut)); @@ -1120,21 +1111,11 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let mut session_info_provider = RuntimeInfo::new_with_cache( - RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), - }, - vec![( - session, - hash, - ExtendedSessionInfo { - session_info, - validator_info: ValidatorInfo { our_index: None, our_group: None }, - }, - )], - ); + let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }); let header = header.clone(); Box::pin(async move { @@ -1199,6 +1180,20 @@ pub(crate) mod tests { })); } ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionInfo(idx, si_tx), + ) + ) => { + assert_eq!(session, idx); + assert_eq!(req_block_hash, hash); + si_tx.send(Ok(Some(session_info.clone()))).unwrap(); + } + ); }); futures::executor::block_on(futures::future::join(test_fut, aux_fut)); @@ -1277,8 +1272,7 @@ pub(crate) mod tests { .map(|(r, c, g)| CandidateEvent::CandidateIncluded(r, Vec::new().into(), c, g)) .collect::>(); - let (state, mut session_info_provider) = - single_session_state(session, session_info.clone(), parent_hash); + let (state, mut session_info_provider) = single_session_state(); overlay_db.write_block_entry( v1::BlockEntry { block_hash: parent_hash, @@ -1343,17 +1337,6 @@ pub(crate) mod tests { } ); - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(c_tx), - )) => { - assert_eq!(h, hash); - let _ = c_tx.send(Ok(session)); - } - ); - // determine_new_blocks exits early as the parent_hash is in the DB assert_matches!( @@ -1399,6 +1382,20 @@ pub(crate) mod tests { } ); + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionInfo(idx, si_tx), + ) + ) => { + assert_eq!(session, idx); + assert_eq!(req_block_hash, hash); + si_tx.send(Ok(Some(session_info.clone()))).unwrap(); + } + ); + assert_matches!( handle.recv().await, AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks( diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index 059ab09bc705..8c1fcf08985f 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -799,36 +799,7 @@ async fn import_block( h_tx.send(Ok(Some(new_header.clone()))).unwrap(); } ); - - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - req_block_hash, - RuntimeApiRequest::SessionIndexForChild(s_tx) - ) - ) => { - let hash = &hashes[number as usize]; - assert_eq!(req_block_hash, hash.0); - s_tx.send(Ok(number.into())).unwrap(); - } - ); - if !fork { - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - req_block_hash, - RuntimeApiRequest::SessionInfo(idx, si_tx), - ) - ) => { - assert_eq!(number, idx); - assert_eq!(req_block_hash, *new_head); - si_tx.send(Ok(Some(session_info.clone()))).unwrap(); - } - ); - let mut _ancestry_step = 0; if gap { assert_matches!( @@ -929,6 +900,20 @@ async fn import_block( } ); } else { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionInfo(idx, si_tx), + ) + ) => { + // assert_eq!(session, idx); + // assert_eq!(req_block_hash, hashes[(number-1) as usize].0); + si_tx.send(Ok(Some(session_info.clone()))).unwrap(); + } + ); + assert_matches!( overseer_recv(overseer).await, AllMessages::ApprovalDistribution( diff --git a/node/subsystem-util/src/runtime/mod.rs b/node/subsystem-util/src/runtime/mod.rs index 468ba94449e7..6e06b99bbe03 100644 --- a/node/subsystem-util/src/runtime/mod.rs +++ b/node/subsystem-util/src/runtime/mod.rs @@ -118,19 +118,6 @@ impl RuntimeInfo { } } - /// Create an instance with pre-populated cache. Used only for testing - pub fn new_with_cache( - cfg: Config, - data: Vec<(SessionIndex, Hash, ExtendedSessionInfo)>, - ) -> Self { - let mut r = Self::new_with_config(cfg); - for (idx, parent, session) in data { - r.session_index_cache.put(parent, idx); - r.session_info_cache.put(idx, session); - } - r - } - /// Returns the session index expected at any child of the `parent` block. /// This does not return the session index for the `parent` block. pub async fn get_session_index_for_child( From 70d136f32837bdd9b9f1ec77c57eb1d4ad120bb7 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 2 May 2023 18:33:33 +0300 Subject: [PATCH 18/24] Fix all tests --- node/core/approval-voting/src/tests.rs | 28 ++++++++++++++------------ 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index 8c1fcf08985f..cd1a267af3ed 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -900,19 +900,21 @@ async fn import_block( } ); } else { - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - req_block_hash, - RuntimeApiRequest::SessionInfo(idx, si_tx), - ) - ) => { - // assert_eq!(session, idx); - // assert_eq!(req_block_hash, hashes[(number-1) as usize].0); - si_tx.send(Ok(Some(session_info.clone()))).unwrap(); - } - ); + if !fork { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionInfo(idx, si_tx), + ) + ) => { + // assert_eq!(session, idx); + // assert_eq!(req_block_hash, hashes[(number-1) as usize].0); + si_tx.send(Ok(Some(session_info.clone()))).unwrap(); + } + ); + } assert_matches!( overseer_recv(overseer).await, From 6e348c958f62da8b92647e8667dcc1793d1b19ea Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 2 May 2023 20:45:30 +0300 Subject: [PATCH 19/24] Fixes in tests --- node/core/approval-voting/src/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index cd1a267af3ed..9e8295710da0 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -909,8 +909,8 @@ async fn import_block( RuntimeApiRequest::SessionInfo(idx, si_tx), ) ) => { - // assert_eq!(session, idx); - // assert_eq!(req_block_hash, hashes[(number-1) as usize].0); + // Make sure all SessionInfo calls are not made for the leaf (but for its relay parent) + assert_ne!(req_block_hash, hashes[(number-1) as usize].0); si_tx.send(Ok(Some(session_info.clone()))).unwrap(); } ); From 4fa4403322eabc68c576b6863bc73de07062004a Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 2 May 2023 20:55:29 +0300 Subject: [PATCH 20/24] Fix comments, variable names and small style changes --- node/core/approval-voting/src/import.rs | 29 +++++++++++++------------ node/core/approval-voting/src/tests.rs | 1 + 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 6e5996df39fa..38666f0ee49b 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -159,7 +159,7 @@ async fn imported_block_info( return Err(ImportedBlockInfoError::FutureCancelled("SessionIndexForChild", error)), }; - // If we can't determine if the block is finalized or not - try processing it + // We can't determine if the block is finalized or not - try processing it if last_finalized_height.map_or(false, |finalized| block_header.number < finalized) { gum::debug!( target: LOG_TARGET, @@ -657,13 +657,14 @@ pub(crate) mod tests { } fn single_session_state() -> (State, RuntimeInfo) { - let runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), - }); - - (blank_state(), runtime_info) + ( + blank_state(), + RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }), + ) } struct MockAssignmentCriteria; @@ -771,7 +772,7 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) .expect("DISPUTE_WINDOW can't be 0; qed."), @@ -780,7 +781,7 @@ pub(crate) mod tests { let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - runtime_info: &mut session_info_provider, + runtime_info: &mut runtime_info, assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -895,7 +896,7 @@ pub(crate) mod tests { .collect::>(); let test_fut = { - let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) .expect("DISPUTE_WINDOW can't be 0; qed."), @@ -904,7 +905,7 @@ pub(crate) mod tests { let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - runtime_info: &mut session_info_provider, + runtime_info: &mut runtime_info, assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; @@ -1111,7 +1112,7 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) .expect("DISPUTE_WINDOW can't be 0; qed."), @@ -1120,7 +1121,7 @@ pub(crate) mod tests { let header = header.clone(); Box::pin(async move { let env = ImportedBlockInfoEnv { - runtime_info: &mut session_info_provider, + runtime_info: &mut runtime_info, assignment_criteria: &MockAssignmentCriteria, keystore: &LocalKeystore::in_memory(), }; diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index 9e8295710da0..034098392b31 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -901,6 +901,7 @@ async fn import_block( ); } else { if !fork { + // SessionInfo won't be called for forks - it's already cached assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi( From dc0542acf8341909c4969170c08c5aa338a0ea7c Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 2 May 2023 22:51:03 +0300 Subject: [PATCH 21/24] Fix a warning --- node/core/approval-voting/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index 034098392b31..d7e19a8c09f3 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -907,7 +907,7 @@ async fn import_block( AllMessages::RuntimeApi( RuntimeApiMessage::Request( req_block_hash, - RuntimeApiRequest::SessionInfo(idx, si_tx), + RuntimeApiRequest::SessionInfo(_, si_tx), ) ) => { // Make sure all SessionInfo calls are not made for the leaf (but for its relay parent) From 233478e1987886c58e8f9030a6fa221d43035f82 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 5 May 2023 11:18:27 +0300 Subject: [PATCH 22/24] impl From for NonZeroUsize --- node/core/approval-voting/src/import.rs | 17 ++++++----------- node/core/approval-voting/src/lib.rs | 3 +-- node/primitives/src/lib.rs | 8 +++++++- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 38666f0ee49b..8de52e648cc7 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -622,7 +622,7 @@ pub(crate) mod tests { use sp_core::{crypto::VrfSigner, testing::TaskExecutor}; use sp_keyring::sr25519::Keyring as Sr25519Keyring; pub(crate) use sp_runtime::{Digest, DigestItem}; - use std::{num::NonZeroUsize, pin::Pin, sync::Arc}; + use std::{pin::Pin, sync::Arc}; use crate::{approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry}; @@ -661,8 +661,7 @@ pub(crate) mod tests { blank_state(), RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), + session_cache_lru_size: DISPUTE_WINDOW.into(), }), ) } @@ -774,8 +773,7 @@ pub(crate) mod tests { let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), + session_cache_lru_size: DISPUTE_WINDOW.into(), }); let header = header.clone(); @@ -898,8 +896,7 @@ pub(crate) mod tests { let test_fut = { let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), + session_cache_lru_size: DISPUTE_WINDOW.into(), }); let header = header.clone(); @@ -1015,8 +1012,7 @@ pub(crate) mod tests { let test_fut = { let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), + session_cache_lru_size: DISPUTE_WINDOW.into(), }); let header = header.clone(); @@ -1114,8 +1110,7 @@ pub(crate) mod tests { let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), + session_cache_lru_size: DISPUTE_WINDOW.into(), }); let header = header.clone(); diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index f61dae4293c9..043ecd923c27 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -782,8 +782,7 @@ where // `None` on start-up. Gets initialized/updated on leaf update let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, - session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) - .expect("DISPUTE_WINDOW can't be 0; qed."), + session_cache_lru_size: DISPUTE_WINDOW.into(), }); let mut wakeups = Wakeups::default(); let mut currently_checking_set = CurrentlyCheckingSet::default(); diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 18e043be9c4d..1177dbc17caa 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -22,7 +22,7 @@ #![deny(missing_docs)] -use std::pin::Pin; +use std::{num::NonZeroUsize, pin::Pin}; use bounded_vec::BoundedVec; use futures::Future; @@ -143,6 +143,12 @@ impl SessionWindowSize { } } +impl From for NonZeroUsize { + fn from(value: SessionWindowSize) -> Self { + NonZeroUsize::new(value.get() as usize).expect("SessionWindowSize can't be 0. qed.") + } +} + /// The cumulative weight of a block in a fork-choice rule. pub type BlockWeight = u32; From d27220ad881ac7a58806504fde9954c50a010a32 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Sat, 6 May 2023 09:52:40 +0300 Subject: [PATCH 23/24] Fix logging for `get_session_info` - remove redundant logs and decrease log level to DEBUG --- node/core/approval-voting/src/import.rs | 20 ++--------------- node/core/approval-voting/src/lib.rs | 29 +++---------------------- 2 files changed, 5 insertions(+), 44 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 8de52e648cc7..d65a7ddb100f 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -212,15 +212,7 @@ async fn imported_block_info( let session_info = match get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await { Some(session_info) => session_info, - None => { - gum::error!( - target: LOG_TARGET, - relay_parent = ?block_hash, - session = session_index, - "Session info unavailable" - ); - return Err(ImportedBlockInfoError::SessionInfoUnavailable) - }, + None => return Err(ImportedBlockInfoError::SessionInfoUnavailable), }; let (assignments, slot, relay_vrf_story) = { @@ -451,15 +443,7 @@ pub(crate) async fn handle_new_head( .await { Some(session_info) => session_info, - None => { - gum::error!( - target: LOG_TARGET, - session = session_index, - ?head, - "Can't get session info for the new head" - ); - return Ok(Vec::new()) - }, + None => return Ok(Vec::new()), }; let (block_tick, no_show_duration) = { diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 043ecd923c27..18b8746ca317 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -652,7 +652,7 @@ where { Ok(extended_info) => Some(&extended_info.session_info), Err(_) => { - gum::error!( + gum::debug!( target: LOG_TARGET, session = session_index, ?relay_parent, @@ -695,14 +695,7 @@ impl State { .await { Some(s) => s, - None => { - gum::warn!( - target: LOG_TARGET, - "Unknown session info for {}", - block_entry.session() - ); - return None - }, + None => return None, }; let block_hash = block_entry.block_hash(); @@ -2318,16 +2311,7 @@ async fn process_wakeup( .await { Some(i) => i, - None => { - gum::warn!( - target: LOG_TARGET, - "Missing session info for live block {} in session {}", - relay_block, - block_entry.session(), - ); - - return Ok(Vec::new()) - }, + None => return Ok(Vec::new()), }; let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot()); @@ -2701,13 +2685,6 @@ async fn issue_approval( { Some(s) => s, None => { - gum::warn!( - target: LOG_TARGET, - "Missing session info for live block {} in session {}", - block_hash, - block_entry.session(), - ); - metrics.on_approval_error(); return Ok(Vec::new()) }, From 2ed7dfd7c0964f102db9634daa3ba69acb80d4b1 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 9 May 2023 11:55:20 +0300 Subject: [PATCH 24/24] Code review feedback --- node/core/approval-voting/src/import.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 67ba48e70702..1ea2687a0246 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -209,11 +209,9 @@ async fn imported_block_info( } }; - let session_info = - match get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await { - Some(session_info) => session_info, - None => return Err(ImportedBlockInfoError::SessionInfoUnavailable), - }; + let session_info = get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index) + .await + .ok_or(ImportedBlockInfoError::SessionInfoUnavailable)?; let (assignments, slot, relay_vrf_story) = { let unsafe_vrf = approval_types::babe_unsafe_vrf_info(&block_header);