Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Dec 4, 2024
1 parent bcfa7ec commit 64560d6
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 41 deletions.
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl IpcExec {
MemSlice::from_file(&file)?
},
ScanSourceRef::File(file) => MemSlice::from_file(file)?,
ScanSourceRef::Buffer(buff) => MemSlice::from_bytes(buff.clone()),
ScanSourceRef::Buffer(buff) => buff.clone(),
};

IpcReader::new(std::io::Cursor::new(memslice))
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-plan/src/plans/ir/scan_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum ScanSources {
#[cfg_attr(feature = "serde", serde(skip))]
Files(Arc<[File]>),
#[cfg_attr(feature = "serde", serde(skip))]
Buffers(Arc<[bytes::Bytes]>),
Buffers(Arc<[MemSlice]>),
}

impl Debug for ScanSources {
Expand All @@ -43,7 +43,7 @@ impl Debug for ScanSources {
pub enum ScanSourceRef<'a> {
Path(&'a Path),
File(&'a File),
Buffer(&'a bytes::Bytes),
Buffer(&'a MemSlice),
}

/// An iterator for [`ScanSources`]
Expand Down Expand Up @@ -263,7 +263,7 @@ impl ScanSourceRef<'_> {
MemSlice::from_file(&file)
},
ScanSourceRef::File(file) => MemSlice::from_file(file),
ScanSourceRef::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())),
ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
}
}

Expand All @@ -289,7 +289,7 @@ impl ScanSourceRef<'_> {
MemSlice::from_file(&file)
},
Self::File(file) => MemSlice::from_file(file),
Self::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())),
Self::Buffer(buff) => Ok((*buff).clone()),
}
}

Expand All @@ -306,7 +306,7 @@ impl ScanSourceRef<'_> {
.await
},
Self::File(file) => Ok(DynByteSource::from(MemSlice::from_file(file)?)),
Self::Buffer(buff) => Ok(DynByteSource::from(MemSlice::from_bytes((*buff).clone()))),
Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-plan/src/plans/optimizer/count_star.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::path::PathBuf;

use polars_utils::mmap::MemSlice;

use super::*;

pub(super) struct CountStar;
Expand Down Expand Up @@ -68,7 +70,7 @@ fn visit_logical_plan_for_scan_paths(
IR::Union { inputs, .. } => {
enum MutableSources {
Paths(Vec<PathBuf>),
Buffers(Vec<bytes::Bytes>),
Buffers(Vec<MemSlice>),
}

let mut scan_type: Option<FileScan> = None;
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-python/src/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use polars_lazy::prelude::*;
#[cfg(feature = "parquet")]
use polars_parquet::write::StatisticsOptions;
use polars_plan::plans::ScanSources;
use polars_utils::mmap::MemSlice;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::total_ord::{TotalEq, TotalHash};
use pyo3::basic::CompareOp;
Expand Down Expand Up @@ -542,7 +543,7 @@ impl<'py> FromPyObject<'py> for Wrap<ScanSources> {
enum MutableSources {
Paths(Vec<PathBuf>),
Files(Vec<File>),
Buffers(Vec<bytes::Bytes>),
Buffers(Vec<MemSlice>),
}

let num_items = list.len();
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl PyDataFrame {

let result = match get_either_file(py_f, false)? {
Py(f) => {
let buf = f.as_buffer();
let buf = std::io::Cursor::new(f.to_memslice());
py.allow_threads(move || {
ParquetReader::new(buf)
.with_projection(projection)
Expand Down
52 changes: 23 additions & 29 deletions crates/polars-python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
#[cfg(target_family = "unix")]
use std::os::fd::{FromRawFd, RawFd};
use std::path::PathBuf;
use std::sync::Arc;

use polars::io::mmap::MmapBytesReader;
use polars_error::polars_err;
use polars_io::cloud::CloudOptions;
use polars_utils::mmap::MemSlice;
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyString, PyStringMethods};
Expand Down Expand Up @@ -39,38 +41,28 @@ impl PyFileLikeObject {
PyFileLikeObject { inner: object }
}

pub fn as_bytes(&self) -> bytes::Bytes {
self.as_file_buffer().into_inner().into()
}

pub fn as_buffer(&self) -> std::io::Cursor<Vec<u8>> {
let data = self.as_file_buffer().into_inner();
std::io::Cursor::new(data)
}

pub fn as_file_buffer(&self) -> Cursor<Vec<u8>> {
let buf = Python::with_gil(|py| {
pub fn to_memslice(&self) -> MemSlice {
Python::with_gil(|py| {
let bytes = self
.inner
.call_method_bound(py, "read", (), None)
.expect("no read method found");

if let Ok(bytes) = bytes.downcast_bound::<PyBytes>(py) {
return bytes.as_bytes().to_vec();
if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
}

if let Ok(bytes) = bytes.downcast_bound::<PyString>(py) {
return bytes
.to_cow()
.expect("PyString is not valid UTF-8")
.into_owned()
.into_bytes();
if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
return match b.to_cow().expect("PyString is not valid UTF-8") {
Cow::Borrowed(v) => {
MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
},
Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
};
}

panic!("Expecting to be able to downcast into bytes from read result.");
});

Cursor::new(buf)
})
}

/// Validates that the underlying
Expand Down Expand Up @@ -212,7 +204,7 @@ impl EitherRustPythonFile {

fn into_scan_source_input(self) -> PythonScanSourceInput {
match self {
EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.as_bytes()),
EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
}
}
Expand All @@ -226,7 +218,7 @@ impl EitherRustPythonFile {
}

pub enum PythonScanSourceInput {
Buffer(bytes::Bytes),
Buffer(MemSlice),
Path(PathBuf),
File(File),
}
Expand Down Expand Up @@ -328,13 +320,15 @@ pub fn get_python_scan_source_input(
write: bool,
) -> PyResult<PythonScanSourceInput> {
Python::with_gil(|py| {
let py_f = py_f.into_bound(py);
let py_f_0 = py_f;
let py_f = py_f_0.clone_ref(py).into_bound(py);

// If the pyobject is a `bytes` class
if let Ok(bytes) = py_f.downcast::<PyBytes>() {
return Ok(PythonScanSourceInput::Buffer(
bytes::Bytes::copy_from_slice(bytes.as_bytes()),
));
if let Ok(b) = py_f.downcast::<PyBytes>() {
return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
b.as_bytes(),
Arc::new(py_f_0),
)));
}

if let Ok(s) = py_f.extract::<Cow<str>>() {
Expand Down
17 changes: 14 additions & 3 deletions crates/polars-utils/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ mod private {
#[derive(Clone, Debug)]
#[allow(unused)]
enum MemSliceInner {
Bytes(bytes::Bytes),
Mmap(Arc<MMapSemaphore>),
Bytes(bytes::Bytes), // Separate because it does atomic refcounting internally
Arc(Arc<dyn std::fmt::Debug + Send + Sync>),
}

impl Deref for MemSlice {
Expand Down Expand Up @@ -97,7 +97,18 @@ mod private {
slice: unsafe {
std::mem::transmute::<&[u8], &'static [u8]>(mmap.as_ref().as_ref())
},
inner: MemSliceInner::Mmap(mmap),
inner: MemSliceInner::Arc(mmap),
}
}

#[inline]
pub fn from_arc<T>(slice: &[u8], arc: Arc<T>) -> Self
where
T: std::fmt::Debug + Send + Sync + 'static,
{
Self {
slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) },
inner: MemSliceInner::Arc(arc),
}
}

Expand Down

0 comments on commit 64560d6

Please sign in to comment.