From 2b6f1d0945e8dbf13d01e045f87c5e58546b4af6 Mon Sep 17 00:00:00 2001 From: Tornike Gurgenidze Date: Sat, 20 Apr 2024 20:48:34 +0400 Subject: [PATCH] feat: Add delta format to `FileSource`, add support for it in ibis/duckdb (#4123) --- protos/feast/core/DataFormat.proto | 4 + sdk/python/feast/data_format.py | 14 ++++ .../feast/infra/offline_stores/file_source.py | 39 ++++++---- sdk/python/feast/infra/offline_stores/ibis.py | 74 +++++++++++++------ .../requirements/py3.10-ci-requirements.txt | 47 ++++++------ .../requirements/py3.10-requirements.txt | 12 +-- .../requirements/py3.9-ci-requirements.txt | 47 ++++++------ .../requirements/py3.9-requirements.txt | 12 +-- .../feature_repos/repo_configuration.py | 2 + .../universal/data_sources/file.py | 53 ++++++++++++- setup.py | 7 +- 11 files changed, 215 insertions(+), 96 deletions(-) diff --git a/protos/feast/core/DataFormat.proto b/protos/feast/core/DataFormat.proto index c453e5e4c8..0a32089b0f 100644 --- a/protos/feast/core/DataFormat.proto +++ b/protos/feast/core/DataFormat.proto @@ -27,8 +27,12 @@ message FileFormat { // Defines options for the Parquet data format message ParquetFormat {} + // Defines options for delta data format + message DeltaFormat {} + oneof format { ParquetFormat parquet_format = 1; + DeltaFormat delta_format = 2; } } diff --git a/sdk/python/feast/data_format.py b/sdk/python/feast/data_format.py index 8f3b195e3e..301dfb8130 100644 --- a/sdk/python/feast/data_format.py +++ b/sdk/python/feast/data_format.py @@ -43,6 +43,8 @@ def from_proto(cls, proto): fmt = proto.WhichOneof("format") if fmt == "parquet_format": return ParquetFormat() + elif fmt == "delta_format": + return DeltaFormat() if fmt is None: return None raise NotImplementedError(f"FileFormat is unsupported: {fmt}") @@ -66,6 +68,18 @@ def __str__(self): return "parquet" +class DeltaFormat(FileFormat): + """ + Defines delta data format + """ + + def to_proto(self): + return FileFormatProto(delta_format=FileFormatProto.DeltaFormat()) + + def __str__(self): + return "delta" + + class StreamFormat(ABC): """ Defines an abtracts streaming data format used to encode feature data in streams diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 2672cf78bf..596f3464a9 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -8,7 +8,7 @@ from typeguard import typechecked from feast import type_map -from feast.data_format import FileFormat, ParquetFormat +from feast.data_format import DeltaFormat, FileFormat, ParquetFormat from feast.data_source import DataSource from feast.feature_logging import LoggingDestination from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto @@ -157,24 +157,31 @@ def get_table_column_names_and_types( filesystem, path = FileSource.create_filesystem_and_path( self.path, self.file_options.s3_endpoint_override ) - # Adding support for different file format path - # based on S3 filesystem - if filesystem is None: - kwargs = ( - {"use_legacy_dataset": False} - if version.parse(pyarrow.__version__) < version.parse("15.0.0") - else {} - ) - schema = ParquetDataset(path, **kwargs).schema - if hasattr(schema, "names") and hasattr(schema, "types"): - # Newer versions of pyarrow doesn't have this method, - # but this field is good enough. - pass + # TODO why None check necessary + if self.file_format is None or isinstance(self.file_format, ParquetFormat): + if filesystem is None: + kwargs = ( + {"use_legacy_dataset": False} + if version.parse(pyarrow.__version__) < version.parse("15.0.0") + else {} + ) + + schema = ParquetDataset(path, **kwargs).schema + if hasattr(schema, "names") and hasattr(schema, "types"): + # Newer versions of pyarrow doesn't have this method, + # but this field is good enough. + pass + else: + schema = schema.to_arrow_schema() else: - schema = schema.to_arrow_schema() + schema = ParquetDataset(path, filesystem=filesystem).schema + elif isinstance(self.file_format, DeltaFormat): + from deltalake import DeltaTable + + schema = DeltaTable(self.path).schema().to_pyarrow() else: - schema = ParquetDataset(path, filesystem=filesystem).schema + raise Exception(f"Unknown FileFormat -> {self.file_format}") return zip(schema.names, map(str, schema.types)) diff --git a/sdk/python/feast/infra/offline_stores/ibis.py b/sdk/python/feast/infra/offline_stores/ibis.py index 6e0729d6d1..de025ca006 100644 --- a/sdk/python/feast/infra/offline_stores/ibis.py +++ b/sdk/python/feast/infra/offline_stores/ibis.py @@ -13,6 +13,7 @@ from ibis.expr.types import Table from pytz import utc +from feast.data_format import DeltaFormat, ParquetFormat from feast.data_source import DataSource from feast.errors import SavedDatasetLocationAlreadyExists from feast.feature_logging import LoggingConfig, LoggingSource @@ -105,6 +106,15 @@ def _generate_row_id( return entity_table + @staticmethod + def _read_data_source(data_source: DataSource) -> Table: + assert isinstance(data_source, FileSource) + + if isinstance(data_source.file_format, ParquetFormat): + return ibis.read_parquet(data_source.path) + elif isinstance(data_source.file_format, DeltaFormat): + return ibis.read_delta(data_source.path) + @staticmethod def get_historical_features( config: RepoConfig, @@ -137,7 +147,9 @@ def get_historical_features( def read_fv( feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool ) -> Tuple: - fv_table: Table = ibis.read_parquet(feature_view.batch_source.name) + fv_table: Table = IbisOfflineStore._read_data_source( + feature_view.batch_source + ) for old_name, new_name in feature_view.batch_source.field_mapping.items(): if old_name in fv_table.columns: @@ -227,7 +239,7 @@ def pull_all_from_table_or_query( start_date = start_date.astimezone(tz=utc) end_date = end_date.astimezone(tz=utc) - table = ibis.read_parquet(data_source.path) + table = IbisOfflineStore._read_data_source(data_source) table = table.select(*fields) @@ -260,10 +272,9 @@ def write_logged_features( destination = logging_config.destination assert isinstance(destination, FileLoggingDestination) - if isinstance(data, Path): - table = ibis.read_parquet(data) - else: - table = ibis.memtable(data) + table = ( + ibis.read_parquet(data) if isinstance(data, Path) else ibis.memtable(data) + ) if destination.partition_by: kwargs = {"partition_by": destination.partition_by} @@ -294,12 +305,21 @@ def offline_write_batch( ) file_options = feature_view.batch_source.file_options - prev_table = ibis.read_parquet(file_options.uri).to_pyarrow() - if table.schema != prev_table.schema: - table = table.cast(prev_table.schema) - new_table = pyarrow.concat_tables([table, prev_table]) - ibis.memtable(new_table).to_parquet(file_options.uri) + if isinstance(feature_view.batch_source.file_format, ParquetFormat): + prev_table = ibis.read_parquet(file_options.uri).to_pyarrow() + if table.schema != prev_table.schema: + table = table.cast(prev_table.schema) + new_table = pyarrow.concat_tables([table, prev_table]) + + ibis.memtable(new_table).to_parquet(file_options.uri) + elif isinstance(feature_view.batch_source.file_format, DeltaFormat): + from deltalake import DeltaTable + + prev_schema = DeltaTable(file_options.uri).schema().to_pyarrow() + if table.schema != prev_schema: + table = table.cast(prev_schema) + ibis.memtable(table).to_delta(file_options.uri, mode="append") class IbisRetrievalJob(RetrievalJob): @@ -338,20 +358,28 @@ def persist( if not allow_overwrite and os.path.exists(storage.file_options.uri): raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri) - filesystem, path = FileSource.create_filesystem_and_path( - storage.file_options.uri, - storage.file_options.s3_endpoint_override, - ) - - if path.endswith(".parquet"): - pyarrow.parquet.write_table( - self.to_arrow(), where=path, filesystem=filesystem + if isinstance(storage.file_options.file_format, ParquetFormat): + filesystem, path = FileSource.create_filesystem_and_path( + storage.file_options.uri, + storage.file_options.s3_endpoint_override, ) - else: - # otherwise assume destination is directory - pyarrow.parquet.write_to_dataset( - self.to_arrow(), root_path=path, filesystem=filesystem + + if path.endswith(".parquet"): + pyarrow.parquet.write_table( + self.to_arrow(), where=path, filesystem=filesystem + ) + else: + # otherwise assume destination is directory + pyarrow.parquet.write_to_dataset( + self.to_arrow(), root_path=path, filesystem=filesystem + ) + elif isinstance(storage.file_options.file_format, DeltaFormat): + mode = ( + "overwrite" + if allow_overwrite and os.path.exists(storage.file_options.uri) + else "error" ) + self.table.to_delta(storage.file_options.uri, mode=mode) @property def metadata(self) -> Optional[RetrievalMetadata]: diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 2d91e5cf41..15dadfd874 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -59,11 +59,11 @@ bidict==0.23.1 # via ibis-framework bleach==6.1.0 # via nbconvert -boto3==1.34.85 +boto3==1.34.88 # via # feast (setup.py) # moto -botocore==1.34.85 +botocore==1.34.88 # via # boto3 # moto @@ -134,11 +134,11 @@ cryptography==42.0.5 # snowflake-connector-python # types-pyopenssl # types-redis -dask[array,dataframe]==2024.4.1 +dask[array,dataframe]==2024.4.2 # via # dask-expr # feast (setup.py) -dask-expr==1.0.11 +dask-expr==1.0.12 # via dask db-dtypes==1.2.0 # via google-cloud-bigquery @@ -148,6 +148,8 @@ decorator==5.1.1 # via ipython defusedxml==0.7.1 # via nbconvert +deltalake==0.16.4 + # via feast (setup.py) dill==0.3.8 # via feast (setup.py) distlib==0.3.8 @@ -158,7 +160,7 @@ docker==7.0.0 # testcontainers docutils==0.19 # via sphinx -duckdb==0.10.1 +duckdb==0.10.2 # via # duckdb-engine # ibis-framework @@ -166,7 +168,7 @@ duckdb-engine==0.11.5 # via ibis-framework entrypoints==0.4 # via altair -exceptiongroup==1.2.0 +exceptiongroup==1.2.1 # via # anyio # ipython @@ -175,7 +177,7 @@ execnet==2.1.1 # via pytest-xdist executing==2.0.1 # via stack-data -fastapi==0.110.1 +fastapi==0.110.2 # via feast (setup.py) fastjsonschema==2.19.1 # via nbformat @@ -263,7 +265,7 @@ greenlet==3.0.3 # via sqlalchemy grpc-google-iam-v1==0.13.0 # via google-cloud-bigtable -grpcio==1.62.1 +grpcio==1.62.2 # via # feast (setup.py) # google-api-core @@ -275,15 +277,15 @@ grpcio==1.62.1 # grpcio-status # grpcio-testing # grpcio-tools -grpcio-health-checking==1.62.1 +grpcio-health-checking==1.62.2 # via feast (setup.py) -grpcio-reflection==1.62.1 +grpcio-reflection==1.62.2 # via feast (setup.py) -grpcio-status==1.62.1 +grpcio-status==1.62.2 # via google-api-core -grpcio-testing==1.62.1 +grpcio-testing==1.62.2 # via feast (setup.py) -grpcio-tools==1.62.1 +grpcio-tools==1.62.2 # via feast (setup.py) gunicorn==22.0.0 ; platform_system != "Windows" # via feast (setup.py) @@ -482,13 +484,13 @@ nest-asyncio==1.6.0 # via ipykernel nodeenv==1.8.0 # via pre-commit -notebook==7.1.2 +notebook==7.1.3 # via great-expectations notebook-shim==0.2.4 # via # jupyterlab # notebook -numpy==1.24.4 +numpy==1.26.4 # via # altair # dask @@ -615,12 +617,15 @@ pyarrow==15.0.2 # via # dask-expr # db-dtypes + # deltalake # feast (setup.py) # google-cloud-bigquery # ibis-framework # snowflake-connector-python pyarrow-hotfix==0.6 - # via ibis-framework + # via + # deltalake + # ibis-framework pyasn1==0.6.0 # via # pyasn1-modules @@ -692,7 +697,7 @@ pytest-ordering==0.6 # via feast (setup.py) pytest-timeout==1.4.2 # via feast (setup.py) -pytest-xdist==3.5.0 +pytest-xdist==3.6.0 # via feast (setup.py) python-dateutil==2.9.0.post0 # via @@ -728,7 +733,7 @@ pyyaml==6.0.1 # pre-commit # responses # uvicorn -pyzmq==26.0.0 +pyzmq==26.0.2 # via # ipykernel # jupyter-client @@ -785,7 +790,7 @@ rsa==4.9 # via google-auth ruamel-yaml==0.17.17 # via great-expectations -ruff==0.3.7 +ruff==0.4.1 # via feast (setup.py) s3transfer==0.10.1 # via boto3 @@ -812,7 +817,7 @@ sniffio==1.3.1 # httpx snowballstemmer==2.2.0 # via sphinx -snowflake-connector-python[pandas]==3.8.1 +snowflake-connector-python[pandas]==3.9.0 # via feast (setup.py) sortedcontainers==2.4.0 # via snowflake-connector-python @@ -895,7 +900,7 @@ tqdm==4.66.2 # via # feast (setup.py) # great-expectations -traitlets==5.14.2 +traitlets==5.14.3 # via # comm # ipykernel diff --git a/sdk/python/requirements/py3.10-requirements.txt b/sdk/python/requirements/py3.10-requirements.txt index 5961b29b61..2cfe62c55b 100644 --- a/sdk/python/requirements/py3.10-requirements.txt +++ b/sdk/python/requirements/py3.10-requirements.txt @@ -34,17 +34,17 @@ cloudpickle==3.0.0 # via dask colorama==0.4.6 # via feast (setup.py) -dask[array,dataframe]==2024.4.1 +dask[array,dataframe]==2024.4.2 # via # dask-expr # feast (setup.py) -dask-expr==1.0.11 +dask-expr==1.0.12 # via dask dill==0.3.8 # via feast (setup.py) -exceptiongroup==1.2.0 +exceptiongroup==1.2.1 # via anyio -fastapi==0.110.1 +fastapi==0.110.2 # via feast (setup.py) fissix==21.11.13 # via bowler @@ -84,7 +84,7 @@ mypy-extensions==1.0.0 # via mypy mypy-protobuf==3.6.0 # via feast (setup.py) -numpy==1.24.4 +numpy==1.26.4 # via # dask # feast (setup.py) @@ -164,7 +164,7 @@ tqdm==4.66.2 # via feast (setup.py) typeguard==4.2.1 # via feast (setup.py) -types-protobuf==4.25.0.20240417 +types-protobuf==5.26.0.20240420 # via mypy-protobuf typing-extensions==4.11.0 # via diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index d42b7ed5b9..8de9151e4e 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -59,11 +59,11 @@ bidict==0.23.1 # via ibis-framework bleach==6.1.0 # via nbconvert -boto3==1.34.85 +boto3==1.34.88 # via # feast (setup.py) # moto -botocore==1.34.85 +botocore==1.34.88 # via # boto3 # moto @@ -134,11 +134,11 @@ cryptography==42.0.5 # snowflake-connector-python # types-pyopenssl # types-redis -dask[array,dataframe]==2024.4.1 +dask[array,dataframe]==2024.4.2 # via # dask-expr # feast (setup.py) -dask-expr==1.0.11 +dask-expr==1.0.12 # via dask db-dtypes==1.2.0 # via google-cloud-bigquery @@ -148,6 +148,8 @@ decorator==5.1.1 # via ipython defusedxml==0.7.1 # via nbconvert +deltalake==0.16.4 + # via feast (setup.py) dill==0.3.8 # via feast (setup.py) distlib==0.3.8 @@ -158,7 +160,7 @@ docker==7.0.0 # testcontainers docutils==0.19 # via sphinx -duckdb==0.10.1 +duckdb==0.10.2 # via # duckdb-engine # ibis-framework @@ -166,7 +168,7 @@ duckdb-engine==0.11.5 # via ibis-framework entrypoints==0.4 # via altair -exceptiongroup==1.2.0 +exceptiongroup==1.2.1 # via # anyio # ipython @@ -175,7 +177,7 @@ execnet==2.1.1 # via pytest-xdist executing==2.0.1 # via stack-data -fastapi==0.110.1 +fastapi==0.110.2 # via feast (setup.py) fastjsonschema==2.19.1 # via nbformat @@ -263,7 +265,7 @@ greenlet==3.0.3 # via sqlalchemy grpc-google-iam-v1==0.13.0 # via google-cloud-bigtable -grpcio==1.62.1 +grpcio==1.62.2 # via # feast (setup.py) # google-api-core @@ -275,15 +277,15 @@ grpcio==1.62.1 # grpcio-status # grpcio-testing # grpcio-tools -grpcio-health-checking==1.62.1 +grpcio-health-checking==1.62.2 # via feast (setup.py) -grpcio-reflection==1.62.1 +grpcio-reflection==1.62.2 # via feast (setup.py) -grpcio-status==1.62.1 +grpcio-status==1.62.2 # via google-api-core -grpcio-testing==1.62.1 +grpcio-testing==1.62.2 # via feast (setup.py) -grpcio-tools==1.62.1 +grpcio-tools==1.62.2 # via feast (setup.py) gunicorn==22.0.0 ; platform_system != "Windows" # via feast (setup.py) @@ -491,13 +493,13 @@ nest-asyncio==1.6.0 # via ipykernel nodeenv==1.8.0 # via pre-commit -notebook==7.1.2 +notebook==7.1.3 # via great-expectations notebook-shim==0.2.4 # via # jupyterlab # notebook -numpy==1.24.4 +numpy==1.26.4 # via # altair # dask @@ -624,12 +626,15 @@ pyarrow==15.0.2 # via # dask-expr # db-dtypes + # deltalake # feast (setup.py) # google-cloud-bigquery # ibis-framework # snowflake-connector-python pyarrow-hotfix==0.6 - # via ibis-framework + # via + # deltalake + # ibis-framework pyasn1==0.6.0 # via # pyasn1-modules @@ -701,7 +706,7 @@ pytest-ordering==0.6 # via feast (setup.py) pytest-timeout==1.4.2 # via feast (setup.py) -pytest-xdist==3.5.0 +pytest-xdist==3.6.0 # via feast (setup.py) python-dateutil==2.9.0.post0 # via @@ -737,7 +742,7 @@ pyyaml==6.0.1 # pre-commit # responses # uvicorn -pyzmq==26.0.0 +pyzmq==26.0.2 # via # ipykernel # jupyter-client @@ -796,7 +801,7 @@ ruamel-yaml==0.17.17 # via great-expectations ruamel-yaml-clib==0.2.8 # via ruamel-yaml -ruff==0.3.7 +ruff==0.4.1 # via feast (setup.py) s3transfer==0.10.1 # via boto3 @@ -823,7 +828,7 @@ sniffio==1.3.1 # httpx snowballstemmer==2.2.0 # via sphinx -snowflake-connector-python[pandas]==3.8.1 +snowflake-connector-python[pandas]==3.9.0 # via feast (setup.py) sortedcontainers==2.4.0 # via snowflake-connector-python @@ -906,7 +911,7 @@ tqdm==4.66.2 # via # feast (setup.py) # great-expectations -traitlets==5.14.2 +traitlets==5.14.3 # via # comm # ipykernel diff --git a/sdk/python/requirements/py3.9-requirements.txt b/sdk/python/requirements/py3.9-requirements.txt index b5d040c561..472f3e90b9 100644 --- a/sdk/python/requirements/py3.9-requirements.txt +++ b/sdk/python/requirements/py3.9-requirements.txt @@ -34,17 +34,17 @@ cloudpickle==3.0.0 # via dask colorama==0.4.6 # via feast (setup.py) -dask[array,dataframe]==2024.4.1 +dask[array,dataframe]==2024.4.2 # via # dask-expr # feast (setup.py) -dask-expr==1.0.11 +dask-expr==1.0.12 # via dask dill==0.3.8 # via feast (setup.py) -exceptiongroup==1.2.0 +exceptiongroup==1.2.1 # via anyio -fastapi==0.110.1 +fastapi==0.110.2 # via feast (setup.py) fissix==21.11.13 # via bowler @@ -86,7 +86,7 @@ mypy-extensions==1.0.0 # via mypy mypy-protobuf==3.6.0 # via feast (setup.py) -numpy==1.24.4 +numpy==1.26.4 # via # dask # feast (setup.py) @@ -166,7 +166,7 @@ tqdm==4.66.2 # via feast (setup.py) typeguard==4.2.1 # via feast (setup.py) -types-protobuf==4.25.0.20240417 +types-protobuf==5.26.0.20240420 # via mypy-protobuf typing-extensions==4.11.0 # via diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index a57a017699..096744f547 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -32,6 +32,7 @@ ) from tests.integration.feature_repos.universal.data_sources.file import ( DuckDBDataSourceCreator, + DuckDBDeltaDataSourceCreator, FileDataSourceCreator, ) from tests.integration.feature_repos.universal.data_sources.redshift import ( @@ -118,6 +119,7 @@ AVAILABLE_OFFLINE_STORES: List[Tuple[str, Type[DataSourceCreator]]] = [ ("local", FileDataSourceCreator), ("local", DuckDBDataSourceCreator), + ("local", DuckDBDeltaDataSourceCreator), ] AVAILABLE_ONLINE_STORES: Dict[ diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 6d4baa19ed..9cdc91a6c8 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -12,7 +12,7 @@ from testcontainers.core.waiting_utils import wait_for_logs from feast import FileSource -from feast.data_format import ParquetFormat +from feast.data_format import DeltaFormat, ParquetFormat from feast.data_source import DataSource from feast.feature_logging import LoggingDestination from feast.infra.offline_stores.duckdb import DuckDBOfflineStoreConfig @@ -30,11 +30,13 @@ class FileDataSourceCreator(DataSourceCreator): files: List[Any] dirs: List[Any] + keep: List[Any] def __init__(self, project_name: str, *args, **kwargs): super().__init__(project_name) self.files = [] self.dirs = [] + self.keep = [] def create_data_source( self, @@ -89,6 +91,49 @@ def teardown(self): shutil.rmtree(d) +class DeltaFileSourceCreator(FileDataSourceCreator): + def create_data_source( + self, + df: pd.DataFrame, + destination_name: str, + created_timestamp_column="created_ts", + field_mapping: Optional[Dict[str, str]] = None, + timestamp_field: Optional[str] = "ts", + ) -> DataSource: + from deltalake.writer import write_deltalake + + destination_name = self.get_prefixed_table_name(destination_name) + + delta_path = tempfile.TemporaryDirectory( + prefix=f"{self.project_name}_{destination_name}" + ) + + self.keep.append(delta_path) + + write_deltalake(delta_path.name, df) + + return FileSource( + file_format=DeltaFormat(), + path=delta_path.name, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + field_mapping=field_mapping or {"ts_1": "ts"}, + ) + + def create_saved_dataset_destination(self) -> SavedDatasetFileStorage: + d = tempfile.mkdtemp(prefix=self.project_name) + self.keep.append(d) + return SavedDatasetFileStorage( + path=d, file_format=DeltaFormat(), s3_endpoint_override=None + ) + + # LoggingDestination is parquet-only + def create_logged_features_destination(self) -> LoggingDestination: + d = tempfile.mkdtemp(prefix=self.project_name) + self.keep.append(d) + return FileLoggingDestination(path=d) + + class FileParquetDatasetSourceCreator(FileDataSourceCreator): def create_data_source( self, @@ -222,3 +267,9 @@ class DuckDBDataSourceCreator(FileDataSourceCreator): def create_offline_store_config(self): self.duckdb_offline_store_config = DuckDBOfflineStoreConfig() return self.duckdb_offline_store_config + + +class DuckDBDeltaDataSourceCreator(DeltaFileSourceCreator): + def create_offline_store_config(self): + self.duckdb_offline_store_config = DuckDBOfflineStoreConfig() + return self.duckdb_offline_store_config diff --git a/setup.py b/setup.py index ae75484805..dcc616f120 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,6 @@ "mmh3", "numpy>=1.22,<2", "pandas>=1.4.3,<3", - # Higher than 4.23.4 seems to cause a seg fault "protobuf>=4.24.0,<5.0.0", "pyarrow>=4", "pydantic>=2.0.0", @@ -150,6 +149,8 @@ DUCKDB_REQUIRED = ["ibis-framework[duckdb]"] +DELTA_REQUIRED = ["deltalake"] + CI_REQUIRED = ( [ "build", @@ -210,6 +211,7 @@ + IBIS_REQUIRED + GRPCIO_REQUIRED + DUCKDB_REQUIRED + + DELTA_REQUIRED ) DOCS_REQUIRED = CI_REQUIRED @@ -374,7 +376,8 @@ def run(self): "rockset": ROCKSET_REQUIRED, "ibis": IBIS_REQUIRED, "duckdb": DUCKDB_REQUIRED, - "ikv": IKV_REQUIRED + "ikv": IKV_REQUIRED, + "delta": DELTA_REQUIRED, }, include_package_data=True, license="Apache",