Skip to content

Commit

Permalink
chore: Add a source field in the feature view API (#2525)
Browse files Browse the repository at this point in the history
* chore: Add a source field in the feature view API

Signed-off-by: Achal Shah <achals@gmail.com>

* fix reference

Signed-off-by: Achal Shah <achals@gmail.com>

* fix reference

Signed-off-by: Achal Shah <achals@gmail.com>

* fix reference

Signed-off-by: Achal Shah <achals@gmail.com>

* Start updating tests

Signed-off-by: Achal Shah <achals@gmail.com>

* fixes

Signed-off-by: Achal Shah <achals@gmail.com>

* simpify

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals authored Apr 12, 2022
1 parent 6a03bed commit 673d00c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 28 deletions.
65 changes: 46 additions & 19 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from feast import utils
from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource, PushSource
from feast.data_source import DataSource, KafkaSource, KinesisSource, PushSource
from feast.entity import Entity
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
Expand Down Expand Up @@ -61,9 +61,9 @@ class FeatureView(BaseFeatureView):
can result in extremely computationally intensive queries.
batch_source (optional): The batch source of data where this group of features
is stored. This is optional ONLY if a push source is specified as the
stream_source, since push sources contain their own batch sources.
stream_source, since push sources contain their own batch sources. This is deprecated in favor of `source`.
stream_source (optional): The stream source of data where this group of features
is stored.
is stored. This is deprecated in favor of `source`.
schema: The schema of the feature view, including feature, timestamp, and entity
columns.
features: The list of features defined as part of this feature view. Each
Expand All @@ -74,6 +74,8 @@ class FeatureView(BaseFeatureView):
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the feature view, typically the email of the primary
maintainer.
source (optional): The source of data for this group of features. May be a stream source, or a batch source.
If a stream source, the source should contain a batch_source for backfills & batch materialization.
"""

name: str
Expand All @@ -88,6 +90,7 @@ class FeatureView(BaseFeatureView):
tags: Dict[str, str]
owner: str
materialization_intervals: List[Tuple[datetime, datetime]]
source: Optional[DataSource]

@log_exceptions
def __init__(
Expand All @@ -104,6 +107,7 @@ def __init__(
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
):
"""
Creates a FeatureView object.
Expand All @@ -126,6 +130,8 @@ def __init__(
primary maintainer.
schema (optional): The schema of the feature view, including feature, timestamp,
and entity columns.
source (optional): The source of data for this group of features. May be a stream source, or a batch source.
If a stream source, the source should contain a batch_source for backfills & batch materialization.
Raises:
ValueError: A field mapping conflicts with an Entity or a Feature.
Expand Down Expand Up @@ -163,6 +169,8 @@ def __init__(
self.name = _name
self.entities = _entities if _entities else [DUMMY_ENTITY_NAME]

self._initialize_sources(_name, batch_source, stream_source, source)

if isinstance(_ttl, Duration):
self.ttl = timedelta(seconds=int(_ttl.seconds))
warnings.warn(
Expand Down Expand Up @@ -199,21 +207,6 @@ def __init__(
# current `features` parameter only accepts feature columns.
_features = _schema

if stream_source is not None and isinstance(stream_source, PushSource):
if stream_source.batch_source is None or not isinstance(
stream_source.batch_source, DataSource
):
raise ValueError(
f"A batch_source needs to be specified for feature view `{name}`"
)
self.batch_source = stream_source.batch_source
else:
if batch_source is None:
raise ValueError(
f"A batch_source needs to be specified for feature view `{name}`"
)
self.batch_source = batch_source

cols = [entity for entity in self.entities] + [
field.name for field in _features
]
Expand All @@ -236,9 +229,43 @@ def __init__(
owner=owner,
)
self.online = online
self.stream_source = stream_source
self.materialization_intervals = []

def _initialize_sources(self, name, batch_source, stream_source, source):
if source:
if (
isinstance(source, PushSource)
or isinstance(source, KafkaSource)
or isinstance(source, KinesisSource)
):
self.stream_source = source
if not source.batch_source:
raise ValueError(
f"A batch_source needs to be specified for stream source `{source.name}`"
)
else:
self.batch_source = source.batch_source
else:
self.stream_source = stream_source
self.batch_source = source
else:
warnings.warn(
"batch_source and stream_source have been deprecated in favor or `source`."
"The deprecated fields will be removed in Feast 0.23.",
DeprecationWarning,
)
if stream_source is not None and isinstance(stream_source, PushSource):
self.stream_source = stream_source
self.batch_source = stream_source.batch_source
else:
if batch_source is None:
raise ValueError(
f"A batch_source needs to be specified for feature view `{name}`"
)
self.stream_source = stream_source
self.batch_source = batch_source
self.source = source

# Note: Python requires redefining hash in child classes that override __eq__
def __hash__(self):
return super().__hash__()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def driver_feature_view(
entities=["driver"],
schema=None if infer_features else [Field(name="value", dtype=dtype)],
ttl=timedelta(days=5),
batch_source=data_source,
source=data_source,
)


Expand All @@ -49,7 +49,7 @@ def global_feature_view(
if infer_features
else [Feature(name="entityless_value", dtype=value_type)],
ttl=timedelta(days=5),
batch_source=data_source,
source=data_source,
)


Expand Down Expand Up @@ -162,7 +162,7 @@ def create_driver_hourly_stats_feature_view(source, infer_features: bool = False
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int32),
],
batch_source=source,
source=source,
ttl=timedelta(hours=2),
)
return driver_stats_feature_view
Expand All @@ -179,7 +179,7 @@ def create_customer_daily_profile_feature_view(source, infer_features: bool = Fa
Field(name="avg_passenger_count", dtype=Float32),
Field(name="lifetime_trip_count", dtype=Int32),
],
batch_source=source,
source=source,
ttl=timedelta(days=2),
)
return customer_profile_feature_view
Expand All @@ -196,7 +196,7 @@ def create_global_stats_feature_view(source, infer_features: bool = False):
Feature(name="num_rides", dtype=ValueType.INT32),
Feature(name="avg_ride_length", dtype=ValueType.FLOAT),
],
batch_source=source,
source=source,
ttl=timedelta(days=2),
)
return global_stats_feature_view
Expand All @@ -209,7 +209,7 @@ def create_order_feature_view(source, infer_features: bool = False):
schema=None
if infer_features
else [Field(name="order_is_success", dtype=Int32)],
batch_source=source,
source=source,
ttl=timedelta(days=2),
)

Expand All @@ -219,7 +219,7 @@ def create_location_stats_feature_view(source, infer_features: bool = False):
name="location_stats",
entities=["location_id"],
schema=None if infer_features else [Field(name="temperature", dtype=Int32)],
batch_source=source,
source=source,
ttl=timedelta(days=2),
)
return location_stats_feature_view
Expand All @@ -231,7 +231,7 @@ def create_field_mapping_feature_view(source):
entities=[],
# Test that Features still work for FeatureViews.
features=[Feature(name="feature_name", dtype=ValueType.INT32)],
batch_source=source,
source=source,
ttl=timedelta(days=2),
)

Expand All @@ -252,5 +252,5 @@ def create_pushable_feature_view(batch_source: DataSource):
# Test that Features still work for FeatureViews.
features=[Feature(name="temperature", dtype=ValueType.INT32)],
ttl=timedelta(days=2),
stream_source=push_source,
source=push_source,
)

0 comments on commit 673d00c

Please sign in to comment.