Skip to content

Commit

Permalink
added backoff mechanism to inbound bridge queue
Browse files Browse the repository at this point in the history
  • Loading branch information
svyatonik committed Jul 13, 2023
1 parent a283fb8 commit e19c82a
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 23 deletions.
1 change: 1 addition & 0 deletions bin/runtime-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Common types/functions that may be used by runtimes of all bridged chains.
#![cfg_attr(not(feature = "std"), no_std)]
#![recursion_limit = "1024"]

use crate::messages_call_ext::MessagesCallSubType;
use pallet_bridge_grandpa::CallSubType as GrandpaCallSubType;
Expand Down
30 changes: 26 additions & 4 deletions bin/runtime-common/src/messages_call_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.

use bp_messages::{ChainWithMessages, InboundLaneData, LaneId, MessageNonce};
use bp_messages::{
target_chain::MessageDispatch, ChainWithMessages, InboundLaneData, LaneId, MessageNonce,
};
use bp_runtime::AccountIdOf;
use frame_support::{dispatch::CallableCallFor, traits::IsSubType, RuntimeDebug};
use pallet_bridge_messages::{BridgedChainOf, Config, Pallet};
Expand Down Expand Up @@ -71,7 +73,13 @@ impl ReceiveMessagesProofInfo {
///
/// - or there are no bundled messages, but the inbound lane is blocked by too many unconfirmed
/// messages and/or unrewarded relayers.
fn is_obsolete(&self) -> bool {
fn is_obsolete(&self, is_dispatcher_active: bool) -> bool {
// TODO: maybe rename method to `is_accepted`, because it isn't about **obsolete** messages
// anymore if dispatcher is inactive, we don't accept any delivery transactions
if !is_dispatcher_active {
return true
}

// transactions with zero bundled nonces are not allowed, unless they're message
// delivery transactions, which brings reward confirmations required to unblock
// the lane
Expand Down Expand Up @@ -266,7 +274,9 @@ impl<

fn check_obsolete_call(&self) -> TransactionValidity {
match self.call_info() {
Some(CallInfo::ReceiveMessagesProof(proof_info)) if proof_info.is_obsolete() => {
Some(CallInfo::ReceiveMessagesProof(proof_info))
if proof_info.is_obsolete(T::MessageDispatch::is_active()) =>
{
log::trace!(
target: pallet_bridge_messages::LOG_TARGET,
"Rejecting obsolete messages delivery transaction: {:?}",
Expand Down Expand Up @@ -315,7 +325,7 @@ mod tests {
use super::*;
use crate::{
messages_call_ext::MessagesCallSubType,
mock::{BridgedUnderlyingChain, TestRuntime, ThisChainRuntimeCall},
mock::{BridgedUnderlyingChain, DummyMessageDispatch, TestRuntime, ThisChainRuntimeCall},
};
use bp_messages::{
source_chain::FromBridgedChainMessagesDeliveryProof,
Expand Down Expand Up @@ -447,6 +457,18 @@ mod tests {
});
}

#[test]
fn extension_reject_call_when_dispatcher_is_inactive() {
run_test(|| {
// when current best delivered is message#10 and we're trying to deliver message 11..=15
// => tx is accepted
deliver_message_10();

DummyMessageDispatch::deactivate();
assert!(!validate_message_delivery(11, 15));
});
}

#[test]
fn extension_rejects_empty_delivery_with_rewards_confirmations_if_there_are_free_relayer_and_message_slots(
) {
Expand Down
6 changes: 6 additions & 0 deletions bin/runtime-common/src/messages_xcm_extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ impl<BlobDispatcher: DispatchBlob, Weights: MessagesPalletWeights> MessageDispat
type DispatchPayload = XcmAsPlainPayload;
type DispatchLevelResult = XcmBlobMessageDispatchResult;

fn is_active() -> bool {
// TODO: extend blob dispatcher with some queue-related methods + emulate queue
// at Rialto/Millau + proper implementation for HRMP/UMP queues
true
}

fn dispatch_weight(message: &mut DispatchMessage<Self::DispatchPayload>) -> Weight {
match message.data.payload {
Ok(ref payload) => {
Expand Down
37 changes: 34 additions & 3 deletions bin/runtime-common/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
use crate::messages_xcm_extension::XcmAsPlainPayload;

use bp_header_chain::ChainWithGrandpa;
use bp_messages::{target_chain::ForbidInboundMessages, ChainWithMessages, LaneId, MessageNonce};
use bp_messages::{
target_chain::{DispatchMessage, MessageDispatch},
ChainWithMessages, LaneId, MessageNonce,
};
use bp_parachains::SingleParaStoredHeaderDataBuilder;
use bp_relayers::PayRewardFromAccount;
use bp_runtime::{Chain, ChainId, Parachain};
use bp_runtime::{messages::MessageDispatchResult, Chain, ChainId, Parachain};
use frame_support::{
parameter_types,
weights::{ConstantMultiplier, IdentityFee, RuntimeDbWeight, Weight},
Expand Down Expand Up @@ -234,7 +237,7 @@ impl pallet_bridge_messages::Config for TestRuntime {
ConstU64<100_000>,
>;

type MessageDispatch = ForbidInboundMessages<Vec<u8>>;
type MessageDispatch = DummyMessageDispatch;
type ThisChain = ThisUnderlyingChain;
type BridgedChain = BridgedUnderlyingChain;
type BridgedHeaderChain = BridgeGrandpa;
Expand All @@ -248,6 +251,34 @@ impl pallet_bridge_relayers::Config for TestRuntime {
type WeightInfo = ();
}

/// Dummy message dispatcher.
pub struct DummyMessageDispatch;

impl DummyMessageDispatch {
pub fn deactivate() {
frame_support::storage::unhashed::put(&b"inactive"[..], &false);
}
}

impl MessageDispatch for DummyMessageDispatch {
type DispatchPayload = Vec<u8>;
type DispatchLevelResult = ();

fn is_active() -> bool {
frame_support::storage::unhashed::take::<bool>(&b"inactive"[..]) != Some(false)
}

fn dispatch_weight(_message: &mut DispatchMessage<Self::DispatchPayload>) -> Weight {
Weight::zero()
}

fn dispatch(
_: DispatchMessage<Self::DispatchPayload>,
) -> MessageDispatchResult<Self::DispatchLevelResult> {
MessageDispatchResult { unspent_weight: Weight::zero(), dispatch_level_result: () }
}
}

/// Underlying chain of `ThisChain`.
pub struct ThisUnderlyingChain;

Expand Down
32 changes: 26 additions & 6 deletions modules/messages/src/lanes_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::{
};

use bp_messages::{
ChainWithMessages, InboundLaneData, LaneId, LaneState, MessageKey, MessageNonce,
MessagePayload, OutboundLaneData, VerificationError,
target_chain::MessageDispatch, ChainWithMessages, InboundLaneData, LaneId, LaneState,
MessageKey, MessageNonce, MessagePayload, OutboundLaneData, VerificationError,
};
use bp_runtime::AccountIdOf;
use codec::{Decode, Encode, MaxEncodedLen};
Expand Down Expand Up @@ -145,10 +145,30 @@ impl<T: Config<I>, I: 'static> RuntimeInboundLaneStorage<T, I> {
) -> Result<RuntimeInboundLaneStorage<T, I>, LanesManagerError> {
let cached_data =
InboundLanes::<T, I>::get(lane_id).ok_or(LanesManagerError::UnknownInboundLane)?;
ensure!(
!check_active || cached_data.state.is_active(),
LanesManagerError::ClosedInboundLane
);

if check_active {
// check that the lane is not explicitly closed
ensure!(cached_data.state.is_active(), LanesManagerError::ClosedInboundLane);
// TODO: /~https://github.com/paritytech/parity-bridges-common/issues/2006 think of it
// apart from the explicit closure, the lane may be unable to receive any messages.
// Right now we do an additional check here, but it may be done later (e.g. by
// explicitly closing the lane and reopening it from
// `pallet-xcm-bridge-hub::on-initialize`)
//
// The fact that we only check it here, means that the `MessageDispatch` may switch
// to inactive state during some message dispatch in the middle of message delivery
// transaction. But we treat result of `MessageDispatch::is_active()` as a hint, so
// we know that it won't drop messages - just it experiences problems with processing.
// This would allow us to check that in our signed extensions, and invalidate
// transaction early, thus avoiding losing honest relayers funds. This problem should
// gone with relayers coordination protocol.
//
// There's a limit on number of messages in the message delivery transaction, so even
// if we dispatch (enqueue) some additional messages, we'll know the maximal queue
// length;
ensure!(T::MessageDispatch::is_active(), LanesManagerError::ClosedInboundLane);
}

Ok(RuntimeInboundLaneStorage {
lane_id,
cached_data: cached_data.into(),
Expand Down
23 changes: 17 additions & 6 deletions modules/messages/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,6 @@ impl pallet_bridge_grandpa::Config for TestRuntime {
type WeightInfo = pallet_bridge_grandpa::weights::BridgeWeight<TestRuntime>;
}

parameter_types! {
pub const MaxMessagesToPruneAtOnce: u64 = 10;
pub const TestBridgedChainId: bp_runtime::ChainId = *b"test";
}

/// weights of messages pallet calls we use in tests.
pub type TestWeightInfo = ();

Expand Down Expand Up @@ -386,10 +381,23 @@ impl DeliveryConfirmationPayments<AccountId> for TestDeliveryConfirmationPayment
#[derive(Debug)]
pub struct TestMessageDispatch;

impl TestMessageDispatch {
pub fn emulate_enqueued_message() {
let dispatched =
frame_support::storage::unhashed::get_or_default::<MessageNonce>(&b"dispatched"[..]);
frame_support::storage::unhashed::put(&b"dispatched"[..], &(dispatched + 1));
}
}

impl MessageDispatch for TestMessageDispatch {
type DispatchPayload = TestPayload;
type DispatchLevelResult = TestDispatchLevelResult;

fn is_active() -> bool {
frame_support::storage::unhashed::get_or_default::<MessageNonce>(&b"dispatched"[..]) <=
BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX
}

fn dispatch_weight(message: &mut DispatchMessage<TestPayload>) -> Weight {
match message.data.payload.as_ref() {
Ok(payload) => payload.declared_weight,
Expand All @@ -401,7 +409,10 @@ impl MessageDispatch for TestMessageDispatch {
message: DispatchMessage<TestPayload>,
) -> MessageDispatchResult<TestDispatchLevelResult> {
match message.data.payload.as_ref() {
Ok(payload) => payload.dispatch_result.clone(),
Ok(payload) => {
Self::emulate_enqueued_message();
payload.dispatch_result.clone()
},
Err(_) => dispatch_result(0),
}
}
Expand Down
89 changes: 85 additions & 4 deletions modules/messages/src/tests/pallet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use crate::{

use bp_messages::{
source_chain::FromBridgedChainMessagesDeliveryProof,
target_chain::FromBridgedChainMessagesProof, BridgeMessagesCall, ChainWithMessages,
DeliveredMessages, InboundLaneData, InboundMessageDetails, LaneId, LaneState, MessageKey,
MessageNonce, MessagesOperatingMode, OutboundLaneData, OutboundMessageDetails,
UnrewardedRelayer, UnrewardedRelayersState, VerificationError,
target_chain::{FromBridgedChainMessagesProof, MessageDispatch},
BridgeMessagesCall, ChainWithMessages, DeliveredMessages, InboundLaneData,
InboundMessageDetails, LaneId, LaneState, MessageKey, MessageNonce, MessagesOperatingMode,
OutboundLaneData, OutboundMessageDetails, UnrewardedRelayer, UnrewardedRelayersState,
VerificationError,
};
use bp_runtime::{BasicOperatingMode, PreComputedSize, Size};
use bp_test_utils::generate_owned_bridge_module_tests;
Expand Down Expand Up @@ -322,6 +323,86 @@ fn receive_messages_proof_updates_confirmed_message_nonce() {
});
}

#[test]
fn receive_messages_proof_fails_when_dispatcher_is_inactive() {
run_test(|| {
// "enqueue" enough (to deactivate dispatcher) messages at dispatcher
let latest_received_nonce = BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX + 1;
for _ in 1..=latest_received_nonce {
TestMessageDispatch::emulate_enqueued_message();
}
assert!(!TestMessageDispatch::is_active());
InboundLanes::<TestRuntime, ()>::insert(
test_lane_id(),
InboundLaneData {
state: LaneState::Opened,
last_confirmed_nonce: latest_received_nonce,
relayers: vec![].into(),
},
);

// try to delvier next message - it should fail because dispatcher is in "suspended" state
// at the beginning of the call
let messages_proof =
prepare_messages_proof(vec![message(latest_received_nonce + 1, REGULAR_PAYLOAD)], None);
assert_noop!(
Pallet::<TestRuntime>::receive_messages_proof(
RuntimeOrigin::signed(1),
TEST_RELAYER_A,
messages_proof,
1,
REGULAR_PAYLOAD.declared_weight,
),
Error::<TestRuntime, ()>::LanesManager(LanesManagerError::ClosedInboundLane)
);
assert!(!TestMessageDispatch::is_active());
});
}

#[test]
fn receive_messages_succeeds_when_dispatcher_becomes_inactive_in_the_middle_of_transaction() {
run_test(|| {
// "enqueue" enough (to deactivate dispatcher) messages at dispatcher
let latest_received_nonce = BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX / 2;
for _ in 1..=latest_received_nonce {
TestMessageDispatch::emulate_enqueued_message();
}
assert!(TestMessageDispatch::is_active());
InboundLanes::<TestRuntime, ()>::insert(
test_lane_id(),
InboundLaneData {
state: LaneState::Opened,
last_confirmed_nonce: latest_received_nonce,
relayers: vec![].into(),
},
);

// try to delvier next `BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX` messages
// - it will lead to dispatcher deactivation, but the transaction shall not fail and all
// messages must be delivered
let messages_begin = latest_received_nonce + 1;
let messages_end =
messages_begin + BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX;
let messages_range = messages_begin..messages_end;
let messages_count = BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX;
assert_ok!(Pallet::<TestRuntime>::receive_messages_proof(
RuntimeOrigin::signed(1),
TEST_RELAYER_A,
prepare_messages_proof(
messages_range.map(|nonce| message(nonce, REGULAR_PAYLOAD)).collect(),
None,
),
messages_count as _,
REGULAR_PAYLOAD.declared_weight * messages_count,
),);
assert_eq!(
inbound_unrewarded_relayers_state(test_lane_id()).last_delivered_nonce,
messages_end - 1,
);
assert!(!TestMessageDispatch::is_active());
});
}

#[test]
fn receive_messages_proof_does_not_accept_message_if_dispatch_weight_is_not_enough() {
run_test(|| {
Expand Down
10 changes: 10 additions & 0 deletions primitives/messages/src/target_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ pub trait MessageDispatch {
/// Fine-grained result of single message dispatch (for better diagnostic purposes)
type DispatchLevelResult: Clone + sp_std::fmt::Debug + Eq;

/// Returns `true` if dispatcher is ready to accept additional messages. The `false` should
/// be treated as a hint by both dispatcher and its consumers - i.e. dispatcher shall not
/// simply drop messages if it returns `false`. The consumer may still call the `dispatch`
/// if dispatcher has returned `false`.
fn is_active() -> bool;

/// Estimate dispatch weight.
///
/// This function must return correct upper bound of dispatch weight. The return value
Expand Down Expand Up @@ -168,6 +174,10 @@ impl<DispatchPayload: Decode> MessageDispatch for ForbidInboundMessages<Dispatch
type DispatchPayload = DispatchPayload;
type DispatchLevelResult = ();

fn is_active() -> bool {
false
}

fn dispatch_weight(_message: &mut DispatchMessage<Self::DispatchPayload>) -> Weight {
Weight::MAX
}
Expand Down

0 comments on commit e19c82a

Please sign in to comment.