Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask dataframe constructor #850

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ Write the date in place of the "Unreleased" in the case a new version is release

## Unreleased

### Fixed

- Update usage of `dask.dataframe.core` to adjust to backward-incompatible
changes in Dask 2025.1.0.

### Maintenance

- Addressed DeprecationWarnings from Python and dependencies
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ all = [
"dask",
"dask[array]",
"dask[dataframe]",
"dask_expr",
"entrypoints",
"fastapi",
"h5netcdf",
Expand Down Expand Up @@ -106,6 +107,7 @@ client = [
"blosc2; python_version >= '3.10'",
"dask[array]",
"dask[dataframe]",
"dask_expr",
"entrypoints",
"httpx >=0.20.0,!=0.23.1",
"json-merge-patch",
Expand Down Expand Up @@ -137,6 +139,7 @@ compression = [
# These are needed by the client and server to transmit/receive dataframes.
dataframe = [
"dask[dataframe]",
"dask_expr",
"pandas",
"pyarrow",
]
Expand Down Expand Up @@ -234,6 +237,7 @@ server = [
"cachetools",
"canonicaljson",
"dask",
"dask_expr",
"dask[array]",
"dask[dataframe]",
"fastapi",
Expand Down
19 changes: 8 additions & 11 deletions tiled/client/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from urllib.parse import parse_qs, urlparse

import dask
import dask.dataframe.core
import dask_expr
import httpx

from ..serialization.table import deserialize_arrow, serialize_arrow
Expand Down Expand Up @@ -147,7 +147,7 @@ def read_partition(self, partition, columns=None):
meta = structure.meta
if columns is not None:
meta = meta[columns]
return dask.dataframe.from_delayed(
return dask_expr.DataFrame.from_delayed(
[dask.delayed(self._get_partition)(partition, columns)],
meta=meta,
divisions=(None,) * (1 + npartitions),
Expand All @@ -160,22 +160,19 @@ def read(self, columns=None):
The result will be internally partitioned with dask.
"""
structure = self.structure()
meta = structure.meta
if columns is not None:
meta = meta[columns]

# Build a client-side dask dataframe whose partitions pull from a
# server-side dask array.
name = f"remote-dask-dataframe-{self.item['links']['self']}"
dask_tasks = {
(name, partition): (self._get_partition, partition, columns)
for partition in range(structure.npartitions)
}
meta = structure.meta

if columns is not None:
meta = meta[columns]
ddf = dask.dataframe.core.DataFrame(
dask_tasks,
name=name,
meta=meta,
divisions=(None,) * (1 + structure.npartitions),
ddf = dask_expr.DataFrame.from_map(
dask_tasks, meta=meta, divisions=(None,) * (1 + structure.npartitions)
Copy link

@phofl phofl Jan 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want:

dd.from_map(self._get_partition, list(range(structure.npartitions)), columns=columns)

Please do not import dask_expr directly. You should import dask.dataframe as dd and use whatever is in the namespace. dask-expr was merged into dask.dataframe and will go away now with the latest release

)
if columns is not None:
ddf = ddf[columns]
Expand Down
Loading