Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DemiBuffer] Implement prepend #1367

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 123 additions & 5 deletions src/rust/runtime/memory/demibuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use ::dpdk_rs::{
rte_pktmbuf_adj,
rte_pktmbuf_clone,
rte_pktmbuf_free,
rte_pktmbuf_prepend,
rte_pktmbuf_trim,
};
use ::std::{
Expand Down Expand Up @@ -348,7 +349,17 @@ impl DemiBuffer {
let (metadata_buf, buffer): (&mut MaybeUninit<MetaData>, &mut [MaybeUninit<u8>]) =
allocate_metadata_data(capacity);

Self::new_from_parts(metadata_buf, buffer.as_mut_ptr(), capacity, None)
Self::new_from_parts(metadata_buf, buffer.as_mut_ptr(), 0, capacity, None)
}

/// Creates a new heap-allocated DemiBuffer with reserved headroom. The application cannot use this space but it
/// is reserved for Demikernel to use for headers.
pub fn new_with_headroom(capacity: u16, headroom: u16) -> Self {
// Allocate some memory off the heap.
let (metadata_buf, buffer): (&mut MaybeUninit<MetaData>, &mut [MaybeUninit<u8>]) =
allocate_metadata_data(capacity + headroom);

Self::new_from_parts(metadata_buf, buffer.as_mut_ptr(), headroom, capacity, None)
}

/// Create a new buffer using a buffer from the specified [`BufferPool`]. If the pool is empty, this method returns
Expand All @@ -374,6 +385,7 @@ impl DemiBuffer {
Some(Self::new_from_parts(
metadata_buf,
buffer.as_mut_ptr(),
0,
buffer.len() as u16,
Some(pool),
))
Expand All @@ -383,28 +395,28 @@ impl DemiBuffer {
fn new_from_parts(
metadata_buf: &mut MaybeUninit<MetaData>,
buf_addr: *mut MaybeUninit<u8>,
headroom: u16,
capacity: u16,
pool: Option<Rc<MemoryPool>>,
) -> Self {
let buf_addr: *mut u8 = if capacity > 0 {
let buf_addr: *mut u8 = if capacity + headroom > 0 {
// TODO: casting the MaybeUninit away can cause UB (when deref'd). Change the exposed data type from
// DemiBuffer to better expose un/initialized values.
buf_addr.cast()
} else {
ptr::null_mut()
};

let metadata: NonNull<MetaData> = NonNull::from(metadata_buf.write(MetaData::new(DemiMetaData {
buf_addr,
data_off: 0,
data_off: headroom,
refcnt: 1,
nb_segs: 1,
ol_flags: 0,
pkt_len: capacity as u32,
// Note: this is not consistent with DPDK behavior: presumably, zero bytes of data are initialized at this
// point
data_len: capacity,
buf_len: capacity,
buf_len: (capacity + headroom),
next: None,
pool,
})));
Expand All @@ -426,6 +438,56 @@ impl DemiBuffer {
slice.try_into()
}

/// Allocates a new DemiBuffer with some headroom and copies the slice contents.
pub fn from_slice_with_headroom(slice: &[u8], headroom: usize) -> Result<Self, Fail> {
// Check size of the slice to ensure a single DemiBuffer can hold it.
let size: u16 = if slice.len() + headroom < u16::MAX as usize {
(slice.len() + headroom) as u16
} else {
return Err(Fail::new(libc::EINVAL, "slice is larger than a DemiBuffer can hold"));
};

// Allocate some memory off the heap.
let (temp, buffer): (&mut MaybeUninit<MetaData>, &mut [MaybeUninit<u8>]) = allocate_metadata_data(size);

// Point buf_addr at the newly allocated data space (if any).
let buf_addr: *mut u8 = if size == 0 {
// No direct data, so don't point buf_addr at anything.
null_mut()
} else {
let buf_addr: *mut u8 = buffer.as_mut_ptr().cast();

// Copy the data from the slice into the DemiBuffer.
// Safety: This is safe, as the src/dst argument pointers are valid for reads/writes of `size` bytes,
// are aligned (trivial for u8 pointers), and the regions they specify do not overlap one another.
unsafe { ptr::copy_nonoverlapping(slice.as_ptr(), buf_addr.add(headroom), size as usize - headroom) };
buf_addr
};

// Set field values as appropriate.
let metadata: NonNull<MetaData> = NonNull::from(temp.write(MetaData::new(DemiMetaData {
buf_addr,
data_off: headroom as u16,
refcnt: 1,
nb_segs: 1,
ol_flags: 0,
pkt_len: size as u32 - headroom as u32,
data_len: size - headroom as u16,
buf_len: size as u16,
next: None,
pool: None,
})));

// Embed the buffer type into the lower bits of the pointer.
let tagged: NonNull<MetaData> = metadata.with_addr(metadata.addr() | Tag::Heap);

// Return the new DemiBuffer.
Ok(DemiBuffer {
tagged_ptr: tagged,
_phantom: PhantomData,
})
}

/// Creates a `DemiBuffer` from a raw pointer.
pub unsafe fn from_raw(token: NonNull<u8>) -> Self {
DemiBuffer {
Expand Down Expand Up @@ -546,6 +608,40 @@ impl DemiBuffer {
Ok(())
}

/// Prepends `nbytes` bytes to the begining of the `DemiBuffer`.
pub fn prepend(&mut self, nbytes: usize) -> Result<(), Fail> {
match self.get_tag() {
Tag::Heap => {
let metadata: &mut MetaData = self.as_metadata();
if nbytes > metadata.data_off as usize {
return Err(Fail::new(
libc::EINVAL,
&format!(
"insufficient headroom: headroom={:?} requested={:?}",
metadata.data_off, nbytes
),
));
}
// The above check against data_len also means that nbytes is <= u16::MAX. So these casts are safe.
metadata.data_off -= nbytes as u16;
metadata.pkt_len += nbytes as u32;
metadata.data_len += nbytes as u16;
},
#[cfg(feature = "libdpdk")]
Tag::Dpdk => {
let mbuf: *mut rte_mbuf = unsafe {
// Safety: rte_pktmbuf_prepend does both sanity and headroom space checks.
rte_pktmbuf_prepend(self.as_mbuf(), nbytes as u16) as *mut rte_mbuf
};

if mbuf.is_null() {
return Err(Fail::new(libc::EINVAL, "tried to prepend more bytes than are allowed"));
}
},
}
Ok(())
}

///
/// **Description**
///
Expand Down Expand Up @@ -1181,6 +1277,28 @@ mod tests {
crate::ensure_eq!(buf.trim(7).is_ok(), true);
crate::ensure_eq!(buf.len(), 28);

// Verify bad requests actually fail.
crate::ensure_eq!(buf.adjust(30).is_err(), true);
crate::ensure_eq!(buf.trim(30).is_err(), true);
crate::ensure_eq!(buf.prepend(30).is_err(), true);

Ok(())
}

#[test]
fn headroom() -> Result<()> {
let mut buf: DemiBuffer = DemiBuffer::new_with_headroom(0, 42);
crate::ensure_eq!(buf.is_heap_allocated(), true);
crate::ensure_eq!(buf.len(), 0);

// Add 7 bytes to the beginning of the data area. Length should now be 7.
crate::ensure_eq!(buf.prepend(7).is_ok(), true);
crate::ensure_eq!(buf.len(), 7);

// Remove 7 bytes from the end of the data area. Length should now be 0.
crate::ensure_eq!(buf.trim(7).is_ok(), true);
crate::ensure_eq!(buf.len(), 0);

// Verify bad requests actually fail.
crate::ensure_eq!(buf.adjust(30).is_err(), true);
crate::ensure_eq!(buf.trim(30).is_err(), true);
Expand Down