-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix dependency graph for indexing pipelines during codegen #2311
Conversation
Test to be added. |
Test added. |
… into fix_indexing_codegen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Very nice! 👌 Quite complex functionality though. Great to see you added several tests.
@ZanSara I just tagged you as a reviewer because you said you are interested in this PR. In that case, it makes sense that you give feedback before merging the PR. Otherwise, it's ready to be merged.
# e.g. DensePassageRetriever depends on ElasticsearchDocumentStore. | ||
# In indexing pipelines ElasticsearchDocumentStore depends on DensePassageRetriever's output. | ||
# But this second dependency is looser, so we neglect it. | ||
if not graph.has_edge(node_name, input): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a check whether I understand this correctly: As an example, in an indexing pipeline ElasticsearchDocumentStore
could be the node
and DensePassageRetriever
could be the input
. Then we check here whether there is an edge in the graph from ElasticsearchDocumentStore
to DensePassageRetriever
and if that's not the case then we add an edge with the opposite direction, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a curiosity: why we need to know what are the runtime dependencies between the nodes at this stage? I thought this function is used in the codegen code to create the nodes in a working order (so nodes that are required as input will be initialized first). The fact that later on in the pipeline a node will get output from another should not matter... right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, right. I chose to include those runtime dependencies to make the order of init calls more readable. With it it'll be in pipeline execution order (like a human would normally instantiate it), without it it would be in arbitrary order and less comprehensive for humans. But that's the only reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool one! I've managed to comment only on BaseComponent
for now, I'll read the new get_config()
in a few hours. No worries, only my typical nitpicks 😄
haystack/nodes/base.py
Outdated
self._component_config["name"] = value | ||
|
||
@property | ||
def sub_components(self) -> List[BaseComponent]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name might be slightly confusing... This is the list of nodes this components depends on, right? We might call it required_components
or something that highlights the dependency relationship
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd simply call it dependencies
. What do you think about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rather go with utilized_components
. At least that makes the most sense to me ;-)
return [param for param in self._component_config["params"].values() if isinstance(param, BaseComponent)] | ||
|
||
@property | ||
def type(self) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically this property is redundant, type(node)
would give you the same value is it?
I could not easily get type
out of the _component_config
dict because it required refactoring get_config
deeply, but now that we're at it, it might be worth trying
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this type returns a string: so type(node).name basically.
I also thought about moving name and type out of _component_config. But on the other hand we have the complete config in one place with it. I like that idea but I'm open for discussing this.
def get_params(self, return_defaults: bool = False) -> Dict[str, Any]: | ||
component_signature = self._get_signature() | ||
params: Dict[str, Any] = {} | ||
for key, value in self._component_config["params"].items(): | ||
if value != component_signature[key].default or return_defaults: | ||
params[key] = value | ||
if return_defaults: | ||
for key, param in component_signature.items(): | ||
if key not in params: | ||
params[key] = param.default | ||
return params |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems strangely convoluted. I think the following would work the same:
def get_params(self, return_defaults: bool = False) -> Dict[str, Any]:
explicit_params = deepcopy(self._component_config["params"])
if not return_defaults:
return explicit_params
default_params = {key: param.default for key, param in self._get_signature().items()}
return {**default_params, **explicit_params}
(untested)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This version would not account for default values within explicit_params. I think we want to suppress them too if return_defaults=False
. But I can definitely get rid of the second loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean, but on the other hand I would not necessarily expect this method to filter out defaults values if they were explicitly given. Keeping them would also make sure that if I load a YAML and I save it back, it's going to come out exactly as I loaded it, no differences, regardless of whether I gave the default parameters or not.
Not a big deal though, feel free to keep the current behavior 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left it like that for now as even with having all default_params in one variable we do not gain much understandability or compactness.
@@ -207,3 +235,13 @@ def _dispatch_run(self, **kwargs) -> Tuple[Dict, str]: | |||
|
|||
output["params"] = params | |||
return output, stream | |||
|
|||
@classmethod | |||
def _get_signature(cls) -> Dict[str, inspect.Parameter]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI: there's another very similar method in _json_schema.py
:
haystack/haystack/nodes/_json_schema.py
Line 90 in b8a3c93
def get_typed_signature(call: Callable[..., Any]) -> inspect.Signature: |
It's interesting that you seem to have taken a completely different approach to the problem... I don't think we need any action here though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very happy about this one. I found only one small possibility for a bug but overall good job! 🙂
if component_signature[param_key].default != param_value or return_defaults is True: | ||
components[node]["params"][param_key] = param_value | ||
component: BaseComponent = node_attributes["component"] | ||
if node_name != component.name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious really, what could lead to this situation? Manual edit of _components_config
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, Pipeline's add_node
sets all names appropriately.
haystack/pipelines/base.py
Outdated
component_names.update(sub_component_names) | ||
return component_names | ||
|
||
def _set_sub_component_names(self, component: BaseComponent, component_names: Optional[Set[str]] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking out loud here. I see that this method recursively names all the "sub" components. But we're already going down recursively with _add_component_to_definitions
. So how about naming any unnamed component in _add_component_to_definitions
as soon as we meet them? That would mean simply to replace PipelineError(f"Component with config '{component._component_config}' does not have a name.")
with a function that names the component.
That would also remove the need for _get_all_component_names
(which in turn is recursive, so yet another layer of recursion removed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately during _add_component_to_definitions()
we do not have all the component_names already assigned within that pipeline. That's a requirement for generating names in order to not end up with duplicates. So _get_all_component_names
would be required either way.
I also thought about assigning (virtual) names during get_config()
and thus in _add_component_to_definitions()
. However it occured cleaner to me that all the names are already being assigned when the component is added to the pipeline because that already happens for non-sub-components.
# e.g. DensePassageRetriever depends on ElasticsearchDocumentStore. | ||
# In indexing pipelines ElasticsearchDocumentStore depends on DensePassageRetriever's output. | ||
# But this second dependency is looser, so we neglect it. | ||
if not graph.has_edge(node_name, input): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a curiosity: why we need to know what are the runtime dependencies between the nodes at this stage? I thought this function is used in the codegen code to create the nodes in a working order (so nodes that are required as input will be initialized first). The fact that later on in the pipeline a node will get output from another should not matter... right?
intermediate = ParentComponent(dependent=child) | ||
parent = ParentComponent(dependent=intermediate) | ||
p_ensemble = Pipeline() | ||
p_ensemble.add_node(component=parent, name="Parent", inputs=["Query"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok now, what happens if I call this node ParentComponent
? Or if I call two nodes with the same name explicitly? I didn't see a check for this case anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good you spotted this. Here was actually a bug as the name of the node to be added wasn't known to the graph when running _get_all_component_names()
: fixed that.
This fixes a NetworkUnfeasible Exception during topological_sort of component construction code lines in
generate_code()
if we need the retriever during indexing:Graph contains a cycle or graph changed during iteration
Sample YAML to reproduce:
Retriever depends on DocumentStore during initialization.
DocumentStore depends on Retriever's output during pipeline execution.
During fixing that some other bugs in
Pipeline.get_config()
surfaced:return_defaults
was true (due to the new paradigm of_component_config
which stores only given params)This is also fixed in this PR.
And I took the opportunity to refactor
Pipeline.get_config()
such as it is possible to have dependent components beyond one level of dependency (i.e.Pipeline.get_config()
is fully recursive now).Proposed changes:
return_defaults=True
optionStatus (please check what you already did):