This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Increase perf of handling presence when joining large rooms. #9916
Merged
Merged
Changes from 1 commit
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
996c0ce
Use get_current_users_in_room from store and not StateHandler
erikjohnston 0c8cd62
Newsfile
erikjohnston 6640fb4
Use correct name
erikjohnston 0ed608c
Increase perf of handling presence when joining large rooms.
erikjohnston 68b6106
Newsfile
erikjohnston 48cf260
Process state deltas in presence by room
erikjohnston 4caa84b
Use lists instead of sets where appropriate
erikjohnston 43b8f20
Merge remote-tracking branch 'origin/develop' into erikj/efficient_pr…
erikjohnston 9b109fd
Fiddle with changelogs
erikjohnston File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1196,8 +1196,11 @@ async def _handle_state_delta(self, deltas: List[JsonDict]) -> None: | |
"""Process current state deltas to find new joins that need to be | ||
handled. | ||
""" | ||
# A map of destination to a set of user state that they should receive | ||
presence_destinations = {} # type: Dict[str, Set[UserPresenceState]] | ||
|
||
# Sets of newly joined users. Note that if the local server is | ||
# joining a remote room for the first time we'll see both the joining | ||
# user and all remote users as newly joined. | ||
newly_joined_users = set() | ||
|
||
for delta in deltas: | ||
typ = delta["type"] | ||
|
@@ -1231,72 +1234,55 @@ async def _handle_state_delta(self, deltas: List[JsonDict]) -> None: | |
# Ignore changes to join events. | ||
continue | ||
|
||
# Retrieve any user presence state updates that need to be sent as a result, | ||
# and the destinations that need to receive it | ||
destinations, user_presence_states = await self._on_user_joined_room( | ||
room_id, state_key | ||
) | ||
|
||
# Insert the destinations and respective updates into our destinations dict | ||
for destination in destinations: | ||
presence_destinations.setdefault(destination, set()).update( | ||
user_presence_states | ||
) | ||
|
||
# Send out user presence updates for each destination | ||
for destination, user_state_set in presence_destinations.items(): | ||
self._federation_queue.send_presence_to_destinations( | ||
destinations=[destination], states=user_state_set | ||
) | ||
|
||
async def _on_user_joined_room( | ||
self, room_id: str, user_id: str | ||
) -> Tuple[List[str], List[UserPresenceState]]: | ||
"""Called when we detect a user joining the room via the current state | ||
delta stream. Returns the destinations that need to be updated and the | ||
presence updates to send to them. | ||
|
||
Args: | ||
room_id: The ID of the room that the user has joined. | ||
user_id: The ID of the user that has joined the room. | ||
|
||
Returns: | ||
A tuple of destinations and presence updates to send to them. | ||
""" | ||
if self.is_mine_id(user_id): | ||
# If this is a local user then we need to send their presence | ||
# out to hosts in the room (who don't already have it) | ||
|
||
# TODO: We should be able to filter the hosts down to those that | ||
# haven't previously seen the user | ||
|
||
remote_hosts = await self.state.get_current_hosts_in_room(room_id) | ||
|
||
# Filter out ourselves. | ||
filtered_remote_hosts = [ | ||
host for host in remote_hosts if host != self.server_name | ||
] | ||
|
||
state = await self.current_state_for_user(user_id) | ||
return filtered_remote_hosts, [state] | ||
else: | ||
# A remote user has joined the room, so we need to: | ||
# 1. Check if this is a new server in the room | ||
# 2. If so send any presence they don't already have for | ||
# local users in the room. | ||
|
||
# TODO: We should be able to filter the users down to those that | ||
# the server hasn't previously seen | ||
newly_joined_users.add(state_key) | ||
|
||
# TODO: Check that this is actually a new server joining the | ||
# room. | ||
|
||
remote_host = get_domain_from_id(user_id) | ||
if not newly_joined_users: | ||
# If nobody has joined then there's nothing to do. | ||
return | ||
|
||
users = await self.store.get_users_in_room(room_id) | ||
user_ids = list(filter(self.is_mine_id, users)) | ||
# We want to send: | ||
# 1. presence states of all local users in the room to newly joined | ||
# remote servers | ||
# 2. presence states of newly joined users to all remote servers in | ||
# the room. | ||
# | ||
# TODO: Only send presence states to remote hosts that don't already | ||
# have them (because they already share rooms). | ||
|
||
# Get all the users who were already in the room, by fetching the | ||
# current users in the room and removing the newly joined users. | ||
users = await self.store.get_users_in_room(room_id) | ||
prev_users = set(users) - newly_joined_users | ||
|
||
# Construct sets for all the local users and remote hosts that were | ||
# already in the room | ||
prev_local_users = set() | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
prev_remote_hosts = set() | ||
for user_id in prev_users: | ||
if self.is_mine_id(user_id): | ||
prev_local_users.add(user_id) | ||
else: | ||
prev_remote_hosts.add(get_domain_from_id(user_id)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if a general "partition a user list by homeserver" function would be generally useful. just a thought, no need to do anything about it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that would probably make a lot of sense |
||
|
||
# Similarly, construct sets for all the local users and remote hosts | ||
# that were *not* already in the room. Care needs to be taken with the | ||
# calculating the remote hosts, as a host may have already been in the | ||
# room even if there is a newly joined user from that host. | ||
newly_joined_local_users = set() | ||
newly_joined_remote_hosts = set() | ||
for user_id in newly_joined_users: | ||
if self.is_mine_id(user_id): | ||
newly_joined_local_users.add(user_id) | ||
else: | ||
host = get_domain_from_id(user_id) | ||
if host not in prev_remote_hosts: | ||
newly_joined_remote_hosts.add(host) | ||
|
||
states_d = await self.current_state_for_users(user_ids) | ||
# Send presence states of all local users in the room to newly joined | ||
# remote servers. (We actually only send states for local users already | ||
# in the room, as we'll send states for newly joined local users below.) | ||
if prev_local_users and newly_joined_remote_hosts: | ||
local_states = await self.current_state_for_users(prev_local_users) | ||
|
||
# Filter out old presence, i.e. offline presence states where | ||
# the user hasn't been active for a week. We can change this | ||
|
@@ -1306,13 +1292,27 @@ async def _on_user_joined_room( | |
now = self.clock.time_msec() | ||
states = [ | ||
state | ||
for state in states_d.values() | ||
for state in local_states.values() | ||
if state.state != PresenceState.OFFLINE | ||
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 | ||
or state.status_msg is not None | ||
] | ||
|
||
return [remote_host], states | ||
self._federation_queue.send_presence_to_destinations( | ||
destinations=newly_joined_remote_hosts, | ||
states=states, | ||
) | ||
|
||
# Send presence states of newly joined users to all remote servers in | ||
# the room | ||
if newly_joined_local_users and ( | ||
prev_remote_hosts or newly_joined_remote_hosts | ||
): | ||
local_states = await self.current_state_for_users(newly_joined_local_users) | ||
self._federation_queue.send_presence_to_destinations( | ||
destinations=prev_remote_hosts | newly_joined_remote_hosts, | ||
states=list(local_states.values()), | ||
) | ||
|
||
|
||
def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we rely on all deltas being for the same room id? and can we rely on there being at least one delta (so that
room_id
is set)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that is a good point. I don't think we can rely on there only being one room per call (though that is the common path).