Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make SlavedIdTracker.advance have same interface as MultiWriterIDGenerator #8171

Merged
merged 2 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8171.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make `SlavedIdTracker.advance` have the same interface as `MultiWriterIDGenerator`.
4 changes: 2 additions & 2 deletions synapse/replication/slave/storage/_slaved_id_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ def __init__(self, db_conn, table, column, extra_tables=[], step=1):
self.step = step
self._current = _load_current_id(db_conn, table, column, step)
for table, column in extra_tables:
self.advance(_load_current_id(db_conn, table, column))
self.advance(None, _load_current_id(db_conn, table, column))

def advance(self, new_id):
def advance(self, instance_name, new_id):
self._current = (max if self.step > 0 else min)(self._current, new_id)

def get_current_token(self):
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/slave/storage/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ def get_max_account_data_stream_id(self):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == TagAccountDataStream.NAME:
self._account_data_id_gen.advance(token)
self._account_data_id_gen.advance(instance_name, token)
for row in rows:
self.get_tags_for_user.invalidate((row.user_id,))
self._account_data_stream_cache.entity_has_changed(row.user_id, token)
elif stream_name == AccountDataStream.NAME:
self._account_data_id_gen.advance(token)
self._account_data_id_gen.advance(instance_name, token)
for row in rows:
if not row.room_id:
self.get_global_account_data_by_type_for_user.invalidate(
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == ToDeviceStream.NAME:
self._device_inbox_id_gen.advance(token)
self._device_inbox_id_gen.advance(instance_name, token)
for row in rows:
if row.entity.startswith("@"):
self._device_inbox_stream_cache.entity_has_changed(
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == DeviceListsStream.NAME:
self._device_list_id_gen.advance(token)
self._device_list_id_gen.advance(instance_name, token)
self._invalidate_caches_for_devices(token, rows)
elif stream_name == UserSignatureStream.NAME:
self._device_list_id_gen.advance(token)
self._device_list_id_gen.advance(instance_name, token)
for row in rows:
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def get_group_stream_token(self):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == GroupServerStream.NAME:
self._group_updates_id_gen.advance(token)
self._group_updates_id_gen.advance(instance_name, token)
for row in rows:
self._group_updates_stream_cache.entity_has_changed(row.user_id, token)

Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def get_current_presence_token(self):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == PresenceStream.NAME:
self._presence_id_gen.advance(token)
self._presence_id_gen.advance(instance_name, token)
for row in rows:
self.presence_stream_cache.entity_has_changed(row.user_id, token)
self._get_presence_for_user.invalidate((row.user_id,))
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def process_replication_rows(self, stream_name, instance_name, token, rows):
assert isinstance(self._push_rules_stream_id_gen, SlavedIdTracker)

if stream_name == PushRulesStream.NAME:
self._push_rules_stream_id_gen.advance(token)
self._push_rules_stream_id_gen.advance(instance_name, token)
for row in rows:
self.get_push_rules_for_user.invalidate((row.user_id,))
self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ def get_pushers_stream_token(self):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == PushersStream.NAME:
self._pushers_id_gen.advance(token)
self._pushers_id_gen.advance(instance_name, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == ReceiptsStream.NAME:
self._receipts_id_gen.advance(token)
self._receipts_id_gen.advance(instance_name, token)
for row in rows:
self.invalidate_caches_for_receipt(
row.room_id, row.receipt_type, row.user_id
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ def get_current_public_room_stream_id(self):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == PublicRoomsStream.NAME:
self._public_room_id_gen.advance(token)
self._public_room_id_gen.advance(instance_name, token)

return super().process_replication_rows(stream_name, instance_name, token, rows)
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ def __init__(self, database: DatabasePool, db_conn, hs):

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == EventsStream.NAME:
self._stream_id_gen.advance(token)
self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(-token)
self._backfill_id_gen.advance(instance_name, -token)

super().process_replication_rows(stream_name, instance_name, token, rows)

Expand Down