Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: exponential backoff with exp decreasing batch size for opensearch and elasticsearch client #3194

Merged
merged 24 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 65 additions & 5 deletions haystack/document_stores/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,66 @@ def __init__(
self.duplicate_documents = duplicate_documents
self.refresh_type = refresh_type

def _split_document_list(
self, documents: Union[List[dict], List[Document]], number_of_lists: int
) -> Generator[Union[List[dict], List[Document]], None, None]:
chunk_size = max(int((len(documents) + 1) / number_of_lists), 1)
for i in range(0, len(documents), chunk_size):
yield documents[i : i + chunk_size]

def _bulk(
self,
documents: Union[List[dict], List[Document]],
headers: Optional[Dict[str, str]] = None,
request_timeout: int = 300,
refresh: str = "wait_for",
_timeout: int = 1,
_remaining_tries: int = 10,
) -> None:
"""
Bulk index documents into Elasticsearch using a custom retry implementation that uses
exponential backoff and exponential batch size reduction to avoid overloading the cluster.

Opensearch/elasticsearch returns '429 Too Many Requests' when the write requests can't be
processed because there are too many requests in the queue or the single request is too large and exceeds the
memory of the nodes. Since the error code is the same for both of these cases we need to wait
and reduce the batch size simultaneously.

:param documents: List of documents to index
:param headers: Optional headers to pass to the bulk request
:param request_timeout: Timeout for the bulk request
:param refresh: Refresh policy for the bulk request
:param _timeout: Timeout for the exponential backoff
:param _remaining_tries: Number of remaining retries
"""
if _remaining_tries == 0:
raise DocumentStoreError(f"Bulk request failed because of too many retries.")

try:
bulk(self.client, documents, request_timeout=300, refresh=self.refresh_type, headers=headers)
except Exception as e:
if hasattr(e, "status_code") and e.status_code == 429: # type: ignore
logger.warning(
f"Failed to insert a batch of '{len(documents)}' documents because of a 'Too Many Requeset' response. Splitting the number of documents into two chunks with the same size and retrying in {_timeout} seconds."
)
if len(documents) == 1:
logger.warning(
"Failed to index single document. Your indexing queue on the Elasticsearch cluster is probably full. Try resizng your cluster or reducing the number of parallel processes that are writing to the cluster."
)

time.sleep(_timeout)
for split_docs in self._split_document_list(documents, 2):
self._bulk(
documents=split_docs,
headers=headers,
request_timeout=request_timeout,
refresh=refresh,
_timeout=_timeout * 2,
_remaining_tries=_remaining_tries - 1,
)
return
raise e

def _create_document_index(self, index_name: str, headers: Optional[Dict[str, str]] = None):
"""
Create a new index for storing documents. In case if an index with the name already exists, it ensures that
Expand Down Expand Up @@ -442,11 +502,11 @@ def write_documents(

# Pass batch_size number of documents to bulk
if len(documents_to_index) % batch_size == 0:
bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
documents_to_index = []

if documents_to_index:
bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)

def write_labels(
self,
Expand Down Expand Up @@ -500,11 +560,11 @@ def write_labels(

# Pass batch_size number of labels to bulk
if len(labels_to_index) % batch_size == 0:
bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
labels_to_index = []

if labels_to_index:
bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)

def update_document_meta(
self, id: str, meta: Dict[str, str], index: str = None, headers: Optional[Dict[str, str]] = None
Expand Down Expand Up @@ -1434,7 +1494,7 @@ def update_embeddings(
}
doc_updates.append(update)

bulk(self.client, doc_updates, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(documents=doc_updates, request_timeout=300, refresh=self.refresh_type, headers=headers)
progress_bar.update(batch_size)

def delete_all_documents(
Expand Down
46 changes: 45 additions & 1 deletion test/document_stores/test_opensearch.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging

from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest
import numpy as np

import opensearchpy
import elasticsearch

from haystack.document_stores.opensearch import (
OpenSearch,
OpenSearchDocumentStore,
Expand Down Expand Up @@ -807,6 +810,47 @@ def test_clone_embedding_field_update_mapping(self, mocked_document_store, index
},
}

@pytest.mark.unit
def test_bulk_write_retries_for_always_failing_insert_is_canceled(self, mocked_document_store, monkeypatch, caplog):
docs_to_write = [
{"meta": {"name": f"name_{i}"}, "content": f"text_{i}", "embedding": np.random.rand(768).astype(np.float32)}
for i in range(1000)
]

with patch("haystack.document_stores.elasticsearch.bulk") as mocked_bulk:
mocked_bulk.side_effect = elasticsearch.TransportError(429, "Too many requests")

with pytest.raises(DocumentStoreError, match="Bulk request failed because of too many retries."):
mocked_document_store._bulk(documents=docs_to_write, _timeout=0, _remaining_tries=3)

assert mocked_bulk.call_count == 3 # depth first search failes and cancels the whole bulk request

assert "Too Many Requeset" in caplog.text
assert " Splitting the number of documents into two chunks with the same size" in caplog.text

@pytest.mark.unit
def test_bulk_write_retries_with_backoff_with_smaller_batch_size_on_too_many_requests(
self, mocked_document_store, monkeypatch, caplog
):
docs_to_write = [
{"meta": {"name": f"name_{i}"}, "content": f"text_{i}", "embedding": np.random.rand(768).astype(np.float32)}
for i in range(1000)
]

with patch("haystack.document_stores.elasticsearch.bulk") as mocked_bulk:
# make bulk insert split documents and request retries s.t.
# 1k => 500 (failed) + 500 (successful) => 250 (successful) + 250 (successful)
# resulting in 5 calls in total
mocked_bulk.side_effect = [
elasticsearch.TransportError(429, "Too many requests"),
elasticsearch.TransportError(429, "Too many requests"),
None,
None,
None,
]
mocked_document_store._bulk(documents=docs_to_write, _timeout=0, _remaining_tries=3)
assert mocked_bulk.call_count == 5


class TestOpenDistroElasticsearchDocumentStore:
@pytest.mark.unit
Expand Down