Skip to content

Commit

Permalink
Add integration tests for AWS Lambda feature server (#2001)
Browse files Browse the repository at this point in the history
* Add integration tests for AWS Lambda feature server

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

* Fix feature server bugs & integration test

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

* Address comments & ignore incorrect linter errors

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

* Fix float comparison & lambda api call retries

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

* Increase retries and catch other ClientErrors

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

* Fix lambda name shortening (md5 hashing instead of base64 prefixing)

base64-ing project name and then taking the prefix was not working when
the prefix of the project name was identical between tests (only their
endings were different for some tests). This caused lambda names between
test cases to be the same, causing conflicts between updates. Now, we
instead use md5 hash which depends on the entire body of the input,
eliminating this issue.

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>
  • Loading branch information
Tsotne Tabidze authored Nov 12, 2021
1 parent d607a20 commit 2435777
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 83 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pr_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: feast-python-server
# Note: the image tags should be in sync with sdk/python/feast/infra/aws.py:_get_docker_image_version
run: |
docker build \
--file sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile \
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Maximum interval(secs) to wait between retries for retry function
MAX_WAIT_INTERVAL: str = "60"

AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server:aws"
AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server"
AWS_LAMBDA_FEATURE_SERVER_REPOSITORY = "feast-python-server"

# feature_store.yaml environment variable name for remote feature server
Expand Down
10 changes: 8 additions & 2 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import feast
from feast import proto_json
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
from feast.type_map import feast_value_type_to_python_type


def get_app(store: "feast.FeatureStore"):
Expand Down Expand Up @@ -36,7 +37,10 @@ async def get_online_features(request: Request):
raise HTTPException(status_code=500, detail="Uneven number of columns")

entity_rows = [
{k: v.val[idx] for k, v in request_proto.entities.items()}
{
k: feast_value_type_to_python_type(v.val[idx])
for k, v in request_proto.entities.items()
}
for idx in range(num_entities)
]

Expand All @@ -45,7 +49,9 @@ async def get_online_features(request: Request):
).proto

# Convert the Protobuf object to JSON and return it
return MessageToDict(response_proto, preserving_proto_field_name=True)
return MessageToDict( # type: ignore
response_proto, preserving_proto_field_name=True, float_precision=18
)
except Exception as e:
# Print the original exception on the server side
logger.exception(e)
Expand Down
83 changes: 58 additions & 25 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import base64
import hashlib
import logging
import os
import subprocess
import uuid
from datetime import datetime
from pathlib import Path
Expand All @@ -10,6 +12,7 @@

from colorama import Fore, Style

from feast import flags_helper
from feast.constants import (
AWS_LAMBDA_FEATURE_SERVER_IMAGE,
AWS_LAMBDA_FEATURE_SERVER_REPOSITORY,
Expand Down Expand Up @@ -88,18 +91,18 @@ def update_infra(
)

ecr_client = boto3.client("ecr")
docker_image_version = _get_docker_image_version()
repository_uri = self._create_or_get_repository_uri(ecr_client)
version = _get_version_for_aws()
# Only download & upload the docker image if it doesn't already exist in ECR
if not ecr_client.batch_get_image(
repositoryName=AWS_LAMBDA_FEATURE_SERVER_REPOSITORY,
imageIds=[{"imageTag": version}],
imageIds=[{"imageTag": docker_image_version}],
).get("images"):
image_uri = self._upload_docker_image(
ecr_client, repository_uri, version
ecr_client, repository_uri, docker_image_version
)
else:
image_uri = f"{repository_uri}:{version}"
image_uri = f"{repository_uri}:{docker_image_version}"

self._deploy_feature_server(project, image_uri)

Expand Down Expand Up @@ -154,11 +157,10 @@ def _deploy_feature_server(self, project: str, image_uri: str):
# feature views, feature services, and other definitions does not update lambda).
_logger.info(" Updating AWS Lambda...")

lambda_client.update_function_configuration(
FunctionName=resource_name,
Environment={
"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}
},
aws_utils.update_lambda_function_environment(
lambda_client,
resource_name,
{"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}},
)

api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
Expand Down Expand Up @@ -235,7 +237,7 @@ def get_feature_server_endpoint(self) -> Optional[str]:
return f"https://{api_id}.execute-api.{region}.amazonaws.com"

def _upload_docker_image(
self, ecr_client, repository_uri: str, version: str
self, ecr_client, repository_uri: str, docker_image_version: str
) -> str:
"""
Pulls the AWS Lambda docker image from Dockerhub and uploads it to AWS ECR.
Expand All @@ -258,12 +260,11 @@ def _upload_docker_image(

raise DockerDaemonNotRunning()

dockerhub_image = f"{AWS_LAMBDA_FEATURE_SERVER_IMAGE}:{docker_image_version}"
_logger.info(
f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_IMAGE}{Style.RESET_ALL}"
f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{dockerhub_image}{Style.RESET_ALL}"
)
for line in docker_client.api.pull(
AWS_LAMBDA_FEATURE_SERVER_IMAGE, stream=True, decode=True
):
for line in docker_client.api.pull(dockerhub_image, stream=True, decode=True):
_logger.debug(f" {line}")

auth_token = ecr_client.get_authorization_token()["authorizationData"][0][
Expand All @@ -280,14 +281,14 @@ def _upload_docker_image(
)
_logger.debug(f" {login_status}")

image = docker_client.images.get(AWS_LAMBDA_FEATURE_SERVER_IMAGE)
image_remote_name = f"{repository_uri}:{version}"
image = docker_client.images.get(dockerhub_image)
image_remote_name = f"{repository_uri}:{docker_image_version}"
_logger.info(
f"Pushing local image to remote {Style.BRIGHT + Fore.GREEN}{image_remote_name}{Style.RESET_ALL}"
)
image.tag(image_remote_name)
for line in docker_client.api.push(
repository_uri, tag=version, stream=True, decode=True
repository_uri, tag=docker_image_version, stream=True, decode=True
):
_logger.debug(f" {line}")

Expand All @@ -310,21 +311,53 @@ def _create_or_get_repository_uri(self, ecr_client):

def _get_lambda_name(project: str):
lambda_prefix = AWS_LAMBDA_FEATURE_SERVER_REPOSITORY
lambda_suffix = f"{project}-{_get_version_for_aws()}"
lambda_suffix = f"{project}-{_get_docker_image_version()}"
# AWS Lambda name can't have the length greater than 64 bytes.
# This usually occurs during integration tests or when feast is
# installed in editable mode (pip install -e), where feast version is long
# This usually occurs during integration tests where feast version is long
if len(lambda_prefix) + len(lambda_suffix) >= 63:
lambda_suffix = base64.b64encode(lambda_suffix.encode()).decode()[:40]
lambda_suffix = hashlib.md5(lambda_suffix.encode()).hexdigest()
return f"{lambda_prefix}-{lambda_suffix}"


def _get_version_for_aws():
"""Returns Feast version with certain characters replaced.
def _get_docker_image_version() -> str:
"""Returns a version for the feature server Docker image.
For public Feast releases this equals to the Feast SDK version modified by replacing "." with "_".
For example, Feast SDK version "0.14.1" would correspond to Docker image version "0_14_1".
For integration tests this equals to the git commit hash of HEAD. This is necessary,
because integration tests need to use images built from the same commit hash.
During development (when Feast is installed in editable mode) this equals to the Feast SDK version
modified by removing the "dev..." suffix and replacing "." with "_". For example, Feast SDK version
"0.14.1.dev41+g1cbfa225.d20211103" would correspond to Docker image version "0_14_1". This way,
Feast SDK will use an already existing Docker image built during the previous public release.
This allows the version to be included in names for AWS resources.
"""
return get_version().replace(".", "_").replace("+", "_")
if flags_helper.is_test():
# Note: this should be in sync with /~https://github.com/feast-dev/feast/blob/6fbe01b6e9a444dc77ec3328a54376f4d9387664/.github/workflows/pr_integration_tests.yml#L41
return (
subprocess.check_output(
["git", "rev-parse", "HEAD"], cwd=Path(__file__).resolve().parent
)
.decode()
.strip()
)
else:
version = get_version()
if "dev" in version:
version = version[: version.find("dev") - 1].replace(".", "_")
_logger.warning(
"You are trying to use AWS Lambda feature server while Feast is in a development mode. "
f"Feast will use a docker image version {version} derived from Feast SDK "
f"version {get_version()}. If you want to update the Feast SDK version, make "
"sure to first fetch all new release tags from Github and then reinstall the library:\n"
"> git fetch --all --tags\n"
"> pip install -e sdk/python"
)
else:
version = version.replace(".", "_")
return version


class S3RegistryStore(RegistryStore):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ COPY protos protos
COPY README.md README.md

# Install Feast for AWS with Lambda dependencies
RUN pip3 install -e 'sdk/python[aws,redis]'
RUN pip3 install -e 'sdk/python[aws]'
RUN pip3 install -r sdk/python/feast/infra/feature_servers/aws_lambda/requirements.txt --target "${LAMBDA_TASK_ROOT}"

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
Expand Down
27 changes: 26 additions & 1 deletion sdk/python/feast/infra/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import tempfile
import uuid
from typing import Dict, Iterator, Optional, Tuple
from typing import Any, Dict, Iterator, Optional, Tuple

import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -60,6 +60,7 @@ def get_bucket_and_key(s3_path: str) -> Tuple[str, str]:
wait=wait_exponential(multiplier=1, max=4),
retry=retry_if_exception_type(ConnectionClosedError),
stop=stop_after_attempt(5),
reraise=True,
)
def execute_redshift_statement_async(
redshift_data_client, cluster_id: str, database: str, user: str, query: str
Expand Down Expand Up @@ -96,6 +97,7 @@ class RedshiftStatementNotFinishedError(Exception):
wait=wait_exponential(multiplier=1, max=30),
retry=retry_if_exception_type(RedshiftStatementNotFinishedError),
stop=stop_after_delay(300), # 300 seconds
reraise=True,
)
def wait_for_redshift_statement(redshift_data_client, statement: dict) -> None:
"""Waits for the Redshift statement to finish. Raises RedshiftQueryError if the statement didn't succeed.
Expand Down Expand Up @@ -426,6 +428,29 @@ def delete_lambda_function(lambda_client, function_name: str) -> Dict:
return lambda_client.delete_function(FunctionName=function_name)


@retry(
wait=wait_exponential(multiplier=1, max=4),
retry=retry_if_exception_type(ClientError),
stop=stop_after_attempt(5),
reraise=True,
)
def update_lambda_function_environment(
lambda_client, function_name: str, environment: Dict[str, Any]
) -> None:
"""
Update AWS Lambda function environment. The function is retried multiple times in case another action is
currently being run on the lambda (e.g. it's being created or being updated in parallel).
Args:
lambda_client: AWS Lambda client.
function_name: Name of the AWS Lambda function.
environment: The desired lambda environment.
"""
lambda_client.update_function_configuration(
FunctionName=function_name, Environment=environment
)


def get_first_api_gateway(api_gateway_client, api_gateway_name: str) -> Optional[Dict]:
"""
Get the first API Gateway with the given name. Note, that API Gateways can have the same name.
Expand Down
20 changes: 10 additions & 10 deletions sdk/python/feast/online_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ def __init__(self, online_response_proto: GetOnlineFeaturesResponse):
online_response_proto: GetOnlineResponse proto object to construct from.
"""
self.proto = online_response_proto
# Delete DUMMY_ENTITY_ID from proto if it exists
for item in self.proto.field_values:
if DUMMY_ENTITY_ID in item.statuses:
del item.statuses[DUMMY_ENTITY_ID]
if DUMMY_ENTITY_ID in item.fields:
del item.fields[DUMMY_ENTITY_ID]

@property
def field_values(self):
Expand All @@ -57,13 +63,9 @@ def to_dict(self) -> Dict[str, Any]:
"""
Converts GetOnlineFeaturesResponse features into a dictionary form.
"""
fields = [
k
for row in self.field_values
for k, _ in row.statuses.items()
if k != DUMMY_ENTITY_ID
]
features_dict: Dict[str, List[Any]] = {k: list() for k in fields}
features_dict: Dict[str, List[Any]] = {
k: list() for row in self.field_values for k, _ in row.statuses.items()
}

for row in self.field_values:
for feature in features_dict.keys():
Expand All @@ -77,9 +79,7 @@ def to_df(self) -> pd.DataFrame:
Converts GetOnlineFeaturesResponse features into Panda dataframe form.
"""

return pd.DataFrame(self.to_dict()).drop(
DUMMY_ENTITY_ID, axis=1, errors="ignore"
)
return pd.DataFrame(self.to_dict())


def _infer_online_entity_rows(
Expand Down
20 changes: 11 additions & 9 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
"gcp_cloudrun": "feast.infra.feature_servers.gcp_cloudrun.config.GcpCloudRunFeatureServerConfig",
}

FEATURE_SERVER_TYPE_FOR_PROVIDER = {
"aws": "aws_lambda",
"gcp": "gcp_cloudrun",
}


class FeastBaseModel(BaseModel):
""" Feast Pydantic Configuration Class """
Expand Down Expand Up @@ -226,15 +231,12 @@ def _validate_feature_server_config(cls, values):
if "provider" not in values:
raise FeastProviderNotSetError()

# Make sure that the type is not set, since we will set it based on the provider.
if "type" in values["feature_server"]:
raise FeastFeatureServerTypeSetError(values["feature_server"]["type"])

# Set the default type. We only support AWS Lambda for now.
if values["provider"] == "aws":
values["feature_server"]["type"] = "aws_lambda"

feature_server_type = values["feature_server"]["type"]
feature_server_type = FEATURE_SERVER_TYPE_FOR_PROVIDER.get(values["provider"])
defined_type = values["feature_server"].get("type")
# Make sure that the type is either not set, or set correctly, since it's defined by the provider
if defined_type not in (None, feature_server_type):
raise FeastFeatureServerTypeSetError(defined_type)
values["feature_server"]["type"] = feature_server_type

# Validate the dict to ensure one of the union types match
try:
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any:
Returns:
Python native type representation/version of the given field_value_proto
"""
field_value_dict = MessageToDict(field_value_proto)
field_value_dict = MessageToDict(field_value_proto, float_precision=18) # type: ignore

# This can happen when proto_json.patch() has been called before this call, which is true for a feature server
if not isinstance(field_value_dict, dict):
return field_value_dict

for k, v in field_value_dict.items():
if "List" in k:
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"tqdm==4.*",
"fastapi>=0.68.0",
"uvicorn[standard]>=0.14.0",
"proto-plus<1.19.7",
]

GCP_REQUIRED = [
Expand Down Expand Up @@ -113,7 +114,7 @@
"firebase-admin==4.5.2",
"pre-commit",
"assertpy==1.1",
"pip-tools"
"pip-tools",
] + GCP_REQUIRED + REDIS_REQUIRED + AWS_REQUIRED

DEV_REQUIRED = ["mypy-protobuf==1.*", "grpcio-testing==1.*"] + CI_REQUIRED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class IntegrationTestRepoConfig:

full_feature_names: bool = True
infer_features: bool = False
python_feature_server: bool = False

def __repr__(self) -> str:
return "-".join(
Expand Down
Loading

0 comments on commit 2435777

Please sign in to comment.