diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 1a885443b9..4ef1820a3f 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -20,7 +20,7 @@ from feast import utils from feast.base_feature_view import BaseFeatureView -from feast.data_source import DataSource, PushSource +from feast.data_source import DataSource, KafkaSource, KinesisSource, PushSource from feast.entity import Entity from feast.feature import Feature from feast.feature_view_projection import FeatureViewProjection @@ -61,9 +61,9 @@ class FeatureView(BaseFeatureView): can result in extremely computationally intensive queries. batch_source (optional): The batch source of data where this group of features is stored. This is optional ONLY if a push source is specified as the - stream_source, since push sources contain their own batch sources. + stream_source, since push sources contain their own batch sources. This is deprecated in favor of `source`. stream_source (optional): The stream source of data where this group of features - is stored. + is stored. This is deprecated in favor of `source`. schema: The schema of the feature view, including feature, timestamp, and entity columns. features: The list of features defined as part of this feature view. Each @@ -74,6 +74,8 @@ class FeatureView(BaseFeatureView): tags: A dictionary of key-value pairs to store arbitrary metadata. owner: The owner of the feature view, typically the email of the primary maintainer. + source (optional): The source of data for this group of features. May be a stream source, or a batch source. + If a stream source, the source should contain a batch_source for backfills & batch materialization. """ name: str @@ -88,6 +90,7 @@ class FeatureView(BaseFeatureView): tags: Dict[str, str] owner: str materialization_intervals: List[Tuple[datetime, datetime]] + source: Optional[DataSource] @log_exceptions def __init__( @@ -104,6 +107,7 @@ def __init__( description: str = "", owner: str = "", schema: Optional[List[Field]] = None, + source: Optional[DataSource] = None, ): """ Creates a FeatureView object. @@ -126,6 +130,8 @@ def __init__( primary maintainer. schema (optional): The schema of the feature view, including feature, timestamp, and entity columns. + source (optional): The source of data for this group of features. May be a stream source, or a batch source. + If a stream source, the source should contain a batch_source for backfills & batch materialization. Raises: ValueError: A field mapping conflicts with an Entity or a Feature. @@ -163,6 +169,8 @@ def __init__( self.name = _name self.entities = _entities if _entities else [DUMMY_ENTITY_NAME] + self._initialize_sources(_name, batch_source, stream_source, source) + if isinstance(_ttl, Duration): self.ttl = timedelta(seconds=int(_ttl.seconds)) warnings.warn( @@ -199,21 +207,6 @@ def __init__( # current `features` parameter only accepts feature columns. _features = _schema - if stream_source is not None and isinstance(stream_source, PushSource): - if stream_source.batch_source is None or not isinstance( - stream_source.batch_source, DataSource - ): - raise ValueError( - f"A batch_source needs to be specified for feature view `{name}`" - ) - self.batch_source = stream_source.batch_source - else: - if batch_source is None: - raise ValueError( - f"A batch_source needs to be specified for feature view `{name}`" - ) - self.batch_source = batch_source - cols = [entity for entity in self.entities] + [ field.name for field in _features ] @@ -236,9 +229,43 @@ def __init__( owner=owner, ) self.online = online - self.stream_source = stream_source self.materialization_intervals = [] + def _initialize_sources(self, name, batch_source, stream_source, source): + if source: + if ( + isinstance(source, PushSource) + or isinstance(source, KafkaSource) + or isinstance(source, KinesisSource) + ): + self.stream_source = source + if not source.batch_source: + raise ValueError( + f"A batch_source needs to be specified for stream source `{source.name}`" + ) + else: + self.batch_source = source.batch_source + else: + self.stream_source = stream_source + self.batch_source = source + else: + warnings.warn( + "batch_source and stream_source have been deprecated in favor or `source`." + "The deprecated fields will be removed in Feast 0.23.", + DeprecationWarning, + ) + if stream_source is not None and isinstance(stream_source, PushSource): + self.stream_source = stream_source + self.batch_source = stream_source.batch_source + else: + if batch_source is None: + raise ValueError( + f"A batch_source needs to be specified for feature view `{name}`" + ) + self.stream_source = stream_source + self.batch_source = batch_source + self.source = source + # Note: Python requires redefining hash in child classes that override __eq__ def __hash__(self): return super().__hash__() diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 02d8baddad..a3fcbce32e 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -31,7 +31,7 @@ def driver_feature_view( entities=["driver"], schema=None if infer_features else [Field(name="value", dtype=dtype)], ttl=timedelta(days=5), - batch_source=data_source, + source=data_source, ) @@ -49,7 +49,7 @@ def global_feature_view( if infer_features else [Feature(name="entityless_value", dtype=value_type)], ttl=timedelta(days=5), - batch_source=data_source, + source=data_source, ) @@ -162,7 +162,7 @@ def create_driver_hourly_stats_feature_view(source, infer_features: bool = False Field(name="acc_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int32), ], - batch_source=source, + source=source, ttl=timedelta(hours=2), ) return driver_stats_feature_view @@ -179,7 +179,7 @@ def create_customer_daily_profile_feature_view(source, infer_features: bool = Fa Field(name="avg_passenger_count", dtype=Float32), Field(name="lifetime_trip_count", dtype=Int32), ], - batch_source=source, + source=source, ttl=timedelta(days=2), ) return customer_profile_feature_view @@ -196,7 +196,7 @@ def create_global_stats_feature_view(source, infer_features: bool = False): Feature(name="num_rides", dtype=ValueType.INT32), Feature(name="avg_ride_length", dtype=ValueType.FLOAT), ], - batch_source=source, + source=source, ttl=timedelta(days=2), ) return global_stats_feature_view @@ -209,7 +209,7 @@ def create_order_feature_view(source, infer_features: bool = False): schema=None if infer_features else [Field(name="order_is_success", dtype=Int32)], - batch_source=source, + source=source, ttl=timedelta(days=2), ) @@ -219,7 +219,7 @@ def create_location_stats_feature_view(source, infer_features: bool = False): name="location_stats", entities=["location_id"], schema=None if infer_features else [Field(name="temperature", dtype=Int32)], - batch_source=source, + source=source, ttl=timedelta(days=2), ) return location_stats_feature_view @@ -231,7 +231,7 @@ def create_field_mapping_feature_view(source): entities=[], # Test that Features still work for FeatureViews. features=[Feature(name="feature_name", dtype=ValueType.INT32)], - batch_source=source, + source=source, ttl=timedelta(days=2), ) @@ -252,5 +252,5 @@ def create_pushable_feature_view(batch_source: DataSource): # Test that Features still work for FeatureViews. features=[Feature(name="temperature", dtype=ValueType.INT32)], ttl=timedelta(days=2), - stream_source=push_source, + source=push_source, )