Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Reduce memory copy when scanning from Python objects #20142

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl IpcExec {
MemSlice::from_file(&file)?
},
ScanSourceRef::File(file) => MemSlice::from_file(file)?,
ScanSourceRef::Buffer(buff) => MemSlice::from_bytes(buff.clone()),
ScanSourceRef::Buffer(buff) => buff.clone(),
};

IpcReader::new(std::io::Cursor::new(memslice))
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-plan/src/plans/ir/scan_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum ScanSources {
#[cfg_attr(feature = "serde", serde(skip))]
Files(Arc<[File]>),
#[cfg_attr(feature = "serde", serde(skip))]
Buffers(Arc<[bytes::Bytes]>),
Buffers(Arc<[MemSlice]>),
}

impl Debug for ScanSources {
Expand All @@ -43,7 +43,7 @@ impl Debug for ScanSources {
pub enum ScanSourceRef<'a> {
Path(&'a Path),
File(&'a File),
Buffer(&'a bytes::Bytes),
Buffer(&'a MemSlice),
}

/// An iterator for [`ScanSources`]
Expand Down Expand Up @@ -263,7 +263,7 @@ impl ScanSourceRef<'_> {
MemSlice::from_file(&file)
},
ScanSourceRef::File(file) => MemSlice::from_file(file),
ScanSourceRef::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())),
ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
}
}

Expand All @@ -289,7 +289,7 @@ impl ScanSourceRef<'_> {
MemSlice::from_file(&file)
},
Self::File(file) => MemSlice::from_file(file),
Self::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())),
Self::Buffer(buff) => Ok((*buff).clone()),
}
}

Expand All @@ -306,7 +306,7 @@ impl ScanSourceRef<'_> {
.await
},
Self::File(file) => Ok(DynByteSource::from(MemSlice::from_file(file)?)),
Self::Buffer(buff) => Ok(DynByteSource::from(MemSlice::from_bytes((*buff).clone()))),
Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-plan/src/plans/optimizer/count_star.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::path::PathBuf;

use polars_utils::mmap::MemSlice;

use super::*;

pub(super) struct CountStar;
Expand Down Expand Up @@ -68,7 +70,7 @@ fn visit_logical_plan_for_scan_paths(
IR::Union { inputs, .. } => {
enum MutableSources {
Paths(Vec<PathBuf>),
Buffers(Vec<bytes::Bytes>),
Buffers(Vec<MemSlice>),
}

let mut scan_type: Option<FileScan> = None;
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-python/src/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use polars_lazy::prelude::*;
#[cfg(feature = "parquet")]
use polars_parquet::write::StatisticsOptions;
use polars_plan::plans::ScanSources;
use polars_utils::mmap::MemSlice;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::total_ord::{TotalEq, TotalHash};
use pyo3::basic::CompareOp;
Expand Down Expand Up @@ -542,7 +543,7 @@ impl<'py> FromPyObject<'py> for Wrap<ScanSources> {
enum MutableSources {
Paths(Vec<PathBuf>),
Files(Vec<File>),
Buffers(Vec<bytes::Bytes>),
Buffers(Vec<MemSlice>),
}

let num_items = list.len();
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl PyDataFrame {

let result = match get_either_file(py_f, false)? {
Py(f) => {
let buf = f.as_buffer();
let buf = std::io::Cursor::new(f.to_memslice());
py.allow_threads(move || {
ParquetReader::new(buf)
.with_projection(projection)
Expand Down
52 changes: 23 additions & 29 deletions crates/polars-python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
#[cfg(target_family = "unix")]
use std::os::fd::{FromRawFd, RawFd};
use std::path::PathBuf;
use std::sync::Arc;

use polars::io::mmap::MmapBytesReader;
use polars_error::polars_err;
use polars_io::cloud::CloudOptions;
use polars_utils::mmap::MemSlice;
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyString, PyStringMethods};
Expand Down Expand Up @@ -39,38 +41,28 @@ impl PyFileLikeObject {
PyFileLikeObject { inner: object }
}

pub fn as_bytes(&self) -> bytes::Bytes {
self.as_file_buffer().into_inner().into()
}

pub fn as_buffer(&self) -> std::io::Cursor<Vec<u8>> {
let data = self.as_file_buffer().into_inner();
std::io::Cursor::new(data)
}

pub fn as_file_buffer(&self) -> Cursor<Vec<u8>> {
let buf = Python::with_gil(|py| {
pub fn to_memslice(&self) -> MemSlice {
Python::with_gil(|py| {
let bytes = self
.inner
.call_method_bound(py, "read", (), None)
.expect("no read method found");

if let Ok(bytes) = bytes.downcast_bound::<PyBytes>(py) {
return bytes.as_bytes().to_vec();
if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
}

if let Ok(bytes) = bytes.downcast_bound::<PyString>(py) {
return bytes
.to_cow()
.expect("PyString is not valid UTF-8")
.into_owned()
.into_bytes();
if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
return match b.to_cow().expect("PyString is not valid UTF-8") {
Cow::Borrowed(v) => {
MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
},
Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
};
}

panic!("Expecting to be able to downcast into bytes from read result.");
});

Cursor::new(buf)
})
}

/// Validates that the underlying
Expand Down Expand Up @@ -212,7 +204,7 @@ impl EitherRustPythonFile {

fn into_scan_source_input(self) -> PythonScanSourceInput {
match self {
EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.as_bytes()),
EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
}
}
Expand All @@ -226,7 +218,7 @@ impl EitherRustPythonFile {
}

pub enum PythonScanSourceInput {
Buffer(bytes::Bytes),
Buffer(MemSlice),
Path(PathBuf),
File(File),
}
Expand Down Expand Up @@ -328,13 +320,15 @@ pub fn get_python_scan_source_input(
write: bool,
) -> PyResult<PythonScanSourceInput> {
Python::with_gil(|py| {
let py_f = py_f.into_bound(py);
let py_f_0 = py_f;
let py_f = py_f_0.clone_ref(py).into_bound(py);

// If the pyobject is a `bytes` class
if let Ok(bytes) = py_f.downcast::<PyBytes>() {
return Ok(PythonScanSourceInput::Buffer(
bytes::Bytes::copy_from_slice(bytes.as_bytes()),
));
if let Ok(b) = py_f.downcast::<PyBytes>() {
return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
b.as_bytes(),
Arc::new(py_f_0),
)));
}

if let Ok(s) = py_f.extract::<Cow<str>>() {
Expand Down
17 changes: 14 additions & 3 deletions crates/polars-utils/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ mod private {
#[derive(Clone, Debug)]
#[allow(unused)]
enum MemSliceInner {
Bytes(bytes::Bytes),
Mmap(Arc<MMapSemaphore>),
Bytes(bytes::Bytes), // Separate because it does atomic refcounting internally
Arc(Arc<dyn std::fmt::Debug + Send + Sync>),
}

impl Deref for MemSlice {
Expand Down Expand Up @@ -97,7 +97,18 @@ mod private {
slice: unsafe {
std::mem::transmute::<&[u8], &'static [u8]>(mmap.as_ref().as_ref())
},
inner: MemSliceInner::Mmap(mmap),
inner: MemSliceInner::Arc(mmap),
}
}

#[inline]
pub fn from_arc<T>(slice: &[u8], arc: Arc<T>) -> Self
where
T: std::fmt::Debug + Send + Sync + 'static,
{
Self {
slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) },
inner: MemSliceInner::Arc(arc),
}
}

Expand Down
Loading