Skip to content

Commit

Permalink
fix: Add PushSource proto and Python class (#2428)
Browse files Browse the repository at this point in the history
* fix: Add PushSource proto and Python class

Signed-off-by: Achal Shah <achals@gmail.com>

* tests

Signed-off-by: Achal Shah <achals@gmail.com>

* remove pdb

Signed-off-by: Achal Shah <achals@gmail.com>

* cr

Signed-off-by: Achal Shah <achals@gmail.com>

* cr

Signed-off-by: Achal Shah <achals@gmail.com>

* cr

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals authored Mar 22, 2022
1 parent 3c41f94 commit 9a4bd63
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 3 deletions.
17 changes: 14 additions & 3 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 22
// Next available id: 23
message DataSource {
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
// but they are going to be reserved for backwards compatibility.
reserved 6 to 10;

// Type of Data Source.
// Next available id: 9
// Next available id: 10
enum SourceType {
INVALID = 0;
BATCH_FILE = 1;
Expand All @@ -44,7 +44,7 @@ message DataSource {
STREAM_KINESIS = 4;
CUSTOM_SOURCE = 6;
REQUEST_SOURCE = 7;

PUSH_SOURCE = 9;
}

// Unique name of data source within the project
Expand Down Expand Up @@ -169,6 +169,16 @@ message DataSource {
map<string, feast.types.ValueType.Enum> schema = 2;
}

// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
// the online store on-demand, such as by stream consumers.
message PushOptions {
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 1;
// Optional batch source for the push source for historical features and materialization.
DataSource batch_source = 2;
}


// DataSource options.
oneof options {
FileOptions file_options = 11;
Expand All @@ -179,5 +189,6 @@ message DataSource {
RequestDataOptions request_data_options = 18;
CustomSourceOptions custom_options = 16;
SnowflakeOptions snowflake_options = 19;
PushOptions push_options = 22;
}
}
72 changes: 72 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,75 @@ def to_proto(self) -> DataSourceProto:
data_source_proto.date_partition_column = self.date_partition_column

return data_source_proto


class PushSource(DataSource):
"""
PushSource that can be used to ingest features on request
Args:
name: Name of the push source
schema: Schema mapping from the input feature name to a ValueType
"""

name: str
schema: Dict[str, ValueType]
batch_source: Optional[DataSource]

def __init__(
self,
name: str,
schema: Dict[str, ValueType],
batch_source: Optional[DataSource] = None,
):
"""Creates a PushSource object."""
super().__init__(name)
self.schema = schema
self.batch_source = batch_source

def validate(self, config: RepoConfig):
pass

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
pass

@staticmethod
def from_proto(data_source: DataSourceProto):
schema_pb = data_source.push_options.schema
schema = {}
for key, value in schema_pb.items():
schema[key] = value

batch_source = None
if data_source.push_options.HasField("batch_source"):
batch_source = DataSource.from_proto(data_source.push_options.batch_source)

return PushSource(
name=data_source.name, schema=schema, batch_source=batch_source
)

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for key, value in self.schema.items():
schema_pb[key] = value
batch_source_proto = None
if self.batch_source:
batch_source_proto = self.batch_source.to_proto()

options = DataSourceProto.PushOptions(
schema=schema_pb, batch_source=batch_source_proto
)
data_source_proto = DataSourceProto(
name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options,
)

return data_source_proto

def get_table_query_string(self) -> str:
raise NotImplementedError

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError
34 changes: 34 additions & 0 deletions sdk/python/tests/unit/test_data_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from feast.data_source import PushSource
from feast.infra.offline_stores.bigquery_source import BigQuerySource
from feast.protos.feast.types.Value_pb2 import ValueType


def test_push_no_batch():
push_source = PushSource(
name="test", schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64}
)
push_source_proto = push_source.to_proto()
assert push_source_proto.push_options is not None
assert not push_source_proto.push_options.HasField("batch_source")
push_source_unproto = PushSource.from_proto(push_source_proto)

assert push_source.name == push_source_unproto.name
assert push_source.schema == push_source_unproto.schema
assert push_source.batch_source == push_source_unproto.batch_source


def test_push_with_batch():
push_source = PushSource(
name="test",
schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64},
batch_source=BigQuerySource(table="test.test"),
)
push_source_proto = push_source.to_proto()
assert push_source_proto.push_options is not None
assert push_source_proto.push_options.HasField("batch_source")

push_source_unproto = PushSource.from_proto(push_source_proto)

assert push_source.name == push_source_unproto.name
assert push_source.schema == push_source_unproto.schema
assert push_source.batch_source.name == push_source_unproto.batch_source.name

0 comments on commit 9a4bd63

Please sign in to comment.