Skip to content

Commit

Permalink
Simplify and Refactor pre_lock_filter (#4980)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Feb 27, 2025
1 parent 1ecd4ea commit 64a8871
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 54 deletions.
25 changes: 11 additions & 14 deletions core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
prio_graph_scheduler::{
Batches, PrioGraphScheduler, TransactionSchedulingError, TransactionSchedulingInfo,
},
scheduler::{Scheduler, SchedulingSummary},
scheduler::{PreLockFilterAction, Scheduler, SchedulingSummary},
scheduler_error::SchedulerError,
thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet, TryLockError},
transaction_priority_id::TransactionPriorityId,
Expand Down Expand Up @@ -82,7 +82,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
&mut self,
container: &mut S,
_pre_graph_filter: impl Fn(&[&Tx], &mut [bool]),
pre_lock_filter: impl Fn(&Tx) -> bool,
pre_lock_filter: impl Fn(&TransactionState<Tx>) -> PreLockFilterAction,
) -> Result<SchedulingSummary, SchedulerError> {
let num_threads = self.consume_work_senders.len();
let target_cu_per_thread = self.config.target_scheduled_cus / num_threads as u64;
Expand All @@ -99,7 +99,6 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
}

// Track metrics on filter.
let mut num_filtered_out: usize = 0;
let mut num_scanned: usize = 0;
let mut num_scheduled: usize = 0;
let mut num_sent: usize = 0;
Expand Down Expand Up @@ -148,10 +147,6 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
)
},
) {
Err(TransactionSchedulingError::Filtered) => {
num_filtered_out += 1;
container.remove_by_id(id.id);
}
Err(TransactionSchedulingError::UnschedulableConflicts)
| Err(TransactionSchedulingError::UnschedulableThread) => {
num_unschedulable += 1;
Expand Down Expand Up @@ -207,7 +202,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
Ok(SchedulingSummary {
num_scheduled,
num_unschedulable,
num_filtered_out,
num_filtered_out: 0,
filter_time_us: 0,
})
}
Expand Down Expand Up @@ -348,17 +343,17 @@ impl<Tx: TransactionWithMeta> GreedyScheduler<Tx> {

fn try_schedule_transaction<Tx: TransactionWithMeta>(
transaction_state: &mut TransactionState<Tx>,
pre_lock_filter: impl Fn(&Tx) -> bool,
pre_lock_filter: impl Fn(&TransactionState<Tx>) -> PreLockFilterAction,
account_locks: &mut ThreadAwareAccountLocks,
schedulable_threads: ThreadSet,
thread_selector: impl Fn(ThreadSet) -> ThreadId,
) -> Result<TransactionSchedulingInfo<Tx>, TransactionSchedulingError> {
let transaction = &transaction_state.transaction_ttl().transaction;
if !pre_lock_filter(transaction) {
return Err(TransactionSchedulingError::Filtered);
match pre_lock_filter(transaction_state) {
PreLockFilterAction::AttemptToSchedule => {}
}

// Schedule the transaction if it can be.
let transaction = &transaction_state.transaction_ttl().transaction;
let account_keys = transaction.account_keys();
let write_account_locks = account_keys
.iter()
Expand Down Expand Up @@ -518,8 +513,10 @@ mod test {
results.fill(true);
}

fn test_pre_lock_filter(_tx: &RuntimeTransaction<SanitizedTransaction>) -> bool {
true
fn test_pre_lock_filter(
_tx: &TransactionState<RuntimeTransaction<SanitizedTransaction>>,
) -> PreLockFilterAction {
PreLockFilterAction::AttemptToSchedule
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::{
in_flight_tracker::InFlightTracker,
scheduler::Scheduler,
scheduler::{PreLockFilterAction, Scheduler},
scheduler_error::SchedulerError,
thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet, TryLockError},
transaction_state::SanitizedTransactionTTL,
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
&mut self,
container: &mut S,
pre_graph_filter: impl Fn(&[&Tx], &mut [bool]),
pre_lock_filter: impl Fn(&Tx) -> bool,
pre_lock_filter: impl Fn(&TransactionState<Tx>) -> PreLockFilterAction,
) -> Result<SchedulingSummary, SchedulerError> {
let num_threads = self.consume_work_senders.len();
let max_cu_per_thread = self.config.max_scheduled_cus / num_threads as u64;
Expand Down Expand Up @@ -233,9 +233,6 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
);

match maybe_schedule_info {
Err(TransactionSchedulingError::Filtered) => {
container.remove_by_id(id.id);
}
Err(TransactionSchedulingError::UnschedulableConflicts)
| Err(TransactionSchedulingError::UnschedulableThread) => {
unschedulable_ids.push(id);
Expand Down Expand Up @@ -558,8 +555,6 @@ pub(crate) struct TransactionSchedulingInfo<Tx> {

/// Error type for reasons a transaction could not be scheduled.
pub(crate) enum TransactionSchedulingError {
/// Transaction was filtered out before locking.
Filtered,
/// Transaction cannot be scheduled due to conflicts, or
/// higher priority conflicting transactions are unschedulable.
UnschedulableConflicts,
Expand All @@ -569,18 +564,18 @@ pub(crate) enum TransactionSchedulingError {

fn try_schedule_transaction<Tx: TransactionWithMeta>(
transaction_state: &mut TransactionState<Tx>,
pre_lock_filter: impl Fn(&Tx) -> bool,
pre_lock_filter: impl Fn(&TransactionState<Tx>) -> PreLockFilterAction,
blocking_locks: &mut ReadWriteAccountSet,
account_locks: &mut ThreadAwareAccountLocks,
num_threads: usize,
thread_selector: impl Fn(ThreadSet) -> ThreadId,
) -> Result<TransactionSchedulingInfo<Tx>, TransactionSchedulingError> {
let transaction = &transaction_state.transaction_ttl().transaction;
if !pre_lock_filter(transaction) {
return Err(TransactionSchedulingError::Filtered);
match pre_lock_filter(transaction_state) {
PreLockFilterAction::AttemptToSchedule => {}
}

// Check if this transaction conflicts with any blocked transactions
let transaction = &transaction_state.transaction_ttl().transaction;
if !blocking_locks.check_locks(transaction) {
blocking_locks.take_locks(transaction);
return Err(TransactionSchedulingError::UnschedulableConflicts);
Expand Down Expand Up @@ -758,8 +753,10 @@ mod tests {
results.fill(true);
}

fn test_pre_lock_filter(_tx: &RuntimeTransaction<SanitizedTransaction>) -> bool {
true
fn test_pre_lock_filter(
_tx: &TransactionState<RuntimeTransaction<SanitizedTransaction>>,
) -> PreLockFilterAction {
PreLockFilterAction::AttemptToSchedule
}

#[test]
Expand Down Expand Up @@ -912,29 +909,6 @@ mod tests {
assert_eq!(collect_work(&work_receivers[1]).1, [vec![4], vec![5]]);
}

#[test]
fn test_schedule_pre_lock_filter() {
let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1);
let pubkey = Pubkey::new_unique();
let keypair = Keypair::new();
let mut container = create_container([
(&Keypair::new(), &[pubkey], 1, 1),
(&keypair, &[pubkey], 1, 2),
(&Keypair::new(), &[pubkey], 1, 3),
]);

// 2nd transaction should be filtered out and dropped before locking.
let pre_lock_filter = |tx: &RuntimeTransaction<SanitizedTransaction>| {
tx.message().fee_payer() != &keypair.pubkey()
};
let scheduling_summary = scheduler
.schedule(&mut container, test_pre_graph_filter, pre_lock_filter)
.unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(collect_work(&work_receivers[0]).1, vec![vec![2], vec![0]]);
}

#[test]
fn test_schedule_over_full_container() {
let (mut scheduler, _work_receivers, _finished_work_sender) = create_test_frame(1);
Expand Down
13 changes: 11 additions & 2 deletions core/src/banking_stage/transaction_scheduler/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use {
super::{scheduler_error::SchedulerError, transaction_state_container::StateContainer},
super::{
scheduler_error::SchedulerError, transaction_state::TransactionState,
transaction_state_container::StateContainer,
},
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
};

Expand All @@ -11,7 +14,7 @@ pub(crate) trait Scheduler<Tx: TransactionWithMeta> {
&mut self,
container: &mut S,
pre_graph_filter: impl Fn(&[&Tx], &mut [bool]),
pre_lock_filter: impl Fn(&Tx) -> bool,
pre_lock_filter: impl Fn(&TransactionState<Tx>) -> PreLockFilterAction,
) -> Result<SchedulingSummary, SchedulerError>;

/// Receive completed batches of transactions without blocking.
Expand All @@ -22,6 +25,12 @@ pub(crate) trait Scheduler<Tx: TransactionWithMeta> {
) -> Result<(usize, usize), SchedulerError>;
}

/// Action to be taken by pre-lock filter.
pub(crate) enum PreLockFilterAction {
/// Attempt to schedule the transaction.
AttemptToSchedule,
}

/// Metrics from scheduling transactions.
#[derive(Default, Debug, PartialEq, Eq)]
pub(crate) struct SchedulingSummary {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use {
super::{
receive_and_buffer::ReceiveAndBuffer,
scheduler::Scheduler,
scheduler::{PreLockFilterAction, Scheduler},
scheduler_error::SchedulerError,
scheduler_metrics::{
SchedulerCountMetrics, SchedulerLeaderDetectionMetrics, SchedulerTimingMetrics,
Expand Down Expand Up @@ -142,7 +142,7 @@ where
MAX_PROCESSING_AGE,
)
},
|_| true // no pre-lock filter for now
|_| PreLockFilterAction::AttemptToSchedule // no pre-lock filter for now
)?);

self.count_metrics.update(|count_metrics| {
Expand Down

0 comments on commit 64a8871

Please sign in to comment.