Skip to content

Commit

Permalink
Merge pull request #827 from teozkr/chore/infallible-scheduler
Browse files Browse the repository at this point in the history
Remove `scheduler::Error`
  • Loading branch information
nightkr authored Feb 14, 2022
2 parents d71c445 + 77ac8a7 commit 4eb2d54
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 55 deletions.
9 changes: 8 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,11 @@ allow-git = []

[bans]
multiple-versions = "deny"
skip = []

[[bans.skip]]
# The following dependencies are still working on upgrading to 0.7:
# /~https://github.com/hyperium/h2/pull/603
# /~https://github.com/tower-rs/tower/pull/638
# /~https://github.com/tower-rs/tower-http/pull/221
name = "tokio-util"
version = "0.6"
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ latest = ["k8s-openapi/v1_22"]
deprecated = ["kube/deprecated-crd-v1beta1", "k8s-openapi/v1_21"]

[dev-dependencies]
tokio-util = "0.6.8"
tokio-util = "0.7.0"
assert-json-diff = "2.0.1"
validator = { version = "0.14.0", features = ["derive"] }
anyhow = "1.0.44"
Expand Down
2 changes: 1 addition & 1 deletion kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ bytes = { version = "1.1.0", optional = true }
tokio = { version = "1.14.0", features = ["time", "signal", "sync"], optional = true }
kube-core = { path = "../kube-core", version = "^0.69.0"}
jsonpath_lib = { version = "0.3.0", optional = true }
tokio-util = { version = "0.6.8", optional = true, features = ["io", "codec"] }
tokio-util = { version = "0.7.0", optional = true, features = ["io", "codec"] }
hyper = { version = "0.14.13", optional = true, features = ["client", "http1", "stream", "tcp"] }
hyper-tls = { version = "0.5.0", optional = true }
hyper-rustls = { version = "0.23.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ ahash = "0.7"
parking_lot = "0.11"
pin-project = "1.0.2"
tokio = { version = "1.14.0", features = ["time"] }
tokio-util = { version = "0.6.8", features = ["time"] }
tokio-util = { version = "0.7.0", features = ["time"] }
tracing = "0.1.29"
json-patch = "0.2.6"
serde_json = "1.0.68"
Expand Down
6 changes: 1 addition & 5 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
store::{Store, Writer},
ObjectRef,
},
scheduler::{self, scheduler, ScheduleRequest},
scheduler::{scheduler, ScheduleRequest},
utils::{
try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle,
KubeRuntimeStreamExt, StreamBackoff,
Expand Down Expand Up @@ -43,8 +43,6 @@ pub enum Error<ReconcilerErr: std::error::Error + 'static, QueueErr: std::error:
ObjectNotFound(ObjectRef<DynamicObject>),
#[error("reconciler for object {1} failed")]
ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
#[error("scheduler dequeue failed")]
SchedulerDequeueFailed(#[source] scheduler::Error),
#[error("event queue error")]
QueueError(#[source] QueueErr),
}
Expand Down Expand Up @@ -279,8 +277,6 @@ where
.right_future(),
}
})
.map_err(Error::SchedulerDequeueFailed)
.map(|res| res.and_then(|x| x))
.on_complete(async { tracing::debug!("applier runner terminated") })
},
)
Expand Down
18 changes: 8 additions & 10 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::future_hash_map::FutureHashMap;
use crate::scheduler::{self, ScheduleRequest, Scheduler};
use crate::scheduler::{ScheduleRequest, Scheduler};
use futures::{Future, Stream, StreamExt};
use pin_project::pin_project;
use std::{
Expand Down Expand Up @@ -43,14 +43,14 @@ where
F: Future + Unpin,
MkF: FnMut(&T) -> F,
{
type Item = scheduler::Result<F::Output>;
type Item = F::Output;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let slots = this.slots;
let scheduler = &mut this.scheduler;
let has_active_slots = match slots.poll_next_unpin(cx) {
Poll::Ready(Some(result)) => return Poll::Ready(Some(Ok(result))),
Poll::Ready(Some(result)) => return Poll::Ready(Some(result)),
Poll::Ready(None) => false,
Poll::Pending => true,
};
Expand All @@ -63,15 +63,14 @@ where
.hold_unless(|msg| !slots.contains_key(msg))
.poll_next_unpin(cx);
match next_msg_poll {
Poll::Ready(Some(Ok(msg))) => {
Poll::Ready(Some(msg)) => {
let msg_fut = (this.run_msg)(&msg);
assert!(
slots.insert(msg, msg_fut).is_none(),
"Runner tried to replace a running future.. please report this as a kube-rs bug!"
);
cx.waker().wake_by_ref();
}
Poll::Ready(Some(Err(err))) => break Poll::Ready(Some(Err(err))),
Poll::Ready(None) => {
break if has_active_slots {
// We're done listening for new messages, but still have some that
Expand All @@ -93,7 +92,7 @@ mod tests {
use crate::scheduler::{scheduler, ScheduleRequest};
use futures::{
channel::{mpsc, oneshot},
future, poll, SinkExt, TryStreamExt,
future, poll, SinkExt, StreamExt,
};
use std::{cell::RefCell, time::Duration};
use tokio::{
Expand All @@ -118,7 +117,7 @@ mod tests {
drop(mutex_ref);
})
})
.try_for_each(|_| async { Ok(()) }),
.for_each(|_| async {}),
);
sched_tx
.send(ScheduleRequest {
Expand All @@ -135,15 +134,14 @@ mod tests {
})
.await
.unwrap();
let ((), run) = future::join(
future::join(
async {
tokio::time::sleep(Duration::from_secs(5)).await;
drop(sched_tx);
},
runner,
)
.await;
run.unwrap();
// Validate that we actually ran both requests
assert_eq!(count, 2);
}
Expand All @@ -158,7 +156,7 @@ mod tests {
let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg));
// Start a background task that starts listening /before/ we enqueue the message
// We can't just use Stream::poll_next(), since that bypasses the waker system
Handle::current().spawn(async move { result_tx.send(runner.try_next().await.unwrap()).unwrap() });
Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() });
// Ensure that the background task actually gets to initiate properly, and starts polling the runner
yield_now().await;
sched_tx
Expand Down
55 changes: 19 additions & 36 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,9 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;
use tokio::time::{self, Instant};
use tokio::time::Instant;
use tokio_util::time::delay_queue::{self, DelayQueue};

#[derive(Debug, Error)]
pub enum Error {
#[error("timer failure: {0}")]
TimerError(#[source] time::error::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

/// A request to re-emit `message` at a given `Instant` (`run_at`).
#[derive(Debug)]
pub struct ScheduleRequest<T> {
Expand Down Expand Up @@ -95,24 +87,23 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
&mut self,
cx: &mut Context<'_>,
can_take_message: impl Fn(&T) -> bool,
) -> Poll<Result<T, time::error::Error>> {
) -> Poll<T> {
if let Some(msg) = self.pending.iter().find(|msg| can_take_message(*msg)).cloned() {
return Poll::Ready(Ok(self.pending.take(&msg).unwrap()));
return Poll::Ready(self.pending.take(&msg).unwrap());
}

loop {
match self.queue.poll_expired(cx) {
Poll::Ready(Some(Ok(msg))) => {
Poll::Ready(Some(msg)) => {
let msg = msg.into_inner();
self.scheduled.remove(&msg).expect(
"Expired message was popped from the Scheduler queue, but was not in the metadata map",
);
"Expired message was popped from the Scheduler queue, but was not in the metadata map",
);
if can_take_message(&msg) {
break Poll::Ready(Ok(msg));
break Poll::Ready(msg);
}
self.pending.insert(msg);
}
Poll::Ready(Some(Err(err))) => break Poll::Ready(Err(err)),
Poll::Ready(None) | Poll::Pending => break Poll::Pending,
}
}
Expand All @@ -131,7 +122,7 @@ where
R: Stream<Item = ScheduleRequest<T>>,
C: Fn(&T) -> bool + Unpin,
{
type Item = Result<T>;
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Expand All @@ -147,7 +138,7 @@ where
}

match scheduler.poll_pop_queue_message(cx, &can_take_message) {
Poll::Ready(expired) => Poll::Ready(Some(expired.map_err(Error::TimerError))),
Poll::Ready(expired) => Poll::Ready(Some(expired)),
Poll::Pending => Poll::Pending,
}
}
Expand Down Expand Up @@ -187,7 +178,7 @@ where
T: Eq + Hash + Clone,
R: Stream<Item = ScheduleRequest<T>>,
{
type Item = Result<T>;
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.hold_unless(|_| true)).poll_next(cx)
Expand Down Expand Up @@ -239,9 +230,7 @@ mod tests {
assert!(poll!(scheduler.as_mut().hold_unless(|_| false).next()).is_pending());
assert!(scheduler.contains_pending(&1));
assert_eq!(
unwrap_poll(poll!(scheduler.as_mut().hold_unless(|_| true).next()))
.unwrap()
.unwrap(),
unwrap_poll(poll!(scheduler.as_mut().hold_unless(|_| true).next())).unwrap(),
1_u8
);
assert!(!scheduler.contains_pending(&1));
Expand Down Expand Up @@ -272,7 +261,7 @@ mod tests {
drop(tx);
},
async {
assert_eq!(scheduler.next().await.unwrap().unwrap(), 1);
assert_eq!(scheduler.next().await.unwrap(), 1);
assert!(scheduler.next().await.is_none())
},
)
Expand All @@ -295,13 +284,7 @@ mod tests {
.on_complete(sleep(Duration::from_secs(2))),
));
assert_eq!(
scheduler
.as_mut()
.hold_unless(|x| *x != 1)
.next()
.await
.unwrap()
.unwrap(),
scheduler.as_mut().hold_unless(|x| *x != 1).next().await.unwrap(),
2
);
}
Expand All @@ -325,10 +308,10 @@ mod tests {
pin_mut!(scheduler);
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), 1);
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap(), 1);
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), 2);
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap(), 2);
// Stream has terminated
assert!(scheduler.next().await.is_none());
}
Expand All @@ -352,7 +335,7 @@ mod tests {
pin_mut!(scheduler);
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
scheduler.next().now_or_never().unwrap().unwrap();
// Stream has terminated
assert!(scheduler.next().await.is_none());
}
Expand All @@ -376,7 +359,7 @@ mod tests {
pin_mut!(scheduler);
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
scheduler.next().now_or_never().unwrap().unwrap();
// Stream has terminated
assert!(scheduler.next().await.is_none());
}
Expand All @@ -395,7 +378,7 @@ mod tests {
.unwrap();
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
scheduler.next().now_or_never().unwrap().unwrap();
assert!(poll!(scheduler.next()).is_pending());
schedule_tx
.send(ScheduleRequest {
Expand All @@ -406,7 +389,7 @@ mod tests {
.unwrap();
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
scheduler.next().now_or_never().unwrap().unwrap();
assert!(poll!(scheduler.next()).is_pending());
}
}

0 comments on commit 4eb2d54

Please sign in to comment.