Skip to content

Commit

Permalink
Reorganized Activator as Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 28, 2024
1 parent d8d4d42 commit 4e20406
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 41 deletions.
3 changes: 2 additions & 1 deletion timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,11 @@ where
self.propagate_pointstamps();

{ // Enqueue active children; scoped to let borrow drop.
use crate::scheduling::activate::Scheduler;
let temp_active = &mut self.temp_active;
self.activations
.borrow_mut()
.for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));
.extensions(&self.path[..], temp_active);
}

// Schedule child operators.
Expand Down
66 changes: 41 additions & 25 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,45 @@ use std::time::{Duration, Instant};
use std::cmp::Reverse;
use crossbeam_channel::{Sender, Receiver};

/// Methods required to act as a timely scheduler.
/// Methods required to act as a scheduler for timely operators.
///
/// The core methods are the activation of "paths", sequences of integers, and
/// the enumeration of active paths by prefix. A scheduler may delay the report
/// of a path indefinitely, but it should report at least one extension for the
/// empty path `&[]` or risk parking the worker thread without a certain unpark.
/// Operators are described by "paths" of integers, indicating the path along
/// a tree of regions, arriving at the the operator. Each path is either "idle"
/// or "active", where the latter indicates that someone has requested that the
/// operator be scheduled by the worker. Operators go from idle to active when
/// the `activate(path)` method is called, and from active to idle when the path
/// is returned through a call to `extensions(path, _)`.
///
/// There is no known harm to "spurious wake-ups" where a not-active path is
/// returned through `extensions()`.
/// The worker will continually probe for extensions to the root empty path `[]`,
/// and then follow all returned addresses, recursively. A scheduler need not
/// schedule all active paths, but it should return *some* active path when the
/// worker probes the empty path, or the worker may put the thread to sleep.
///
/// There is no known harm to scheduling an idle path.
/// The worker may speculatively schedule paths of its own accord.
pub trait Scheduler {
/// Mark a path as immediately scheduleable.
///
/// The scheduler is not required to immediately schedule the path, but it
/// should not signal that it has no work until the path has been scheduled.
fn activate(&mut self, path: &[usize]);
/// Populates `dest` with next identifiers on active extensions of `path`.
///
/// This method is where a scheduler is allowed to exercise some discretion,
/// in that it does not need to present *all* extensions, but it can instead
/// present only those that the runtime should schedule.
fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>);
/// present only those that the runtime should immediately schedule.
///
/// The worker *will* schedule all extensions before probing new prefixes.
/// The scheduler is invited to rely on this, and to schedule in "batches",
/// where the next time the worker probes for extensions to the empty path
/// then all addresses in the batch have certainly been scheduled.
fn extensions(&mut self, path: &[usize], dest: &mut BinaryHeap<Reverse<usize>>);
}

// Trait objects can be schedulers too.
impl Scheduler for Box<dyn Scheduler> {
fn activate(&mut self, path: &[usize]) { (**self).activate(path) }
fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>) { (**self).extensions(path, dest) }
fn extensions(&mut self, path: &[usize], dest: &mut BinaryHeap<Reverse<usize>>) { (**self).extensions(path, dest) }
}

/// Allocation-free activation tracker.
Expand Down Expand Up @@ -93,7 +108,7 @@ impl Activations {
}

/// Discards the current active set and presents the next active set.
pub fn advance(&mut self) {
fn advance(&mut self) {

// Drain inter-thread activations.
while let Ok(path) = self.rx.try_recv() {
Expand Down Expand Up @@ -128,15 +143,15 @@ impl Activations {
self.clean = self.bounds.len();
}

/// Maps a function across activated paths.
pub fn map_active(&self, logic: impl Fn(&[usize])) {
for (offset, length) in self.bounds.iter() {
logic(&self.slices[*offset .. (*offset + *length)]);
}
}

/// Sets as active any symbols that follow `path`.
pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {
fn for_extensions(&mut self, path: &[usize], mut action: impl FnMut(usize)) {

// Each call for the root path is a moment where the worker has reset.
// This relies on a worker implementation that follows the scheduling
// instructions perfectly; if any offered paths are not explored, oops.
if path.is_empty() {
self.advance();
}

let position =
self.bounds[..self.clean]
Expand Down Expand Up @@ -209,13 +224,14 @@ impl Activations {
std::thread::park();
}
}
}

/// True iff there are no immediate activations.
///
/// Used by others to guard work done in anticipation of potentially parking.
/// An alternate method name could be `would_park`.
pub fn is_idle(&self) -> bool {
self.bounds.is_empty() && self.timer.is_none()
impl Scheduler for Activations {
fn activate(&mut self, path: &[usize]) {
self.activate(path);
}
fn extensions(&mut self, path: &[usize], dest: &mut BinaryHeap<Reverse<usize>>) {
self.for_extensions(path, |index| dest.push(Reverse(index)));
}
}

Expand Down
34 changes: 19 additions & 15 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
use std::rc::Rc;
use std::cell::{RefCell, RefMut};
use std::cmp::Reverse;
use std::any::Any;
use std::str::FromStr;
use std::time::{Instant, Duration};
use std::collections::HashMap;
use std::collections::{HashMap, BinaryHeap};
use std::collections::hash_map::Entry;
use std::sync::Arc;

Expand Down Expand Up @@ -221,7 +222,7 @@ pub struct Worker<A: Allocate> {
logging: Option<Rc<RefCell<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>>,

activations: Rc<RefCell<Activations>>,
active_dataflows: Vec<usize>,
active_dataflows: BinaryHeap<Reverse<usize>>,

// Temporary storage for channel identifiers during dataflow construction.
// These are then associated with a dataflow once constructed.
Expand Down Expand Up @@ -355,12 +356,20 @@ impl<A: Allocate> Worker<A> {
}
}

// Organize activations.
self.activations
.borrow_mut()
.advance();
// Commence a new round of scheduling, starting with dataflows.
// We probe the scheduler for active prefixes, where an empty response
// indicates that the scheduler has no work for us at the moment.
{ // Scoped to let borrow of `self.active_dataflows` drop.
use crate::scheduling::activate::Scheduler;
let active_dataflows = &mut self.active_dataflows;
self.activations
.borrow_mut()
.extensions(&[], active_dataflows);
}

// If no dataflows are active, there is nothing to do. Consider parking.
if self.active_dataflows.is_empty() {

if self.activations.borrow().is_idle() {
// If the timeout is zero, don't bother trying to park.
// More generally, we could put some threshold in here.
if timeout != Some(Duration::new(0, 0)) {
Expand All @@ -378,15 +387,10 @@ impl<A: Allocate> Worker<A> {
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
}
}
else { // Schedule active dataflows.

let active_dataflows = &mut self.active_dataflows;
self.activations
.borrow_mut()
.for_extensions(&[], |index| active_dataflows.push(index));
else { // Schedule all active dataflows.

let mut dataflows = self.dataflows.borrow_mut();
for index in active_dataflows.drain(..) {
for Reverse(index) in self.active_dataflows.drain() {
// Step dataflow if it exists, remove if not incomplete.
if let Entry::Occupied(mut entry) = dataflows.entry(index) {
// TODO: This is a moment at which a scheduling decision is being made.
Expand Down Expand Up @@ -725,7 +729,7 @@ impl<A: Allocate> Clone for Worker<A> {
dataflow_counter: self.dataflow_counter.clone(),
logging: self.logging.clone(),
activations: self.activations.clone(),
active_dataflows: Vec::new(),
active_dataflows: Default::default(),
temp_channel_ids: self.temp_channel_ids.clone(),
}
}
Expand Down

0 comments on commit 4e20406

Please sign in to comment.