Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for reading asynchronously with mio::Evented and tokio_core::Stream #26

Merged
merged 1 commit into from
Dec 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*.so
*.rlib
*.dll
*.bk

# Executables
*.exe
Expand Down
12 changes: 8 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: rust
sudo: false
rust:
- 1.5.0
- 1.10.0
- stable
- beta
- nightly
Expand All @@ -25,9 +25,13 @@ env:

script:
- |
travis-cargo build &&
travis-cargo test &&
travis-cargo --only stable doc
travis-cargo build -- --features=tokio &&
travis-cargo test -- --features=tokio &&
travis-cargo build -- --features=mio-evented &&
travis-cargo test -- --features=mio-evented &&
travis-cargo build -- &&
travis-cargo test -- &&
travis-cargo --only stable doc -- --features=tokio

after_success:
- travis-cargo --only stable doc-upload
Expand Down
9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sysfs_gpio"
version = "0.4.4"
version = "0.5.0"
authors = ["Paul Osborne <osbpau@gmail.com>"]
license = "MIT/Apache-2.0"
repository = "/~https://github.com/rust-embedded/rust-sysfs-gpio"
Expand All @@ -15,6 +15,13 @@ interrupts) GPIOs from userspace.
See https://www.kernel.org/doc/Documentation/gpio/sysfs.txt
"""

[features]
mio-evented = ["mio"]
tokio = ["futures", "tokio-core", "mio-evented"]

[dependencies]
futures = { version = "0.1", optional = true }
nix = "0.6.0"
regex = "0.1.0"
mio = { version = "0.6", optional = true }
tokio-core = { version = "0.1", optional = true }
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ More Examples:
- [Blink an LED](examples/blinky.rs)
- [Poll a GPIO Input](examples/poll.rs)
- [Receive interrupt on GPIO Change](examples/interrupt.rs)
- [Poll several pins asynchronously with Tokio](examples/tokio.rs)
- [gpio-utils Project (uses most features)](/~https://github.com/rust-embedded/gpio-utils)

Features
Expand All @@ -81,6 +82,8 @@ The following features are planned for the library:
- [ ] Support for configuring whether a pin is active low/high
- [x] Support for configuring interrupts on GPIO
- [x] Support for polling on GPIO with configured interrupt
- [x] Support for asynchronous polling using `mio` or `tokio-core` (requires
enabling the `mio-evented` or `tokio` crate features, respectively)

Cross Compiling
---------------
Expand Down
60 changes: 60 additions & 0 deletions examples/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#[cfg(feature = "tokio")]
extern crate futures;
#[cfg(feature = "tokio")]
extern crate sysfs_gpio;
#[cfg(feature = "tokio")]
extern crate tokio_core;

#[cfg(feature = "tokio")]
use futures::{Future, Stream};
#[cfg(feature = "tokio")]
use sysfs_gpio::{Direction, Edge, Pin};
#[cfg(feature = "tokio")]
use std::env;
#[cfg(feature = "tokio")]
use tokio_core::reactor::Core;

#[cfg(feature = "tokio")]
fn stream(pin_nums: Vec<u64>) -> sysfs_gpio::Result<()> {
// NOTE: this currently runs forever and as such if
// the app is stopped (Ctrl-C), no cleanup will happen
// and the GPIO will be left exported. Not much
// can be done about this as Rust signal handling isn't
// really present at the moment. Revisit later.
let pins: Vec<_> = pin_nums.iter().map(|&p| (p, Pin::new(p))).collect();
let mut l = try!(Core::new());
let handle = l.handle();
for &(i, ref pin) in pins.iter() {
try!(pin.export());
try!(pin.set_direction(Direction::In));
try!(pin.set_edge(Edge::BothEdges));
handle.spawn(try!(pin.get_value_stream(&handle))
.for_each(move |val| {
println!("Pin {} changed value to {}", i, val);
Ok(())
})
.map_err(|_| ()));
}
// Wait forever for events
loop {
l.turn(None)
}
}

#[cfg(feature = "tokio")]
fn main() {
let pins: Vec<u64> = env::args()
.skip(1)
.map(|a| a.parse().expect("Pins must be specified as integers"))
.collect();
if pins.is_empty() {
println!("Usage: ./tokio <pin> [pin ...]");
} else {
stream(pins).unwrap();
}
}

#[cfg(not(feature = "tokio"))]
fn main() {
println!("This example requires the `tokio` feature to be enabled.");
}
166 changes: 163 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,22 @@
//! }
//! ```

#[cfg(feature = "tokio")]
extern crate futures;
#[cfg(feature = "mio-evented")]
extern crate mio;
extern crate nix;
extern crate regex;
#[cfg(feature = "tokio")]
extern crate tokio_core;

#[cfg(feature = "tokio")]
use futures::{Async, Poll, Stream};

#[cfg(feature = "mio-evented")]
use mio::Evented;
#[cfg(feature = "mio-evented")]
use mio::unix::EventedFd;

use nix::sys::epoll::*;
use nix::unistd::close;
Expand All @@ -57,6 +71,9 @@ use std::fs;
use std::fs::File;
use std::path::Path;

#[cfg(feature = "tokio")]
use tokio_core::reactor::{Handle, PollEvented};

mod error;
pub use error::Error;

Expand Down Expand Up @@ -395,6 +412,44 @@ impl Pin {
pub fn get_poller(&self) -> Result<PinPoller> {
PinPoller::new(self.pin_num)
}

/// Get an AsyncPinPoller object for this pin
///
/// The async pin poller object can be used with the `mio` crate. You should probably call
/// `set_edge()` before using this.
///
/// This method is only available when the `mio-evented` crate feature is enabled.
#[cfg(feature = "mio-evented")]
pub fn get_async_poller(&self) -> Result<AsyncPinPoller> {
AsyncPinPoller::new(self.pin_num)
}

/// Get a Stream of pin interrupts for this pin
///
/// The PinStream object can be used with the `tokio-core` crate. You should probably call
/// `set_edge()` before using this.
///
/// This method is only available when the `tokio` crate feature is enabled.
#[cfg(feature = "tokio")]
pub fn get_stream(&self, handle: &Handle) -> Result<PinStream> {
PinStream::init(self.clone(), handle)
}

/// Get a Stream of pin values for this pin
///
/// The PinStream object can be used with the `tokio-core` crate. You should probably call
/// `set_edge(Edge::BothEdges)` before using this.
///
/// Note that the values produced are the value of the pin as soon as we get to handling the
/// interrupt in userspace. Each time this stream produces a value, a change has occurred, but
/// it could end up producing the same value multiple times if the value has changed back
/// between when the interrupt occurred and when the value was read.
///
/// This method is only available when the `tokio` crate feature is enabled.
#[cfg(feature = "tokio")]
pub fn get_value_stream(&self, handle: &Handle) -> Result<PinValueStream> {
Ok(PinValueStream(try!(PinStream::init(self.clone(), handle))))
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -445,9 +500,9 @@ impl PinPoller {
/// of interrupts which may result in this call returning
/// may be configured by calling `set_edge()` prior to
/// making this call. This call makes use of epoll under the
/// covers. If it is desirable to poll on multiple GPIOs or
/// other event source, you will need to implement that logic
/// yourself.
/// covers. To poll on multiple GPIOs or other event sources,
/// poll asynchronously using the integration with either `mio`
/// or `tokio_core`.
///
/// This function will return Some(value) of the pin if a change is
/// detected or None if a timeout occurs. Note that the value provided
Expand Down Expand Up @@ -479,3 +534,108 @@ impl Drop for PinPoller {
close(self.epoll_fd).unwrap(); // panic! if close files
}
}

#[cfg(feature = "mio-evented")]
#[derive(Debug)]
pub struct AsyncPinPoller {
devfile: File,
}

#[cfg(feature = "mio-evented")]
impl AsyncPinPoller {
fn new(pin_num: u64) -> Result<Self> {
let devfile = try!(File::open(&format!("/sys/class/gpio/gpio{}/value", pin_num)));
Ok(AsyncPinPoller { devfile: devfile })
}
}

#[cfg(feature = "mio-evented")]
impl Evented for AsyncPinPoller {
fn register(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt)
-> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).register(poll, token, interest, opts)
}

fn reregister(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt)
-> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).reregister(poll, token, interest, opts)
}

fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).deregister(poll)
}
}

#[cfg(feature = "tokio")]
pub struct PinStream {
evented: PollEvented<AsyncPinPoller>,
skipped_first_event: bool,
}

#[cfg(feature = "tokio")]
impl PinStream {
pub fn init(pin: Pin, handle: &Handle) -> Result<Self> {
Ok(PinStream {
evented: try!(PollEvented::new(try!(pin.get_async_poller()), &handle)),
skipped_first_event: false,
})
}
}

#[cfg(feature = "tokio")]
impl Stream for PinStream {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(match self.evented.poll_read() {
Async::Ready(()) => {
self.evented.need_read();
if self.skipped_first_event {
Async::Ready(Some(()))
} else {
self.skipped_first_event = true;
Async::NotReady
}
}
Async::NotReady => Async::NotReady,
})
}
}

#[cfg(feature = "tokio")]
pub struct PinValueStream(PinStream);

#[cfg(feature = "tokio")]
impl PinValueStream {
#[inline]
fn get_value(&mut self) -> Result<u8> {
get_value_from_file(&mut self.0.evented.get_mut().devfile)
}
}

#[cfg(feature = "tokio")]
impl Stream for PinValueStream {
type Item = u8;
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.0.poll() {
Ok(Async::Ready(Some(()))) => {
let value = try!(self.get_value());
Ok(Async::Ready(Some(value)))
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
}