Skip to content

Commit

Permalink
Merge pull request #26 from emosenkis/futures
Browse files Browse the repository at this point in the history
Add support for reading asynchronously with mio::Evented and tokio_core::Stream
  • Loading branch information
posborne authored Dec 18, 2016
2 parents c85cc3b + 1be4e73 commit bd9f5b5
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*.so
*.rlib
*.dll
*.bk

# Executables
*.exe
Expand Down
12 changes: 8 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: rust
sudo: false
rust:
- 1.5.0
- 1.10.0
- stable
- beta
- nightly
Expand All @@ -25,9 +25,13 @@ env:

script:
- |
travis-cargo build &&
travis-cargo test &&
travis-cargo --only stable doc
travis-cargo build -- --features=tokio &&
travis-cargo test -- --features=tokio &&
travis-cargo build -- --features=mio-evented &&
travis-cargo test -- --features=mio-evented &&
travis-cargo build -- &&
travis-cargo test -- &&
travis-cargo --only stable doc -- --features=tokio
after_success:
- travis-cargo --only stable doc-upload
Expand Down
9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sysfs_gpio"
version = "0.4.4"
version = "0.5.0"
authors = ["Paul Osborne <osbpau@gmail.com>"]
license = "MIT/Apache-2.0"
repository = "/~https://github.com/rust-embedded/rust-sysfs-gpio"
Expand All @@ -15,6 +15,13 @@ interrupts) GPIOs from userspace.
See https://www.kernel.org/doc/Documentation/gpio/sysfs.txt
"""

[features]
mio-evented = ["mio"]
tokio = ["futures", "tokio-core", "mio-evented"]

[dependencies]
futures = { version = "0.1", optional = true }
nix = "0.6.0"
regex = "0.1.0"
mio = { version = "0.6", optional = true }
tokio-core = { version = "0.1", optional = true }
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ More Examples:
- [Blink an LED](examples/blinky.rs)
- [Poll a GPIO Input](examples/poll.rs)
- [Receive interrupt on GPIO Change](examples/interrupt.rs)
- [Poll several pins asynchronously with Tokio](examples/tokio.rs)
- [gpio-utils Project (uses most features)](/~https://github.com/rust-embedded/gpio-utils)

Features
Expand All @@ -81,6 +82,8 @@ The following features are planned for the library:
- [ ] Support for configuring whether a pin is active low/high
- [x] Support for configuring interrupts on GPIO
- [x] Support for polling on GPIO with configured interrupt
- [x] Support for asynchronous polling using `mio` or `tokio-core` (requires
enabling the `mio-evented` or `tokio` crate features, respectively)

Cross Compiling
---------------
Expand Down
60 changes: 60 additions & 0 deletions examples/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#[cfg(feature = "tokio")]
extern crate futures;
#[cfg(feature = "tokio")]
extern crate sysfs_gpio;
#[cfg(feature = "tokio")]
extern crate tokio_core;

#[cfg(feature = "tokio")]
use futures::{Future, Stream};
#[cfg(feature = "tokio")]
use sysfs_gpio::{Direction, Edge, Pin};
#[cfg(feature = "tokio")]
use std::env;
#[cfg(feature = "tokio")]
use tokio_core::reactor::Core;

#[cfg(feature = "tokio")]
fn stream(pin_nums: Vec<u64>) -> sysfs_gpio::Result<()> {
// NOTE: this currently runs forever and as such if
// the app is stopped (Ctrl-C), no cleanup will happen
// and the GPIO will be left exported. Not much
// can be done about this as Rust signal handling isn't
// really present at the moment. Revisit later.
let pins: Vec<_> = pin_nums.iter().map(|&p| (p, Pin::new(p))).collect();
let mut l = try!(Core::new());
let handle = l.handle();
for &(i, ref pin) in pins.iter() {
try!(pin.export());
try!(pin.set_direction(Direction::In));
try!(pin.set_edge(Edge::BothEdges));
handle.spawn(try!(pin.get_value_stream(&handle))
.for_each(move |val| {
println!("Pin {} changed value to {}", i, val);
Ok(())
})
.map_err(|_| ()));
}
// Wait forever for events
loop {
l.turn(None)
}
}

#[cfg(feature = "tokio")]
fn main() {
let pins: Vec<u64> = env::args()
.skip(1)
.map(|a| a.parse().expect("Pins must be specified as integers"))
.collect();
if pins.is_empty() {
println!("Usage: ./tokio <pin> [pin ...]");
} else {
stream(pins).unwrap();
}
}

#[cfg(not(feature = "tokio"))]
fn main() {
println!("This example requires the `tokio` feature to be enabled.");
}
166 changes: 163 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,22 @@
//! }
//! ```
#[cfg(feature = "tokio")]
extern crate futures;
#[cfg(feature = "mio-evented")]
extern crate mio;
extern crate nix;
extern crate regex;
#[cfg(feature = "tokio")]
extern crate tokio_core;

#[cfg(feature = "tokio")]
use futures::{Async, Poll, Stream};

#[cfg(feature = "mio-evented")]
use mio::Evented;
#[cfg(feature = "mio-evented")]
use mio::unix::EventedFd;

use nix::sys::epoll::*;
use nix::unistd::close;
Expand All @@ -57,6 +71,9 @@ use std::fs;
use std::fs::File;
use std::path::Path;

#[cfg(feature = "tokio")]
use tokio_core::reactor::{Handle, PollEvented};

mod error;
pub use error::Error;

Expand Down Expand Up @@ -395,6 +412,44 @@ impl Pin {
pub fn get_poller(&self) -> Result<PinPoller> {
PinPoller::new(self.pin_num)
}

/// Get an AsyncPinPoller object for this pin
///
/// The async pin poller object can be used with the `mio` crate. You should probably call
/// `set_edge()` before using this.
///
/// This method is only available when the `mio-evented` crate feature is enabled.
#[cfg(feature = "mio-evented")]
pub fn get_async_poller(&self) -> Result<AsyncPinPoller> {
AsyncPinPoller::new(self.pin_num)
}

/// Get a Stream of pin interrupts for this pin
///
/// The PinStream object can be used with the `tokio-core` crate. You should probably call
/// `set_edge()` before using this.
///
/// This method is only available when the `tokio` crate feature is enabled.
#[cfg(feature = "tokio")]
pub fn get_stream(&self, handle: &Handle) -> Result<PinStream> {
PinStream::init(self.clone(), handle)
}

/// Get a Stream of pin values for this pin
///
/// The PinStream object can be used with the `tokio-core` crate. You should probably call
/// `set_edge(Edge::BothEdges)` before using this.
///
/// Note that the values produced are the value of the pin as soon as we get to handling the
/// interrupt in userspace. Each time this stream produces a value, a change has occurred, but
/// it could end up producing the same value multiple times if the value has changed back
/// between when the interrupt occurred and when the value was read.
///
/// This method is only available when the `tokio` crate feature is enabled.
#[cfg(feature = "tokio")]
pub fn get_value_stream(&self, handle: &Handle) -> Result<PinValueStream> {
Ok(PinValueStream(try!(PinStream::init(self.clone(), handle))))
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -445,9 +500,9 @@ impl PinPoller {
/// of interrupts which may result in this call returning
/// may be configured by calling `set_edge()` prior to
/// making this call. This call makes use of epoll under the
/// covers. If it is desirable to poll on multiple GPIOs or
/// other event source, you will need to implement that logic
/// yourself.
/// covers. To poll on multiple GPIOs or other event sources,
/// poll asynchronously using the integration with either `mio`
/// or `tokio_core`.
///
/// This function will return Some(value) of the pin if a change is
/// detected or None if a timeout occurs. Note that the value provided
Expand Down Expand Up @@ -479,3 +534,108 @@ impl Drop for PinPoller {
close(self.epoll_fd).unwrap(); // panic! if close files
}
}

#[cfg(feature = "mio-evented")]
#[derive(Debug)]
pub struct AsyncPinPoller {
devfile: File,
}

#[cfg(feature = "mio-evented")]
impl AsyncPinPoller {
fn new(pin_num: u64) -> Result<Self> {
let devfile = try!(File::open(&format!("/sys/class/gpio/gpio{}/value", pin_num)));
Ok(AsyncPinPoller { devfile: devfile })
}
}

#[cfg(feature = "mio-evented")]
impl Evented for AsyncPinPoller {
fn register(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt)
-> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).register(poll, token, interest, opts)
}

fn reregister(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt)
-> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).reregister(poll, token, interest, opts)
}

fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).deregister(poll)
}
}

#[cfg(feature = "tokio")]
pub struct PinStream {
evented: PollEvented<AsyncPinPoller>,
skipped_first_event: bool,
}

#[cfg(feature = "tokio")]
impl PinStream {
pub fn init(pin: Pin, handle: &Handle) -> Result<Self> {
Ok(PinStream {
evented: try!(PollEvented::new(try!(pin.get_async_poller()), &handle)),
skipped_first_event: false,
})
}
}

#[cfg(feature = "tokio")]
impl Stream for PinStream {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(match self.evented.poll_read() {
Async::Ready(()) => {
self.evented.need_read();
if self.skipped_first_event {
Async::Ready(Some(()))
} else {
self.skipped_first_event = true;
Async::NotReady
}
}
Async::NotReady => Async::NotReady,
})
}
}

#[cfg(feature = "tokio")]
pub struct PinValueStream(PinStream);

#[cfg(feature = "tokio")]
impl PinValueStream {
#[inline]
fn get_value(&mut self) -> Result<u8> {
get_value_from_file(&mut self.0.evented.get_mut().devfile)
}
}

#[cfg(feature = "tokio")]
impl Stream for PinValueStream {
type Item = u8;
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.0.poll() {
Ok(Async::Ready(Some(()))) => {
let value = try!(self.get_value());
Ok(Async::Ready(Some(value)))
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
}

0 comments on commit bd9f5b5

Please sign in to comment.