diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 1200c1b9be..099ba32d92 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -26,6 +26,10 @@ import "feast/core/DataFormat.proto"; // Defines a Data Source that can be used source Feature data message DataSource { + // Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not, + // but they are going to be reserved for backwards compatibility. + reserved 6 to 10; + // Type of Data Source. enum SourceType { INVALID = 0; @@ -34,6 +38,7 @@ message DataSource { STREAM_KAFKA = 3; STREAM_KINESIS = 4; BATCH_REDSHIFT = 5; + CUSTOM_SOURCE = 6; } SourceType type = 1; @@ -51,6 +56,10 @@ message DataSource { // Must specify creation timestamp column name string created_timestamp_column = 5; + // This is an internal field that is represents the python class for the data source object a proto object represents. + // This should be set by feast, and not by users. + string data_source_class_type = 17; + // Defines options for DataSource that sources features from a file message FileOptions { FileFormat file_format = 1; @@ -111,6 +120,13 @@ message DataSource { string query = 2; } + // Defines configuration for custom third-party data sources. + message CustomSourceOptions { + // Serialized configuration information for the data source. The implementer of the custom data source is + // responsible for serializing and deserializing data from bytes + bytes configuration = 1; + } + // DataSource options. oneof options { FileOptions file_options = 11; @@ -118,5 +134,6 @@ message DataSource { KafkaOptions kafka_options = 13; KinesisOptions kinesis_options = 14; RedshiftOptions redshift_options = 15; + CustomSourceOptions custom_options = 16; } } diff --git a/sdk/python/feast/.DS_Store b/sdk/python/feast/.DS_Store new file mode 100644 index 0000000000..aefa87c191 Binary files /dev/null and b/sdk/python/feast/.DS_Store differ diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index 6f1cb58451..43d7aa939b 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -2,15 +2,12 @@ from pkg_resources import DistributionNotFound, get_distribution +from feast.infra.offline_stores.bigquery import BigQuerySource +from feast.infra.offline_stores.file import FileSource +from feast.infra.offline_stores.redshift import RedshiftSource + from .client import Client -from .data_source import ( - BigQuerySource, - FileSource, - KafkaSource, - KinesisSource, - RedshiftSource, - SourceType, -) +from .data_source import KafkaSource, KinesisSource, SourceType from .entity import Entity from .feature import Feature from .feature_store import FeatureStore @@ -32,10 +29,9 @@ pass __all__ = [ + "BigQuerySource", "Client", "Entity", - "BigQuerySource", - "FileSource", "KafkaSource", "KinesisSource", "RedshiftSource", @@ -46,4 +42,5 @@ "RepoConfig", "SourceType", "ValueType", + "FileSource", ] diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 10d1859fb8..52da77fd5f 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -20,10 +20,10 @@ import grpc import pandas as pd +from feast import BigQuerySource, FileSource from feast.config import Config from feast.constants import ConfigOptions as opt from feast.data_format import ParquetFormat -from feast.data_source import BigQuerySource, FileSource from feast.entity import Entity from feast.feature import Feature, FeatureRef, _build_feature_references from feast.feature_table import FeatureTable diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 30b192f6ed..6886fa0c26 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -14,15 +14,13 @@ import enum +from abc import ABC, abstractmethod from typing import Callable, Dict, Iterable, Optional, Tuple -from pyarrow.parquet import ParquetFile - from feast import type_map -from feast.data_format import FileFormat, StreamFormat -from feast.errors import DataSourceNotFoundException, RedshiftCredentialsError +from feast.data_format import StreamFormat from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto -from feast.repo_config import RepoConfig +from feast.repo_config import RepoConfig, get_data_source_class_from_type from feast.value_type import ValueType @@ -38,151 +36,6 @@ class SourceType(enum.Enum): STREAM_KINESIS = 4 -class FileOptions: - """ - DataSource File options used to source features from a file - """ - - def __init__( - self, file_format: Optional[FileFormat], file_url: Optional[str], - ): - self._file_format = file_format - self._file_url = file_url - - @property - def file_format(self): - """ - Returns the file format of this file - """ - return self._file_format - - @file_format.setter - def file_format(self, file_format): - """ - Sets the file format of this file - """ - self._file_format = file_format - - @property - def file_url(self): - """ - Returns the file url of this file - """ - return self._file_url - - @file_url.setter - def file_url(self, file_url): - """ - Sets the file url of this file - """ - self._file_url = file_url - - @classmethod - def from_proto(cls, file_options_proto: DataSourceProto.FileOptions): - """ - Creates a FileOptions from a protobuf representation of a file option - - args: - file_options_proto: a protobuf representation of a datasource - - Returns: - Returns a FileOptions object based on the file_options protobuf - """ - file_options = cls( - file_format=FileFormat.from_proto(file_options_proto.file_format), - file_url=file_options_proto.file_url, - ) - return file_options - - def to_proto(self) -> DataSourceProto.FileOptions: - """ - Converts an FileOptionsProto object to its protobuf representation. - - Returns: - FileOptionsProto protobuf - """ - - file_options_proto = DataSourceProto.FileOptions( - file_format=( - None if self.file_format is None else self.file_format.to_proto() - ), - file_url=self.file_url, - ) - - return file_options_proto - - -class BigQueryOptions: - """ - DataSource BigQuery options used to source features from BigQuery query - """ - - def __init__(self, table_ref: Optional[str], query: Optional[str]): - self._table_ref = table_ref - self._query = query - - @property - def query(self): - """ - Returns the BigQuery SQL query referenced by this source - """ - return self._query - - @query.setter - def query(self, query): - """ - Sets the BigQuery SQL query referenced by this source - """ - self._query = query - - @property - def table_ref(self): - """ - Returns the table ref of this BQ table - """ - return self._table_ref - - @table_ref.setter - def table_ref(self, table_ref): - """ - Sets the table ref of this BQ table - """ - self._table_ref = table_ref - - @classmethod - def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): - """ - Creates a BigQueryOptions from a protobuf representation of a BigQuery option - - Args: - bigquery_options_proto: A protobuf representation of a DataSource - - Returns: - Returns a BigQueryOptions object based on the bigquery_options protobuf - """ - - bigquery_options = cls( - table_ref=bigquery_options_proto.table_ref, - query=bigquery_options_proto.query, - ) - - return bigquery_options - - def to_proto(self) -> DataSourceProto.BigQueryOptions: - """ - Converts an BigQueryOptionsProto object to its protobuf representation. - - Returns: - BigQueryOptionsProto protobuf - """ - - bigquery_options_proto = DataSourceProto.BigQueryOptions( - table_ref=self.table_ref, query=self.query, - ) - - return bigquery_options_proto - - class KafkaOptions: """ DataSource Kafka options used to source features from Kafka messages @@ -365,7 +218,7 @@ def to_proto(self) -> DataSourceProto.KinesisOptions: return kinesis_options_proto -class DataSource: +class DataSource(ABC): """ DataSource that can be used source features """ @@ -453,77 +306,48 @@ def date_partition_column(self, date_partition_column): self._date_partition_column = date_partition_column @staticmethod - def from_proto(data_source): + @abstractmethod + def from_proto(data_source: DataSourceProto): """ Convert data source config in FeatureTable spec to a DataSource class object. """ + if data_source.data_source_class_type: + cls = get_data_source_class_from_type(data_source.data_source_class_type) + return cls.from_proto(data_source) + if data_source.file_options.file_format and data_source.file_options.file_url: - data_source_obj = FileSource( - field_mapping=data_source.field_mapping, - file_format=FileFormat.from_proto(data_source.file_options.file_format), - path=data_source.file_options.file_url, - event_timestamp_column=data_source.event_timestamp_column, - created_timestamp_column=data_source.created_timestamp_column, - date_partition_column=data_source.date_partition_column, - ) + from feast.infra.offline_stores.file import FileSource + + data_source_obj = FileSource.from_proto(data_source) elif ( data_source.bigquery_options.table_ref or data_source.bigquery_options.query ): - data_source_obj = BigQuerySource( - field_mapping=data_source.field_mapping, - table_ref=data_source.bigquery_options.table_ref, - event_timestamp_column=data_source.event_timestamp_column, - created_timestamp_column=data_source.created_timestamp_column, - date_partition_column=data_source.date_partition_column, - query=data_source.bigquery_options.query, - ) + from feast.infra.offline_stores.bigquery import BigQuerySource + + data_source_obj = BigQuerySource.from_proto(data_source) elif data_source.redshift_options.table or data_source.redshift_options.query: - data_source_obj = RedshiftSource( - field_mapping=data_source.field_mapping, - table=data_source.redshift_options.table, - event_timestamp_column=data_source.event_timestamp_column, - created_timestamp_column=data_source.created_timestamp_column, - date_partition_column=data_source.date_partition_column, - query=data_source.redshift_options.query, - ) + from feast.infra.offline_stores.redshift import RedshiftSource + + data_source_obj = RedshiftSource.from_proto(data_source) elif ( data_source.kafka_options.bootstrap_servers and data_source.kafka_options.topic and data_source.kafka_options.message_format ): - data_source_obj = KafkaSource( - field_mapping=data_source.field_mapping, - bootstrap_servers=data_source.kafka_options.bootstrap_servers, - message_format=StreamFormat.from_proto( - data_source.kafka_options.message_format - ), - topic=data_source.kafka_options.topic, - event_timestamp_column=data_source.event_timestamp_column, - created_timestamp_column=data_source.created_timestamp_column, - date_partition_column=data_source.date_partition_column, - ) + data_source_obj = KafkaSource.from_proto(data_source) elif ( data_source.kinesis_options.record_format and data_source.kinesis_options.region and data_source.kinesis_options.stream_name ): - data_source_obj = KinesisSource( - field_mapping=data_source.field_mapping, - record_format=StreamFormat.from_proto( - data_source.kinesis_options.record_format - ), - region=data_source.kinesis_options.region, - stream_name=data_source.kinesis_options.stream_name, - event_timestamp_column=data_source.event_timestamp_column, - created_timestamp_column=data_source.created_timestamp_column, - date_partition_column=data_source.date_partition_column, - ) + data_source_obj = KinesisSource.from_proto(data_source) else: raise ValueError("Could not identify the source type being added") return data_source_obj + @abstractmethod def to_proto(self) -> DataSourceProto: """ Converts an DataSourceProto object to its protobuf representation. @@ -537,6 +361,7 @@ def validate(self, config: RepoConfig): raise NotImplementedError @staticmethod + @abstractmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: """ Get the callable method that returns Feast type given the raw column type @@ -552,229 +377,6 @@ def get_table_column_names_and_types( raise NotImplementedError -class FileSource(DataSource): - def __init__( - self, - event_timestamp_column: Optional[str] = "", - file_url: Optional[str] = None, - path: Optional[str] = None, - file_format: FileFormat = None, - created_timestamp_column: Optional[str] = "", - field_mapping: Optional[Dict[str, str]] = None, - date_partition_column: Optional[str] = "", - ): - """Create a FileSource from a file containing feature data. Only Parquet format supported. - - Args: - - path: File path to file containing feature data. Must contain an event_timestamp column, entity columns and - feature columns. - event_timestamp_column: Event timestamp column used for point in time joins of feature values. - created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. - file_url: [Deprecated] Please see path - file_format (optional): Explicitly set the file format. Allows Feast to bypass inferring the file format. - field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table - or view. Only used for feature columns, not entities or timestamp columns. - - Examples: - >>> FileSource(path="/data/my_features.parquet", event_timestamp_column="datetime") - """ - if path is None and file_url is None: - raise ValueError( - 'No "path" argument provided. Please set "path" to the location of your file source.' - ) - if file_url: - from warnings import warn - - warn( - 'Argument "file_url" is being deprecated. Please use the "path" argument.' - ) - else: - file_url = path - - self._file_options = FileOptions(file_format=file_format, file_url=file_url) - - super().__init__( - event_timestamp_column, - created_timestamp_column, - field_mapping, - date_partition_column, - ) - - def __eq__(self, other): - if not isinstance(other, FileSource): - raise TypeError("Comparisons should only involve FileSource class objects.") - - return ( - self.file_options.file_url == other.file_options.file_url - and self.file_options.file_format == other.file_options.file_format - and self.event_timestamp_column == other.event_timestamp_column - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping - ) - - @property - def file_options(self): - """ - Returns the file options of this data source - """ - return self._file_options - - @file_options.setter - def file_options(self, file_options): - """ - Sets the file options of this data source - """ - self._file_options = file_options - - @property - def path(self): - """ - Returns the file path of this feature data source - """ - return self._file_options.file_url - - def to_proto(self) -> DataSourceProto: - data_source_proto = DataSourceProto( - type=DataSourceProto.BATCH_FILE, - field_mapping=self.field_mapping, - file_options=self.file_options.to_proto(), - ) - - data_source_proto.event_timestamp_column = self.event_timestamp_column - data_source_proto.created_timestamp_column = self.created_timestamp_column - data_source_proto.date_partition_column = self.date_partition_column - - return data_source_proto - - def validate(self, config: RepoConfig): - # TODO: validate a FileSource - pass - - @staticmethod - def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: - return type_map.pa_to_feast_value_type - - def get_table_column_names_and_types( - self, config: RepoConfig - ) -> Iterable[Tuple[str, str]]: - schema = ParquetFile(self.path).schema_arrow - return zip(schema.names, map(str, schema.types)) - - -class BigQuerySource(DataSource): - def __init__( - self, - event_timestamp_column: Optional[str] = "", - table_ref: Optional[str] = None, - created_timestamp_column: Optional[str] = "", - field_mapping: Optional[Dict[str, str]] = None, - date_partition_column: Optional[str] = "", - query: Optional[str] = None, - ): - self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) - - super().__init__( - event_timestamp_column, - created_timestamp_column, - field_mapping, - date_partition_column, - ) - - def __eq__(self, other): - if not isinstance(other, BigQuerySource): - raise TypeError( - "Comparisons should only involve BigQuerySource class objects." - ) - - return ( - self.bigquery_options.table_ref == other.bigquery_options.table_ref - and self.bigquery_options.query == other.bigquery_options.query - and self.event_timestamp_column == other.event_timestamp_column - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping - ) - - @property - def table_ref(self): - return self._bigquery_options.table_ref - - @property - def query(self): - return self._bigquery_options.query - - @property - def bigquery_options(self): - """ - Returns the bigquery options of this data source - """ - return self._bigquery_options - - @bigquery_options.setter - def bigquery_options(self, bigquery_options): - """ - Sets the bigquery options of this data source - """ - self._bigquery_options = bigquery_options - - def to_proto(self) -> DataSourceProto: - data_source_proto = DataSourceProto( - type=DataSourceProto.BATCH_BIGQUERY, - field_mapping=self.field_mapping, - bigquery_options=self.bigquery_options.to_proto(), - ) - - data_source_proto.event_timestamp_column = self.event_timestamp_column - data_source_proto.created_timestamp_column = self.created_timestamp_column - data_source_proto.date_partition_column = self.date_partition_column - - return data_source_proto - - def validate(self, config: RepoConfig): - if not self.query: - from google.api_core.exceptions import NotFound - from google.cloud import bigquery - - client = bigquery.Client() - try: - client.get_table(self.table_ref) - except NotFound: - raise DataSourceNotFoundException(self.table_ref) - - def get_table_query_string(self) -> str: - """Returns a string that can directly be used to reference this table in SQL""" - if self.table_ref: - return f"`{self.table_ref}`" - else: - return f"({self.query})" - - @staticmethod - def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: - return type_map.bq_to_feast_value_type - - def get_table_column_names_and_types( - self, config: RepoConfig - ) -> Iterable[Tuple[str, str]]: - from google.cloud import bigquery - - client = bigquery.Client() - if self.table_ref is not None: - table_schema = client.get_table(self.table_ref).schema - if not isinstance(table_schema[0], bigquery.schema.SchemaField): - raise TypeError("Could not parse BigQuery table schema.") - - name_type_pairs = [(field.name, field.field_type) for field in table_schema] - else: - bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1" - queryRes = client.query(bq_columns_query).result() - name_type_pairs = [ - (schema_field.name, schema_field.field_type) - for schema_field in queryRes.schema - ] - - return name_type_pairs - - class KafkaSource(DataSource): def __init__( self, @@ -783,7 +385,7 @@ def __init__( message_format: StreamFormat, topic: str, created_timestamp_column: Optional[str] = "", - field_mapping: Optional[Dict[str, str]] = dict(), + field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", ): super().__init__( @@ -828,6 +430,20 @@ def kafka_options(self, kafka_options): """ self._kafka_options = kafka_options + @staticmethod + def from_proto(data_source: DataSourceProto): + return KafkaSource( + field_mapping=dict(data_source.field_mapping), + bootstrap_servers=data_source.kafka_options.bootstrap_servers, + message_format=StreamFormat.from_proto( + data_source.kafka_options.message_format + ), + topic=data_source.kafka_options.topic, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, + date_partition_column=data_source.date_partition_column, + ) + def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.STREAM_KAFKA, @@ -841,8 +457,30 @@ def to_proto(self) -> DataSourceProto: return data_source_proto + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + return type_map.redshift_to_feast_value_type + class KinesisSource(DataSource): + @staticmethod + def from_proto(data_source: DataSourceProto): + return KinesisSource( + field_mapping=dict(data_source.field_mapping), + record_format=StreamFormat.from_proto( + data_source.kinesis_options.record_format + ), + region=data_source.kinesis_options.region, + stream_name=data_source.kinesis_options.stream_name, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, + date_partition_column=data_source.date_partition_column, + ) + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + pass + def __init__( self, event_timestamp_column: str, @@ -850,7 +488,7 @@ def __init__( record_format: StreamFormat, region: str, stream_name: str, - field_mapping: Optional[Dict[str, str]] = dict(), + field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", ): super().__init__( @@ -907,204 +545,3 @@ def to_proto(self) -> DataSourceProto: data_source_proto.date_partition_column = self.date_partition_column return data_source_proto - - -class RedshiftOptions: - """ - DataSource Redshift options used to source features from Redshift query - """ - - def __init__(self, table: Optional[str], query: Optional[str]): - self._table = table - self._query = query - - @property - def query(self): - """ - Returns the Redshift SQL query referenced by this source - """ - return self._query - - @query.setter - def query(self, query): - """ - Sets the Redshift SQL query referenced by this source - """ - self._query = query - - @property - def table(self): - """ - Returns the table name of this Redshift table - """ - return self._table - - @table.setter - def table(self, table_name): - """ - Sets the table ref of this Redshift table - """ - self._table = table_name - - @classmethod - def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions): - """ - Creates a RedshiftOptions from a protobuf representation of a Redshift option - - Args: - redshift_options_proto: A protobuf representation of a DataSource - - Returns: - Returns a RedshiftOptions object based on the redshift_options protobuf - """ - - redshift_options = cls( - table=redshift_options_proto.table, query=redshift_options_proto.query, - ) - - return redshift_options - - def to_proto(self) -> DataSourceProto.RedshiftOptions: - """ - Converts an RedshiftOptionsProto object to its protobuf representation. - - Returns: - RedshiftOptionsProto protobuf - """ - - redshift_options_proto = DataSourceProto.RedshiftOptions( - table=self.table, query=self.query, - ) - - return redshift_options_proto - - -class RedshiftSource(DataSource): - def __init__( - self, - event_timestamp_column: Optional[str] = "", - table: Optional[str] = None, - created_timestamp_column: Optional[str] = "", - field_mapping: Optional[Dict[str, str]] = None, - date_partition_column: Optional[str] = "", - query: Optional[str] = None, - ): - super().__init__( - event_timestamp_column, - created_timestamp_column, - field_mapping, - date_partition_column, - ) - - self._redshift_options = RedshiftOptions(table=table, query=query) - - def __eq__(self, other): - if not isinstance(other, RedshiftSource): - raise TypeError( - "Comparisons should only involve RedshiftSource class objects." - ) - - return ( - self.redshift_options.table == other.redshift_options.table - and self.redshift_options.query == other.redshift_options.query - and self.event_timestamp_column == other.event_timestamp_column - and self.created_timestamp_column == other.created_timestamp_column - and self.field_mapping == other.field_mapping - ) - - @property - def table(self): - return self._redshift_options.table - - @property - def query(self): - return self._redshift_options.query - - @property - def redshift_options(self): - """ - Returns the Redshift options of this data source - """ - return self._redshift_options - - @redshift_options.setter - def redshift_options(self, _redshift_options): - """ - Sets the Redshift options of this data source - """ - self._redshift_options = _redshift_options - - def to_proto(self) -> DataSourceProto: - data_source_proto = DataSourceProto( - type=DataSourceProto.BATCH_REDSHIFT, - field_mapping=self.field_mapping, - redshift_options=self.redshift_options.to_proto(), - ) - - data_source_proto.event_timestamp_column = self.event_timestamp_column - data_source_proto.created_timestamp_column = self.created_timestamp_column - data_source_proto.date_partition_column = self.date_partition_column - - return data_source_proto - - def validate(self, config: RepoConfig): - # As long as the query gets successfully executed, or the table exists, - # the data source is validated. We don't need the results though. - # TODO: uncomment this - # self.get_table_column_names_and_types(config) - print("Validate", self.get_table_column_names_and_types(config)) - - def get_table_query_string(self) -> str: - """Returns a string that can directly be used to reference this table in SQL""" - if self.table: - return f'"{self.table}"' - else: - return f"({self.query})" - - @staticmethod - def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: - return type_map.redshift_to_feast_value_type - - def get_table_column_names_and_types( - self, config: RepoConfig - ) -> Iterable[Tuple[str, str]]: - from botocore.exceptions import ClientError - - from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig - from feast.infra.utils import aws_utils - - assert isinstance(config.offline_store, RedshiftOfflineStoreConfig) - - client = aws_utils.get_redshift_data_client(config.offline_store.region) - - if self.table is not None: - try: - table = client.describe_table( - ClusterIdentifier=config.offline_store.cluster_id, - Database=config.offline_store.database, - DbUser=config.offline_store.user, - Table=self.table, - ) - except ClientError as e: - if e.response["Error"]["Code"] == "ValidationException": - raise RedshiftCredentialsError() from e - raise - - # The API returns valid JSON with empty column list when the table doesn't exist - if len(table["ColumnList"]) == 0: - raise DataSourceNotFoundException(self.table) - - columns = table["ColumnList"] - else: - statement_id = aws_utils.execute_redshift_statement( - client, - config.offline_store.cluster_id, - config.offline_store.database, - config.offline_store.user, - f"SELECT * FROM ({self.query}) LIMIT 1", - ) - columns = aws_utils.get_redshift_statement_result(client, statement_id)[ - "ColumnMetadata" - ] - - return [(column["name"], column["typeName"].upper()) for column in columns] diff --git a/sdk/python/feast/feature_table.py b/sdk/python/feast/feature_table.py index 84463baa72..2b09fea8bd 100644 --- a/sdk/python/feast/feature_table.py +++ b/sdk/python/feast/feature_table.py @@ -20,13 +20,7 @@ from google.protobuf.json_format import MessageToDict, MessageToJson from google.protobuf.timestamp_pb2 import Timestamp -from feast.data_source import ( - BigQuerySource, - DataSource, - FileSource, - KafkaSource, - KinesisSource, -) +from feast.data_source import DataSource, KafkaSource, KinesisSource from feast.feature import Feature from feast.loaders import yaml as feast_yaml from feast.protos.feast.core.FeatureTable_pb2 import FeatureTable as FeatureTableProto @@ -49,7 +43,7 @@ def __init__( name: str, entities: List[str], features: List[Feature], - batch_source: Union[BigQuerySource, FileSource] = None, + batch_source: DataSource = None, stream_source: Optional[Union[KafkaSource, KinesisSource]] = None, max_age: Optional[Duration] = None, labels: Optional[MutableMapping[str, str]] = None, @@ -147,7 +141,7 @@ def batch_source(self): return self._batch_source @batch_source.setter - def batch_source(self, batch_source: Union[BigQuerySource, FileSource]): + def batch_source(self, batch_source: DataSource): """ Sets the batch source of this feature table """ @@ -317,6 +311,14 @@ def to_proto(self) -> FeatureTableProto: last_updated_timestamp=self.last_updated_timestamp, ) + batch_source_proto = self.batch_source.to_proto() + batch_source_proto.data_source_class_type = f"{self.batch_source.__class__.__module__}.{self.batch_source.__class__.__name__}" + + stream_source_proto = None + if self.stream_source: + stream_source_proto = self.stream_source.to_proto() + stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}" + spec = FeatureTableSpecProto( name=self.name, entities=self.entities, @@ -326,16 +328,8 @@ def to_proto(self) -> FeatureTableProto: ], labels=self.labels, max_age=self.max_age, - batch_source=( - self.batch_source.to_proto() - if issubclass(type(self.batch_source), DataSource) - else self.batch_source - ), - stream_source=( - self.stream_source.to_proto() - if issubclass(type(self.stream_source), DataSource) - else self.stream_source - ), + batch_source=batch_source_proto, + stream_source=stream_source_proto, ) return FeatureTableProto(spec=spec, meta=meta) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 9728ad2092..b1c8a47902 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -182,6 +182,16 @@ def to_proto(self) -> FeatureViewProto: ttl_duration = Duration() ttl_duration.FromTimedelta(self.ttl) + batch_source_proto = self.input.to_proto() + batch_source_proto.data_source_class_type = ( + f"{self.input.__class__.__module__}.{self.input.__class__.__name__}" + ) + + stream_source_proto = None + if self.stream_source: + stream_source_proto = self.stream_source.to_proto() + stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}" + spec = FeatureViewSpecProto( name=self.name, entities=self.entities, @@ -189,12 +199,8 @@ def to_proto(self) -> FeatureViewProto: tags=self.tags, ttl=(ttl_duration if ttl_duration is not None else None), online=self.online, - batch_source=self.input.to_proto(), - stream_source=( - self.stream_source.to_proto() - if self.stream_source is not None - else None - ), + batch_source=batch_source_proto, + stream_source=stream_source_proto, ) return FeatureViewProto(spec=spec, meta=meta) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 28b764fd80..721c34fb1a 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -1,8 +1,8 @@ import re from typing import List -from feast import Entity -from feast.data_source import BigQuerySource, DataSource, FileSource, RedshiftSource +from feast import BigQuerySource, Entity, FileSource, RedshiftSource +from feast.data_source import DataSource from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView from feast.repo_config import RepoConfig diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 06aaec5a7b..9cea722c95 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -2,7 +2,7 @@ import uuid from dataclasses import asdict, dataclass from datetime import date, datetime, timedelta -from typing import List, Optional, Set, Union +from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, Union import pandas import pyarrow @@ -12,11 +12,12 @@ from pydantic.typing import Literal from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed -from feast import errors -from feast.data_source import BigQuerySource, DataSource +from feast import errors, type_map +from feast.data_source import DataSource from feast.errors import ( BigQueryJobCancelled, BigQueryJobStillRunning, + DataSourceNotFoundException, FeastProviderLoginError, ) from feast.feature_view import FeatureView @@ -25,8 +26,10 @@ DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, _get_requested_feature_views_to_features_dict, ) +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.value_type import ValueType try: from google.api_core.exceptions import NotFound @@ -693,3 +696,201 @@ def _get_bigquery_client(project: Optional[str] = None): ) USING (entity_row_unique_id) {% endfor %} """ + + +class BigQuerySource(DataSource): + def __init__( + self, + event_timestamp_column: Optional[str] = "", + table_ref: Optional[str] = None, + created_timestamp_column: Optional[str] = "", + field_mapping: Optional[Dict[str, str]] = None, + date_partition_column: Optional[str] = "", + query: Optional[str] = None, + ): + self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) + + super().__init__( + event_timestamp_column, + created_timestamp_column, + field_mapping, + date_partition_column, + ) + + def __eq__(self, other): + if not isinstance(other, BigQuerySource): + raise TypeError( + "Comparisons should only involve BigQuerySource class objects." + ) + + return ( + self.bigquery_options.table_ref == other.bigquery_options.table_ref + and self.bigquery_options.query == other.bigquery_options.query + and self.event_timestamp_column == other.event_timestamp_column + and self.created_timestamp_column == other.created_timestamp_column + and self.field_mapping == other.field_mapping + ) + + @property + def table_ref(self): + return self._bigquery_options.table_ref + + @property + def query(self): + return self._bigquery_options.query + + @property + def bigquery_options(self): + """ + Returns the bigquery options of this data source + """ + return self._bigquery_options + + @bigquery_options.setter + def bigquery_options(self, bigquery_options): + """ + Sets the bigquery options of this data source + """ + self._bigquery_options = bigquery_options + + @staticmethod + def from_proto(data_source: DataSourceProto): + + assert data_source.HasField("bigquery_options") + + return BigQuerySource( + field_mapping=dict(data_source.field_mapping), + table_ref=data_source.bigquery_options.table_ref, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, + date_partition_column=data_source.date_partition_column, + query=data_source.bigquery_options.query, + ) + + def to_proto(self) -> DataSourceProto: + data_source_proto = DataSourceProto( + type=DataSourceProto.BATCH_BIGQUERY, + field_mapping=self.field_mapping, + bigquery_options=self.bigquery_options.to_proto(), + ) + + data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.created_timestamp_column = self.created_timestamp_column + data_source_proto.date_partition_column = self.date_partition_column + + return data_source_proto + + def validate(self, config: RepoConfig): + if not self.query: + from google.api_core.exceptions import NotFound + from google.cloud import bigquery + + client = bigquery.Client() + try: + client.get_table(self.table_ref) + except NotFound: + raise DataSourceNotFoundException(self.table_ref) + + def get_table_query_string(self) -> str: + """Returns a string that can directly be used to reference this table in SQL""" + if self.table_ref: + return f"`{self.table_ref}`" + else: + return f"({self.query})" + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + return type_map.bq_to_feast_value_type + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + from google.cloud import bigquery + + client = bigquery.Client() + if self.table_ref is not None: + table_schema = client.get_table(self.table_ref).schema + if not isinstance(table_schema[0], bigquery.schema.SchemaField): + raise TypeError("Could not parse BigQuery table schema.") + + name_type_pairs = [(field.name, field.field_type) for field in table_schema] + else: + bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1" + queryRes = client.query(bq_columns_query).result() + name_type_pairs = [ + (schema_field.name, schema_field.field_type) + for schema_field in queryRes.schema + ] + + return name_type_pairs + + +class BigQueryOptions: + """ + DataSource BigQuery options used to source features from BigQuery query + """ + + def __init__(self, table_ref: Optional[str], query: Optional[str]): + self._table_ref = table_ref + self._query = query + + @property + def query(self): + """ + Returns the BigQuery SQL query referenced by this source + """ + return self._query + + @query.setter + def query(self, query): + """ + Sets the BigQuery SQL query referenced by this source + """ + self._query = query + + @property + def table_ref(self): + """ + Returns the table ref of this BQ table + """ + return self._table_ref + + @table_ref.setter + def table_ref(self, table_ref): + """ + Sets the table ref of this BQ table + """ + self._table_ref = table_ref + + @classmethod + def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): + """ + Creates a BigQueryOptions from a protobuf representation of a BigQuery option + + Args: + bigquery_options_proto: A protobuf representation of a DataSource + + Returns: + Returns a BigQueryOptions object based on the bigquery_options protobuf + """ + + bigquery_options = cls( + table_ref=bigquery_options_proto.table_ref, + query=bigquery_options_proto.query, + ) + + return bigquery_options + + def to_proto(self) -> DataSourceProto.BigQueryOptions: + """ + Converts an BigQueryOptionsProto object to its protobuf representation. + + Returns: + BigQueryOptionsProto protobuf + """ + + bigquery_options_proto = DataSourceProto.BigQueryOptions( + table_ref=self.table_ref, query=self.query, + ) + + return bigquery_options_proto diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 8ff896ba61..e6f95ee162 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -1,12 +1,15 @@ from datetime import datetime -from typing import Callable, List, Optional, Union +from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union import pandas as pd import pyarrow import pytz +from pyarrow.parquet import ParquetFile from pydantic.typing import Literal -from feast.data_source import DataSource, FileSource +from feast import type_map +from feast.data_format import FileFormat +from feast.data_source import DataSource from feast.errors import FeastJoinKeysDuringMaterialization from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob @@ -15,8 +18,10 @@ _get_requested_feature_views_to_features_dict, _run_field_mapping, ) +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.value_type import ValueType class FileOfflineStoreConfig(FeastConfigBaseModel): @@ -265,3 +270,198 @@ def evaluate_offline_job(): return last_values_df[columns_to_extract] return FileRetrievalJob(evaluation_function=evaluate_offline_job) + + +class FileSource(DataSource): + def __init__( + self, + event_timestamp_column: Optional[str] = "", + file_url: Optional[str] = None, + path: Optional[str] = None, + file_format: FileFormat = None, + created_timestamp_column: Optional[str] = "", + field_mapping: Optional[Dict[str, str]] = None, + date_partition_column: Optional[str] = "", + ): + """Create a FileSource from a file containing feature data. Only Parquet format supported. + + Args: + + path: File path to file containing feature data. Must contain an event_timestamp column, entity columns and + feature columns. + event_timestamp_column: Event timestamp column used for point in time joins of feature values. + created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. + file_url: [Deprecated] Please see path + file_format (optional): Explicitly set the file format. Allows Feast to bypass inferring the file format. + field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table + or view. Only used for feature columns, not entities or timestamp columns. + + Examples: + >>> FileSource(path="/data/my_features.parquet", event_timestamp_column="datetime") + """ + if path is None and file_url is None: + raise ValueError( + 'No "path" argument provided. Please set "path" to the location of your file source.' + ) + if file_url: + from warnings import warn + + warn( + 'Argument "file_url" is being deprecated. Please use the "path" argument.' + ) + else: + file_url = path + + self._file_options = FileOptions(file_format=file_format, file_url=file_url) + + super().__init__( + event_timestamp_column, + created_timestamp_column, + field_mapping, + date_partition_column, + ) + + def __eq__(self, other): + if not isinstance(other, FileSource): + raise TypeError("Comparisons should only involve FileSource class objects.") + + return ( + self.file_options.file_url == other.file_options.file_url + and self.file_options.file_format == other.file_options.file_format + and self.event_timestamp_column == other.event_timestamp_column + and self.created_timestamp_column == other.created_timestamp_column + and self.field_mapping == other.field_mapping + ) + + @property + def file_options(self): + """ + Returns the file options of this data source + """ + return self._file_options + + @file_options.setter + def file_options(self, file_options): + """ + Sets the file options of this data source + """ + self._file_options = file_options + + @property + def path(self): + """ + Returns the file path of this feature data source + """ + return self._file_options.file_url + + @staticmethod + def from_proto(data_source: DataSourceProto): + return FileSource( + field_mapping=dict(data_source.field_mapping), + file_format=FileFormat.from_proto(data_source.file_options.file_format), + path=data_source.file_options.file_url, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, + date_partition_column=data_source.date_partition_column, + ) + + def to_proto(self) -> DataSourceProto: + data_source_proto = DataSourceProto( + type=DataSourceProto.BATCH_FILE, + field_mapping=self.field_mapping, + file_options=self.file_options.to_proto(), + ) + + data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.created_timestamp_column = self.created_timestamp_column + data_source_proto.date_partition_column = self.date_partition_column + + return data_source_proto + + def validate(self, config: RepoConfig): + # TODO: validate a FileSource + pass + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + return type_map.pa_to_feast_value_type + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + schema = ParquetFile(self.path).schema_arrow + return zip(schema.names, map(str, schema.types)) + + +class FileOptions: + """ + DataSource File options used to source features from a file + """ + + def __init__( + self, file_format: Optional[FileFormat], file_url: Optional[str], + ): + self._file_format = file_format + self._file_url = file_url + + @property + def file_format(self): + """ + Returns the file format of this file + """ + return self._file_format + + @file_format.setter + def file_format(self, file_format): + """ + Sets the file format of this file + """ + self._file_format = file_format + + @property + def file_url(self): + """ + Returns the file url of this file + """ + return self._file_url + + @file_url.setter + def file_url(self, file_url): + """ + Sets the file url of this file + """ + self._file_url = file_url + + @classmethod + def from_proto(cls, file_options_proto: DataSourceProto.FileOptions): + """ + Creates a FileOptions from a protobuf representation of a file option + + args: + file_options_proto: a protobuf representation of a datasource + + Returns: + Returns a FileOptions object based on the file_options protobuf + """ + file_options = cls( + file_format=FileFormat.from_proto(file_options_proto.file_format), + file_url=file_options_proto.file_url, + ) + return file_options + + def to_proto(self) -> DataSourceProto.FileOptions: + """ + Converts an FileOptionsProto object to its protobuf representation. + + Returns: + FileOptionsProto protobuf + """ + + file_options_proto = DataSourceProto.FileOptions( + file_format=( + None if self.file_format is None else self.file_format.to_proto() + ), + file_url=self.file_url, + ) + + return file_options_proto diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index c97389ceaa..ae085c28b9 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -1,18 +1,22 @@ import uuid from datetime import datetime -from typing import List, Optional, Union +from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union import pandas as pd import pyarrow as pa from pydantic import StrictStr from pydantic.typing import Literal -from feast.data_source import DataSource, RedshiftSource +from feast import type_map +from feast.data_source import DataSource +from feast.errors import DataSourceNotFoundException, RedshiftCredentialsError from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob from feast.infra.utils import aws_utils +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.value_type import ValueType class RedshiftOfflineStoreConfig(FeastConfigBaseModel): @@ -171,3 +175,215 @@ def to_redshift(self, table_name: str) -> None: self._config.offline_store.user, f'CREATE TABLE "{table_name}" AS ({self.query})', ) + + +class RedshiftSource(DataSource): + def __init__( + self, + event_timestamp_column: Optional[str] = "", + table: Optional[str] = None, + created_timestamp_column: Optional[str] = "", + field_mapping: Optional[Dict[str, str]] = None, + date_partition_column: Optional[str] = "", + query: Optional[str] = None, + ): + super().__init__( + event_timestamp_column, + created_timestamp_column, + field_mapping, + date_partition_column, + ) + + self._redshift_options = RedshiftOptions(table=table, query=query) + + @staticmethod + def from_proto(data_source: DataSourceProto): + return RedshiftSource( + field_mapping=dict(data_source.field_mapping), + table=data_source.redshift_options.table, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, + date_partition_column=data_source.date_partition_column, + query=data_source.redshift_options.query, + ) + + def __eq__(self, other): + if not isinstance(other, RedshiftSource): + raise TypeError( + "Comparisons should only involve RedshiftSource class objects." + ) + + return ( + self.redshift_options.table == other.redshift_options.table + and self.redshift_options.query == other.redshift_options.query + and self.event_timestamp_column == other.event_timestamp_column + and self.created_timestamp_column == other.created_timestamp_column + and self.field_mapping == other.field_mapping + ) + + @property + def table(self): + return self._redshift_options.table + + @property + def query(self): + return self._redshift_options.query + + @property + def redshift_options(self): + """ + Returns the Redshift options of this data source + """ + return self._redshift_options + + @redshift_options.setter + def redshift_options(self, _redshift_options): + """ + Sets the Redshift options of this data source + """ + self._redshift_options = _redshift_options + + def to_proto(self) -> DataSourceProto: + data_source_proto = DataSourceProto( + type=DataSourceProto.BATCH_REDSHIFT, + field_mapping=self.field_mapping, + redshift_options=self.redshift_options.to_proto(), + ) + + data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.created_timestamp_column = self.created_timestamp_column + data_source_proto.date_partition_column = self.date_partition_column + + return data_source_proto + + def validate(self, config: RepoConfig): + # As long as the query gets successfully executed, or the table exists, + # the data source is validated. We don't need the results though. + # TODO: uncomment this + # self.get_table_column_names_and_types(config) + print("Validate", self.get_table_column_names_and_types(config)) + + def get_table_query_string(self) -> str: + """Returns a string that can directly be used to reference this table in SQL""" + if self.table: + return f'"{self.table}"' + else: + return f"({self.query})" + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + return type_map.redshift_to_feast_value_type + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + from botocore.exceptions import ClientError + + from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig + from feast.infra.utils import aws_utils + + assert isinstance(config.offline_store, RedshiftOfflineStoreConfig) + + client = aws_utils.get_redshift_data_client(config.offline_store.region) + + if self.table is not None: + try: + table = client.describe_table( + ClusterIdentifier=config.offline_store.cluster_id, + Database=config.offline_store.database, + DbUser=config.offline_store.user, + Table=self.table, + ) + except ClientError as e: + if e.response["Error"]["Code"] == "ValidationException": + raise RedshiftCredentialsError() from e + raise + + # The API returns valid JSON with empty column list when the table doesn't exist + if len(table["ColumnList"]) == 0: + raise DataSourceNotFoundException(self.table) + + columns = table["ColumnList"] + else: + statement_id = aws_utils.execute_redshift_statement( + client, + config.offline_store.cluster_id, + config.offline_store.database, + config.offline_store.user, + f"SELECT * FROM ({self.query}) LIMIT 1", + ) + columns = aws_utils.get_redshift_statement_result(client, statement_id)[ + "ColumnMetadata" + ] + + return [(column["name"], column["typeName"].upper()) for column in columns] + + +class RedshiftOptions: + """ + DataSource Redshift options used to source features from Redshift query + """ + + def __init__(self, table: Optional[str], query: Optional[str]): + self._table = table + self._query = query + + @property + def query(self): + """ + Returns the Redshift SQL query referenced by this source + """ + return self._query + + @query.setter + def query(self, query): + """ + Sets the Redshift SQL query referenced by this source + """ + self._query = query + + @property + def table(self): + """ + Returns the table name of this Redshift table + """ + return self._table + + @table.setter + def table(self, table_name): + """ + Sets the table ref of this Redshift table + """ + self._table = table_name + + @classmethod + def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions): + """ + Creates a RedshiftOptions from a protobuf representation of a Redshift option + + Args: + redshift_options_proto: A protobuf representation of a DataSource + + Returns: + Returns a RedshiftOptions object based on the redshift_options protobuf + """ + + redshift_options = cls( + table=redshift_options_proto.table, query=redshift_options_proto.query, + ) + + return redshift_options + + def to_proto(self) -> DataSourceProto.RedshiftOptions: + """ + Converts an RedshiftOptionsProto object to its protobuf representation. + + Returns: + RedshiftOptionsProto protobuf + """ + + redshift_options_proto = DataSourceProto.RedshiftOptions( + table=self.table, query=self.query, + ) + + return redshift_options_proto diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 28601fbac9..d68972466a 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -177,7 +177,6 @@ def apply_feature_table( self.cached_registry_proto.feature_tables.append(feature_table_proto) if commit: self.commit() - return def apply_feature_view( self, feature_view: FeatureView, project: str, commit: bool = True @@ -203,7 +202,6 @@ def apply_feature_view( existing_feature_view_proto.spec.name == feature_view_proto.spec.name and existing_feature_view_proto.spec.project == project ): - # do not update if feature view has not changed; updating will erase tracked materialization intervals if FeatureView.from_proto(existing_feature_view_proto) == feature_view: return else: @@ -213,7 +211,6 @@ def apply_feature_view( self.cached_registry_proto.feature_views.append(feature_view_proto) if commit: self.commit() - return def apply_materialization( self, diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 5cf17bf729..6c51350b05 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -195,6 +195,11 @@ def __repr__(self) -> str: ) +def get_data_source_class_from_type(data_source_type: str): + module_name, config_class_name = data_source_type.rsplit(".", 1) + return get_class_from_type(module_name, config_class_name, "Source") + + def get_online_config_from_type(online_store_type: str): if online_store_type in ONLINE_STORE_CLASS_FOR_TYPE: online_store_type = ONLINE_STORE_CLASS_FOR_TYPE[online_store_type] diff --git a/sdk/python/feast/staging/entities.py b/sdk/python/feast/staging/entities.py index dbb9095a58..17f0959fce 100644 --- a/sdk/python/feast/staging/entities.py +++ b/sdk/python/feast/staging/entities.py @@ -7,9 +7,9 @@ import pandas as pd +from feast import BigQuerySource, FileSource from feast.config import Config from feast.data_format import ParquetFormat -from feast.data_source import BigQuerySource, FileSource from feast.staging.storage_client import get_staging_client try: diff --git a/sdk/python/feast/templates/aws/example.py b/sdk/python/feast/templates/aws/example.py index a66dbba120..f9f2b3b6eb 100644 --- a/sdk/python/feast/templates/aws/example.py +++ b/sdk/python/feast/templates/aws/example.py @@ -2,8 +2,7 @@ from google.protobuf.duration_pb2 import Duration -from feast import Entity, Feature, FeatureView, ValueType -from feast.data_source import FileSource +from feast import Entity, Feature, FeatureView, FileSource, ValueType # Read data from parquet files. Parquet is convenient for local development mode. For # production, you can use your favorite DWH, such as BigQuery. See Feast documentation diff --git a/sdk/python/feast/templates/local/example.py b/sdk/python/feast/templates/local/example.py index a66dbba120..f9f2b3b6eb 100644 --- a/sdk/python/feast/templates/local/example.py +++ b/sdk/python/feast/templates/local/example.py @@ -2,8 +2,7 @@ from google.protobuf.duration_pb2 import Duration -from feast import Entity, Feature, FeatureView, ValueType -from feast.data_source import FileSource +from feast import Entity, Feature, FeatureView, FileSource, ValueType # Read data from parquet files. Parquet is convenient for local development mode. For # production, you can use your favorite DWH, such as BigQuery. See Feast documentation diff --git a/sdk/python/tests/example_feature_repo_2.py b/sdk/python/tests/example_feature_repo_2.py index fc3634557d..420d71de0a 100644 --- a/sdk/python/tests/example_feature_repo_2.py +++ b/sdk/python/tests/example_feature_repo_2.py @@ -1,7 +1,6 @@ from google.protobuf.duration_pb2 import Duration -from feast import Entity, Feature, FeatureView, ValueType -from feast.data_source import FileSource +from feast import Entity, Feature, FeatureView, FileSource, ValueType driver_hourly_stats = FileSource( path="%PARQUET_PATH%", # placeholder to be replaced by the test diff --git a/sdk/python/tests/example_feature_repo_with_entity_join_key.py b/sdk/python/tests/example_feature_repo_with_entity_join_key.py index 572811dc9b..10be18ca2e 100644 --- a/sdk/python/tests/example_feature_repo_with_entity_join_key.py +++ b/sdk/python/tests/example_feature_repo_with_entity_join_key.py @@ -1,7 +1,6 @@ from google.protobuf.duration_pb2 import Duration -from feast import Entity, Feature, FeatureView, ValueType -from feast.data_source import FileSource +from feast import Entity, Feature, FeatureView, FileSource, ValueType driver_hourly_stats = FileSource( path="%PARQUET_PATH%", # placeholder to be replaced by the test diff --git a/sdk/python/tests/example_feature_repo_with_inference.py b/sdk/python/tests/example_feature_repo_with_inference.py index a427f0cea4..b46519b468 100644 --- a/sdk/python/tests/example_feature_repo_with_inference.py +++ b/sdk/python/tests/example_feature_repo_with_inference.py @@ -1,7 +1,6 @@ from google.protobuf.duration_pb2 import Duration -from feast import Entity, FeatureView -from feast.data_source import FileSource +from feast import Entity, FeatureView, FileSource driver_hourly_stats = FileSource( path="%PARQUET_PATH%", # placeholder to be replaced by the test diff --git a/sdk/python/tests/online_write_benchmark.py b/sdk/python/tests/online_write_benchmark.py index b2ba8c5f67..d8041e51cd 100644 --- a/sdk/python/tests/online_write_benchmark.py +++ b/sdk/python/tests/online_write_benchmark.py @@ -8,7 +8,7 @@ import pyarrow as pa from tqdm import tqdm -from feast.data_source import FileSource +from feast import FileSource from feast.driver_test_data import create_driver_hourly_stats_df from feast.entity import Entity from feast.feature import Feature diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index de92a8deb6..dbed2bac16 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -30,9 +30,10 @@ from pyarrow import parquet as pq from pytest_lazyfixture import lazy_fixture +from feast import FileSource from feast.client import Client from feast.data_format import ParquetFormat, ProtoFormat -from feast.data_source import FileSource, KafkaSource +from feast.data_source import KafkaSource from feast.entity import Entity from feast.feature import Feature from feast.feature_table import FeatureTable diff --git a/sdk/python/tests/test_feature_store.py b/sdk/python/tests/test_feature_store.py index f169c1336a..b842adcc77 100644 --- a/sdk/python/tests/test_feature_store.py +++ b/sdk/python/tests/test_feature_store.py @@ -18,8 +18,8 @@ import pytest from pytest_lazyfixture import lazy_fixture +from feast import FileSource from feast.data_format import ParquetFormat -from feast.data_source import FileSource from feast.entity import Entity from feast.feature import Feature from feast.feature_store import FeatureStore diff --git a/sdk/python/tests/test_feature_table.py b/sdk/python/tests/test_feature_table.py index 5727604578..95e604133b 100644 --- a/sdk/python/tests/test_feature_table.py +++ b/sdk/python/tests/test_feature_table.py @@ -19,9 +19,10 @@ import grpc import pytest +from feast import FileSource from feast.client import Client from feast.data_format import ParquetFormat, ProtoFormat -from feast.data_source import FileSource, KafkaSource +from feast.data_source import KafkaSource from feast.feature import Feature from feast.feature_table import FeatureTable from feast.protos.feast.core import CoreService_pb2_grpc as Core diff --git a/sdk/python/tests/test_historical_retrieval.py b/sdk/python/tests/test_historical_retrieval.py index 9c557bc77b..087b578365 100644 --- a/sdk/python/tests/test_historical_retrieval.py +++ b/sdk/python/tests/test_historical_retrieval.py @@ -14,8 +14,7 @@ from pytz import utc import feast.driver_test_data as driver_data -from feast import RepoConfig, errors, utils -from feast.data_source import BigQuerySource, FileSource +from feast import BigQuerySource, FileSource, RepoConfig, errors, utils from feast.entity import Entity from feast.errors import FeatureNameCollisionError from feast.feature import Feature diff --git a/sdk/python/tests/test_offline_online_store_consistency.py b/sdk/python/tests/test_offline_online_store_consistency.py index 25c3fccf35..41741e0f96 100644 --- a/sdk/python/tests/test_offline_online_store_consistency.py +++ b/sdk/python/tests/test_offline_online_store_consistency.py @@ -13,8 +13,9 @@ from google.cloud import bigquery from pytz import timezone, utc +from feast import BigQuerySource, FileSource, RedshiftSource from feast.data_format import ParquetFormat -from feast.data_source import BigQuerySource, DataSource, FileSource, RedshiftSource +from feast.data_source import DataSource from feast.entity import Entity from feast.feature import Feature from feast.feature_store import FeatureStore diff --git a/sdk/python/tests/test_registry.py b/sdk/python/tests/test_registry.py index 5ed5579b3c..82b0f51038 100644 --- a/sdk/python/tests/test_registry.py +++ b/sdk/python/tests/test_registry.py @@ -18,8 +18,8 @@ import pytest from pytest_lazyfixture import lazy_fixture +from feast import FileSource from feast.data_format import ParquetFormat -from feast.data_source import FileSource from feast.entity import Entity from feast.feature import Feature from feast.feature_view import FeatureView diff --git a/sdk/python/tests/utils/data_source_utils.py b/sdk/python/tests/utils/data_source_utils.py index c848b8ea64..3c25a697b7 100644 --- a/sdk/python/tests/utils/data_source_utils.py +++ b/sdk/python/tests/utils/data_source_utils.py @@ -3,8 +3,8 @@ from google.cloud import bigquery +from feast import BigQuerySource, FileSource from feast.data_format import ParquetFormat -from feast.data_source import BigQuerySource, FileSource @contextlib.contextmanager