Skip to content

Commit

Permalink
Allow event iterators to surface owned data (#627)
Browse files Browse the repository at this point in the history
* Allow event iterators to surface owned data

Previously, event iterators were forced to handing out references to data,
even if they could return owned data. This is not great because it requires
the replay operator to clone the data to send it downstream.

With this change, the event iterator can surface either owned or shared
data, which allows the `Rc<EventLink>` to surface owned data when it is
uniquely owned, and references when it is shared. This avoids cloning data
when there is only a single replay operator attached to an event link.

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>

* Cow instead of Result

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>

---------

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru authored Jan 21, 2025
1 parent 09994a8 commit 2081554
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
28 changes: 18 additions & 10 deletions timely/src/dataflow/operators/core/capture/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ pub enum Event<T, C> {

/// Iterates over contained `Event<T, C>`.
///
/// The `EventIterator` trait describes types that can iterate over references to events,
/// The `EventIterator` trait describes types that can iterate over `Cow`s of events,
/// and which can be used to replay a stream into a new timely dataflow computation.
///
/// This method is not simply an iterator because of the lifetime in the result.
pub trait EventIterator<T, C> {
/// Iterates over references to `Event<T, C>` elements.
fn next(&mut self) -> Option<&Event<T, C>>;
pub trait EventIterator<T: Clone, C: Clone> {
/// Iterates over `Cow<Event<T, C>>` elements.
fn next(&mut self) -> Option<std::borrow::Cow<Event<T, C>>>;
}

/// Receives `Event<T, C>` events.
Expand All @@ -45,6 +45,7 @@ impl<T, C> EventPusher<T, C> for ::std::sync::mpsc::Sender<Event<T, C>> {
/// A linked-list event pusher and iterator.
pub mod link {

use std::borrow::Cow;
use std::rc::Rc;
use std::cell::RefCell;

Expand Down Expand Up @@ -77,13 +78,18 @@ pub mod link {
}
}

impl<T, C> EventIterator<T, C> for Rc<EventLink<T, C>> {
fn next(&mut self) -> Option<&Event<T, C>> {
impl<T: Clone, C: Clone> EventIterator<T, C> for Rc<EventLink<T, C>> {
fn next(&mut self) -> Option<Cow<Event<T, C>>> {
let is_some = self.next.borrow().is_some();
if is_some {
let next = self.next.borrow().as_ref().unwrap().clone();
*self = next;
self.event.as_ref()
if let Some(this) = Rc::get_mut(self) {
this.event.take().map(Cow::Owned)
}
else {
self.event.as_ref().map(Cow::Borrowed)
}
}
else {
None
Expand Down Expand Up @@ -121,6 +127,8 @@ pub mod link {
/// A binary event pusher and iterator.
pub mod binary {

use std::borrow::Cow;

use serde::{de::DeserializeOwned, Serialize};

use super::{Event, EventPusher, EventIterator};
Expand Down Expand Up @@ -164,10 +172,10 @@ pub mod binary {
}
}

impl<T: DeserializeOwned, C: DeserializeOwned, R: ::std::io::Read> EventIterator<T, C> for EventReader<T, C, R> {
fn next(&mut self) -> Option<&Event<T, C>> {
impl<T: DeserializeOwned + Clone, C: DeserializeOwned + Clone, R: ::std::io::Read> EventIterator<T, C> for EventReader<T, C, R> {
fn next(&mut self) -> Option<Cow<Event<T, C>>> {
self.decoded = ::bincode::deserialize_from(&mut self.reader).ok();
self.decoded.as_ref()
self.decoded.take().map(Cow::Owned)
}
}
}
11 changes: 9 additions & 2 deletions timely/src/dataflow/operators/core/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,18 @@ where

for event_stream in event_streams.iter_mut() {
while let Some(event) = event_stream.next() {
use std::borrow::Cow::*;
match event {
Event::Progress(vec) => {
Owned(Event::Progress(vec)) => {
progress.internals[0].extend(vec.into_iter());
},
Owned(Event::Messages(time, mut data)) => {
output.session(&time).give_container(&mut data);
}
Borrowed(Event::Progress(vec)) => {
progress.internals[0].extend(vec.iter().cloned());
},
Event::Messages(ref time, data) => {
Borrowed(Event::Messages(time, data)) => {
allocation.clone_from(data);
output.session(time).give_container(&mut allocation);
}
Expand Down

0 comments on commit 2081554

Please sign in to comment.