diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 05f9c77e03..217e53d060 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -63,13 +63,13 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri joinKeyTypes := make(map[string]int32) - for viewName, _ := range viewNames { + for viewName := range viewNames { view, err := s.fs.GetFeatureView(viewName, true) if err != nil { // skip on demand feature views continue } - for entityName, _ := range view.Entities { + for entityName := range view.Entities { entity := entitiesByName[entityName] joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) } @@ -98,7 +98,7 @@ func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceN // skip on demand feature views continue } - for entityName, _ := range view.Entities { + for entityName := range view.Entities { entity := entitiesByName[entityName] joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) } diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 6425085e61..93503468c4 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -26,7 +26,7 @@ import "feast/core/DataFormat.proto"; import "feast/types/Value.proto"; // Defines a Data Source that can be used source Feature data -// Next available id: 26 +// Next available id: 27 message DataSource { // Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not, // but they are going to be reserved for backwards compatibility. @@ -82,6 +82,10 @@ message DataSource { // first party sources as well. string data_source_class_type = 17; + // Optional batch source for streaming sources for historical features and materialization. + DataSource batch_source = 26; + + // Defines options for DataSource that sources features from a file message FileOptions { FileFormat file_format = 1; @@ -128,6 +132,7 @@ message DataSource { // Defines the stream data format encoding feature/entity data in Kafka messages. StreamFormat message_format = 3; + } // Defines options for DataSource that sources features from Kinesis records. @@ -199,8 +204,6 @@ message DataSource { message PushOptions { // Mapping of feature name to type map schema = 1; - // Optional batch source for the push source for historical features and materialization. - DataSource batch_source = 2; } diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 4dde5a4faa..46df1088db 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -360,6 +360,7 @@ def __init__( tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", timestamp_field: Optional[str] = "", + batch_source: Optional[DataSource] = None, ): super().__init__( event_timestamp_column=event_timestamp_column, @@ -372,6 +373,7 @@ def __init__( name=name, timestamp_field=timestamp_field, ) + self.batch_source = batch_source self.kafka_options = KafkaOptions( bootstrap_servers=bootstrap_servers, message_format=message_format, @@ -411,6 +413,7 @@ def from_proto(data_source: DataSourceProto): description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, + batch_source=DataSource.from_proto(data_source.batch_source), ) def to_proto(self) -> DataSourceProto: @@ -427,6 +430,8 @@ def to_proto(self) -> DataSourceProto: data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column + if self.batch_source: + data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto()) return data_source_proto @staticmethod @@ -546,6 +551,7 @@ def from_proto(data_source: DataSourceProto): description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, + batch_source=DataSource.from_proto(data_source.batch_source), ) @staticmethod @@ -569,6 +575,7 @@ def __init__( tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", timestamp_field: Optional[str] = "", + batch_source: Optional[DataSource] = None, ): super().__init__( name=name, @@ -581,6 +588,7 @@ def __init__( owner=owner, timestamp_field=timestamp_field, ) + self.batch_source = batch_source self.kinesis_options = KinesisOptions( record_format=record_format, region=region, stream_name=stream_name ) @@ -618,6 +626,8 @@ def to_proto(self) -> DataSourceProto: data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column + if self.batch_source: + data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto()) return data_source_proto @@ -634,6 +644,7 @@ class PushSource(DataSource): def __init__( self, + *, name: str, schema: Dict[str, ValueType], batch_source: DataSource, @@ -693,8 +704,8 @@ def from_proto(data_source: DataSourceProto): for key, val in schema_pb.items(): schema[key] = ValueType(val) - assert data_source.push_options.HasField("batch_source") - batch_source = DataSource.from_proto(data_source.push_options.batch_source) + assert data_source.HasField("batch_source") + batch_source = DataSource.from_proto(data_source.batch_source) return PushSource( name=data_source.name, @@ -714,9 +725,7 @@ def to_proto(self) -> DataSourceProto: if self.batch_source: batch_source_proto = self.batch_source.to_proto() - options = DataSourceProto.PushOptions( - schema=schema_pb, batch_source=batch_source_proto - ) + options = DataSourceProto.PushOptions(schema=schema_pb,) data_source_proto = DataSourceProto( name=self.name, type=DataSourceProto.PUSH_SOURCE, @@ -725,6 +734,7 @@ def to_proto(self) -> DataSourceProto: description=self.description, tags=self.tags, owner=self.owner, + batch_source=batch_source_proto, ) return data_source_proto diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 6e8e44c0e3..f32089b3b9 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -12,8 +12,8 @@ def test_push_with_batch(): batch_source=BigQuerySource(table="test.test"), ) push_source_proto = push_source.to_proto() + assert push_source_proto.HasField("batch_source") assert push_source_proto.push_options is not None - assert push_source_proto.push_options.HasField("batch_source") push_source_unproto = PushSource.from_proto(push_source_proto)