From 1be4e73b5b21e7e98228e97a432d3dededf3a166 Mon Sep 17 00:00:00 2001 From: Eitan Mosenkis Date: Wed, 14 Dec 2016 08:39:29 +0200 Subject: [PATCH] Add support for reading asynchronously with mio::Evented and tokio_core::Stream. --- .gitignore | 1 + .travis.yml | 12 ++-- Cargo.toml | 9 ++- README.md | 3 + examples/tokio.rs | 60 +++++++++++++++++ src/lib.rs | 166 +++++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 243 insertions(+), 8 deletions(-) create mode 100644 examples/tokio.rs diff --git a/.gitignore b/.gitignore index f857248de..5e1b1d5cf 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ *.so *.rlib *.dll +*.bk # Executables *.exe diff --git a/.travis.yml b/.travis.yml index ced5a95de..fe76b400d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: rust sudo: false rust: - - 1.5.0 + - 1.10.0 - stable - beta - nightly @@ -25,9 +25,13 @@ env: script: - | - travis-cargo build && - travis-cargo test && - travis-cargo --only stable doc + travis-cargo build -- --features=tokio && + travis-cargo test -- --features=tokio && + travis-cargo build -- --features=mio-evented && + travis-cargo test -- --features=mio-evented && + travis-cargo build -- && + travis-cargo test -- && + travis-cargo --only stable doc -- --features=tokio after_success: - travis-cargo --only stable doc-upload diff --git a/Cargo.toml b/Cargo.toml index e31d37e1d..aa71e4067 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sysfs_gpio" -version = "0.4.4" +version = "0.5.0" authors = ["Paul Osborne "] license = "MIT/Apache-2.0" repository = "/~https://github.com/rust-embedded/rust-sysfs-gpio" @@ -15,6 +15,13 @@ interrupts) GPIOs from userspace. See https://www.kernel.org/doc/Documentation/gpio/sysfs.txt """ +[features] +mio-evented = ["mio"] +tokio = ["futures", "tokio-core", "mio-evented"] + [dependencies] +futures = { version = "0.1", optional = true } nix = "0.6.0" regex = "0.1.0" +mio = { version = "0.6", optional = true } +tokio-core = { version = "0.1", optional = true } diff --git a/README.md b/README.md index eb2463c06..d75c19c87 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ More Examples: - [Blink an LED](examples/blinky.rs) - [Poll a GPIO Input](examples/poll.rs) - [Receive interrupt on GPIO Change](examples/interrupt.rs) +- [Poll several pins asynchronously with Tokio](examples/tokio.rs) - [gpio-utils Project (uses most features)](/~https://github.com/rust-embedded/gpio-utils) Features @@ -81,6 +82,8 @@ The following features are planned for the library: - [ ] Support for configuring whether a pin is active low/high - [x] Support for configuring interrupts on GPIO - [x] Support for polling on GPIO with configured interrupt +- [x] Support for asynchronous polling using `mio` or `tokio-core` (requires + enabling the `mio-evented` or `tokio` crate features, respectively) Cross Compiling --------------- diff --git a/examples/tokio.rs b/examples/tokio.rs new file mode 100644 index 000000000..b0117b6a2 --- /dev/null +++ b/examples/tokio.rs @@ -0,0 +1,60 @@ +#[cfg(feature = "tokio")] +extern crate futures; +#[cfg(feature = "tokio")] +extern crate sysfs_gpio; +#[cfg(feature = "tokio")] +extern crate tokio_core; + +#[cfg(feature = "tokio")] +use futures::{Future, Stream}; +#[cfg(feature = "tokio")] +use sysfs_gpio::{Direction, Edge, Pin}; +#[cfg(feature = "tokio")] +use std::env; +#[cfg(feature = "tokio")] +use tokio_core::reactor::Core; + +#[cfg(feature = "tokio")] +fn stream(pin_nums: Vec) -> sysfs_gpio::Result<()> { + // NOTE: this currently runs forever and as such if + // the app is stopped (Ctrl-C), no cleanup will happen + // and the GPIO will be left exported. Not much + // can be done about this as Rust signal handling isn't + // really present at the moment. Revisit later. + let pins: Vec<_> = pin_nums.iter().map(|&p| (p, Pin::new(p))).collect(); + let mut l = try!(Core::new()); + let handle = l.handle(); + for &(i, ref pin) in pins.iter() { + try!(pin.export()); + try!(pin.set_direction(Direction::In)); + try!(pin.set_edge(Edge::BothEdges)); + handle.spawn(try!(pin.get_value_stream(&handle)) + .for_each(move |val| { + println!("Pin {} changed value to {}", i, val); + Ok(()) + }) + .map_err(|_| ())); + } + // Wait forever for events + loop { + l.turn(None) + } +} + +#[cfg(feature = "tokio")] +fn main() { + let pins: Vec = env::args() + .skip(1) + .map(|a| a.parse().expect("Pins must be specified as integers")) + .collect(); + if pins.is_empty() { + println!("Usage: ./tokio [pin ...]"); + } else { + stream(pins).unwrap(); + } +} + +#[cfg(not(feature = "tokio"))] +fn main() { + println!("This example requires the `tokio` feature to be enabled."); +} diff --git a/src/lib.rs b/src/lib.rs index b81cf25d5..82cc8b8e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,8 +44,22 @@ //! } //! ``` +#[cfg(feature = "tokio")] +extern crate futures; +#[cfg(feature = "mio-evented")] +extern crate mio; extern crate nix; extern crate regex; +#[cfg(feature = "tokio")] +extern crate tokio_core; + +#[cfg(feature = "tokio")] +use futures::{Async, Poll, Stream}; + +#[cfg(feature = "mio-evented")] +use mio::Evented; +#[cfg(feature = "mio-evented")] +use mio::unix::EventedFd; use nix::sys::epoll::*; use nix::unistd::close; @@ -57,6 +71,9 @@ use std::fs; use std::fs::File; use std::path::Path; +#[cfg(feature = "tokio")] +use tokio_core::reactor::{Handle, PollEvented}; + mod error; pub use error::Error; @@ -395,6 +412,44 @@ impl Pin { pub fn get_poller(&self) -> Result { PinPoller::new(self.pin_num) } + + /// Get an AsyncPinPoller object for this pin + /// + /// The async pin poller object can be used with the `mio` crate. You should probably call + /// `set_edge()` before using this. + /// + /// This method is only available when the `mio-evented` crate feature is enabled. + #[cfg(feature = "mio-evented")] + pub fn get_async_poller(&self) -> Result { + AsyncPinPoller::new(self.pin_num) + } + + /// Get a Stream of pin interrupts for this pin + /// + /// The PinStream object can be used with the `tokio-core` crate. You should probably call + /// `set_edge()` before using this. + /// + /// This method is only available when the `tokio` crate feature is enabled. + #[cfg(feature = "tokio")] + pub fn get_stream(&self, handle: &Handle) -> Result { + PinStream::init(self.clone(), handle) + } + + /// Get a Stream of pin values for this pin + /// + /// The PinStream object can be used with the `tokio-core` crate. You should probably call + /// `set_edge(Edge::BothEdges)` before using this. + /// + /// Note that the values produced are the value of the pin as soon as we get to handling the + /// interrupt in userspace. Each time this stream produces a value, a change has occurred, but + /// it could end up producing the same value multiple times if the value has changed back + /// between when the interrupt occurred and when the value was read. + /// + /// This method is only available when the `tokio` crate feature is enabled. + #[cfg(feature = "tokio")] + pub fn get_value_stream(&self, handle: &Handle) -> Result { + Ok(PinValueStream(try!(PinStream::init(self.clone(), handle)))) + } } #[derive(Debug)] @@ -445,9 +500,9 @@ impl PinPoller { /// of interrupts which may result in this call returning /// may be configured by calling `set_edge()` prior to /// making this call. This call makes use of epoll under the - /// covers. If it is desirable to poll on multiple GPIOs or - /// other event source, you will need to implement that logic - /// yourself. + /// covers. To poll on multiple GPIOs or other event sources, + /// poll asynchronously using the integration with either `mio` + /// or `tokio_core`. /// /// This function will return Some(value) of the pin if a change is /// detected or None if a timeout occurs. Note that the value provided @@ -479,3 +534,108 @@ impl Drop for PinPoller { close(self.epoll_fd).unwrap(); // panic! if close files } } + +#[cfg(feature = "mio-evented")] +#[derive(Debug)] +pub struct AsyncPinPoller { + devfile: File, +} + +#[cfg(feature = "mio-evented")] +impl AsyncPinPoller { + fn new(pin_num: u64) -> Result { + let devfile = try!(File::open(&format!("/sys/class/gpio/gpio{}/value", pin_num))); + Ok(AsyncPinPoller { devfile: devfile }) + } +} + +#[cfg(feature = "mio-evented")] +impl Evented for AsyncPinPoller { + fn register(&self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::Ready, + opts: mio::PollOpt) + -> io::Result<()> { + EventedFd(&self.devfile.as_raw_fd()).register(poll, token, interest, opts) + } + + fn reregister(&self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::Ready, + opts: mio::PollOpt) + -> io::Result<()> { + EventedFd(&self.devfile.as_raw_fd()).reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { + EventedFd(&self.devfile.as_raw_fd()).deregister(poll) + } +} + +#[cfg(feature = "tokio")] +pub struct PinStream { + evented: PollEvented, + skipped_first_event: bool, +} + +#[cfg(feature = "tokio")] +impl PinStream { + pub fn init(pin: Pin, handle: &Handle) -> Result { + Ok(PinStream { + evented: try!(PollEvented::new(try!(pin.get_async_poller()), &handle)), + skipped_first_event: false, + }) + } +} + +#[cfg(feature = "tokio")] +impl Stream for PinStream { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + Ok(match self.evented.poll_read() { + Async::Ready(()) => { + self.evented.need_read(); + if self.skipped_first_event { + Async::Ready(Some(())) + } else { + self.skipped_first_event = true; + Async::NotReady + } + } + Async::NotReady => Async::NotReady, + }) + } +} + +#[cfg(feature = "tokio")] +pub struct PinValueStream(PinStream); + +#[cfg(feature = "tokio")] +impl PinValueStream { + #[inline] + fn get_value(&mut self) -> Result { + get_value_from_file(&mut self.0.evented.get_mut().devfile) + } +} + +#[cfg(feature = "tokio")] +impl Stream for PinValueStream { + type Item = u8; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.0.poll() { + Ok(Async::Ready(Some(()))) => { + let value = try!(self.get_value()); + Ok(Async::Ready(Some(value))) + } + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => Err(e), + } + } +}