Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DC SDK - load pipeline from deepset cloud #2013

Merged
merged 28 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
849a1fd
initial load_from_dc
ArzelaAscoIi Jan 17, 2022
516efd8
typo
ArzelaAscoIi Jan 17, 2022
b7aa7c8
adjusted api endpoint
ArzelaAscoIi Jan 17, 2022
463bbbc
removed kwargs
ArzelaAscoIi Jan 17, 2022
96c51ed
added _load_from_dict
ArzelaAscoIi Jan 18, 2022
db12138
refactor pipeline loading mechanism
tstadel Jan 18, 2022
4859398
renaming load_from_dc api
ArzelaAscoIi Jan 19, 2022
3911168
Merge branch 'master' into pipelines-from-to-dc
ArzelaAscoIi Jan 19, 2022
262472e
renaming
ArzelaAscoIi Jan 19, 2022
9f1ce41
fixed errors
ArzelaAscoIi Jan 19, 2022
ff42508
fix comments and environment variable overrides
tstadel Jan 20, 2022
7f35798
Add latest docstring and tutorial changes
github-actions[bot] Jan 20, 2022
bbd8dfd
fix outdated YAML examples
tstadel Jan 20, 2022
ff9b706
Add latest docstring and tutorial changes
github-actions[bot] Jan 20, 2022
0910bea
Introduce readonly DCDocumentStore (without labels support) (#1991)
tstadel Jan 25, 2022
0dd4039
introduce DeepsetCloudAdapter
tstadel Jan 26, 2022
5db9646
Merge branch 'master' into pipelines-from-to-dc
tstadel Jan 26, 2022
5ccd3e6
Add latest docstring and tutorial changes
github-actions[bot] Jan 26, 2022
3cb4d04
introduce DeepsetCloudClient
tstadel Jan 27, 2022
9dffc2e
Add latest docstring and tutorial changes
github-actions[bot] Jan 27, 2022
b1b83cc
use json api for pipeline_config
tstadel Jan 27, 2022
98bf639
Merge branch 'pipelines-from-to-dc' of github.com:deepset-ai/haystack…
tstadel Jan 27, 2022
865c6d5
indexing pipeline test added
tstadel Jan 27, 2022
6887197
pseudo change to force cache eviction
tstadel Jan 27, 2022
44d0ece
revert pseudo change to force cache eviction
tstadel Jan 28, 2022
92a9a22
remove conftest duplicates
tstadel Jan 28, 2022
a519c92
minor formatting and docstring fixes
tstadel Jan 28, 2022
0d0fadd
fix tests when MOCK_DC=False
tstadel Jan 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 146 additions & 52 deletions haystack/pipelines/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Dict, List, Optional, Any, Union
from typing import Dict, List, Optional, Any

import copy
import inspect
import logging
import requests
import os
import traceback
import numpy as np
Expand All @@ -29,7 +30,6 @@

logger = logging.getLogger(__name__)


class RootNode(BaseComponent):
"""
RootNode feeds inputs together with corresponding params to a Pipeline.
Expand Down Expand Up @@ -92,70 +92,93 @@ def load_from_yaml(cls, path: Path, pipeline_name: Optional[str] = None, overwri
variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an
`_` sign must be used to specify nested hierarchical properties.
"""
pipeline_config = cls._get_pipeline_config_from_yaml(path=path, pipeline_name=pipeline_name)
if pipeline_config["type"] == "Pipeline":
return Pipeline.load_from_yaml(
path=path, pipeline_name=pipeline_name, overwrite_with_env_variables=overwrite_with_env_variables

pipeline_config = cls._read_pipeline_config_from_yaml(path)
pipeline_definition = cls._get_pipeline_definition(pipeline_config=pipeline_config, pipeline_name=pipeline_name)
if pipeline_definition["type"] == "Pipeline":
return Pipeline._load_from_config(
pipeline_config=pipeline_config, pipeline_name=pipeline_name, overwrite_with_env_variables=overwrite_with_env_variables
)
elif pipeline_config["type"] == "RayPipeline":
return RayPipeline.load_from_yaml(
path=path, pipeline_name=pipeline_name, overwrite_with_env_variables=overwrite_with_env_variables
elif pipeline_definition["type"] == "RayPipeline":
return RayPipeline._load_from_config(
pipeline_config=pipeline_config, pipeline_name=pipeline_name, overwrite_with_env_variables=overwrite_with_env_variables
)
else:
raise KeyError(f"Pipeline Type '{pipeline_config['type']}' is not a valid. The available types are"
raise KeyError(f"Pipeline Type '{pipeline_definition['type']}' is not a valid. The available types are"
f"'Pipeline' and 'RayPipeline'.")

@classmethod
def _get_pipeline_config_from_yaml(cls, path: Path, pipeline_name: Optional[str] = None):
def load_from_dc(
cls,
pipeline_config_name: str,
pipeline_name: str = "query",
api_key: Optional[str] = None,
api_endpoint: Optional[str] = None,
workspace_name: Optional[str] = "default",
):
api_endpoint = os.getenv("DEEPSET_CLOUD_API_ENDPOINT", api_endpoint)
if api_endpoint is None:
raise ValueError("Missing environment variable 'DEEPSET_CLOUD_API_ENDPOINT'. Cannot communicate with DC without specifying the endpoint.")

# overwrite api_key if environment variable is set
api_key = os.getenv("DEEPSET_CLOUD_API_KEY", api_key)
if api_key is None:
raise ValueError("Could not authenticate at deepset cloud: No 'api_key' or envorionment 'DEEPSET_CLOUD_API_KEY' variable defined.")

response = requests.get(
f"{api_endpoint}/workspaces/{workspace_name}/pipelines/{pipeline_config_name}/yaml",
headers={
'Authorization': f'Bearer {api_key}'
}
)
pipeline_config = yaml.safe_load(response.json())
pipeline = Pipeline._load_from_config(pipeline_config=pipeline_config, pipeline_name=pipeline_name)
return pipeline

@classmethod
def _get_pipeline_definition(cls, pipeline_config: Dict, pipeline_name: Optional[str] = None):
"""
Get the definition of Pipeline from a given YAML. If the YAML contains more than one Pipeline,
Get the definition of Pipeline from a given pipeline config. If the config contains more than one Pipeline,
then the pipeline_name must be supplied.

:param path: Path of Pipeline YAML file.
:param pipeline_config: Dict Pipeline config parsed as a dictionary.
:param pipeline_name: name of the Pipeline.
"""
with open(path, "r", encoding='utf-8') as stream:
data = yaml.safe_load(stream)

if pipeline_name is None:
if len(data["pipelines"]) == 1:
pipeline_config = data["pipelines"][0]
if len(pipeline_config["pipelines"]) == 1:
pipeline_definition = pipeline_config["pipelines"][0]
else:
raise Exception("The YAML contains multiple pipelines. Please specify the pipeline name to load.")
else:
pipelines_in_yaml = list(filter(lambda p: p["name"] == pipeline_name, data["pipelines"]))
if not pipelines_in_yaml:
pipelines_in_definitions = list(filter(lambda p: p["name"] == pipeline_name, pipeline_config["pipelines"]))
if not pipelines_in_definitions:
raise KeyError(f"Cannot find any pipeline with name '{pipeline_name}' declared in the YAML file.")
pipeline_config = pipelines_in_yaml[0]
pipeline_definition = pipelines_in_definitions[0]

return pipeline_config
return pipeline_definition

@classmethod
def _read_yaml(cls, path: Path, pipeline_name: Optional[str], overwrite_with_env_variables: bool):
def _get_component_definitions(cls, pipeline_config: Dict, overwrite_with_env_variables: bool):
"""
Parse the YAML and return the full YAML config, pipeline_config, and definitions of all components.
Parse the YAML and return the pipeline_definition, and definitions of all components.

:param path: path of the YAML file.
:param yaml_dict: Dict Pipeline YAML parsed as a dictionary.
:param pipeline_name: if the YAML contains multiple pipelines, the pipeline_name to load must be set.
:param overwrite_with_env_variables: Overwrite the YAML configuration with environment variables. For example,
to change index name param for an ElasticsearchDocumentStore, an env
variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an
`_` sign must be used to specify nested hierarchical properties.
"""
with open(path, "r", encoding="utf-8") as stream:
data = yaml.safe_load(stream)

pipeline_config = cls._get_pipeline_config_from_yaml(path=path, pipeline_name=pipeline_name)

definitions = {} # definitions of each component from the YAML.
component_definitions = copy.deepcopy(data["components"])
for definition in component_definitions:
component_definitions = {} # definitions of each component from the YAML.
raw_component_definitions = copy.deepcopy(pipeline_config["components"])
for component_definition in raw_component_definitions:
if overwrite_with_env_variables:
cls._overwrite_with_env_variables(definition)
name = definition.pop("name")
definitions[name] = definition
cls._overwrite_with_env_variables(component_definition)
name = component_definition.pop("name")
component_definitions[name] = component_definition

return data, pipeline_config, definitions
return component_definitions

@classmethod
def _overwrite_with_env_variables(cls, definition: dict):
Expand All @@ -172,6 +195,11 @@ def _overwrite_with_env_variables(cls, definition: dict):
param_name = key.replace(env_prefix, "").lower()
definition["params"][param_name] = value

@classmethod
def _read_pipeline_config_from_yaml(cls, path: Path):
with open(path, "r", encoding="utf-8") as stream:
return yaml.safe_load(stream)


class Pipeline(BasePipeline):
"""
Expand Down Expand Up @@ -652,17 +680,25 @@ def load_from_yaml(cls, path: Path, pipeline_name: Optional[str] = None, overwri
variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an
`_` sign must be used to specify nested hierarchical properties.
"""
data, pipeline_config, definitions = cls._read_yaml(
path=path, pipeline_name=pipeline_name, overwrite_with_env_variables=overwrite_with_env_variables
pipeline_config = cls._read_pipeline_config_from_yaml(path)
return Pipeline._load_from_config(pipeline_config=pipeline_config,
pipeline_name=pipeline_name,
overwrite_with_env_variables=overwrite_with_env_variables)

@classmethod
def _load_from_config(cls, pipeline_config: Dict, pipeline_name: Optional[str] = None, overwrite_with_env_variables: bool = True):
pipeline_definition = cls._get_pipeline_definition(pipeline_config=pipeline_config, pipeline_name=pipeline_name)
component_definitions = cls._get_component_definitions(
pipeline_config=pipeline_config, overwrite_with_env_variables=overwrite_with_env_variables
)

pipeline = cls()

components: dict = {} # instances of component objects.
for node_config in pipeline_config["nodes"]:
name = node_config["name"]
component = cls._load_or_get_component(name=name, definitions=definitions, components=components)
pipeline.add_node(component=component, name=node_config["name"], inputs=node_config.get("inputs", []))
for node in pipeline_definition["nodes"]:
name = node["name"]
component = cls._load_or_get_component(name=name, definitions=component_definitions, components=components)
pipeline.add_node(component=component, name=name, inputs=node.get("inputs", []))

return pipeline

Expand Down Expand Up @@ -900,15 +936,16 @@ def __init__(self, address: str = None, **kwargs):
super().__init__()

@classmethod
def load_from_yaml(
def _load_from_config(
cls,
path: Path, pipeline_name: Optional[str] = None,
pipeline_config: Dict,
pipeline_name: Optional[str] = None,
overwrite_with_env_variables: bool = True,
address: Optional[str] = None,
**kwargs,
):
"""
Load Pipeline from a YAML file defining the individual components and how they're tied together to form
Load Pipeline from a YAML parsed as a dictionary defining the individual components and how they're tied together to form
a Pipeline. A single YAML can declare multiple Pipelines, in which case an explicit `pipeline_name` must
be passed.

Expand Down Expand Up @@ -950,26 +987,27 @@ def load_from_yaml(
`_` sign must be used to specify nested hierarchical properties.
:param address: The IP address for the Ray cluster. If set to None, a local Ray instance is started.
"""
data, pipeline_config, definitions = cls._read_yaml(
path=path, pipeline_name=pipeline_name, overwrite_with_env_variables=overwrite_with_env_variables
pipeline_definition = cls._get_pipeline_definition(pipeline_config=pipeline_config, pipeline_name=pipeline_name)
component_definitions = cls._get_component_definitions(
pipeline_config=pipeline_config, overwrite_with_env_variables=overwrite_with_env_variables
)
pipeline = cls(address=address, **kwargs)

for node_config in pipeline_config["nodes"]:
for node_config in pipeline_definition["nodes"]:
if pipeline.root_node is None:
root_node = node_config["inputs"][0]
if root_node in ["Query", "File"]:
pipeline.root_node = root_node
handle = cls._create_ray_deployment(component_name=root_node, pipeline_config=data)
handle = cls._create_ray_deployment(component_name=root_node, pipeline_config=pipeline_config)
pipeline._add_ray_deployment_in_graph(handle=handle, name=root_node, outgoing_edges=1, inputs=[])
else:
raise KeyError(f"Root node '{root_node}' is invalid. Available options are 'Query' and 'File'.")

name = node_config["name"]
component_type = definitions[name]["type"]
component_type = component_definitions[name]["type"]
component_class = BaseComponent.get_subclass(component_type)
replicas = next(node for node in pipeline_config["nodes"] if node["name"] == name).get("replicas", 1)
handle = cls._create_ray_deployment(component_name=name, pipeline_config=data, replicas=replicas)
replicas = next(node for node in pipeline_definition["nodes"] if node["name"] == name).get("replicas", 1)
handle = cls._create_ray_deployment(component_name=name, pipeline_config=pipeline_config, replicas=replicas)
pipeline._add_ray_deployment_in_graph(
handle=handle,
name=name,
Expand All @@ -979,6 +1017,62 @@ def load_from_yaml(

return pipeline

@classmethod
def load_from_yaml(
cls,
path: Path, pipeline_name: Optional[str] = None,
overwrite_with_env_variables: bool = True,
address: Optional[str] = None,
**kwargs,
):
"""
Load Pipeline from a YAML file defining the individual components and how they're tied together to form
a Pipeline. A single YAML can declare multiple Pipelines, in which case an explicit `pipeline_name` must
be passed.

Here's a sample configuration:

```yaml
| version: '0.8'
|
| components: # define all the building-blocks for Pipeline
| - name: MyReader # custom-name for the component; helpful for visualization & debugging
| type: FARMReader # Haystack Class name for the component
| params:
| no_ans_boost: -10
| model_name_or_path: deepset/roberta-base-squad2
| - name: MyESRetriever
| type: ElasticsearchRetriever
| params:
| document_store: MyDocumentStore # params can reference other components defined in the YAML
| custom_query: null
| - name: MyDocumentStore
| type: ElasticsearchDocumentStore
| params:
| index: haystack_test
|
| pipelines: # multiple Pipelines can be defined using the components from above
| - name: my_query_pipeline # a simple extractive-qa Pipeline
| nodes:
| - name: MyESRetriever
| inputs: [Query]
| - name: MyReader
| inputs: [MyESRetriever]
```

:param path: path of the YAML file.
:param pipeline_name: if the YAML contains multiple pipelines, the pipeline_name to load must be set.
:param overwrite_with_env_variables: Overwrite the YAML configuration with environment variables. For example,
to change index name param for an ElasticsearchDocumentStore, an env
variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an
`_` sign must be used to specify nested hierarchical properties.
:param address: The IP address for the Ray cluster. If set to None, a local Ray instance is started.
"""
pipeline_config = cls._read_pipeline_config_from_yaml(path)
return RayPipeline._load_from_config(pipeline_config=pipeline_config, pipeline_name=pipeline_name,
overwrite_with_env_variables=overwrite_with_env_variables, address=address,**kwargs)


@classmethod
def _create_ray_deployment(cls, component_name: str, pipeline_config: dict, replicas: int = 1):
"""
Expand Down
8 changes: 5 additions & 3 deletions rest_api/controller/file_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
router = APIRouter()

try:
_, pipeline_config, definitions = Pipeline._read_yaml(
path=Path(PIPELINE_YAML_PATH), pipeline_name=INDEXING_PIPELINE_NAME, overwrite_with_env_variables=True
pipeline_config = Pipeline._read_pipeline_config_from_yaml(Path(PIPELINE_YAML_PATH))
pipeline_definition = Pipeline._get_pipeline_definition(pipeline_config=pipeline_config, pipeline_name=INDEXING_PIPELINE_NAME)
definitions = Pipeline._get_component_definitions(
pipeline_config=pipeline_config, overwrite_with_env_variables=True
)
# Since each instance of FAISSDocumentStore creates an in-memory FAISS index, the Indexing & Query Pipelines would
# end up with different indices. The same applies for InMemoryDocumentStore. The check below prevents creation of
# Indexing Pipelines with FAISSDocumentStore or InMemoryDocumentStore.
is_faiss_or_inmemory_present = False
for node in pipeline_config["nodes"]:
for node in pipeline_definition["nodes"]:
if definitions[node["name"]]["type"] == "FAISSDocumentStore" or definitions[node["name"]]["type"] == "InMemoryDocumentStore":
is_faiss_or_inmemory_present = True
break
Expand Down