Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offline loading #1726

Merged
merged 24 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/datasets/commands/dummy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def run(self):
dataset_name=self._dataset_name,
config=config,
version=version,
is_local=True,
use_local_dummy_data=True,
load_existing_dummy_data=False,
)

Expand Down
157 changes: 105 additions & 52 deletions src/datasets/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import os
import re
import shutil
from hashlib import sha256
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
Expand All @@ -33,36 +33,49 @@
from .features import Features
from .info import DATASET_INFOS_DICT_FILE_NAME
from .metric import Metric
from .packaged_modules import _PACKAGED_DATASETS_MODULES, hash_python_lines
from .splits import Split
from .utils.download_manager import GenerateMode
from .utils.file_utils import HF_MODULES_CACHE, DownloadConfig, cached_path, head_hf_s3, hf_bucket_url, hf_github_url
from .utils.file_utils import (
HF_MODULES_CACHE,
DownloadConfig,
cached_path,
head_hf_s3,
hf_bucket_url,
hf_github_url,
init_hf_modules,
)
from .utils.filelock import FileLock
from .utils.logging import get_logger
from .utils.version import Version


logger = get_logger(__name__)

DYNAMIC_MODULES_PATH = os.path.join(HF_MODULES_CACHE, "datasets_modules")
DATASETS_PATH = os.path.join(DYNAMIC_MODULES_PATH, "datasets")
DATASETS_MODULE = "datasets_modules.datasets"
METRICS_PATH = os.path.join(DYNAMIC_MODULES_PATH, "metrics")
METRICS_MODULE = "datasets_modules.metrics"
MODULE_NAME_FOR_DYNAMIC_MODULES = "datasets_modules"


def init_dynamic_modules():
os.makedirs(DYNAMIC_MODULES_PATH, exist_ok=True)
if not os.path.exists(os.path.join(DYNAMIC_MODULES_PATH, "__init__.py")):
with open(os.path.join(DYNAMIC_MODULES_PATH, "__init__.py"), "w"):
def init_dynamic_modules(name: str, hf_modules_cache: Optional[str] = None):
"""
Create a module with name `name` in which you can add dynamic modules
such as metrics or datasets. The module can be imported using its name.
The module is created in the HF_MODULE_CACHE directory by default (~/.cache/huggingface/modules) but it can
be overriden by specifying a path to another directory in `hf_modules_cache`.
"""
hf_modules_cache = init_hf_modules(hf_modules_cache)
dynamic_modules_path = os.path.join(hf_modules_cache, name)
os.makedirs(dynamic_modules_path, exist_ok=True)
if not os.path.exists(os.path.join(dynamic_modules_path, "__init__.py")):
with open(os.path.join(dynamic_modules_path, "__init__.py"), "w"):
pass
return dynamic_modules_path


def import_main_class(module_path, dataset=True) -> Union[DatasetBuilder, Metric]:
"""Import a module at module_path and return its main class:
- a DatasetBuilder if dataset is True
- a Metric if dataset is False
"""
importlib.invalidate_caches()
module = importlib.import_module(module_path)

if dataset:
Expand Down Expand Up @@ -99,22 +112,7 @@ def files_to_hash(file_paths: List[str]) -> str:
for file_path in to_use_files:
with open(file_path, mode="r", encoding="utf-8") as f:
lines.extend(f.readlines())
filtered_lines = []
for line in lines:
line.replace("\n", "") # remove line breaks, white space and comments
line.replace(" ", "")
line.replace("\t", "")
line = re.sub(r"#.*", "", line)
if line:
filtered_lines.append(line)
file_str = "\n".join(filtered_lines)

# Make a hash from all this code
file_bytes = file_str.encode("utf-8")
file_hash = sha256(file_bytes)
filename = file_hash.hexdigest()

return filename
return hash_python_lines(lines)


def convert_github_url(url_path: str) -> Tuple[str, str]:
Expand Down Expand Up @@ -204,6 +202,7 @@ def prepare_module(
download_mode: Optional[GenerateMode] = None,
dataset: bool = True,
force_local_path: Optional[str] = None,
dynamic_modules_path: Optional[str] = None,
**download_kwargs,
) -> Tuple[str, str]:
r"""
Expand All @@ -217,14 +216,17 @@ def prepare_module(
path (str):
path to the dataset or metric script, can be either:
- a path to a local directory containing the dataset processing python script
- an url to a S3 directory with a dataset processing python script
- an url to a github or S3 directory with a dataset processing python script
script_version (Optional ``Union[str, datasets.Version]``): if specified, the module will be loaded from the datasets repository
at this version. By default it is set to the local version fo the lib. Specifying a version that is different from
your local version of the lib might cause compatibility issues.
download_config (Optional ``datasets.DownloadConfig``: specific download configuration parameters.
dataset (bool): True if the script to load is a dataset, False if the script is a metric.
force_local_path (Optional str): Optional path to a local path to download and prepare the script to.
Used to inspect or modify the script folder.
dynamic_modules_path (Optional str, defaults to HF_MODULES_CACHE / "datasets_modules", i.e. ~/.cache/huggingface/modules/datasets_modules):
Optional path to the directory in which the dynamic modules are saved. It must have been initialized with :obj:`init_dynamic_modules`.
By default the datasets and metrics are stored inside the `datasets_modules` module.
download_kwargs: optional attributes for DownloadConfig() which will override the attributes in download_config if supplied.

Return: Tuple[``str``, ``str``] with
Expand All @@ -233,7 +235,6 @@ def prepare_module(
- the local path to the dataset/metric file if force_local_path is True: e.g. '/User/huggingface/datasets/datasets/squad/squad.py'
2. A hash string computed from the content of the dataset loading script.
"""
init_dynamic_modules()
if download_config is None:
download_config = DownloadConfig(**download_kwargs)
download_config.extract_compressed_file = True
Expand All @@ -247,6 +248,31 @@ def prepare_module(
# Short name is name without the '.py' at the end (for the module)
short_name = name[:-3]

# first check if the module is packaged with the `datasets` package
if dataset and path in _PACKAGED_DATASETS_MODULES:
try:
head_hf_s3(path, filename=name, dataset=dataset, max_retries=download_config.max_retries)
except Exception:
logger.debug(f"Couldn't head HF s3 for packaged dataset module '{path}'. Running in offline mode.")
return _PACKAGED_DATASETS_MODULES[path]

# otherwise the module is added to the dynamic modules
dynamic_modules_path = (
dynamic_modules_path
if dynamic_modules_path is not None
else init_dynamic_modules(MODULE_NAME_FOR_DYNAMIC_MODULES, hf_modules_cache=HF_MODULES_CACHE)
)
module_name_for_dynamic_modules = os.path.basename(dynamic_modules_path)
datasets_modules_path = os.path.join(dynamic_modules_path, "datasets")
datasets_modules_name = module_name_for_dynamic_modules + ".datasets"
metrics_modules_path = os.path.join(dynamic_modules_path, "metric")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhoestq small typo here which breaks metrics loading, submitting a fix now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was "metrics" with an 's' !! Good catch

metrics_modules_name = module_name_for_dynamic_modules + ".metrics"

if force_local_path is None:
main_folder_path = os.path.join(datasets_modules_path if dataset else metrics_modules_path, short_name)
else:
main_folder_path = force_local_path

# We have three ways to find the processing file:
# - if os.path.join(path, name) is a file or a remote url
# - if path is a file or a remote url
Expand All @@ -260,30 +286,50 @@ def prepare_module(
local_path = path
else:
# Try github (canonical datasets/metrics) and then S3 (users datasets/metrics)
head_hf_s3(path, filename=name, dataset=dataset, max_retries=download_config.max_retries)
script_version = str(script_version) if script_version is not None else None
file_path = hf_github_url(path=path, name=name, dataset=dataset, version=script_version)
try:
local_path = cached_path(file_path, download_config=download_config)
except FileNotFoundError:
if script_version is not None:
raise ValueError(
"Couldn't find remote file with version {} at {}\nPlease provide a valid version and a valid {} name".format(
script_version, file_path, "dataset" if dataset else "metric"
)
)
github_file_path = file_path
file_path = hf_bucket_url(path, filename=name, dataset=dataset)
head_hf_s3(path, filename=name, dataset=dataset, max_retries=download_config.max_retries)
script_version = str(script_version) if script_version is not None else None
file_path = hf_github_url(path=path, name=name, dataset=dataset, version=script_version)
try:
local_path = cached_path(file_path, download_config=download_config)
except FileNotFoundError:
raise FileNotFoundError(
"Couldn't find file locally at {}, or remotely at {} or {}.\n"
'If the {} was added recently, you may need to to pass script_version="master" to find '
"the loading script on the master branch.".format(
combined_path, github_file_path, file_path, "dataset" if dataset else "metric"
if script_version is not None:
raise ValueError(
"Couldn't find remote file with version {} at {}\nPlease provide a valid version and a valid {} name".format(
script_version, file_path, "dataset" if dataset else "metric"
)
)
github_file_path = file_path
file_path = hf_bucket_url(path, filename=name, dataset=dataset)
try:
local_path = cached_path(file_path, download_config=download_config)
except FileNotFoundError:
raise FileNotFoundError(
"Couldn't find file locally at {}, or remotely at {} or {}.\n"
'If the {} was added recently, you may need to to pass script_version="master" to find '
"the loading script on the master branch.".format(
combined_path, github_file_path, file_path, "dataset" if dataset else "metric"
)
)
except Exception as e: # noqa: all the attempts failed, before raising the error we should check if the module already exists.
if os.path.isdir(main_folder_path):
hashes = [h for h in os.listdir(main_folder_path) if len(h) == 64]
if hashes:
# get most recent
def _get_modification_time(module_hash):
return (Path(main_folder_path) / module_hash / name).stat().st_mtime

hash = sorted(hashes, key=_get_modification_time)[-1]
module_path = ".".join(
[datasets_modules_name if dataset else metrics_modules_name, short_name, hash, short_name]
)
)
logger.warning(
f"Using the latest cached version of the module from {os.path.join(main_folder_path, hash)} "
f"(last modified on {time.ctime(_get_modification_time(hash))}) since it "
f"couldn't be found locally at {combined_path} or remotely ({type(e).__name__})."
)
return module_path, hash
raise

# Load the module in two steps:
# 1. get the processing file on the local filesystem if it's not there (download to cache dir)
Expand Down Expand Up @@ -352,10 +398,8 @@ def prepare_module(
hash = files_to_hash([local_path] + [loc[1] for loc in local_imports])

if force_local_path is None:
main_folder_path = os.path.join(DATASETS_PATH if dataset else METRICS_PATH, short_name)
hash_folder_path = os.path.join(main_folder_path, hash)
else:
main_folder_path = force_local_path
hash_folder_path = force_local_path

local_file_path = os.path.join(hash_folder_path, name)
Expand Down Expand Up @@ -445,10 +489,14 @@ def prepare_module(
raise OSError(f"Error with local import at {import_path}")

if force_local_path is None:
module_path = ".".join([DATASETS_MODULE if dataset else METRICS_MODULE, short_name, hash, short_name])
module_path = ".".join(
[datasets_modules_name if dataset else metrics_modules_name, short_name, hash, short_name]
)
else:
module_path = local_file_path

# make the new module to be noticed by the import system
importlib.invalidate_caches()
return module_path, hash


Expand Down Expand Up @@ -605,11 +653,16 @@ def load_dataset(
**config_kwargs,
)

# Some datasets are already processed on the HF google storage
# Don't try downloading from google storage for the packaged datasets as text, json, csv or pandas
try_from_hf_gcs = path not in _PACKAGED_DATASETS_MODULES

# Download and prepare data
builder_instance.download_and_prepare(
download_config=download_config,
download_mode=download_mode,
ignore_verifications=ignore_verifications,
try_from_hf_gcs=try_from_hf_gcs,
)

# Build dataset for splits
Expand Down
34 changes: 34 additions & 0 deletions src/datasets/packaged_modules/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import inspect
import re
from hashlib import sha256
from typing import List

from .csv import csv
from .json import json
from .pandas import pandas
from .text import text


def hash_python_lines(lines: List[str]) -> str:
filtered_lines = []
for line in lines:
line.replace("\n", "") # remove line breaks, white space and comments
line.replace(" ", "")
line.replace("\t", "")
line = re.sub(r"#.*", "", line)
if line:
filtered_lines.append(line)
full_str = "\n".join(filtered_lines)

# Make a hash from all this code
full_bytes = full_str.encode("utf-8")
return sha256(full_bytes).hexdigest()


# get importable module names and hash for caching
_PACKAGED_DATASETS_MODULES = {
"csv": (csv.__name__, hash_python_lines(inspect.getsource(csv).splitlines())),
"json": (json.__name__, hash_python_lines(inspect.getsource(json).splitlines())),
"pandas": (pandas.__name__, hash_python_lines(inspect.getsource(pandas).splitlines())),
"text": (text.__name__, hash_python_lines(inspect.getsource(text).splitlines())),
}
Empty file.
File renamed without changes.
Empty file.
File renamed without changes.
Empty file.
File renamed without changes.
Empty file.
File renamed without changes.
30 changes: 23 additions & 7 deletions src/datasets/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,33 @@
HF_MODULES_CACHE = Path(os.getenv("HF_MODULES_CACHE", default_modules_cache_path))
except (AttributeError, ImportError):
HF_MODULES_CACHE = os.getenv(os.getenv("HF_MODULES_CACHE", default_modules_cache_path))
sys.path.append(str(HF_MODULES_CACHE))

os.makedirs(HF_MODULES_CACHE, exist_ok=True)
if not os.path.exists(os.path.join(HF_MODULES_CACHE, "__init__.py")):
with open(os.path.join(HF_MODULES_CACHE, "__init__.py"), "w"):
pass

INCOMPLETE_SUFFIX = ".incomplete"


def is_beam_available():
return _beam_available
def init_hf_modules(hf_modules_cache: Optional[str] = None) -> str:
"""
Add hf_modules_cache to the python path.
By default hf_modules_cache='~/.cache/huggingface/modules'.
It can also be set with the environment variable HF_MODULES_CACHE.
This is used to add modules such as `datasets_modules`
"""
hf_modules_cache = hf_modules_cache if hf_modules_cache is not None else HF_MODULES_CACHE
hf_modules_cache = str(hf_modules_cache)
if hf_modules_cache not in sys.path:
sys.path.append(hf_modules_cache)

os.makedirs(hf_modules_cache, exist_ok=True)
if not os.path.exists(os.path.join(hf_modules_cache, "__init__.py")):
with open(os.path.join(hf_modules_cache, "__init__.py"), "w"):
pass
return hf_modules_cache


@contextmanager
def temp_seed(seed: int, set_pytorch=False, set_tensorflow=False):
"""Temporarily set the random seed. This works for python numpy, pytorch and tensorflow."""
np_state = np.random.get_state()
np.random.seed(seed)

Expand Down Expand Up @@ -209,6 +220,10 @@ def is_tf_available():
return _tf_available


def is_beam_available():
return _beam_available


def is_rarfile_available():
return _rarfile_available

Expand Down Expand Up @@ -319,6 +334,7 @@ def cached_path(
ConnectionError: in case of unreachable url
and no cache on disk
ValueError: if it couldn't parse the url or filename correctly
requests.exceptions.ConnectionError: in case of internet connection issue
"""
if download_config is None:
download_config = DownloadConfig(**download_kwargs)
Expand Down
Loading