From c3c6c0e6222cc1bc8ae35a66389dc428d0ddbc92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Feb 2020 11:15:11 +0000 Subject: [PATCH 1/7] Add 'device_lists_outbound_pokes' as extra table. This makes sure we check all the relevant tables to get the current max stream ID. Currently not doing so isn't problematic as the max stream ID in `device_lists_outbound_pokes` is the same as in `device_lists_stream`, however that will change. --- synapse/replication/slave/storage/devices.py | 8 +++++++- synapse/storage/data_stores/main/__init__.py | 5 ++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 1c77687eea9b..bf46cc4f8a78 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -29,7 +29,13 @@ def __init__(self, database: Database, db_conn, hs): self.hs = hs self._device_list_id_gen = SlavedIdTracker( - db_conn, "device_lists_stream", "stream_id" + db_conn, + "device_lists_stream", + "stream_id", + extra_tables=[ + ("user_signature_stream", "stream_id"), + ("device_lists_outbound_pokes", "stream_id"), + ], ) device_list_max = self._device_list_id_gen.get_current_token() self._device_list_stream_cache = StreamChangeCache( diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index acca079f2320..649e835303bd 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -144,7 +144,10 @@ def __init__(self, database: Database, db_conn, hs): db_conn, "device_lists_stream", "stream_id", - extra_tables=[("user_signature_stream", "stream_id")], + extra_tables=[ + ("user_signature_stream", "stream_id"), + ("device_lists_outbound_pokes", "stream_id"), + ], ) self._cross_signing_id_gen = StreamIdGenerator( db_conn, "e2e_cross_signing_keys", "stream_id" From f5caa1864e3d3c24c691b3a3bff723f77def129e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Feb 2020 11:21:25 +0000 Subject: [PATCH 2/7] Change device lists stream to have one row per id. This will make it possible to process the streams more incrementally, avoiding having to process large chunks at once. --- synapse/storage/data_stores/main/devices.py | 59 ++++++++++++++------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index d55733a4cd9f..3299607910bb 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -1017,29 +1017,41 @@ def add_device_change_to_streams(self, user_id, device_ids, hosts): """Persist that a user's devices have been updated, and which hosts (if any) should be poked. """ - with self._device_list_id_gen.get_next() as stream_id: + if not device_ids: + return + + with self._device_list_id_gen.get_next_mult(len(device_ids)) as stream_ids: yield self.db.runInteraction( - "add_device_change_to_streams", - self._add_device_change_txn, + "add_device_change_to_stream", + self._add_device_change_to_stream_txn, + user_id, + device_ids, + stream_ids, + ) + + if not hosts: + return stream_ids[-1] + + context = get_active_span_text_map() + with self._device_list_id_gen.get_next_mult( + len(hosts) * len(device_ids) + ) as stream_ids: + yield self.db.runInteraction( + "add_device_outbound_poke_to_stream", + self._add_device_outbound_poke_to_stream_txn, user_id, device_ids, hosts, - stream_id, + stream_ids, + context, ) - return stream_id - def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): - now = self._clock.time_msec() + return stream_ids[-1] + def _add_device_change_to_stream_txn(self, txn, user_id, device_ids, stream_ids): txn.call_after( - self._device_list_stream_cache.entity_has_changed, user_id, stream_id + self._device_list_stream_cache.entity_has_changed, user_id, stream_ids[-1], ) - for host in hosts: - txn.call_after( - self._device_list_federation_stream_cache.entity_has_changed, - host, - stream_id, - ) # Delete older entries in the table, as we really only care about # when the latest change happened. @@ -1048,7 +1060,7 @@ def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): DELETE FROM device_lists_stream WHERE user_id = ? AND device_id = ? AND stream_id < ? """, - [(user_id, device_id, stream_id) for device_id in device_ids], + [(user_id, device_id, stream_ids[0]) for device_id in device_ids], ) self.db.simple_insert_many_txn( @@ -1056,11 +1068,22 @@ def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): table="device_lists_stream", values=[ {"stream_id": stream_id, "user_id": user_id, "device_id": device_id} - for device_id in device_ids + for stream_id, device_id in zip(stream_ids, device_ids) ], ) - context = get_active_span_text_map() + def _add_device_outbound_poke_to_stream_txn( + self, txn, user_id, device_ids, hosts, stream_ids, context, + ): + for host in hosts: + txn.call_after( + self._device_list_federation_stream_cache.entity_has_changed, + host, + stream_ids[-1], + ) + + now = self._clock.time_msec() + next_stream_id = iter(stream_ids) self.db.simple_insert_many_txn( txn, @@ -1068,7 +1091,7 @@ def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): values=[ { "destination": destination, - "stream_id": stream_id, + "stream_id": next(next_stream_id), "user_id": user_id, "device_id": device_id, "sent": False, From 9ce4e344a808e15a36a2d9ea03b77ebfc6ac7fe2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Feb 2020 11:24:05 +0000 Subject: [PATCH 3/7] Change device list replication to match new semantics. Instead of sending down batches of user ID/host tuples, send down a row per entity (user ID or host). --- synapse/app/generic_worker.py | 2 +- synapse/replication/slave/storage/devices.py | 25 ++++++++++---------- synapse/replication/tcp/streams/_base.py | 13 ++++++---- synapse/storage/data_stores/main/devices.py | 15 +++++++----- 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index b2c764bfe85b..561a6f4b22f6 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -774,7 +774,7 @@ def process_replication_rows(self, stream_name, token, rows): # ... as well as device updates and messages elif stream_name == DeviceListsStream.NAME: - hosts = {row.destination for row in rows} + hosts = {row.entity for row in rows if not row.entity.startswith("@")} for host in hosts: self.federation_sender.send_device_messages(host) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index bf46cc4f8a78..01a4f85884b3 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -61,23 +61,24 @@ def stream_positions(self): def process_replication_rows(self, stream_name, token, rows): if stream_name == DeviceListsStream.NAME: self._device_list_id_gen.advance(token) - for row in rows: - self._invalidate_caches_for_devices(token, row.user_id, row.destination) + self._invalidate_caches_for_devices(token, rows) elif stream_name == UserSignatureStream.NAME: + self._device_list_id_gen.advance(token) for row in rows: self._user_signature_stream_cache.entity_has_changed(row.user_id, token) return super(SlavedDeviceStore, self).process_replication_rows( stream_name, token, rows ) - def _invalidate_caches_for_devices(self, token, user_id, destination): - self._device_list_stream_cache.entity_has_changed(user_id, token) - - if destination: - self._device_list_federation_stream_cache.entity_has_changed( - destination, token - ) + def _invalidate_caches_for_devices(self, token, rows): + for row in rows: + if row.entity.startswith("@"): + self._device_list_stream_cache.entity_has_changed(row.entity, token) + self.get_cached_devices_for_user.invalidate((row.entity,)) + self._get_cached_user_device.invalidate_many((row.entity,)) + self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,)) - self.get_cached_devices_for_user.invalidate((user_id,)) - self._get_cached_user_device.invalidate_many((user_id,)) - self.get_device_list_last_stream_id_for_remote.invalidate((user_id,)) + else: + self._device_list_federation_stream_cache.entity_has_changed( + row.entity, token + ) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 208e8a667b18..7a8b6e9df1d6 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -94,9 +94,13 @@ class CachesStreamRow: "network_id", # str, optional ), ) -DeviceListsStreamRow = namedtuple( - "DeviceListsStreamRow", ("user_id", "destination") # str # str -) + + +@attr.s +class DeviceListsStreamRow: + entity = attr.ib(type=str) + + ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str TagAccountDataStreamRow = namedtuple( "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict @@ -363,7 +367,8 @@ def __init__(self, hs): class DeviceListsStream(Stream): - """Someone added/changed/removed a device + """Either a user has updated their devices or a remote server needs to be + told about a device update. """ NAME = "device_lists" diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 3299607910bb..768afe7a6cda 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -612,15 +612,18 @@ def get_all_device_list_changes_for_remotes(self, from_key, to_key): combined list of changes to devices, and which destinations need to be poked. `destination` may be None if no destinations need to be poked. """ - # We do a group by here as there can be a large number of duplicate - # entries, since we throw away device IDs. + + # This query Does The Right Thing where it'll correctly apply the + # bounds to the inner queries. sql = """ - SELECT MAX(stream_id) AS stream_id, user_id, destination - FROM device_lists_stream - LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) + SELECT stream_id, entity FROM ( + SELECT stream_id, user_id AS entity FROM device_lists_stream + UNION ALL + SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes + ) AS e WHERE ? < stream_id AND stream_id <= ? - GROUP BY user_id, destination """ + return self.db.execute( "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key ) From 59ad93d2a415cd07ab6f6afd490d0a5ceeec93a0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Feb 2020 11:27:37 +0000 Subject: [PATCH 4/7] Newsfile --- changelog.d/7010.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7010.misc diff --git a/changelog.d/7010.misc b/changelog.d/7010.misc new file mode 100644 index 000000000000..4ba1f6cdf8fc --- /dev/null +++ b/changelog.d/7010.misc @@ -0,0 +1 @@ +Change device list streams to have one row per ID. From f70f44abc73689a66d0a05dc703ca38241092174 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Feb 2020 11:45:35 +0000 Subject: [PATCH 5/7] Remove handling of multiple rows per ID --- synapse/storage/data_stores/main/devices.py | 35 +--------------- tests/storage/test_devices.py | 45 --------------------- 2 files changed, 1 insertion(+), 79 deletions(-) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 768afe7a6cda..06e1d9f0333c 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -112,23 +112,13 @@ def get_device_updates_by_remote(self, destination, from_stream_id, limit): if not has_changed: return now_stream_id, [] - # We retrieve n+1 devices from the list of outbound pokes where n is - # our outbound device update limit. We then check if the very last - # device has the same stream_id as the second-to-last device. If so, - # then we ignore all devices with that stream_id and only send the - # devices with a lower stream_id. - # - # If when culling the list we end up with no devices afterwards, we - # consider the device update to be too large, and simply skip the - # stream_id; the rationale being that such a large device list update - # is likely an error. updates = yield self.db.runInteraction( "get_device_updates_by_remote", self._get_device_updates_by_remote_txn, destination, from_stream_id, now_stream_id, - limit + 1, + limit, ) # Return an empty list if there are no updates @@ -166,14 +156,6 @@ def get_device_updates_by_remote(self, destination, from_stream_id, limit): "device_id": verify_key.version, } - # if we have exceeded the limit, we need to exclude any results with the - # same stream_id as the last row. - if len(updates) > limit: - stream_id_cutoff = updates[-1][2] - now_stream_id = stream_id_cutoff - 1 - else: - stream_id_cutoff = None - # Perform the equivalent of a GROUP BY # # Iterate through the updates list and copy non-duplicate @@ -192,10 +174,6 @@ def get_device_updates_by_remote(self, destination, from_stream_id, limit): query_map = {} cross_signing_keys_by_user = {} for user_id, device_id, update_stream_id, update_context in updates: - if stream_id_cutoff is not None and update_stream_id >= stream_id_cutoff: - # Stop processing updates - break - if ( user_id in master_key_by_user and device_id == master_key_by_user[user_id]["device_id"] @@ -218,17 +196,6 @@ def get_device_updates_by_remote(self, destination, from_stream_id, limit): if update_stream_id > previous_update_stream_id: query_map[key] = (update_stream_id, update_context) - # If we didn't find any updates with a stream_id lower than the cutoff, it - # means that there are more than limit updates all of which have the same - # steam_id. - - # That should only happen if a client is spamming the server with new - # devices, in which case E2E isn't going to work well anyway. We'll just - # skip that stream_id and return an empty list, and continue with the next - # stream_id next time. - if not query_map and not cross_signing_keys_by_user: - return stream_id_cutoff, [] - results = yield self._get_device_update_edus_by_remote( destination, from_stream_id, query_map ) diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py index 6f8d990959e8..c2539b353ace 100644 --- a/tests/storage/test_devices.py +++ b/tests/storage/test_devices.py @@ -88,51 +88,6 @@ def test_get_device_updates_by_remote(self): # Check original device_ids are contained within these updates self._check_devices_in_updates(device_ids, device_updates) - @defer.inlineCallbacks - def test_get_device_updates_by_remote_limited(self): - # Test breaking the update limit in 1, 101, and 1 device_id segments - - # first add one device - device_ids1 = ["device_id0"] - yield self.store.add_device_change_to_streams( - "user_id", device_ids1, ["someotherhost"] - ) - - # then add 101 - device_ids2 = ["device_id" + str(i + 1) for i in range(101)] - yield self.store.add_device_change_to_streams( - "user_id", device_ids2, ["someotherhost"] - ) - - # then one more - device_ids3 = ["newdevice"] - yield self.store.add_device_change_to_streams( - "user_id", device_ids3, ["someotherhost"] - ) - - # - # now read them back. - # - - # first we should get a single update - now_stream_id, device_updates = yield self.store.get_device_updates_by_remote( - "someotherhost", -1, limit=100 - ) - self._check_devices_in_updates(device_ids1, device_updates) - - # Then we should get an empty list back as the 101 devices broke the limit - now_stream_id, device_updates = yield self.store.get_device_updates_by_remote( - "someotherhost", now_stream_id, limit=100 - ) - self.assertEqual(len(device_updates), 0) - - # The 101 devices should've been cleared, so we should now just get one device - # update - now_stream_id, device_updates = yield self.store.get_device_updates_by_remote( - "someotherhost", now_stream_id, limit=100 - ) - self._check_devices_in_updates(device_ids3, device_updates) - def _check_devices_in_updates(self, expected_device_ids, device_updates): """Check that an specific device ids exist in a list of device update EDUs""" self.assertEqual(len(device_updates), len(expected_device_ids)) From e53744c737527ebb2af94b677b359743473b0434 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2020 12:52:28 +0000 Subject: [PATCH 6/7] Fix worker handling --- synapse/app/generic_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 561a6f4b22f6..d5968524197d 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -676,8 +676,9 @@ async def process_and_notify(self, stream_name, token, rows): elif stream_name == "device_lists": all_room_ids = set() for row in rows: - room_ids = await self.store.get_rooms_for_user(row.user_id) - all_room_ids.update(room_ids) + if row.entity.startswith("@"): + room_ids = await self.store.get_rooms_for_user(row.entity) + all_room_ids.update(room_ids) self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) elif stream_name == "presence": await self.presence_handler.process_replication_rows(token, rows) From 6e6476ef07c2d72fbea85603f2eb2a61a6866732 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Mar 2020 10:13:55 +0000 Subject: [PATCH 7/7] Comments from review --- synapse/app/generic_worker.py | 3 +++ synapse/replication/slave/storage/devices.py | 3 +++ synapse/storage/data_stores/main/devices.py | 27 ++++++++++++++------ 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index d5968524197d..cdc078cf1106 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -775,6 +775,9 @@ def process_replication_rows(self, stream_name, token, rows): # ... as well as device updates and messages elif stream_name == DeviceListsStream.NAME: + # The entities are either user IDs (starting with '@') whose devices + # have changed, or remote servers that we need to tell about + # changes. hosts = {row.entity for row in rows if not row.entity.startswith("@")} for host in hosts: self.federation_sender.send_device_messages(host) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 01a4f85884b3..23b1650e41af 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -72,6 +72,9 @@ def process_replication_rows(self, stream_name, token, rows): def _invalidate_caches_for_devices(self, token, rows): for row in rows: + # The entities are either user IDs (starting with '@') whose devices + # have changed, or remote servers that we need to tell about + # changes. if row.entity.startswith("@"): self._device_list_stream_cache.entity_has_changed(row.entity, token) self.get_cached_devices_for_user.invalidate((row.entity,)) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 06e1d9f0333c..4c19c02bbca2 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List, Tuple from six import iteritems @@ -31,7 +32,7 @@ ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause -from synapse.storage.database import Database +from synapse.storage.database import Database, LoggingTransaction from synapse.types import Collection, get_verify_key_from_cross_signing_key from synapse.util.caches.descriptors import ( Cache, @@ -574,10 +575,12 @@ def get_users_whose_signatures_changed(self, user_id, from_key): else: return set() - def get_all_device_list_changes_for_remotes(self, from_key, to_key): - """Return a list of `(stream_id, user_id, destination)` which is the - combined list of changes to devices, and which destinations need to be - poked. `destination` may be None if no destinations need to be poked. + async def get_all_device_list_changes_for_remotes( + self, from_key: int, to_key: int + ) -> List[Tuple[int, str]]: + """Return a list of `(stream_id, entity)` which is the combined list of + changes to devices and which destinations need to be poked. Entity is + either a user ID (starting with '@') or a remote destination. """ # This query Does The Right Thing where it'll correctly apply the @@ -591,7 +594,7 @@ def get_all_device_list_changes_for_remotes(self, from_key, to_key): WHERE ? < stream_id AND stream_id <= ? """ - return self.db.execute( + return await self.db.execute( "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key ) @@ -1018,11 +1021,19 @@ def add_device_change_to_streams(self, user_id, device_ids, hosts): return stream_ids[-1] - def _add_device_change_to_stream_txn(self, txn, user_id, device_ids, stream_ids): + def _add_device_change_to_stream_txn( + self, + txn: LoggingTransaction, + user_id: str, + device_ids: Collection[str], + stream_ids: List[str], + ): txn.call_after( self._device_list_stream_cache.entity_has_changed, user_id, stream_ids[-1], ) + min_stream_id = stream_ids[0] + # Delete older entries in the table, as we really only care about # when the latest change happened. txn.executemany( @@ -1030,7 +1041,7 @@ def _add_device_change_to_stream_txn(self, txn, user_id, device_ids, stream_ids) DELETE FROM device_lists_stream WHERE user_id = ? AND device_id = ? AND stream_id < ? """, - [(user_id, device_id, stream_ids[0]) for device_id in device_ids], + [(user_id, device_id, min_stream_id) for device_id in device_ids], ) self.db.simple_insert_many_txn(