Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Simplify TransactionState #5078

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 12 additions & 26 deletions core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
scheduler_error::SchedulerError,
thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet, TryLockError},
transaction_priority_id::TransactionPriorityId,
transaction_state::{SanitizedTransactionTTL, TransactionState},
transaction_state::TransactionState,
transaction_state_container::StateContainer,
},
crate::banking_stage::{
Expand Down Expand Up @@ -125,7 +125,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
// we should immediately send out the batches, so this transaction may be scheduled.
if !self
.working_account_set
.check_locks(&transaction_state.transaction_ttl().transaction)
.check_locks(transaction_state.transaction())
{
self.working_account_set.clear();
num_sent += self.send_batches(&mut batches)?;
Expand Down Expand Up @@ -241,7 +241,7 @@ impl<Tx: TransactionWithMeta> GreedyScheduler<Tx> {
batch_id,
ids,
transactions,
max_ages,
max_ages: _,
},
retryable_indexes,
}) => {
Expand All @@ -253,18 +253,10 @@ impl<Tx: TransactionWithMeta> GreedyScheduler<Tx> {

// Retryable transactions should be inserted back into the container
let mut retryable_iter = retryable_indexes.into_iter().peekable();
for (index, (id, transaction, max_age)) in
izip!(ids, transactions, max_ages).enumerate()
{
for (index, (id, transaction)) in izip!(ids, transactions).enumerate() {
if let Some(retryable_index) = retryable_iter.peek() {
if *retryable_index == index {
container.retry_transaction(
id,
SanitizedTransactionTTL {
transaction,
max_age,
},
);
container.retry_transaction(id, transaction);
retryable_iter.next();
continue;
}
Expand Down Expand Up @@ -353,7 +345,7 @@ fn try_schedule_transaction<Tx: TransactionWithMeta>(
}

// Schedule the transaction if it can be.
let transaction = &transaction_state.transaction_ttl().transaction;
let transaction = transaction_state.transaction();
let account_keys = transaction.account_keys();
let write_account_locks = account_keys
.iter()
Expand All @@ -379,13 +371,13 @@ fn try_schedule_transaction<Tx: TransactionWithMeta>(
}
};

let sanitized_transaction_ttl = transaction_state.transition_to_pending();
let (transaction, max_age) = transaction_state.take_transaction_for_scheduling();
let cost = transaction_state.cost();

Ok(TransactionSchedulingInfo {
thread_id,
transaction: sanitized_transaction_ttl.transaction,
max_age: sanitized_transaction_ttl.max_age,
transaction,
max_age,
cost,
})
}
Expand All @@ -396,10 +388,7 @@ mod test {
super::*,
crate::banking_stage::{
scheduler_messages::{MaxAge, TransactionId},
transaction_scheduler::{
transaction_state::SanitizedTransactionTTL,
transaction_state_container::TransactionStateContainer,
},
transaction_scheduler::transaction_state_container::TransactionStateContainer,
},
crossbeam_channel::unbounded,
itertools::Itertools,
Expand Down Expand Up @@ -476,13 +465,10 @@ mod test {
lamports,
compute_unit_price,
);
let transaction_ttl = SanitizedTransactionTTL {
transaction,
max_age: MaxAge::MAX,
};
const TEST_TRANSACTION_COST: u64 = 5000;
container.insert_new_transaction(
transaction_ttl,
transaction,
MaxAge::MAX,
compute_unit_price,
TEST_TRANSACTION_COST,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use {
scheduler::{PreLockFilterAction, Scheduler},
scheduler_error::SchedulerError,
thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet, TryLockError},
transaction_state::SanitizedTransactionTTL,
},
crate::banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
Expand Down Expand Up @@ -161,8 +160,8 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
*window_budget = window_budget.saturating_sub(chunk_size);

ids.iter().for_each(|id| {
let transaction = container.get_transaction_ttl(id.id).unwrap();
txs.push(&transaction.transaction);
let transaction = container.get_transaction(id.id).unwrap();
txs.push(transaction);
});

let (_, filter_us) =
Expand All @@ -171,7 +170,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {

for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) {
if *filter_result {
let transaction = container.get_transaction_ttl(id.id).unwrap();
let transaction = container.get_transaction(id.id).unwrap();
prio_graph.insert_transaction(
*id,
Self::get_transaction_account_access(transaction),
Expand Down Expand Up @@ -366,18 +365,12 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {

// Retryable transactions should be inserted back into the container
let mut retryable_iter = retryable_indexes.into_iter().peekable();
for (index, (id, transaction, max_age)) in
for (index, (id, transaction, _max_age)) in
izip!(ids, transactions, max_ages).enumerate()
{
if let Some(retryable_index) = retryable_iter.peek() {
if *retryable_index == index {
container.retry_transaction(
id,
SanitizedTransactionTTL {
transaction,
max_age,
},
);
container.retry_transaction(id, transaction);
retryable_iter.next();
continue;
}
Expand Down Expand Up @@ -485,9 +478,8 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {

/// Gets accessed accounts (resources) for use in `PrioGraph`.
fn get_transaction_account_access(
transaction: &SanitizedTransactionTTL<impl SVMMessage>,
message: &impl SVMMessage,
) -> impl Iterator<Item = (Pubkey, AccessKind)> + '_ {
let message = &transaction.transaction;
message
.account_keys()
.iter()
Expand Down Expand Up @@ -575,7 +567,7 @@ fn try_schedule_transaction<Tx: TransactionWithMeta>(
}

// Check if this transaction conflicts with any blocked transactions
let transaction = &transaction_state.transaction_ttl().transaction;
let transaction = transaction_state.transaction();
if !blocking_locks.check_locks(transaction) {
blocking_locks.take_locks(transaction);
return Err(TransactionSchedulingError::UnschedulableConflicts);
Expand Down Expand Up @@ -609,13 +601,13 @@ fn try_schedule_transaction<Tx: TransactionWithMeta>(
}
};

let sanitized_transaction_ttl = transaction_state.transition_to_pending();
let (transaction, max_age) = transaction_state.take_transaction_for_scheduling();
let cost = transaction_state.cost();

Ok(TransactionSchedulingInfo {
thread_id,
transaction: sanitized_transaction_ttl.transaction,
max_age: sanitized_transaction_ttl.max_age,
transaction,
max_age,
cost,
})
}
Expand Down Expand Up @@ -716,13 +708,11 @@ mod tests {
lamports,
compute_unit_price,
);
let transaction_ttl = SanitizedTransactionTTL {
transaction,
max_age: MaxAge::MAX,
};

const TEST_TRANSACTION_COST: u64 = 5000;
container.insert_new_transaction(
transaction_ttl,
transaction,
MaxAge::MAX,
compute_unit_price,
TEST_TRANSACTION_COST,
);
Expand Down
31 changes: 8 additions & 23 deletions core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use {
consumer::Consumer, decision_maker::BufferedPacketsDecision,
immutable_deserialized_packet::ImmutableDeserializedPacket,
packet_deserializer::PacketDeserializer, packet_filter::MAX_ALLOWED_PRECOMPILE_SIGNATURES,
scheduler_messages::MaxAge,
transaction_scheduler::transaction_state::SanitizedTransactionTTL,
TransactionStateContainer,
scheduler_messages::MaxAge, TransactionStateContainer,
},
agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
agave_transaction_view::{
Expand Down Expand Up @@ -239,12 +237,8 @@ impl SanitizedTransactionReceiveAndBuffer {

let (priority, cost) =
calculate_priority_and_cost(&transaction, &fee_budget_limits, &working_bank);
let transaction_ttl = SanitizedTransactionTTL {
transaction,
max_age,
};

if container.insert_new_transaction(transaction_ttl, priority, cost) {
if container.insert_new_transaction(transaction, max_age, priority, cost) {
saturating_add_assign!(num_dropped_on_capacity, 1);
}
saturating_add_assign!(num_buffered, 1);
Expand Down Expand Up @@ -409,10 +403,9 @@ impl TransactionViewReceiveAndBuffer {
let mut check_results = {
let mut transactions = ArrayVec::<_, EXTRA_CAPACITY>::new();
transactions.extend(transaction_priority_ids.iter().map(|priority_id| {
&container
.get_transaction_ttl(priority_id.id)
container
.get_transaction(priority_id.id)
.expect("transaction must exist")
.transaction
}));
working_bank.check_transactions::<RuntimeTransaction<_>>(
&transactions,
Expand All @@ -431,10 +424,9 @@ impl TransactionViewReceiveAndBuffer {
num_dropped_on_status_age_checks += 1;
container.remove_by_id(priority_id.id);
}
let transaction = &container
.get_transaction_ttl(priority_id.id)
.expect("transaction must exist")
.transaction;
let transaction = container
.get_transaction(priority_id.id)
.expect("transaction must exist");
if let Err(err) = Consumer::check_fee_payer_unlocked(
working_bank,
transaction,
Expand Down Expand Up @@ -595,14 +587,7 @@ impl TransactionViewReceiveAndBuffer {
let fee_budget_limits = FeeBudgetLimits::from(compute_budget_limits);
let (priority, cost) = calculate_priority_and_cost(&view, &fee_budget_limits, working_bank);

Ok(TransactionState::new(
SanitizedTransactionTTL {
transaction: view,
max_age,
},
priority,
cost,
))
Ok(TransactionState::new(view, max_age, priority, cost))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,9 @@ where
let sanitized_txs: Vec<_> = chunk
.iter()
.map(|id| {
&self
.container
.get_transaction_ttl(id.id)
self.container
.get_transaction(id.id)
.expect("transaction must exist")
.transaction
})
.collect();

Expand Down
Loading
Loading