Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AioCb::from_boxed_slice #582

Merged
merged 1 commit into from
Apr 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased]

### Added
- Added `AioCb::from_boxed_slice`
([#582](/~https://github.com/nix-rust/nix/pull/582)
- Added `nix::unistd::{openat, fstatat, readlink, readlinkat}`
([#551](/~https://github.com/nix-rust/nix/pull/551))

Expand Down
57 changes: 49 additions & 8 deletions src/sys/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::io::Write;
use std::io::stderr;
use std::marker::PhantomData;
use std::mem;
use std::rc::Rc;
use std::ptr::{null, null_mut};
use sys::signal::*;
use sys::time::TimeSpec;
Expand Down Expand Up @@ -61,6 +62,17 @@ pub enum AioCancelStat {
AioAllDone = libc::AIO_ALLDONE,
}

/// Private type used by nix to keep buffers from Drop'ing while the kernel has
/// a pointer to them.
#[derive(Clone, Debug)]
enum Keeper<'a> {
none,
/// Keeps a reference to a Boxed slice
boxed(Rc<Box<[u8]>>),
/// Keeps a reference to a slice
phantom(PhantomData<&'a mut [u8]>)
}

/// The basic structure used by all aio functions. Each `aiocb` represents one
/// I/O request.
pub struct AioCb<'a> {
Expand All @@ -69,7 +81,8 @@ pub struct AioCb<'a> {
mutable: bool,
/// Could this `AioCb` potentially have any in-kernel state?
in_progress: bool,
phantom: PhantomData<&'a mut [u8]>
/// Used to keep buffers from Drop'ing
keeper: Keeper<'a>
}

impl<'a> AioCb<'a> {
Expand All @@ -89,7 +102,7 @@ impl<'a> AioCb<'a> {
a.aio_buf = null_mut();

let aiocb = AioCb { aiocb: a, mutable: false, in_progress: false,
phantom: PhantomData};
keeper: Keeper::none};
aiocb
}

Expand All @@ -106,17 +119,43 @@ impl<'a> AioCb<'a> {
/// which operation to use for this individual aiocb
pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8],
prio: ::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb {
opcode: LioOpcode) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
// casting an immutable buffer to a mutable pointer looks unsafe, but
// technically its only unsafe to dereference it, not to create it.
a.aio_buf = buf.as_ptr() as *mut c_void;
a.aio_lio_opcode = opcode as ::c_int;

let aiocb = AioCb { aiocb: a, mutable: true, in_progress: false,
phantom: PhantomData};
keeper: Keeper::phantom(PhantomData)};
aiocb
}

/// Constructs a new `AioCb`.
///
/// Unlike `from_mut_slice`, this method returns a structure suitable for
/// placement on the heap.
///
/// * `fd` File descriptor. Required for all aio functions.
/// * `offs` File offset
/// * `buf` A shared memory buffer on the heap
/// * `prio` If POSIX Prioritized IO is supported, then the operation will
/// be prioritized at the process's priority level minus `prio`
/// * `sigev_notify` Determines how you will be notified of event
/// completion.
/// * `opcode` This field is only used for `lio_listio`. It determines
/// which operation to use for this individual aiocb
pub fn from_boxed_slice(fd: RawFd, offs: off_t, buf: Rc<Box<[u8]>>,
prio: ::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
a.aio_buf = buf.as_ptr() as *mut c_void;
a.aio_lio_opcode = opcode as ::c_int;

let aiocb = AioCb{ aiocb: a, mutable: true, in_progress: false,
keeper: Keeper::boxed(buf)};
aiocb
}

Expand All @@ -139,12 +178,15 @@ impl<'a> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
// casting an immutable buffer to a mutable pointer looks unsafe,
// but technically its only unsafe to dereference it, not to create
// it.
a.aio_buf = buf.as_ptr() as *mut c_void;
assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer");
a.aio_lio_opcode = opcode as ::c_int;

let aiocb = AioCb { aiocb: a, mutable: false, in_progress: false,
phantom: PhantomData};
keeper: Keeper::none};
aiocb
}

Expand Down Expand Up @@ -284,7 +326,6 @@ impl<'a> Debug for AioCb<'a> {
.field("aio_sigevent", &SigEvent::from(&self.aiocb.aio_sigevent))
.field("mutable", &self.mutable)
.field("in_progress", &self.in_progress)
.field("phantom", &self.phantom)
.finish()
}
}
Expand Down
80 changes: 54 additions & 26 deletions test/sys/test_aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use nix::sys::aio::*;
use nix::sys::signal::*;
use nix::sys::time::{TimeSpec, TimeValLike};
use std::io::{Write, Read, Seek, SeekFrom};
use std::ops::Deref;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time};
Expand All @@ -25,12 +27,12 @@ fn poll_aio(mut aiocb: &mut AioCb) -> Result<()> {
// AioCancelStat value.
#[test]
fn test_cancel() {
let mut wbuf = "CDEF".to_string().into_bytes();
let wbuf: &'static [u8] = b"CDEF";

let f = tempfile().unwrap();
let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
0, //offset
&mut wbuf,
&wbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
Expand All @@ -49,12 +51,12 @@ fn test_cancel() {
// Tests using aio_cancel_all for all outstanding IOs.
#[test]
fn test_aio_cancel_all() {
let mut wbuf = "CDEF".to_string().into_bytes();
let wbuf: &'static [u8] = b"CDEF";

let f = tempfile().unwrap();
let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut aiocb = AioCb::from_slice(f.as_raw_fd(),
0, //offset
&mut wbuf,
&wbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
Expand Down Expand Up @@ -90,7 +92,7 @@ fn test_aio_suspend() {
const INITIAL: &'static [u8] = b"abcdef123456";
const WBUF: &'static [u8] = b"CDEF";
let timeout = TimeSpec::seconds(10);
let mut rbuf = vec![0; 4];
let rbuf = Rc::new(vec![0; 4].into_boxed_slice());
let mut f = tempfile().unwrap();
f.write(INITIAL).unwrap();

Expand All @@ -101,9 +103,9 @@ fn test_aio_suspend() {
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut rcb = AioCb::from_boxed_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
rbuf.clone(),
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
Expand All @@ -128,6 +130,31 @@ fn test_aio_suspend() {
// for completion
#[test]
fn test_read() {
const INITIAL: &'static [u8] = b"abcdef123456";
let rbuf = Rc::new(vec![0; 4].into_boxed_slice());
const EXPECT: &'static [u8] = b"cdef";
let mut f = tempfile().unwrap();
f.write(INITIAL).unwrap();
{
let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(),
2, //offset
rbuf.clone(),
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.read().unwrap();

let err = poll_aio(&mut aiocb);
assert!(err == Ok(()));
assert!(aiocb.aio_return().unwrap() as usize == EXPECT.len());
}

assert!(EXPECT == rbuf.deref().deref());
}

// Tests from_mut_slice
#[test]
fn test_read_into_mut_slice() {
const INITIAL: &'static [u8] = b"abcdef123456";
let mut rbuf = vec![0; 4];
const EXPECT: &'static [u8] = b"cdef";
Expand All @@ -154,7 +181,7 @@ fn test_read() {
#[test]
#[should_panic(expected = "Can't read into an immutable buffer")]
fn test_read_immutable_buffer() {
let rbuf = vec![0; 4];
let rbuf: &'static [u8] = b"CDEF";
let f = tempfile().unwrap();
let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
Expand All @@ -165,28 +192,29 @@ fn test_read_immutable_buffer() {
aiocb.read().unwrap();
}


// Test a simple aio operation with no completion notification. We must poll
// for completion. Unlike test_aio_read, this test uses AioCb::from_slice
#[test]
fn test_write() {
const INITIAL: &'static [u8] = b"abcdef123456";
const WBUF: &'static [u8] = b"CDEF"; //"CDEF".to_string().into_bytes();
let wbuf = "CDEF".to_string().into_bytes();
let mut rbuf = Vec::new();
const EXPECT: &'static [u8] = b"abCDEF123456";

let mut f = tempfile().unwrap();
f.write(INITIAL).unwrap();
let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
&WBUF,
&wbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.write().unwrap();

let err = poll_aio(&mut aiocb);
assert!(err == Ok(()));
assert!(aiocb.aio_return().unwrap() as usize == WBUF.len());
assert!(aiocb.aio_return().unwrap() as usize == wbuf.len());

f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf).unwrap();
Expand Down Expand Up @@ -249,7 +277,7 @@ fn test_write_sigev_signal() {
fn test_lio_listio_wait() {
const INITIAL: &'static [u8] = b"abcdef123456";
const WBUF: &'static [u8] = b"CDEF";
let mut rbuf = vec![0; 4];
let rbuf = Rc::new(vec![0; 4].into_boxed_slice());
let mut rbuf2 = Vec::new();
const EXPECT: &'static [u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
Expand All @@ -264,9 +292,9 @@ fn test_lio_listio_wait() {
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut rcb = AioCb::from_boxed_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
rbuf.clone(),
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
Expand All @@ -276,7 +304,7 @@ fn test_lio_listio_wait() {
assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
assert!(rcb.aio_return().unwrap() as usize == WBUF.len());
}
assert!(rbuf == b"3456");
assert!(rbuf.deref().deref() == b"3456");

f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf2).unwrap();
Expand All @@ -291,7 +319,7 @@ fn test_lio_listio_wait() {
fn test_lio_listio_nowait() {
const INITIAL: &'static [u8] = b"abcdef123456";
const WBUF: &'static [u8] = b"CDEF";
let mut rbuf = vec![0; 4];
let rbuf = Rc::new(vec![0; 4].into_boxed_slice());
let mut rbuf2 = Vec::new();
const EXPECT: &'static [u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
Expand All @@ -306,9 +334,9 @@ fn test_lio_listio_nowait() {
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut rcb = AioCb::from_boxed_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
rbuf.clone(),
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
Expand All @@ -320,7 +348,7 @@ fn test_lio_listio_nowait() {
assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
assert!(rcb.aio_return().unwrap() as usize == WBUF.len());
}
assert!(rbuf == b"3456");
assert!(rbuf.deref().deref() == b"3456");

f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf2).unwrap();
Expand All @@ -336,7 +364,7 @@ fn test_lio_listio_signal() {
let _ = SIGUSR2_MTX.lock().expect("Mutex got poisoned by another test");
const INITIAL: &'static [u8] = b"abcdef123456";
const WBUF: &'static [u8] = b"CDEF";
let mut rbuf = vec![0; 4];
let rbuf = Rc::new(vec![0; 4].into_boxed_slice());
let mut rbuf2 = Vec::new();
const EXPECT: &'static [u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
Expand All @@ -356,9 +384,9 @@ fn test_lio_listio_signal() {
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);

let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
let mut rcb = AioCb::from_boxed_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
rbuf.clone(),
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
Expand All @@ -373,7 +401,7 @@ fn test_lio_listio_signal() {
assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
assert!(rcb.aio_return().unwrap() as usize == WBUF.len());
}
assert!(rbuf == b"3456");
assert!(rbuf.deref().deref() == b"3456");

f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf2).unwrap();
Expand All @@ -386,7 +414,7 @@ fn test_lio_listio_signal() {
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[should_panic(expected = "Can't read into an immutable buffer")]
fn test_lio_listio_read_immutable() {
let rbuf = vec![0; 4];
let rbuf: &'static [u8] = b"abcd";
let f = tempfile().unwrap();


Expand Down