From fd46a421301db03b6fc56f2fb72ba024ae5333ca Mon Sep 17 00:00:00 2001
From: tstadel <60758086+tstadel@users.noreply.github.com>
Date: Thu, 10 Mar 2022 09:49:28 +0100
Subject: [PATCH] Allow to deploy and undeploy Pipelines on Deepset Cloud
(#2285)
* add deploy_on_deepset_cloud and undeploy_on_deepset_cloud
* increase polling interval to 5 seconds
* Update Documentation & Code Style
* improve logging
* move transitioning logic to PipelineClient
* use enum for Pipeline states
* improve docstrings
* Update Documentation & Code Style
* tests added
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
---
docs/_src/api/api/pipelines.md | 58 +++++
haystack/pipelines/base.py | 60 +++++
haystack/utils/deepsetcloud.py | 217 +++++++++++++++++-
test/test_pipeline.py | 405 +++++++++++++++++++++++++++++++++
4 files changed, 737 insertions(+), 3 deletions(-)
diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md
index 0c1c0eab2b..4675889aeb 100644
--- a/docs/_src/api/api/pipelines.md
+++ b/docs/_src/api/api/pipelines.md
@@ -281,6 +281,64 @@ If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
- `overwrite`: Whether to overwrite the config if it already exists. Otherwise an error is being raised.
+
+
+#### deploy\_on\_deepset\_cloud
+
+```python
+@classmethod
+def deploy_on_deepset_cloud(cls, pipeline_config_name: str, workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, timeout: int = 60)
+```
+
+Deploys the pipelines of a pipeline config on Deepset Cloud.
+
+Blocks until pipelines are successfully deployed, deployment failed or timeout exceeds.
+If pipelines are already deployed no action will be taken and an info will be logged.
+If timeout exceeds a TimeoutError will be raised.
+If deployment fails a DeepsetCloudError will be raised.
+
+Pipeline config must be present on Deepset Cloud. See save_to_deepset_cloud() for more information.
+
+**Arguments**:
+
+- `pipeline_config_name`: name of the config file inside the Deepset Cloud workspace.
+- `workspace`: workspace in Deepset Cloud
+- `api_key`: Secret value of the API key.
+If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
+- `api_endpoint`: The URL of the Deepset Cloud API.
+If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
+- `timeout`: The time in seconds to wait until deployment completes.
+If the timeout is exceeded an error will be raised.
+
+
+
+#### undeploy\_on\_deepset\_cloud
+
+```python
+@classmethod
+def undeploy_on_deepset_cloud(cls, pipeline_config_name: str, workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, timeout: int = 60)
+```
+
+Undeploys the pipelines of a pipeline config on Deepset Cloud.
+
+Blocks until pipelines are successfully undeployed, undeployment failed or timeout exceeds.
+If pipelines are already undeployed no action will be taken and an info will be logged.
+If timeout exceeds a TimeoutError will be raised.
+If deployment fails a DeepsetCloudError will be raised.
+
+Pipeline config must be present on Deepset Cloud. See save_to_deepset_cloud() for more information.
+
+**Arguments**:
+
+- `pipeline_config_name`: name of the config file inside the Deepset Cloud workspace.
+- `workspace`: workspace in Deepset Cloud
+- `api_key`: Secret value of the API key.
+If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
+- `api_endpoint`: The URL of the Deepset Cloud API.
+If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
+- `timeout`: The time in seconds to wait until undeployment completes.
+If the timeout is exceeded an error will be raised.
+
## Pipeline
diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py
index 92d3072fab..af1799cb3c 100644
--- a/haystack/pipelines/base.py
+++ b/haystack/pipelines/base.py
@@ -404,6 +404,66 @@ def save_to_deepset_cloud(
client.save_pipeline_config(config=config, pipeline_config_name=pipeline_config_name)
logger.info(f"Pipeline config '{pipeline_config_name}' successfully created.")
+ @classmethod
+ def deploy_on_deepset_cloud(
+ cls,
+ pipeline_config_name: str,
+ workspace: str = "default",
+ api_key: Optional[str] = None,
+ api_endpoint: Optional[str] = None,
+ timeout: int = 60,
+ ):
+ """
+ Deploys the pipelines of a pipeline config on Deepset Cloud.
+ Blocks until pipelines are successfully deployed, deployment failed or timeout exceeds.
+ If pipelines are already deployed no action will be taken and an info will be logged.
+ If timeout exceeds a TimeoutError will be raised.
+ If deployment fails a DeepsetCloudError will be raised.
+
+ Pipeline config must be present on Deepset Cloud. See save_to_deepset_cloud() for more information.
+
+ :param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
+ :param workspace: workspace in Deepset Cloud
+ :param api_key: Secret value of the API key.
+ If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
+ :param api_endpoint: The URL of the Deepset Cloud API.
+ If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
+ :param timeout: The time in seconds to wait until deployment completes.
+ If the timeout is exceeded an error will be raised.
+ """
+ client = DeepsetCloud.get_pipeline_client(api_key=api_key, api_endpoint=api_endpoint, workspace=workspace)
+ client.deploy(pipeline_config_name=pipeline_config_name, timeout=timeout)
+
+ @classmethod
+ def undeploy_on_deepset_cloud(
+ cls,
+ pipeline_config_name: str,
+ workspace: str = "default",
+ api_key: Optional[str] = None,
+ api_endpoint: Optional[str] = None,
+ timeout: int = 60,
+ ):
+ """
+ Undeploys the pipelines of a pipeline config on Deepset Cloud.
+ Blocks until pipelines are successfully undeployed, undeployment failed or timeout exceeds.
+ If pipelines are already undeployed no action will be taken and an info will be logged.
+ If timeout exceeds a TimeoutError will be raised.
+ If deployment fails a DeepsetCloudError will be raised.
+
+ Pipeline config must be present on Deepset Cloud. See save_to_deepset_cloud() for more information.
+
+ :param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
+ :param workspace: workspace in Deepset Cloud
+ :param api_key: Secret value of the API key.
+ If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
+ :param api_endpoint: The URL of the Deepset Cloud API.
+ If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
+ :param timeout: The time in seconds to wait until undeployment completes.
+ If the timeout is exceeded an error will be raised.
+ """
+ client = DeepsetCloud.get_pipeline_client(api_key=api_key, api_endpoint=api_endpoint, workspace=workspace)
+ client.undeploy(pipeline_config_name=pipeline_config_name, timeout=timeout)
+
class Pipeline(BasePipeline):
"""
diff --git a/haystack/utils/deepsetcloud.py b/haystack/utils/deepsetcloud.py
index 4be170fa57..b3361caf83 100644
--- a/haystack/utils/deepsetcloud.py
+++ b/haystack/utils/deepsetcloud.py
@@ -1,6 +1,9 @@
+from __future__ import annotations
+from enum import Enum
import logging
import os
-from typing import Any, Dict, Generator, List, Optional, Union
+import time
+from typing import Any, Dict, Generator, List, Optional, Tuple, Union
try:
from typing import Literal
@@ -12,6 +15,41 @@
DEFAULT_API_ENDPOINT = f"DC_API_PLACEHOLDER/v1" # TODO
+
+class PipelineStatus(Enum):
+ UNDEPLOYED: str = "UNDEPLOYED"
+ DEPLOYED_UNHEALTHY: str = "DEPLOYED_UNHEALTHY"
+ DEPLOYED: str = "DEPLOYED"
+ DEPLOYMENT_IN_PROGRESS: str = "DEPLOYMENT_IN_PROGRESS"
+ UNDEPLOYMENT_IN_PROGRESS: str = "UNDEPLOYMENT_IN_PROGRESS"
+ DEPLOYMENT_SCHEDULED: str = "DEPLOYMENT_SCHEDULED"
+ UNDEPLOYMENT_SCHEDULED: str = "UNDEPLOYMENT_SCHEDULED"
+ UKNOWN: str = "UNKNOWN"
+
+ @classmethod
+ def from_str(cls, status_string: str) -> PipelineStatus:
+ return cls.__dict__.get(status_string, PipelineStatus.UKNOWN)
+
+
+SATISFIED_STATES_KEY = "satisfied_states"
+VALID_INITIAL_STATES_KEY = "valid_initial_states"
+VALID_TRANSITIONING_STATES_KEY = "valid_transitioning_states"
+PIPELINE_STATE_TRANSITION_INFOS: Dict[PipelineStatus, Dict[str, List[PipelineStatus]]] = {
+ PipelineStatus.UNDEPLOYED: {
+ SATISFIED_STATES_KEY: [PipelineStatus.UNDEPLOYED],
+ VALID_INITIAL_STATES_KEY: [PipelineStatus.DEPLOYED, PipelineStatus.DEPLOYED_UNHEALTHY],
+ VALID_TRANSITIONING_STATES_KEY: [
+ PipelineStatus.UNDEPLOYMENT_SCHEDULED,
+ PipelineStatus.UNDEPLOYMENT_IN_PROGRESS,
+ ],
+ },
+ PipelineStatus.DEPLOYED: {
+ SATISFIED_STATES_KEY: [PipelineStatus.DEPLOYED, PipelineStatus.DEPLOYED_UNHEALTHY],
+ VALID_INITIAL_STATES_KEY: [PipelineStatus.UNDEPLOYED],
+ VALID_TRANSITIONING_STATES_KEY: [PipelineStatus.DEPLOYMENT_SCHEDULED, PipelineStatus.DEPLOYMENT_IN_PROGRESS],
+ },
+}
+
logger = logging.getLogger(__name__)
@@ -393,7 +431,11 @@ def list_pipeline_configs(self, workspace: Optional[str] = None, headers: dict =
return generator
def save_pipeline_config(
- self, config: dict, pipeline_config_name: str, workspace: Optional[str] = None, headers: dict = None
+ self,
+ config: dict,
+ pipeline_config_name: Optional[str] = None,
+ workspace: Optional[str] = None,
+ headers: dict = None,
):
config["name"] = pipeline_config_name
workspace_url = self._build_workspace_url(workspace=workspace)
@@ -403,7 +445,11 @@ def save_pipeline_config(
logger.warning(f"Unexpected response from saving pipeline config: {response}")
def update_pipeline_config(
- self, config: dict, pipeline_config_name: str, workspace: Optional[str] = None, headers: dict = None
+ self,
+ config: dict,
+ pipeline_config_name: Optional[str] = None,
+ workspace: Optional[str] = None,
+ headers: dict = None,
):
config["name"] = pipeline_config_name
pipeline_url = self._build_pipeline_url(workspace=workspace, pipeline_config_name=pipeline_config_name)
@@ -412,6 +458,171 @@ def update_pipeline_config(
if "name" not in response or response["name"] != pipeline_config_name:
logger.warning(f"Unexpected response from updating pipeline config: {response}")
+ def deploy(
+ self, pipeline_config_name: Optional[str] = None, workspace: str = None, headers: dict = None, timeout: int = 60
+ ):
+ """
+ Deploys the pipelines of a pipeline config on Deepset Cloud.
+ Blocks until pipelines are successfully deployed, deployment failed or timeout exceeds.
+ If pipelines are already deployed no action will be taken and an info will be logged.
+ If timeout exceeds a TimeoutError will be raised.
+ If deployment fails a DeepsetCloudError will be raised.
+
+ :param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
+ :param workspace: workspace in Deepset Cloud
+ :param headers: Headers to pass to API call
+ :param timeout: The time in seconds to wait until deployment completes.
+ If the timeout is exceeded an error will be raised.
+ """
+ status, changed = self._transition_pipeline_state(
+ target_state=PipelineStatus.DEPLOYED,
+ timeout=timeout,
+ pipeline_config_name=pipeline_config_name,
+ workspace=workspace,
+ headers=headers,
+ )
+
+ if status == PipelineStatus.DEPLOYED:
+ if changed:
+ logger.info(f"Pipeline config '{pipeline_config_name}' successfully deployed.")
+ else:
+ logger.info(f"Pipeline config '{pipeline_config_name}' is already deployed.")
+ elif status == PipelineStatus.DEPLOYED_UNHEALTHY:
+ logger.warning(
+ f"Deployment of pipeline config '{pipeline_config_name}' succeeded. But '{pipeline_config_name}' is unhealthy."
+ )
+ elif status in [PipelineStatus.UNDEPLOYMENT_IN_PROGRESS, PipelineStatus.UNDEPLOYMENT_SCHEDULED]:
+ raise DeepsetCloudError(
+ f"Deployment of pipline config '{pipeline_config_name}' aborted. Undeployment was requested."
+ )
+ elif status == PipelineStatus.UNDEPLOYED:
+ raise DeepsetCloudError(f"Deployment of pipeline config '{pipeline_config_name}' failed.")
+ else:
+ raise DeepsetCloudError(
+ f"Deployment of pipeline config '{pipeline_config_name} ended in unexpected status: {status.value}"
+ )
+
+ def undeploy(
+ self, pipeline_config_name: Optional[str] = None, workspace: str = None, headers: dict = None, timeout: int = 60
+ ):
+ """
+ Undeploys the pipelines of a pipeline config on Deepset Cloud.
+ Blocks until pipelines are successfully undeployed, undeployment failed or timeout exceeds.
+ If pipelines are already undeployed no action will be taken and an info will be logged.
+ If timeout exceeds a TimeoutError will be raised.
+ If deployment fails a DeepsetCloudError will be raised.
+
+ :param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
+ :param workspace: workspace in Deepset Cloud
+ :param headers: Headers to pass to API call
+ :param timeout: The time in seconds to wait until undeployment completes.
+ If the timeout is exceeded an error will be raised.
+ """
+ status, changed = self._transition_pipeline_state(
+ target_state=PipelineStatus.UNDEPLOYED,
+ timeout=timeout,
+ pipeline_config_name=pipeline_config_name,
+ workspace=workspace,
+ headers=headers,
+ )
+
+ if status == PipelineStatus.UNDEPLOYED:
+ if changed:
+ logger.info(f"Pipeline config '{pipeline_config_name}' successfully undeployed.")
+ else:
+ logger.info(f"Pipeline config '{pipeline_config_name}' is already undeployed.")
+ elif status in [PipelineStatus.DEPLOYMENT_IN_PROGRESS, PipelineStatus.DEPLOYMENT_SCHEDULED]:
+ raise DeepsetCloudError(
+ f"Undeployment of pipline config '{pipeline_config_name}' aborted. Deployment was requested."
+ )
+ elif status in [PipelineStatus.DEPLOYED, PipelineStatus.DEPLOYED_UNHEALTHY]:
+ raise DeepsetCloudError(f"Undeployment of pipeline config '{pipeline_config_name}' failed.")
+ else:
+ raise DeepsetCloudError(
+ f"Undeployment of pipeline config '{pipeline_config_name} ended in unexpected status: {status.value}"
+ )
+
+ def _transition_pipeline_state(
+ self,
+ target_state: Literal[PipelineStatus.DEPLOYED, PipelineStatus.UNDEPLOYED],
+ timeout: int = 60,
+ pipeline_config_name: Optional[str] = None,
+ workspace: str = None,
+ headers: dict = None,
+ ) -> Tuple[PipelineStatus, bool]:
+ """
+ Transitions the pipeline config state to desired target_state on Deepset Cloud.
+
+ :param target_state: the target state of the Pipeline config.
+ :param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
+ :param workspace: workspace in Deepset Cloud
+ :param headers: Headers to pass to API call
+ :param timeout: The time in seconds to wait until undeployment completes.
+ If the timeout is exceeded an error will be raised.
+ """
+ pipeline_info = self.get_pipeline_config_info(
+ pipeline_config_name=pipeline_config_name, workspace=workspace, headers=headers
+ )
+ if pipeline_info is None:
+ raise DeepsetCloudError(f"Pipeline config '{pipeline_config_name}' does not exist.")
+
+ transition_info = PIPELINE_STATE_TRANSITION_INFOS[target_state]
+ satisfied_states = transition_info[SATISFIED_STATES_KEY]
+ valid_transitioning_states = transition_info[VALID_TRANSITIONING_STATES_KEY]
+ valid_initial_states = transition_info[VALID_INITIAL_STATES_KEY]
+
+ status = PipelineStatus.from_str(pipeline_info["status"])
+ if status in satisfied_states:
+ return status, False
+
+ if status not in valid_initial_states:
+ raise DeepsetCloudError(
+ f"Pipeline config '{pipeline_config_name}' is in invalid state '{status.value}' to be transitioned to '{target_state.value}'."
+ )
+
+ if target_state == PipelineStatus.DEPLOYED:
+ res = self._deploy(pipeline_config_name=pipeline_config_name, workspace=workspace, headers=headers)
+ status = PipelineStatus.from_str(res["status"])
+ elif target_state == PipelineStatus.UNDEPLOYED:
+ res = self._undeploy(pipeline_config_name=pipeline_config_name, workspace=workspace, headers=headers)
+ status = PipelineStatus.from_str(res["status"])
+ else:
+ raise NotImplementedError(f"Transitioning to state '{target_state.value}' is not implemented.")
+
+ start_time = time.time()
+ while status in valid_transitioning_states:
+ if time.time() - start_time > timeout:
+ raise TimeoutError(
+ f"Transitioning of '{pipeline_config_name}' to state '{target_state.value}' timed out."
+ )
+ pipeline_info = self.get_pipeline_config_info(
+ pipeline_config_name=pipeline_config_name, workspace=workspace, headers=headers
+ )
+ if pipeline_info is None:
+ raise DeepsetCloudError(f"Pipeline config '{pipeline_config_name}' does not exist anymore.")
+ status = PipelineStatus.from_str(pipeline_info["status"])
+ if status in valid_transitioning_states:
+ logger.info(f"Current status of '{pipeline_config_name}' is: '{status}'")
+ time.sleep(5)
+
+ return status, True
+
+ def _deploy(
+ self, pipeline_config_name: Optional[str] = None, workspace: Optional[str] = None, headers: dict = None
+ ) -> dict:
+ pipeline_url = self._build_pipeline_url(workspace=workspace, pipeline_config_name=pipeline_config_name)
+ deploy_url = f"{pipeline_url}/deploy"
+ response = self.client.post(url=deploy_url, headers=headers).json()
+ return response
+
+ def _undeploy(
+ self, pipeline_config_name: Optional[str] = None, workspace: Optional[str] = None, headers: dict = None
+ ) -> dict:
+ pipeline_url = self._build_pipeline_url(workspace=workspace, pipeline_config_name=pipeline_config_name)
+ undeploy_url = f"{pipeline_url}/undeploy"
+ response = self.client.post(url=undeploy_url, headers=headers).json()
+ return response
+
def _build_pipeline_url(self, workspace: Optional[str] = None, pipeline_config_name: Optional[str] = None):
if pipeline_config_name is None:
pipeline_config_name = self.pipeline_config_name
diff --git a/test/test_pipeline.py b/test/test_pipeline.py
index 6f938db906..ed04654767 100644
--- a/test/test_pipeline.py
+++ b/test/test_pipeline.py
@@ -25,6 +25,7 @@
from haystack.nodes import DensePassageRetriever, EmbeddingRetriever, RouteDocuments, PreProcessor, TextConverter
from conftest import MOCK_DC, DC_API_ENDPOINT, DC_API_KEY, DC_TEST_INDEX, SAMPLES_PATH, deepset_cloud_fixture
+from haystack.utils.deepsetcloud import DeepsetCloudError
class ParentComponent(BaseComponent):
@@ -807,6 +808,410 @@ def dc_document_store_matcher(request: PreparedRequest) -> Tuple[bool, str]:
)
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_deploy_on_deepset_cloud_non_existing_pipeline():
+ if MOCK_DC:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"errors": ["Pipeline with the name test_pipeline_config_copy does not exists."]},
+ status=404,
+ )
+
+ with pytest.raises(DeepsetCloudError, match="Pipeline config 'test_new_non_existing_pipeline' does not exist."):
+ Pipeline.deploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_undeploy_on_deepset_cloud_non_existing_pipeline():
+ if MOCK_DC:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"errors": ["Pipeline with the name test_pipeline_config_copy does not exists."]},
+ status=404,
+ )
+
+ with pytest.raises(DeepsetCloudError, match="Pipeline config 'test_new_non_existing_pipeline' does not exist."):
+ Pipeline.undeploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_deploy_on_deepset_cloud():
+ if MOCK_DC:
+ responses.add(
+ method=responses.POST,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline/deploy",
+ json={"status": "DEPLOYMENT_SCHEDULED"},
+ status=200,
+ )
+
+ # status will be first undeployed, after deploy() it's in progress twice and the third time deployed
+ status_flow = ["UNDEPLOYED", "DEPLOYMENT_IN_PROGRESS", "DEPLOYMENT_IN_PROGRESS", "DEPLOYED"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+
+ Pipeline.deploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_undeploy_on_deepset_cloud():
+ if MOCK_DC:
+ responses.add(
+ method=responses.POST,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline/undeploy",
+ json={"status": "UNDEPLOYMENT_SCHEDULED"},
+ status=200,
+ )
+
+ # status will be first undeployed, after deploy() it's in progress twice and the third time deployed
+ status_flow = ["DEPLOYED", "UNDEPLOYMENT_IN_PROGRESS", "UNDEPLOYMENT_IN_PROGRESS", "UNDEPLOYED"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+
+ Pipeline.undeploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_deploy_on_deepset_cloud_sate_already_satisfied():
+ if MOCK_DC:
+ # status will be first undeployed, after deploy() it's in progress twice and the third time deployed
+ status_flow = ["DEPLOYED"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+
+ Pipeline.deploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_undeploy_on_deepset_cloud_sate_already_satisfied():
+ if MOCK_DC:
+ # status will be first undeployed, after deploy() it's in progress twice and the third time deployed
+ status_flow = ["UNDEPLOYED"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+
+ Pipeline.undeploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_deploy_on_deepset_cloud_failed():
+ if MOCK_DC:
+ responses.add(
+ method=responses.POST,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline/deploy",
+ json={"status": "DEPLOYMENT_SCHEDULED"},
+ status=200,
+ )
+
+ # status will be first undeployed, after deploy() it's in progress and the third time undeployed
+ status_flow = ["UNDEPLOYED", "DEPLOYMENT_IN_PROGRESS", "UNDEPLOYED"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+
+ with pytest.raises(
+ DeepsetCloudError, match="Deployment of pipeline config 'test_new_non_existing_pipeline' failed."
+ ):
+ Pipeline.deploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_undeploy_on_deepset_cloud_failed():
+ if MOCK_DC:
+ responses.add(
+ method=responses.POST,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline/undeploy",
+ json={"status": "UNDEPLOYMENT_SCHEDULED"},
+ status=200,
+ )
+
+ # status will be first undeployed, after deploy() it's in progress and the third time undeployed
+ status_flow = ["DEPLOYED", "UNDEPLOYMENT_IN_PROGRESS", "DEPLOYED"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+
+ with pytest.raises(
+ DeepsetCloudError, match="Undeployment of pipeline config 'test_new_non_existing_pipeline' failed."
+ ):
+ Pipeline.undeploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_deploy_on_deepset_cloud_invalid_initial_state():
+ if MOCK_DC:
+ status_flow = ["UNDEPLOYMENT_SCHEDULED"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+
+ with pytest.raises(
+ DeepsetCloudError,
+ match="Pipeline config 'test_new_non_existing_pipeline' is in invalid state 'UNDEPLOYMENT_SCHEDULED' to be transitioned to 'DEPLOYED'.",
+ ):
+ Pipeline.deploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_undeploy_on_deepset_cloud_invalid_initial_state():
+ if MOCK_DC:
+ status_flow = ["DEPLOYMENT_SCHEDULED"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+
+ with pytest.raises(
+ DeepsetCloudError,
+ match="Pipeline config 'test_new_non_existing_pipeline' is in invalid state 'DEPLOYMENT_SCHEDULED' to be transitioned to 'UNDEPLOYED'.",
+ ):
+ Pipeline.undeploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_deploy_on_deepset_cloud_invalid_state_in_progress():
+ if MOCK_DC:
+ responses.add(
+ method=responses.POST,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline/deploy",
+ json={"status": "DEPLOYMENT_SCHEDULED"},
+ status=200,
+ )
+
+ # status will be first undeployed, after deploy() it's in progress twice and the third time deployed
+ status_flow = ["UNDEPLOYED", "UNDEPLOYMENT_IN_PROGRESS"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+ with pytest.raises(
+ DeepsetCloudError,
+ match="Deployment of pipline config 'test_new_non_existing_pipeline' aborted. Undeployment was requested.",
+ ):
+ Pipeline.deploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_undeploy_on_deepset_cloud_invalid_state_in_progress():
+ if MOCK_DC:
+ responses.add(
+ method=responses.POST,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline/undeploy",
+ json={"status": "UNDEPLOYMENT_SCHEDULED"},
+ status=200,
+ )
+
+ # status will be first undeployed, after deploy() it's in progress twice and the third time deployed
+ status_flow = ["DEPLOYED", "DEPLOYMENT_IN_PROGRESS"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+ with pytest.raises(
+ DeepsetCloudError,
+ match="Undeployment of pipline config 'test_new_non_existing_pipeline' aborted. Deployment was requested.",
+ ):
+ Pipeline.undeploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_deploy_on_deepset_cloud_unknown_state_in_progress():
+ if MOCK_DC:
+ responses.add(
+ method=responses.POST,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline/deploy",
+ json={"status": "DEPLOYMENT_SCHEDULED"},
+ status=200,
+ )
+
+ # status will be first undeployed, after deploy() it's in progress twice and the third time deployed
+ status_flow = ["UNDEPLOYED", "ASKDHFASJDF"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+ with pytest.raises(
+ DeepsetCloudError,
+ match="Deployment of pipeline config 'test_new_non_existing_pipeline ended in unexpected status: UNKNOWN",
+ ):
+ Pipeline.deploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_undeploy_on_deepset_cloud_unknown_state_in_progress():
+ if MOCK_DC:
+ responses.add(
+ method=responses.POST,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline/undeploy",
+ json={"status": "UNDEPLOYMENT_SCHEDULED"},
+ status=200,
+ )
+
+ # status will be first undeployed, after deploy() it's in progress twice and the third time deployed
+ status_flow = ["DEPLOYED", "ASKDHFASJDF"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+ with pytest.raises(
+ DeepsetCloudError,
+ match="Undeployment of pipeline config 'test_new_non_existing_pipeline ended in unexpected status: UNKNOWN",
+ ):
+ Pipeline.undeploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline", api_endpoint=DC_API_ENDPOINT, api_key=DC_API_KEY
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_deploy_on_deepset_cloud_timeout():
+ if MOCK_DC:
+ responses.add(
+ method=responses.POST,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline/deploy",
+ json={"status": "DEPLOYMENT_SCHEDULED"},
+ status=200,
+ )
+
+ # status will be first undeployed, after deploy() it's in progress twice and the third time deployed
+ status_flow = ["UNDEPLOYED", "DEPLOYMENT_IN_PROGRESS", "DEPLOYMENT_IN_PROGRESS", "DEPLOYED"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+ with pytest.raises(
+ TimeoutError, match="Transitioning of 'test_new_non_existing_pipeline' to state 'DEPLOYED' timed out."
+ ):
+ Pipeline.deploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline",
+ api_endpoint=DC_API_ENDPOINT,
+ api_key=DC_API_KEY,
+ timeout=5,
+ )
+
+
+@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
+@responses.activate
+def test_undeploy_on_deepset_cloud_timeout():
+ if MOCK_DC:
+ responses.add(
+ method=responses.POST,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline/undeploy",
+ json={"status": "UNDEPLOYMENT_SCHEDULED"},
+ status=200,
+ )
+
+ # status will be first undeployed, after deploy() it's in progress twice and the third time deployed
+ status_flow = ["DEPLOYED", "UNDEPLOYMENT_IN_PROGRESS", "UNDEPLOYMENT_IN_PROGRESS", "UNDEPLOYED"]
+ for status in status_flow:
+ responses.add(
+ method=responses.GET,
+ url=f"{DC_API_ENDPOINT}/workspaces/default/pipelines/test_new_non_existing_pipeline",
+ json={"status": status},
+ status=200,
+ )
+ with pytest.raises(
+ TimeoutError, match="Transitioning of 'test_new_non_existing_pipeline' to state 'UNDEPLOYED' timed out."
+ ):
+ Pipeline.undeploy_on_deepset_cloud(
+ pipeline_config_name="test_new_non_existing_pipeline",
+ api_endpoint=DC_API_ENDPOINT,
+ api_key=DC_API_KEY,
+ timeout=5,
+ )
+
+
# @pytest.mark.slow
# @pytest.mark.elasticsearch
# @pytest.mark.parametrize(