Skip to content

Commit

Permalink
feat: CLI interface for validation of logged features (#2718)
Browse files Browse the repository at this point in the history
* store validation reference in registry

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* CLI test

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* clean function before pickle

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* ignore "too complex" lint rule

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* lazy import & correct feature status in logs

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* pygments dependency

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* ttl for regular feature views

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* some apidocs

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* address comments

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex authored May 20, 2022
1 parent 8ec0790 commit c8b11b3
Show file tree
Hide file tree
Showing 30 changed files with 505 additions and 45 deletions.
12 changes: 8 additions & 4 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,12 @@ func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int,
go func() {
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
<-s.grpcStopCh
fmt.Println("Stopping the gRPC server...")
log.Println("Stopping the gRPC server...")
grpcServer.GracefulStop()
if loggingService != nil {
loggingService.Stop()
}
fmt.Println("gRPC server terminated")
log.Println("gRPC server terminated")
}()

err = grpcServer.Serve(lis)
Expand Down Expand Up @@ -314,11 +314,15 @@ func (s *OnlineFeatureService) StartHttpServerWithLogging(host string, port int,
go func() {
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
<-s.httpStopCh
fmt.Println("Stopping the HTTP server...")
log.Println("Stopping the HTTP server...")
err := ser.Stop()
if err != nil {
fmt.Printf("Error when stopping the HTTP server: %v\n", err)
log.Printf("Error when stopping the HTTP server: %v\n", err)
}
if loggingService != nil {
loggingService.Stop()
}
log.Println("HTTP server terminated")
}()

return ser.Serve(host, port)
Expand Down
1 change: 0 additions & 1 deletion protos/feast/core/FeatureService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ message FeatureServiceMeta {

message LoggingConfig {
float sample_rate = 1;
google.protobuf.Duration partition_interval = 2;

oneof destination {
FileDestination file_destination = 3;
Expand Down
4 changes: 3 additions & 1 deletion protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import "feast/core/OnDemandFeatureView.proto";
import "feast/core/RequestFeatureView.proto";
import "feast/core/DataSource.proto";
import "feast/core/SavedDataset.proto";
import "feast/core/ValidationProfile.proto";
import "google/protobuf/timestamp.proto";

// Next id: 13
// Next id: 14
message Registry {
repeated Entity entities = 1;
repeated FeatureTable feature_tables = 2;
Expand All @@ -42,6 +43,7 @@ message Registry {
repeated RequestFeatureView request_feature_views = 9;
repeated FeatureService feature_services = 7;
repeated SavedDataset saved_datasets = 11;
repeated ValidationReference validation_references = 13;
Infra infra = 10;

string registry_schema_version = 3; // to support migrations; incremented when schema is changed
Expand Down
21 changes: 18 additions & 3 deletions protos/feast/core/ValidationProfile.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,24 @@ message GEValidationProfile {
}

message ValidationReference {
SavedDataset dataset = 1;

// Unique name of validation reference within the project
string name = 1;
// Name of saved dataset used as reference dataset
string reference_dataset_name = 2;
// Name of Feast project that this object source belongs to
string project = 3;
// Description of the validation reference
string description = 4;
// User defined metadata
map<string,string> tags = 5;

// validation profiler
oneof profiler {
GEValidationProfiler ge_profiler = 2;
GEValidationProfiler ge_profiler = 6;
}

// (optional) cached validation profile (to avoid constant recalculation)
oneof cached_profile {
GEValidationProfile ge_profile = 7;
}
}
59 changes: 58 additions & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import warnings
from datetime import datetime
Expand All @@ -23,6 +23,7 @@
import yaml
from colorama import Fore, Style
from dateutil import parser
from pygments import formatters, highlight, lexers

from feast import flags, flags_helper, utils
from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT
Expand Down Expand Up @@ -758,5 +759,61 @@ def disable_alpha_features(ctx: click.Context):
store.config.write_to_path(Path(repo_path))


@cli.command("validate")
@click.option(
"--feature-service", "-f", help="Specify a feature service name",
)
@click.option(
"--reference", "-r", help="Specify a validation reference name",
)
@click.option(
"--no-profile-cache", is_flag=True, help="Do not store cached profile in registry",
)
@click.argument("start_ts")
@click.argument("end_ts")
@click.pass_context
def validate(
ctx: click.Context,
feature_service: str,
reference: str,
start_ts: str,
end_ts: str,
no_profile_cache,
):
"""
Perform validation of logged features (produced by a given feature service) against provided reference.
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))

feature_service = store.get_feature_service(name=feature_service)
reference = store.get_validation_reference(reference)

result = store.validate_logged_features(
source=feature_service,
reference=reference,
start=datetime.fromisoformat(start_ts),
end=datetime.fromisoformat(end_ts),
throw_exception=False,
cache_profile=not no_profile_cache,
)

if not result:
print(f"{Style.BRIGHT + Fore.GREEN}Validation successful!{Style.RESET_ALL}")
return

errors = [e.to_dict() for e in result.report.errors]
formatted_json = json.dumps(errors, indent=4)
colorful_json = highlight(
formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter()
)
print(f"{Style.BRIGHT + Fore.RED}Validation failed!{Style.RESET_ALL}")
print(colorful_json)
exit(1)


if __name__ == "__main__":
cli()
10 changes: 7 additions & 3 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from feast.protos.feast.core.RequestFeatureView_pb2 import (
RequestFeatureView as RequestFeatureViewProto,
)
from feast.protos.feast.core.ValidationProfile_pb2 import (
ValidationReference as ValidationReferenceProto,
)
from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry
from feast.repo_contents import RepoContents

Expand Down Expand Up @@ -103,6 +106,7 @@ def tag_objects_for_keep_delete_update_add(
FeatureServiceProto,
OnDemandFeatureViewProto,
RequestFeatureViewProto,
ValidationReferenceProto,
)


Expand All @@ -120,9 +124,9 @@ def diff_registry_objects(

current_spec: FeastObjectSpecProto
new_spec: FeastObjectSpecProto
if isinstance(current_proto, DataSourceProto) or isinstance(
new_proto, DataSourceProto
):
if isinstance(
current_proto, (DataSourceProto, ValidationReferenceProto)
) or isinstance(new_proto, (DataSourceProto, ValidationReferenceProto)):
assert type(current_proto) == type(new_proto)
current_spec = cast(DataSourceProto, current_proto)
new_spec = cast(DataSourceProto, new_proto)
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/dqm/profilers/ge_profiler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from types import FunctionType
from typing import Any, Callable, Dict, List

import dill
Expand Down Expand Up @@ -140,9 +141,12 @@ def analyze_dataset(self, df: pd.DataFrame) -> Profile:
return GEProfile(expectation_suite=self.user_defined_profiler(dataset))

def to_proto(self):
# keep only the code and drop context for now
# ToDo (pyalex): include some context, but not all (dill tries to pull too much)
udp = FunctionType(self.user_defined_profiler.__code__, {})
return GEValidationProfilerProto(
profiler=GEValidationProfilerProto.UserDefinedProfiler(
body=dill.dumps(self.user_defined_profiler, recurse=True)
body=dill.dumps(udp, recurse=False)
)
)

Expand Down
13 changes: 13 additions & 0 deletions sdk/python/feast/dqm/profilers/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class ValidationError:

missing_count: Optional[int]
missing_percent: Optional[float]
observed_value: Optional[float]

def __init__(
self,
Expand All @@ -77,12 +78,24 @@ def __init__(
check_config: Optional[Any] = None,
missing_count: Optional[int] = None,
missing_percent: Optional[float] = None,
observed_value: Optional[float] = None,
):
self.check_name = check_name
self.column_name = column_name
self.check_config = check_config
self.missing_count = missing_count
self.missing_percent = missing_percent
self.observed_value = observed_value

def __repr__(self):
return f"<ValidationError {self.check_name}:{self.column_name}>"

def to_dict(self):
return dict(
check_name=self.check_name,
column_name=self.column_name,
check_config=self.check_config,
missing_count=self.missing_count,
missing_percent=self.missing_percent,
observed_value=self.observed_value,
)
7 changes: 7 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ def __init__(self, name: str, project: str):
super().__init__(f"Saved dataset {name} does not exist in project {project}")


class ValidationReferenceNotFound(FeastObjectNotFoundException):
def __init__(self, name: str, project: str):
super().__init__(
f"Validation reference {name} does not exist in project {project}"
)


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""

Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/feast_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from .protos.feast.core.FeatureView_pb2 import FeatureViewSpec
from .protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec
from .protos.feast.core.RequestFeatureView_pb2 import RequestFeatureViewSpec
from .protos.feast.core.ValidationProfile_pb2 import (
ValidationReference as ValidationReferenceProto,
)
from .request_feature_view import RequestFeatureView
from .saved_dataset import ValidationReference

# Convenience type representing all Feast objects
FeastObject = Union[
Expand All @@ -21,6 +25,7 @@
Entity,
FeatureService,
DataSource,
ValidationReference,
]

FeastObjectSpecProto = Union[
Expand All @@ -30,4 +35,5 @@
EntitySpecV2,
FeatureServiceSpec,
DataSourceProto,
ValidationReferenceProto,
]
Loading

0 comments on commit c8b11b3

Please sign in to comment.