Skip to content

Commit

Permalink
fix: Support passing batch source to streaming sources for backfills (#…
Browse files Browse the repository at this point in the history
…2523)

* fix: Support passing batch source to streaming sources for backfills

Signed-off-by: Achal Shah <achals@gmail.com>

* fix tests

Signed-off-by: Achal Shah <achals@gmail.com>

* fix tests

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals authored Apr 11, 2022
1 parent c22fa2c commit 90db1d1
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 12 deletions.
6 changes: 3 additions & 3 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri

joinKeyTypes := make(map[string]int32)

for viewName, _ := range viewNames {
for viewName := range viewNames {
view, err := s.fs.GetFeatureView(viewName, true)
if err != nil {
// skip on demand feature views
continue
}
for entityName, _ := range view.Entities {
for entityName := range view.Entities {
entity := entitiesByName[entityName]
joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number())
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceN
// skip on demand feature views
continue
}
for entityName, _ := range view.Entities {
for entityName := range view.Entities {
entity := entitiesByName[entityName]
joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number())
}
Expand Down
9 changes: 6 additions & 3 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 26
// Next available id: 27
message DataSource {
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
// but they are going to be reserved for backwards compatibility.
Expand Down Expand Up @@ -82,6 +82,10 @@ message DataSource {
// first party sources as well.
string data_source_class_type = 17;

// Optional batch source for streaming sources for historical features and materialization.
DataSource batch_source = 26;


// Defines options for DataSource that sources features from a file
message FileOptions {
FileFormat file_format = 1;
Expand Down Expand Up @@ -128,6 +132,7 @@ message DataSource {

// Defines the stream data format encoding feature/entity data in Kafka messages.
StreamFormat message_format = 3;

}

// Defines options for DataSource that sources features from Kinesis records.
Expand Down Expand Up @@ -199,8 +204,6 @@ message DataSource {
message PushOptions {
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 1;
// Optional batch source for the push source for historical features and materialization.
DataSource batch_source = 2;
}


Expand Down
20 changes: 15 additions & 5 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ def __init__(
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
timestamp_field: Optional[str] = "",
batch_source: Optional[DataSource] = None,
):
super().__init__(
event_timestamp_column=event_timestamp_column,
Expand All @@ -372,6 +373,7 @@ def __init__(
name=name,
timestamp_field=timestamp_field,
)
self.batch_source = batch_source
self.kafka_options = KafkaOptions(
bootstrap_servers=bootstrap_servers,
message_format=message_format,
Expand Down Expand Up @@ -411,6 +413,7 @@ def from_proto(data_source: DataSourceProto):
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
batch_source=DataSource.from_proto(data_source.batch_source),
)

def to_proto(self) -> DataSourceProto:
Expand All @@ -427,6 +430,8 @@ def to_proto(self) -> DataSourceProto:
data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column
if self.batch_source:
data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto())
return data_source_proto

@staticmethod
Expand Down Expand Up @@ -546,6 +551,7 @@ def from_proto(data_source: DataSourceProto):
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
batch_source=DataSource.from_proto(data_source.batch_source),
)

@staticmethod
Expand All @@ -569,6 +575,7 @@ def __init__(
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
timestamp_field: Optional[str] = "",
batch_source: Optional[DataSource] = None,
):
super().__init__(
name=name,
Expand All @@ -581,6 +588,7 @@ def __init__(
owner=owner,
timestamp_field=timestamp_field,
)
self.batch_source = batch_source
self.kinesis_options = KinesisOptions(
record_format=record_format, region=region, stream_name=stream_name
)
Expand Down Expand Up @@ -618,6 +626,8 @@ def to_proto(self) -> DataSourceProto:
data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column
if self.batch_source:
data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto())

return data_source_proto

Expand All @@ -634,6 +644,7 @@ class PushSource(DataSource):

def __init__(
self,
*,
name: str,
schema: Dict[str, ValueType],
batch_source: DataSource,
Expand Down Expand Up @@ -693,8 +704,8 @@ def from_proto(data_source: DataSourceProto):
for key, val in schema_pb.items():
schema[key] = ValueType(val)

assert data_source.push_options.HasField("batch_source")
batch_source = DataSource.from_proto(data_source.push_options.batch_source)
assert data_source.HasField("batch_source")
batch_source = DataSource.from_proto(data_source.batch_source)

return PushSource(
name=data_source.name,
Expand All @@ -714,9 +725,7 @@ def to_proto(self) -> DataSourceProto:
if self.batch_source:
batch_source_proto = self.batch_source.to_proto()

options = DataSourceProto.PushOptions(
schema=schema_pb, batch_source=batch_source_proto
)
options = DataSourceProto.PushOptions(schema=schema_pb,)
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.PUSH_SOURCE,
Expand All @@ -725,6 +734,7 @@ def to_proto(self) -> DataSourceProto:
description=self.description,
tags=self.tags,
owner=self.owner,
batch_source=batch_source_proto,
)

return data_source_proto
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/unit/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def test_push_with_batch():
batch_source=BigQuerySource(table="test.test"),
)
push_source_proto = push_source.to_proto()
assert push_source_proto.HasField("batch_source")
assert push_source_proto.push_options is not None
assert push_source_proto.push_options.HasField("batch_source")

push_source_unproto = PushSource.from_proto(push_source_proto)

Expand Down

0 comments on commit 90db1d1

Please sign in to comment.