diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 19b8728db9b9..d45102dfc85e 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -46,6 +46,9 @@ BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 +PURGE_HISTORY_LOCK_NAME = "purge_history_lock" + + @attr.s(slots=True, auto_attribs=True) class PurgeStatus: """Object tracking the status of a purge request @@ -142,6 +145,7 @@ def __init__(self, hs: "HomeServer"): self._server_name = hs.hostname self._room_shutdown_handler = hs.get_room_shutdown_handler() self._relations_handler = hs.get_relations_handler() + self._worker_locks = hs.get_worker_locks_handler() self.pagination_lock = ReadWriteLock() # IDs of rooms in which there currently an active purge *or delete* operation. @@ -356,7 +360,9 @@ async def _purge_history( """ self._purges_in_progress_by_room.add(room_id) try: - async with self.pagination_lock.write(room_id): + async with self._worker_locks.acquire_read_write_lock( + PURGE_HISTORY_LOCK_NAME, room_id, write=True + ): await self._storage_controllers.purge_events.purge_history( room_id, token, delete_local_events ) @@ -412,7 +418,9 @@ async def purge_room(self, room_id: str, force: bool = False) -> None: room_id: room to be purged force: set true to skip checking for joined users. """ - async with self.pagination_lock.write(room_id): + async with self._worker_locks.acquire_read_write_lock( + PURGE_HISTORY_LOCK_NAME, room_id, write=True + ): # first check that we have no users in this room if not force: joined = await self.store.is_host_joined(room_id, self._server_name) @@ -471,7 +479,9 @@ async def get_messages( room_token = from_token.room_key - async with self.pagination_lock.read(room_id): + async with self._worker_locks.acquire_read_write_lock( + PURGE_HISTORY_LOCK_NAME, room_id, write=False + ): (membership, member_event_id) = (None, None) if not use_admin_priviledge: ( @@ -747,7 +757,9 @@ async def _shutdown_and_purge_room( self._purges_in_progress_by_room.add(room_id) try: - async with self.pagination_lock.write(room_id): + async with self._worker_locks.acquire_read_write_lock( + PURGE_HISTORY_LOCK_NAME, room_id, write=True + ): self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN self._delete_by_id[ delete_id