Skip to content

Commit

Permalink
feat: exponential backoff with exp decreasing batch size for opensear…
Browse files Browse the repository at this point in the history
…ch client (#3194)

* Validate custom_mapping properly as an object

* Remove related test

* black

* feat: exponential backoff with exp dec batch size

* added docstring and split doc lsit

* fix

* fix mypy

* fix

* catch generic exception

* added test

* mypy ignore

* fixed no attribute

* added test

* added tests

* revert strange merge conflicts

* revert merge conflict again

* Update haystack/document_stores/elasticsearch.py

Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>

* done

* adjust test

* remove not required caplog

* fixed comments

Co-authored-by: ZanSara <sarazanzo94@gmail.com>
Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>
  • Loading branch information
3 people authored and brandenchan committed Sep 21, 2022
1 parent 65e7e13 commit 7dfc5ea
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 6 deletions.
73 changes: 68 additions & 5 deletions haystack/document_stores/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,69 @@ def __init__(
self.duplicate_documents = duplicate_documents
self.refresh_type = refresh_type

def _split_document_list(
self, documents: Union[List[dict], List[Document]], number_of_lists: int
) -> Generator[Union[List[dict], List[Document]], None, None]:
chunk_size = max((len(documents) + 1) // number_of_lists, 1)
for i in range(0, len(documents), chunk_size):
yield documents[i : i + chunk_size]

def _bulk(
self,
documents: Union[List[dict], List[Document]],
headers: Optional[Dict[str, str]] = None,
request_timeout: int = 300,
refresh: str = "wait_for",
_timeout: int = 1,
_remaining_tries: int = 10,
) -> None:
"""
Bulk index documents into Elasticsearch using a custom retry implementation that uses
exponential backoff and exponential batch size reduction to avoid overloading the cluster.
Opensearch/elasticsearch returns '429 Too Many Requests' when the write requests can't be
processed because there are too many requests in the queue or the single request is too large and exceeds the
memory of the nodes. Since the error code is the same for both of these cases we need to wait
and reduce the batch size simultaneously.
:param documents: List of documents to index
:param headers: Optional headers to pass to the bulk request
:param request_timeout: Timeout for the bulk request
:param refresh: Refresh policy for the bulk request
:param _timeout: Timeout for the exponential backoff
:param _remaining_tries: Number of remaining retries
"""

try:
bulk(self.client, documents, request_timeout=300, refresh=self.refresh_type, headers=headers)
except Exception as e:
if hasattr(e, "status_code") and e.status_code == 429: # type: ignore
logger.warning(
f"Failed to insert a batch of '{len(documents)}' documents because of a 'Too Many Requeset' response. Splitting the number of documents into two chunks with the same size and retrying in {_timeout} seconds."
)
if len(documents) == 1:
logger.warning(
"Failed to index a single document. Your indexing queue on the cluster is probably full. Try resizing your cluster or reducing the number of parallel processes that are writing to the cluster."
)

time.sleep(_timeout)

_remaining_tries -= 1
if _remaining_tries == 0:
raise DocumentStoreError("Last try of bulk indexing documents failed.")

for split_docs in self._split_document_list(documents, 2):
self._bulk(
documents=split_docs,
headers=headers,
request_timeout=request_timeout,
refresh=refresh,
_timeout=_timeout * 2,
_remaining_tries=_remaining_tries,
)
return
raise e

def _create_document_index(self, index_name: str, headers: Optional[Dict[str, str]] = None):
"""
Create a new index for storing documents. In case if an index with the name already exists, it ensures that
Expand Down Expand Up @@ -442,11 +505,11 @@ def write_documents(

# Pass batch_size number of documents to bulk
if len(documents_to_index) % batch_size == 0:
bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
documents_to_index = []

if documents_to_index:
bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(documents_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)

def write_labels(
self,
Expand Down Expand Up @@ -500,11 +563,11 @@ def write_labels(

# Pass batch_size number of labels to bulk
if len(labels_to_index) % batch_size == 0:
bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
labels_to_index = []

if labels_to_index:
bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)

def update_document_meta(
self, id: str, meta: Dict[str, str], index: str = None, headers: Optional[Dict[str, str]] = None
Expand Down Expand Up @@ -1434,7 +1497,7 @@ def update_embeddings(
}
doc_updates.append(update)

bulk(self.client, doc_updates, request_timeout=300, refresh=self.refresh_type, headers=headers)
self._bulk(documents=doc_updates, request_timeout=300, refresh=self.refresh_type, headers=headers)
progress_bar.update(batch_size)

def delete_all_documents(
Expand Down
45 changes: 44 additions & 1 deletion test/document_stores/test_opensearch.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging

from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest
import numpy as np

import opensearchpy

from haystack.document_stores.opensearch import (
OpenSearch,
OpenSearchDocumentStore,
Expand Down Expand Up @@ -807,6 +809,47 @@ def test_clone_embedding_field_update_mapping(self, mocked_document_store, index
},
}

@pytest.mark.unit
def test_bulk_write_retries_for_always_failing_insert_is_canceled(self, mocked_document_store, monkeypatch, caplog):
docs_to_write = [
{"meta": {"name": f"name_{i}"}, "content": f"text_{i}", "embedding": np.random.rand(768).astype(np.float32)}
for i in range(1000)
]

with patch("haystack.document_stores.elasticsearch.bulk") as mocked_bulk:
mocked_bulk.side_effect = opensearchpy.TransportError(429, "Too many requests")

with pytest.raises(DocumentStoreError, match="Last try of bulk indexing documents failed."):
mocked_document_store._bulk(documents=docs_to_write, _timeout=0, _remaining_tries=3)

assert mocked_bulk.call_count == 3 # depth first search failes and cancels the whole bulk request

assert "Too Many Requeset" in caplog.text
assert " Splitting the number of documents into two chunks with the same size" in caplog.text

@pytest.mark.unit
def test_bulk_write_retries_with_backoff_with_smaller_batch_size_on_too_many_requests(
self, mocked_document_store, monkeypatch
):
docs_to_write = [
{"meta": {"name": f"name_{i}"}, "content": f"text_{i}", "embedding": np.random.rand(768).astype(np.float32)}
for i in range(1000)
]

with patch("haystack.document_stores.elasticsearch.bulk") as mocked_bulk:
# make bulk insert split documents and request retries s.t.
# 1k => 500 (failed) + 500 (successful) => 250 (successful) + 250 (successful)
# resulting in 5 calls in total
mocked_bulk.side_effect = [
opensearchpy.TransportError(429, "Too many requests"),
opensearchpy.TransportError(429, "Too many requests"),
None,
None,
None,
]
mocked_document_store._bulk(documents=docs_to_write, _timeout=0, _remaining_tries=3)
assert mocked_bulk.call_count == 5


class TestOpenDistroElasticsearchDocumentStore:
@pytest.mark.unit
Expand Down

0 comments on commit 7dfc5ea

Please sign in to comment.