Skip to content

Commit

Permalink
Add url prefix convention for many compression formats (#2822)
Browse files Browse the repository at this point in the history
* remove compression="infer" in xopen

* add fs protocols for bz2, lz4, xz and zstd

* test streaming gz, lz4, bz2, xz and zst

* fix test

* fix tar streaming

* temporarily remove zip and tar data_files streaming

* lewis' comments

* docs on how streaming works with chained URLs

* severo's comment

* lewis' comments
  • Loading branch information
lhoestq authored Aug 23, 2021
1 parent 72ba8c3 commit 9adc7db
Show file tree
Hide file tree
Showing 14 changed files with 457 additions and 167 deletions.
105 changes: 105 additions & 0 deletions docs/source/dataset_streaming.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,108 @@ It is possible to get a ``torch.utils.data.IterableDataset`` from a :class:`data
{'input_ids': tensor([[101, 11047, 10497, 7869, 2352...]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0...]]), 'attention_mask': tensor([[1, 1, 1, 1, 1...]])}
For now, only the PyTorch format is supported but support for TensorFlow and others will be added soon.


How does dataset streaming work ?
--------------------------------------------------

The StreamingDownloadManager
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The standard (i.e. non-streaming) way of loading a dataset has two steps:

1. download and extract the raw data files of the dataset by using the :class:`datasets.DownloadManager`
2. process the data files to generate the Arrow file used to load the :class:`datasets.Dataset` object.

For example, in non-streaming mode a file is simply downloaded like this:

.. code-block::
>>> from datasets import DownloadManager
>>> url = "https://huggingface.co/datasets/lhoestq/test/resolve/main/some_text.txt"
>>> filepath = DownloadManager().download(url) # the file is downloaded here
>>> print(filepath)
'/Users/user/.cache/huggingface/datasets/downloads/16b702620cad8d485bafea59b1d2ed69e796196e6f2c73f005dee935a413aa19.ab631f60c6cb31a079ecf1ad910005a7c009ef0f1e4905b69d489fb2bd162683'
>>> with open(filepath) as f:
... print(f.read())
When you load a dataset in streaming mode, the download manager that is used instead is the :class:`datasets.StreamingDownloadManager`.
Instead of actually downloading and extracting all the data when you load the dataset, it is done lazily.
The file starts to be downloaded and extracted only when ``open`` is called.
This is made possible by extending ``open`` to support opening remote files via HTTP.
In each dataset script, ``open`` is replaced by our function ``xopen`` that extends ``open`` to be able to stream data from remote files.

Here is a sample code that shows what is done under the hood:

.. code-block::
>>> from datasets.utils.streaming_download_manager import StreamingDownloadManager, xopen
>>> url = "https://huggingface.co/datasets/lhoestq/test/resolve/main/some_text.txt"
>>> urlpath = StreamingDownloadManager().download(url)
>>> print(urlpath)
'https://huggingface.co/datasets/lhoestq/test/resolve/main/some_text.txt'
>>> with xopen(urlpath) as f:
... print(f.read()) # the file is actually downloaded here
As you can see, since it's possible to open remote files via an URL, the streaming download manager just returns the URL instead of the path to the local downloaded file.

Then the file is downloaded in a streaming fashion: it is downloaded progessively as you iterate over the data file.
This is made possible because it is based on ``fsspec``, a library that allows to open and iterate on remote files.
You can find more information about ``fsspec`` in `its documentation <https://filesystem-spec.readthedocs.io/>`_

Compressed files and archives
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You may have noticed that the streaming download manager returns the exact same URL that was given as input for a text file.
However if you use ``download_and_extract`` on a compressed file instead, then the output url will be a chained URL.
Chained URLs are used by ``fsspec`` to navigate in remote compressed archives.

Some examples of chained URL are:

.. code-block::
>>> from datasets.utils.streaming_download_manager import xopen
>>> chained_url = "zip://combined/train.json::https://adversarialqa.github.io/data/aqa_v1.0.zip"
>>> with xopen(chained_url) as f:
... print(f.read()[:100])
'{"data": [{"title": "Brain", "paragraphs": [{"context": "Another approach to brain function is to ex'
>>> chained_url2 = "gzip://mkqa.jsonl::/~https://github.com/apple/ml-mkqa/raw/master/dataset/mkqa.jsonl.gz"
>>> with xopen(chained_url2) as f:
... print(f.readline()[:100])
'{"query": "how long did it take the twin towers to be built", "answers": {"en": [{"type": "number_wi'
We also extended some functions from ``os.path`` to work with chained URLs.
For example ``os.path.join`` is replaced by our function ``xjoin`` that extends ``os.path.join`` to work with chained URLs:

.. code-block::
>>> from datasets.utils.streaming_download_manager import StreamingDownloadManager, xopen, xjoin
>>> url = "https://adversarialqa.github.io/data/aqa_v1.0.zip"
>>> archive_path = StreamingDownloadManager().download_and_extract(url)
>>> print(archive_path)
'zip://::https://adversarialqa.github.io/data/aqa_v1.0.zip'
>>> filepath = xjoin(archive_path, "combined", "train.json")
>>> print(filepath)
'zip://combined/train.json::https://adversarialqa.github.io/data/aqa_v1.0.zip'
>>> with xopen(filepath) as f:
... print(f.read()[:100])
'{"data": [{"title": "Brain", "paragraphs": [{"context": "Another approach to brain function is to ex'
You can also take a look at the ``fsspec`` documentation about URL chaining `here <https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining>`_

.. note::

Streaming data from TAR archives is currently highly inefficient and requires a lot of bandwidth. We are working on optimizing this to offer you the best performance, stay tuned !

Dataset script compatibility
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Now that you are aware of how dataset streaming works, you can make sure your dataset script work in streaming mode:

1. make sure you use ``open`` to open the data files: it is extended to work with remote files
2. if you have to deal with archives like ZIP files, make sure you use ``os.path.join`` to navigate in the archive

Currently a few python functions or classes are not supported for dataset streaming:

- ``pathlib.Path`` and all its methods are not supported, please use ``os.path.join`` and string objects
- ``os.walk``, ``os.listdir``, ``glob.glob`` are not supported yet
17 changes: 3 additions & 14 deletions src/datasets/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,10 @@
logger.info("Disabling Apache Beam because USE_BEAM is set to False")


USE_RAR = os.environ.get("USE_RAR", "AUTO").upper()
RARFILE_VERSION = "N/A"
RARFILE_AVAILABLE = False
if USE_RAR in ("1", "ON", "YES", "AUTO"):
try:
RARFILE_VERSION = version.parse(importlib_metadata.version("rarfile"))
RARFILE_AVAILABLE = True
logger.info("rarfile available.")
except importlib_metadata.PackageNotFoundError:
pass
else:
logger.info("Disabling rarfile because USE_RAR is set to False")


# Optional compression tools
RARFILE_AVAILABLE = importlib.util.find_spec("rarfile") is not None
ZSTANDARD_AVAILABLE = importlib.util.find_spec("zstandard") is not None
LZ4_AVAILABLE = importlib.util.find_spec("lz4") is not None


# Cache location
Expand Down
11 changes: 10 additions & 1 deletion src/datasets/filesystems/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import importlib
from typing import List

import fsspec

Expand All @@ -10,9 +11,17 @@
if _has_s3fs:
from .s3filesystem import S3FileSystem # noqa: F401

COMPRESSION_FILESYSTEMS: List[compression.BaseCompressedFileFileSystem] = [
compression.Bz2FileSystem,
compression.GzipFileSystem,
compression.Lz4FileSystem,
compression.XzFileSystem,
compression.ZstdFileSystem,
]

# Register custom filesystems
fsspec.register_implementation(compression.gzip.GZipFileSystem.protocol, compression.gzip.GZipFileSystem)
for fs_class in COMPRESSION_FILESYSTEMS:
fsspec.register_implementation(fs_class.protocol, fs_class)


def extract_path_from_uri(dataset_path: str) -> str:
Expand Down
168 changes: 168 additions & 0 deletions src/datasets/filesystems/compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import os
from typing import Optional

import fsspec
from fsspec.archive import AbstractArchiveFileSystem
from fsspec.utils import DEFAULT_BLOCK_SIZE


class BaseCompressedFileFileSystem(AbstractArchiveFileSystem):
"""Read contents of compressed file as a filesystem with one file inside."""

root_marker = ""
protocol: str = (
None # protocol passed in prefix to the url. ex: "gzip", for gzip://file.txt::http://foo.bar/file.txt.gz
)
compression: str = None # compression type in fsspec. ex: "gzip"
extension: str = None # extension of the filename to strip. ex: "".gz" to get file.txt from file.txt.gz

def __init__(
self, fo: str = "", target_protocol: Optional[str] = None, target_options: Optional[dict] = None, **kwargs
):
"""
The compressed file system can be instantiated from any compressed file.
It reads the contents of compressed file as a filesystem with one file inside, as if it was an archive.
The single file inside the filesystem is named after the compresssed file,
without the compression extension at the end of the filename.
Args:
fo (:obj:``str``): Path to compressed file. Will fetch file using ``fsspec.open()``
mode (:obj:``str``): Currently, only 'rb' accepted
target_protocol(:obj:``str``, optional): To override the FS protocol inferred from a URL.
target_options (:obj:``dict``, optional): Kwargs passed when instantiating the target FS.
"""
super().__init__(self, **kwargs)
# always open as "rb" since fsspec can then use the TextIOWrapper to make it work for "r" mode
self.file = fsspec.open(
fo, mode="rb", protocol=target_protocol, compression=self.compression, **(target_options or {})
)
self.info = self.file.fs.info(self.file.path)
self.compressed_name = os.path.basename(self.file.path.split("::")[0])
self.uncompressed_name = self.compressed_name[: self.compressed_name.rindex(".")]
self.dir_cache = None

@classmethod
def _strip_protocol(cls, path):
# compressed file paths are always relative to the archive root
return super()._strip_protocol(path).lstrip("/")

def _get_dirs(self):
if self.dir_cache is None:
f = {**self.info, "name": self.uncompressed_name}
self.dir_cache = {f["name"]: f}

def cat(self, path: str):
return self.file.open().read()

def _open(
self,
path: str,
mode: str = "rb",
block_size=None,
autocommit=True,
cache_options=None,
**kwargs,
):
path = self._strip_protocol(path)
if mode != "rb":
raise ValueError(f"Tried to read with mode {mode} on file {self.file.path} opened with mode 'rb'")
if path != self.uncompressed_name:
raise FileNotFoundError(f"Expected file {self.uncompressed_name} but got {path}")
return self.file.open()


class Bz2FileSystem(BaseCompressedFileFileSystem):
"""Read contents of BZ2 file as a filesystem with one file inside."""

protocol = "bz2"
compression = "bz2"
extension = ".bz2"


class GzipFileSystem(BaseCompressedFileFileSystem):
"""Read contents of GZIP file as a filesystem with one file inside."""

protocol = "gzip"
compression = "gzip"
extension = ".gz"


class Lz4FileSystem(BaseCompressedFileFileSystem):
"""Read contents of LZ4 file as a filesystem with one file inside."""

protocol = "lz4"
compression = "lz4"
extension = ".lz4"


class XzFileSystem(BaseCompressedFileFileSystem):
"""Read contents of .xz (LZMA) file as a filesystem with one file inside."""

protocol = "xz"
compression = "xz"
extension = ".xz"


class ZstdFileSystem(BaseCompressedFileFileSystem):
"""
Read contents of zstd file as a filesystem with one file inside.
Note that reading in binary mode with fsspec isn't supported yet:
/~https://github.com/indygreg/python-zstandard/issues/136
"""

protocol = "zstd"
compression = "zstd"
extension = ".zst"

def __init__(
self,
fo: str,
mode: str = "rb",
target_protocol: Optional[str] = None,
target_options: Optional[dict] = None,
block_size: int = DEFAULT_BLOCK_SIZE,
**kwargs,
):
super().__init__(
fo=fo,
mode=mode,
target_protocol=target_protocol,
target_options=target_options,
block_size=block_size,
**kwargs,
)
# We need to wrap the zstd decompressor to avoid this error in fsspec==2021.7.0 and zstandard==0.15.2:
#
# File "/Users/user/.virtualenvs/hf-datasets/lib/python3.7/site-packages/fsspec/core.py", line 145, in open
# out.close = close
# AttributeError: 'zstd.ZstdDecompressionReader' object attribute 'close' is read-only
#
# see /~https://github.com/intake/filesystem_spec/issues/725
_enter = self.file.__enter__

class WrappedFile:
def __init__(self, file_):
self._file = file_

def __enter__(self):
self._file.__enter__()
return self

def __exit__(self, *args, **kwargs):
self._file.__exit__(*args, **kwargs)

def __iter__(self):
return iter(self._file)

def __next__(self):
return next(self._file)

def __getattr__(self, attr):
return getattr(self._file, attr)

def fixed_enter(*args, **kwargs):
return WrappedFile(_enter(*args, **kwargs))

self.file.__enter__ = fixed_enter
1 change: 0 additions & 1 deletion src/datasets/filesystems/compression/__init__.py

This file was deleted.

Loading

1 comment on commit 9adc7db

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Show benchmarks

PyArrow==3.0.0

Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.010701 / 0.011353 (-0.000652) 0.004086 / 0.011008 (-0.006922) 0.036589 / 0.038508 (-0.001919) 0.041225 / 0.023109 (0.018116) 0.359508 / 0.275898 (0.083610) 0.392124 / 0.323480 (0.068644) 0.009396 / 0.007986 (0.001410) 0.005063 / 0.004328 (0.000734) 0.010501 / 0.004250 (0.006250) 0.046559 / 0.037052 (0.009506) 0.372839 / 0.258489 (0.114350) 0.406192 / 0.293841 (0.112351) 0.027122 / 0.128546 (-0.101424) 0.008722 / 0.075646 (-0.066925) 0.298849 / 0.419271 (-0.120422) 0.053150 / 0.043533 (0.009617) 0.355580 / 0.255139 (0.100441) 0.400323 / 0.283200 (0.117123) 0.118248 / 0.141683 (-0.023435) 2.120152 / 1.452155 (0.667997) 2.155194 / 1.492716 (0.662478)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.014519 / 0.018006 (-0.003487) 0.478632 / 0.000490 (0.478142) 0.002804 / 0.000200 (0.002604) 0.000078 / 0.000054 (0.000023)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.043492 / 0.037411 (0.006081) 0.026967 / 0.014526 (0.012441) 0.030913 / 0.176557 (-0.145644) 0.145577 / 0.737135 (-0.591559) 0.031404 / 0.296338 (-0.264935)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.407365 / 0.215209 (0.192156) 4.141050 / 2.077655 (2.063395) 2.077630 / 1.504120 (0.573511) 1.871158 / 1.541195 (0.329963) 1.923131 / 1.468490 (0.454641) 0.363253 / 4.584777 (-4.221524) 4.995971 / 3.745712 (1.250259) 5.623255 / 5.269862 (0.353393) 2.772526 / 4.565676 (-1.793150) 0.043042 / 0.424275 (-0.381233) 0.006017 / 0.007607 (-0.001591) 0.530965 / 0.226044 (0.304921) 5.292137 / 2.268929 (3.023209) 2.658698 / 55.444624 (-52.785926) 2.239946 / 6.876477 (-4.636531) 2.332951 / 2.142072 (0.190879) 0.509457 / 4.805227 (-4.295771) 0.116491 / 6.500664 (-6.384173) 0.061044 / 0.075469 (-0.014425)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 126.911067 / 1.841788 (125.069279) 14.936440 / 8.074308 (6.862132) 31.399334 / 10.191392 (21.207942) 0.872395 / 0.680424 (0.191971) 0.618952 / 0.534201 (0.084751) 0.264083 / 0.579283 (-0.315200) 0.581173 / 0.434364 (0.146809) 0.392311 / 0.540337 (-0.148026) 1.217468 / 1.386936 (-0.169468)
PyArrow==latest
Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.010856 / 0.011353 (-0.000497) 0.004043 / 0.011008 (-0.006965) 0.036767 / 0.038508 (-0.001741) 0.041144 / 0.023109 (0.018035) 0.338018 / 0.275898 (0.062120) 0.373891 / 0.323480 (0.050411) 0.009390 / 0.007986 (0.001404) 0.005966 / 0.004328 (0.001637) 0.010611 / 0.004250 (0.006360) 0.044464 / 0.037052 (0.007412) 0.336793 / 0.258489 (0.078304) 0.421893 / 0.293841 (0.128052) 0.027478 / 0.128546 (-0.101069) 0.008828 / 0.075646 (-0.066818) 0.299660 / 0.419271 (-0.119612) 0.053742 / 0.043533 (0.010209) 0.342950 / 0.255139 (0.087811) 0.371937 / 0.283200 (0.088738) 0.117360 / 0.141683 (-0.024323) 2.046747 / 1.452155 (0.594592) 2.121901 / 1.492716 (0.629185)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.078887 / 0.018006 (0.060881) 0.485888 / 0.000490 (0.485398) 0.053158 / 0.000200 (0.052958) 0.000534 / 0.000054 (0.000480)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.041436 / 0.037411 (0.004025) 0.025883 / 0.014526 (0.011357) 0.033666 / 0.176557 (-0.142891) 0.151491 / 0.737135 (-0.585645) 0.035275 / 0.296338 (-0.261063)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.402978 / 0.215209 (0.187769) 4.059305 / 2.077655 (1.981650) 2.053137 / 1.504120 (0.549017) 1.853276 / 1.541195 (0.312081) 1.893028 / 1.468490 (0.424538) 0.362644 / 4.584777 (-4.222133) 5.221594 / 3.745712 (1.475882) 6.752943 / 5.269862 (1.483081) 3.434059 / 4.565676 (-1.131617) 0.042890 / 0.424275 (-0.381385) 0.006219 / 0.007607 (-0.001388) 0.538977 / 0.226044 (0.312932) 5.366086 / 2.268929 (3.097157) 2.600468 / 55.444624 (-52.844156) 2.199339 / 6.876477 (-4.677137) 2.231395 / 2.142072 (0.089323) 0.487271 / 4.805227 (-4.317957) 0.114549 / 6.500664 (-6.386115) 0.060380 / 0.075469 (-0.015089)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 123.302070 / 1.841788 (121.460283) 14.840054 / 8.074308 (6.765746) 30.843536 / 10.191392 (20.652144) 0.889644 / 0.680424 (0.209220) 0.583946 / 0.534201 (0.049745) 0.261144 / 0.579283 (-0.318139) 0.574732 / 0.434364 (0.140368) 0.357287 / 0.540337 (-0.183050) 1.222354 / 1.386936 (-0.164582)

CML watermark

Please sign in to comment.