Skip to content

Commit

Permalink
fix: Improve the code related to on-demand-featureview. (#4203)
Browse files Browse the repository at this point in the history
* fix: Improve the code related to on-demand-featureview.

Signed-off-by: Shuchu Han <shuchu.han@gmail.com>

* fix: add the pyarrow.substrait import.

Signed-off-by: Shuchu Han <shuchu.han@gmail.com>

---------

Signed-off-by: Shuchu Han <shuchu.han@gmail.com>
  • Loading branch information
shuchu authored May 15, 2024
1 parent 0e42150 commit d91d7e0
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 109 deletions.
90 changes: 45 additions & 45 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import warnings
from datetime import datetime
from types import FunctionType
from typing import Any, Dict, List, Optional, Type, Union
from typing import Any, Optional, Union

import dill
import pandas as pd
Expand Down Expand Up @@ -62,24 +62,24 @@ class OnDemandFeatureView(BaseFeatureView):
"""

name: str
features: List[Field]
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_sources: Dict[str, RequestSource]
features: list[Field]
source_feature_view_projections: dict[str, FeatureViewProjection]
source_request_sources: dict[str, RequestSource]
feature_transformation: Union[
PandasTransformation, PythonTransformation, SubstraitTransformation
]
mode: str
description: str
tags: Dict[str, str]
tags: dict[str, str]
owner: str

@log_exceptions # noqa: C901
def __init__( # noqa: C901
self,
*,
name: str,
schema: List[Field],
sources: List[
schema: list[Field],
sources: list[
Union[
FeatureView,
RequestSource,
Expand All @@ -93,7 +93,7 @@ def __init__( # noqa: C901
],
mode: str = "pandas",
description: str = "",
tags: Optional[Dict[str, str]] = None,
tags: Optional[dict[str, str]] = None,
owner: str = "",
):
"""
Expand Down Expand Up @@ -124,32 +124,31 @@ def __init__( # noqa: C901
owner=owner,
)

if mode not in {"python", "pandas", "substrait"}:
raise Exception(
f"Unknown mode {mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait."
self.mode = mode.lower()

if self.mode not in {"python", "pandas", "substrait"}:
raise ValueError(
f"Unknown mode {self.mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait."
)
else:
self.mode = mode

if not feature_transformation:
if udf:
warnings.warn(
"udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.",
DeprecationWarning,
)
# Note inspecting the return signature won't work with isinstance so this is the best alternative
if mode == "pandas":
if self.mode == "pandas":
feature_transformation = PandasTransformation(udf, udf_string)
elif mode == "python":
elif self.mode == "python":
feature_transformation = PythonTransformation(udf, udf_string)
else:
pass
else:
raise Exception(
raise ValueError(
"OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments"
)

self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
self.source_request_sources: Dict[str, RequestSource] = {}
self.source_feature_view_projections: dict[str, FeatureViewProjection] = {}
self.source_request_sources: dict[str, RequestSource] = {}
for odfv_source in sources:
if isinstance(odfv_source, RequestSource):
self.source_request_sources[odfv_source.name] = odfv_source
Expand All @@ -163,7 +162,7 @@ def __init__( # noqa: C901
self.feature_transformation = feature_transformation

@property
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
def proto_class(self) -> type[OnDemandFeatureViewProto]:
return OnDemandFeatureViewProto

def __copy__(self):
Expand Down Expand Up @@ -336,7 +335,7 @@ def from_proto(
user_defined_function_proto=backwards_compatible_udf,
)
else:
raise Exception("At least one transformation type needs to be provided")
raise ValueError("At least one transformation type needs to be provided")

on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
Expand Down Expand Up @@ -372,18 +371,18 @@ def from_proto(

return on_demand_feature_view_obj

def get_request_data_schema(self) -> Dict[str, ValueType]:
schema: Dict[str, ValueType] = {}
def get_request_data_schema(self) -> dict[str, ValueType]:
schema: dict[str, ValueType] = {}
for request_source in self.source_request_sources.values():
if isinstance(request_source.schema, List):
if isinstance(request_source.schema, list):
new_schema = {}
for field in request_source.schema:
new_schema[field.name] = field.dtype.to_value_type()
schema.update(new_schema)
elif isinstance(request_source.schema, Dict):
elif isinstance(request_source.schema, dict):
schema.update(request_source.schema)
else:
raise Exception(
raise TypeError(
f"Request source schema is not correct type: ${str(type(request_source.schema))}"
)
return schema
Expand All @@ -401,7 +400,10 @@ def transform_ibis(
if not isinstance(ibis_table, Table):
raise TypeError("transform_ibis only accepts ibis.expr.types.Table")

assert type(self.feature_transformation) == SubstraitTransformation
if not isinstance(self.feature_transformation, SubstraitTransformation):
raise TypeError(
"The feature_transformation is not SubstraitTransformation type while calling transform_ibis()."
)

columns_to_cleanup = []
for source_fv_projection in self.source_feature_view_projections.values():
Expand All @@ -423,7 +425,7 @@ def transform_ibis(

transformed_table = transformed_table.drop(*columns_to_cleanup)

rename_columns: Dict[str, str] = {}
rename_columns: dict[str, str] = {}
for feature in self.features:
short_name = feature.name
long_name = self._get_projected_feature_name(feature.name)
Expand Down Expand Up @@ -454,11 +456,9 @@ def transform_arrow(
pa_table = pa_table.append_column(
feature.name, pa_table[full_feature_ref]
)
# pa_table[feature.name] = pa_table[full_feature_ref]
columns_to_cleanup.append(feature.name)
elif feature.name in pa_table.column_names:
# Make sure the full feature name is always present
# pa_table[full_feature_ref] = pa_table[feature.name]
pa_table = pa_table.append_column(
full_feature_ref, pa_table[feature.name]
)
Expand All @@ -469,7 +469,7 @@ def transform_arrow(
)

# Work out whether the correct columns names are used.
rename_columns: Dict[str, str] = {}
rename_columns: dict[str, str] = {}
for feature in self.features:
short_name = feature.name
long_name = self._get_projected_feature_name(feature.name)
Expand All @@ -494,12 +494,12 @@ def transform_arrow(

def transform_dict(
self,
feature_dict: Dict[str, Any], # type: ignore
) -> Dict[str, Any]:
feature_dict: dict[str, Any], # type: ignore
) -> dict[str, Any]:
# we need a mapping from full feature name to short and back to do a renaming
# The simplest thing to do is to make the full reference, copy the columns with the short reference
# and rerun
columns_to_cleanup: List[str] = []
columns_to_cleanup: list[str] = []
for source_fv_projection in self.source_feature_view_projections.values():
for feature in source_fv_projection.features:
full_feature_ref = f"{source_fv_projection.name}__{feature.name}"
Expand All @@ -512,7 +512,7 @@ def transform_dict(
feature_dict[full_feature_ref] = feature_dict[feature.name]
columns_to_cleanup.append(str(full_feature_ref))

output_dict: Dict[str, Any] = self.feature_transformation.transform(
output_dict: dict[str, Any] = self.feature_transformation.transform(
feature_dict
)
for feature_name in columns_to_cleanup:
Expand Down Expand Up @@ -542,8 +542,8 @@ def infer_features(self) -> None:
f"Could not infer Features for the feature view '{self.name}'.",
)

def _construct_random_input(self) -> Dict[str, List[Any]]:
rand_dict_value: Dict[ValueType, List[Any]] = {
def _construct_random_input(self) -> dict[str, list[Any]]:
rand_dict_value: dict[ValueType, list[Any]] = {
ValueType.BYTES: [str.encode("hello world")],
ValueType.STRING: ["hello world"],
ValueType.INT32: [1],
Expand Down Expand Up @@ -582,11 +582,11 @@ def _construct_random_input(self) -> Dict[str, List[Any]]:
@staticmethod
def get_requested_odfvs(
feature_refs, project, registry
) -> List["OnDemandFeatureView"]:
) -> list["OnDemandFeatureView"]:
all_on_demand_feature_views = registry.list_on_demand_feature_views(
project, allow_cache=True
)
requested_on_demand_feature_views: List[OnDemandFeatureView] = []
requested_on_demand_feature_views: list[OnDemandFeatureView] = []
for odfv in all_on_demand_feature_views:
for feature in odfv.features:
if f"{odfv.name}:{feature.name}" in feature_refs:
Expand All @@ -597,8 +597,8 @@ def get_requested_odfvs(

def on_demand_feature_view(
*,
schema: List[Field],
sources: List[
schema: list[Field],
sources: list[
Union[
FeatureView,
RequestSource,
Expand All @@ -607,7 +607,7 @@ def on_demand_feature_view(
],
mode: str = "pandas",
description: str = "",
tags: Optional[Dict[str, str]] = None,
tags: Optional[dict[str, str]] = None,
owner: str = "",
):
"""
Expand Down Expand Up @@ -643,9 +643,9 @@ def decorator(user_function):
)
transformation = PandasTransformation(user_function, udf_string)
elif mode == "python":
if return_annotation not in (inspect._empty, Dict[str, Any]):
if return_annotation not in (inspect._empty, dict[str, Any]):
raise TypeError(
f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]"
f"return signature for {user_function} is {return_annotation} but should be dict[str, Any]"
)
transformation = PythonTransformation(user_function, udf_string)
elif mode == "substrait":
Expand Down
31 changes: 6 additions & 25 deletions sdk/python/feast/transformation/pandas_transformation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from types import FunctionType
from typing import Any, Dict, List
from typing import Any

import dill
import pandas as pd
Expand Down Expand Up @@ -28,35 +28,16 @@ def __init__(self, udf: FunctionType, udf_string: str = ""):
self.udf_string = udf_string

def transform_arrow(
self, pa_table: pyarrow.Table, features: List[Field]
self, pa_table: pyarrow.Table, features: list[Field]
) -> pyarrow.Table:
if not isinstance(pa_table, pyarrow.Table):
raise TypeError(
f"pa_table should be type pyarrow.Table but got {type(pa_table).__name__}"
)
output_df = self.udf.__call__(pa_table.to_pandas())
output_df = pyarrow.Table.from_pandas(output_df)
if not isinstance(output_df, pyarrow.Table):
raise TypeError(
f"output_df should be type pyarrow.Table but got {type(output_df).__name__}"
)
return output_df
output_df_pandas = self.udf.__call__(pa_table.to_pandas())
return pyarrow.Table.from_pandas(output_df_pandas)

def transform(self, input_df: pd.DataFrame) -> pd.DataFrame:
if not isinstance(input_df, pd.DataFrame):
raise TypeError(
f"input_df should be type pd.DataFrame but got {type(input_df).__name__}"
)
output_df = self.udf.__call__(input_df)
if not isinstance(output_df, pd.DataFrame):
raise TypeError(
f"output_df should be type pd.DataFrame but got {type(output_df).__name__}"
)
return output_df
return self.udf.__call__(input_df)

def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
df = pd.DataFrame.from_dict(random_input)

output_df: pd.DataFrame = self.transform(df)

return [
Expand Down
20 changes: 6 additions & 14 deletions sdk/python/feast/transformation/python_transformation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from types import FunctionType
from typing import Any, Dict, List
from typing import Any

import dill
import pyarrow
Expand All @@ -26,27 +26,19 @@ def __init__(self, udf: FunctionType, udf_string: str = ""):
self.udf_string = udf_string

def transform_arrow(
self, pa_table: pyarrow.Table, features: List[Field]
self, pa_table: pyarrow.Table, features: list[Field]
) -> pyarrow.Table:
raise Exception(
'OnDemandFeatureView mode "python" not supported for offline processing.'
'OnDemandFeatureView with mode "python" does not support offline processing.'
)

def transform(self, input_dict: Dict) -> Dict:
if not isinstance(input_dict, Dict):
raise TypeError(
f"input_dict should be type Dict[str, Any] but got {type(input_dict).__name__}"
)
def transform(self, input_dict: dict) -> dict:
# Ensuring that the inputs are included as well
output_dict = self.udf.__call__(input_dict)
if not isinstance(output_dict, Dict):
raise TypeError(
f"output_dict should be type Dict[str, Any] but got {type(output_dict).__name__}"
)
return {**input_dict, **output_dict}

def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
output_dict: Dict[str, List[Any]] = self.transform(random_input)
def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
output_dict: dict[str, list[Any]] = self.transform(random_input)

return [
Field(
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/transformation/substrait_transformation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from types import FunctionType
from typing import Any, Dict, List
from typing import Any

import dill
import pandas as pd
Expand Down Expand Up @@ -42,7 +42,7 @@ def transform_ibis(self, table):
return self.ibis_function(table)

def transform_arrow(
self, pa_table: pyarrow.Table, features: List[Field] = []
self, pa_table: pyarrow.Table, features: list[Field] = []
) -> pyarrow.Table:
def table_provider(names, schema: pyarrow.Schema):
return pa_table.select(schema.names)
Expand All @@ -56,7 +56,7 @@ def table_provider(names, schema: pyarrow.Schema):

return table

def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
df = pd.DataFrame.from_dict(random_input)
output_df: pd.DataFrame = self.transform(df)

Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/unit/infra/test_inference_unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame:
],
mode="python",
)
def python_native_test_view(input_dict: Dict[str, Any]) -> Dict[str, Any]:
output_dict: Dict[str, Any] = {
def python_native_test_view(input_dict: dict[str, Any]) -> dict[str, Any]:
output_dict: dict[str, Any] = {
"output": input_dict["some_date"],
"object_output": str(input_dict["some_date"]),
}
Expand Down
Loading

0 comments on commit d91d7e0

Please sign in to comment.