Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix bug where a new writer advances their token too quickly
Browse files Browse the repository at this point in the history
When starting a new writer (for e.g. persisting events), the
`MultiWriterIdGenerator` doesn't have a minimum token for it as there
are no rows matching that new writer in the DB.

This results in the the first stream ID it acquired being announced as
persisted *before* it actually finishes persisting, if another writer
gets and persists a subsequent stream ID. This is due to the logic of
setting the minimum persisted position to the minimum known position of
across all writers, and the new writer starts off not being considered.
  • Loading branch information
erikjohnston committed Oct 12, 2023
1 parent cc865ff commit 15fcb24
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
8 changes: 8 additions & 0 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,14 @@ def sort_by_stream_id_key_func(row: Tuple[str, int]) -> int:
if instance == self._instance_name:
self._current_positions[instance] = stream_id

if self._writers:
# If we have explicit writers then make sure that each instance has
# a position.
for writer in self._writers:
self._current_positions.setdefault(
writer, self._persisted_upto_position
)

cur.close()

def _load_next_id_txn(self, txn: Cursor) -> int:
Expand Down
59 changes: 55 additions & 4 deletions tests/storage/test_id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ def test_empty(self) -> None:

id_gen = self._create_id_generator()

# The table is empty so we expect an empty map for positions
self.assertEqual(id_gen.get_positions(), {})
# The table is empty so we expect the map for positions to have a dummy
# minimum value.
self.assertEqual(id_gen.get_positions(), {"master": 1})

def test_single_instance(self) -> None:
"""Test that reads and writes from a single process are handled
Expand Down Expand Up @@ -398,6 +399,56 @@ async def _get_next_async2() -> None:
second_id_gen.advance("first", 8)
self.assertEqual(second_id_gen.get_positions(), {"first": 8, "second": 9})

def test_multi_instance_empty_row(self) -> None:
"""Test that reads and writes from multiple processes are handled
correctly, when one of the writers starts without any rows.
"""
# Insert some rows for two out of three of the ID gens.
self._insert_rows("first", 3)
self._insert_rows("second", 4)

first_id_gen = self._create_id_generator(
"first", writers=["first", "second", "third"]
)
second_id_gen = self._create_id_generator(
"second", writers=["first", "second", "third"]
)
third_id_gen = self._create_id_generator(
"third", writers=["first", "second", "third"]
)

self.assertEqual(
first_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7}
)
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7)
self.assertEqual(first_id_gen.get_current_token_for_writer("third"), 7)

self.assertEqual(
second_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7}
)
self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7)
self.assertEqual(second_id_gen.get_current_token_for_writer("third"), 7)

# Try allocating a new ID gen and check that we only see position
# advanced after we leave the context manager.

async def _get_next_async() -> None:
async with third_id_gen.get_next() as stream_id:
self.assertEqual(stream_id, 8)

self.assertEqual(
third_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7}
)
self.assertEqual(third_id_gen.get_persisted_upto_position(), 7)

self.get_success(_get_next_async())

self.assertEqual(
third_id_gen.get_positions(), {"first": 3, "second": 7, "third": 8}
)

def test_get_next_txn(self) -> None:
"""Test that the `get_next_txn` function works correctly."""

Expand Down Expand Up @@ -712,8 +763,8 @@ async def _get_next_async() -> None:

self.get_success(_get_next_async())

self.assertEqual(id_gen_1.get_positions(), {"first": -1})
self.assertEqual(id_gen_2.get_positions(), {"first": -1})
self.assertEqual(id_gen_1.get_positions(), {"first": -1, "second": -1})
self.assertEqual(id_gen_2.get_positions(), {"first": -1, "second": -1})
self.assertEqual(id_gen_1.get_persisted_upto_position(), -1)
self.assertEqual(id_gen_2.get_persisted_upto_position(), -1)

Expand Down

0 comments on commit 15fcb24

Please sign in to comment.