Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Comments on the /sync tentacles (#11494)
Browse files Browse the repository at this point in the history
This mainly consists of docstrings and inline comments. There are one or two type annotations and variable renames thrown in while I was here.

Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
  • Loading branch information
David Robertson and clokep authored Dec 2, 2021
1 parent f91624a commit d26808d
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 43 deletions.
1 change: 1 addition & 0 deletions changelog.d/11494.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add comments to various parts of the `/sync` handler.
156 changes: 117 additions & 39 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,19 @@ async def _wait_for_sync_for_user(
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> SyncResult:
"""The start of the machinery that produces a /sync response.
See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
This method does high-level bookkeeping:
- tracking the kind of sync in the logging context
- deleting any to_device messages whose delivery has been acknowledged.
- deciding if we should dispatch an instant or delayed response
- marking the sync as being lazily loaded, if appropriate
Computing the body of the response begins in the next method,
`current_sync_for_user`.
"""
if since_token is None:
sync_type = "initial_sync"
elif full_state:
Expand Down Expand Up @@ -363,7 +376,7 @@ async def _wait_for_sync_for_user(
sync_config, since_token, full_state=full_state
)
else:

# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SyncResult:
Expand Down Expand Up @@ -402,7 +415,12 @@ async def current_sync_for_user(
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
"""Get the sync for client needed to match what the server has now."""
"""Generates the response body of a sync result, represented as a SyncResult.
This is a wrapper around `generate_sync_result` which starts an open tracing
span to track the sync. See `generate_sync_result` for the next part of your
indoctrination.
"""
with start_active_span("current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
Expand Down Expand Up @@ -560,7 +578,7 @@ async def _load_filtered_recents(
# that have happened since `since_key` up to `end_key`, so we
# can just use `get_room_events_stream_for_room`.
# Otherwise, we want to return the last N events in the room
# in toplogical ordering.
# in topological ordering.
if since_key:
events, end_key = await self.store.get_room_events_stream_for_room(
room_id,
Expand Down Expand Up @@ -1042,7 +1060,18 @@ async def generate_sync_result(
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
"""Generates a sync result."""
"""Generates the response body of a sync result.
This is represented by a `SyncResult` struct, which is built from small pieces
using a `SyncResultBuilder`. See also
https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
the `sync_result_builder` is passed as a mutable ("inout") parameter to various
helper functions. These retrieve and process the data which forms the sync body,
often writing to the `sync_result_builder` to store their output.
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
instance to signify that the sync calculation is complete.
"""
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
Expand Down Expand Up @@ -1344,22 +1373,30 @@ async def _generate_sync_entry_for_to_device(
async def _generate_sync_entry_for_account_data(
self, sync_result_builder: "SyncResultBuilder"
) -> Dict[str, Dict[str, JsonDict]]:
"""Generates the account data portion of the sync response. Populates
`sync_result_builder` with the result.
"""Generates the account data portion of the sync response.
Account data (called "Client Config" in the spec) can be set either globally
or for a specific room. Account data consists of a list of events which
accumulate state, much like a room.
This function retrieves global and per-room account data. The former is written
to the given `sync_result_builder`. The latter is returned directly, to be
later written to the `sync_result_builder` on a room-by-room basis.
Args:
sync_result_builder
Returns:
A dictionary containing the per room account data.
A dictionary whose keys (room ids) map to the per room account data for that
room.
"""
sync_config = sync_result_builder.sync_config
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token

if since_token and not sync_result_builder.full_state:
(
account_data,
global_account_data,
account_data_by_room,
) = await self.store.get_updated_account_data_for_user(
user_id, since_token.account_data_key
Expand All @@ -1370,23 +1407,23 @@ async def _generate_sync_entry_for_account_data(
)

if push_rules_changed:
account_data["m.push_rules"] = await self.push_rules_for_user(
global_account_data["m.push_rules"] = await self.push_rules_for_user(
sync_config.user
)
else:
(
account_data,
global_account_data,
account_data_by_room,
) = await self.store.get_account_data_for_user(sync_config.user.to_string())

account_data["m.push_rules"] = await self.push_rules_for_user(
global_account_data["m.push_rules"] = await self.push_rules_for_user(
sync_config.user
)

account_data_for_user = await sync_config.filter_collection.filter_account_data(
[
{"type": account_data_type, "content": content}
for account_data_type, content in account_data.items()
for account_data_type, content in global_account_data.items()
]
)

Expand Down Expand Up @@ -1460,15 +1497,22 @@ async def _generate_sync_entry_for_rooms(
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
In the response that reaches the client, rooms are divided into four categories:
`invite`, `join`, `knock`, `leave`. These aren't the same as the four sets of
room ids returned by this function.
Args:
sync_result_builder
account_data_by_room: Dictionary of per room account data
Returns:
Returns a 4-tuple of
`(newly_joined_rooms, newly_joined_or_invited_users,
newly_left_rooms, newly_left_users)`
Returns a 4-tuple whose entries are:
- newly_joined_rooms
- newly_joined_or_invited_or_knocked_users
- newly_left_rooms
- newly_left_users
"""
# Start by fetching all ephemeral events in rooms we've joined (if required).
user_id = sync_result_builder.sync_config.user.to_string()
block_all_room_ephemeral = (
sync_result_builder.since_token is None
Expand Down Expand Up @@ -1590,19 +1634,22 @@ async def _have_rooms_changed(
) -> bool:
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
Does not modify the `sync_result_builder`.
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token

assert since_token

# Get a list of membership change events that have happened.
rooms_changed = await self.store.get_membership_changes_for_user(
# Get a list of membership change events that have happened to the user
# requesting the sync.
membership_changes = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
)

if rooms_changed:
if membership_changes:
return True

stream_id = since_token.room_key.stream
Expand All @@ -1614,29 +1661,62 @@ async def _have_rooms_changed(
async def _get_rooms_changed(
self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
) -> _RoomChanges:
"""Gets the the changes that have happened since the last sync."""
"""Determine the changes in rooms to report to the user.
Ideally, we want to report all events whose stream ordering `s` lies in the
range `since_token < s <= now_token`, where the two tokens are read from the
sync_result_builder.
If there are too many events in that range to report, things get complicated.
In this situation we return a truncated list of the most recent events, and
indicate in the response that there is a "gap" of omitted events. Additionally:
- we include a "state_delta", to describe the changes in state over the gap,
- we include all membership events applying to the user making the request,
even those in the gap.
See the spec for the rationale:
https://spec.matrix.org/v1.1/client-server-api/#syncing
The sync_result_builder is not modified by this function.
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config

assert since_token

# Get a list of membership change events that have happened.
rooms_changed = await self.store.get_membership_changes_for_user(
# The spec
# https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
# notes that membership events need special consideration:
#
# > When a sync is limited, the server MUST return membership events for events
# > in the gap (between since and the start of the returned timeline), regardless
# > as to whether or not they are redundant.
#
# We fetch such events here, but we only seem to use them for categorising rooms
# as newly joined, newly left, invited or knocked.
# TODO: we've already called this function and ran this query in
# _have_rooms_changed. We could keep the results in memory to avoid a
# second query, at the cost of more complicated source code.
membership_change_events = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
)

mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
for event in rooms_changed:
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)

newly_joined_rooms = []
newly_left_rooms = []
room_entries = []
invited = []
knocked = []
newly_joined_rooms: List[str] = []
newly_left_rooms: List[str] = []
room_entries: List[RoomSyncResultBuilder] = []
invited: List[InvitedSyncResult] = []
knocked: List[KnockedSyncResult] = []
for room_id, events in mem_change_events_by_room_id.items():
# The body of this loop will add this room to at least one of the five lists
# above. Things get messy if you've e.g. joined, left, joined then left the
# room all in the same sync period.
logger.debug(
"Membership changes in %s: [%s]",
room_id,
Expand Down Expand Up @@ -1781,7 +1861,9 @@ async def _get_rooms_changed(

timeline_limit = sync_config.filter_collection.timeline_limit()

# Get all events for rooms we're currently joined to.
# Get all events since the `from_key` in rooms we're currently joined to.
# If there are too many, we get the most recent events only. This leaves
# a "gap" in the timeline, as described by the spec for /sync.
room_to_events = await self.store.get_room_events_stream_for_rooms(
room_ids=sync_result_builder.joined_room_ids,
from_key=since_token.room_key,
Expand Down Expand Up @@ -1842,6 +1924,10 @@ async def _get_all_rooms(
) -> _RoomChanges:
"""Returns entries for all rooms for the user.
Like `_get_rooms_changed`, but assumes the `since_token` is `None`.
This function does not modify the sync_result_builder.
Args:
sync_result_builder
ignored_users: Set of users ignored by user.
Expand All @@ -1853,16 +1939,9 @@ async def _get_all_rooms(
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config

membership_list = (
Membership.INVITE,
Membership.KNOCK,
Membership.JOIN,
Membership.LEAVE,
Membership.BAN,
)

room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id, membership_list=membership_list
user_id=user_id,
membership_list=Membership.LIST,
)

room_entries = []
Expand Down Expand Up @@ -2212,8 +2291,7 @@ def _calculate_state(
# to only include membership events for the senders in the timeline.
# In practice, we can do this by removing them from the p_ids list,
# which is the list of relevant state we know we have already sent to the client.
# see /~https://github.com/matrix-org/synapse/pull/2970
# /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
# see /~https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809

if lazy_load_members:
p_ids.difference_update(
Expand Down
15 changes: 11 additions & 4 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ async def get_room_events_stream_for_room(
oldest `limit` events.
Returns:
The list of events (in ascending order) and the token from the start
The list of events (in ascending stream order) and the token from the start
of the chunk of events returned.
"""
if from_key == to_key:
Expand All @@ -510,7 +510,7 @@ async def get_room_events_stream_for_room(
if not has_changed:
return [], from_key

def f(txn):
def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
# To handle tokens with a non-empty instance_map we fetch more
# results than necessary and then filter down
min_from_id = from_key.stream
Expand Down Expand Up @@ -565,6 +565,13 @@ def f(txn):
async def get_membership_changes_for_user(
self, user_id: str, from_key: RoomStreamToken, to_key: RoomStreamToken
) -> List[EventBase]:
"""Fetch membership events for a given user.
All such events whose stream ordering `s` lies in the range
`from_key < s <= to_key` are returned. Events are ordered by ascending stream
order.
"""
# Start by ruling out cases where a DB query is not necessary.
if from_key == to_key:
return []

Expand All @@ -575,7 +582,7 @@ async def get_membership_changes_for_user(
if not has_changed:
return []

def f(txn):
def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
# To handle tokens with a non-empty instance_map we fetch more
# results than necessary and then filter down
min_from_id = from_key.stream
Expand Down Expand Up @@ -634,7 +641,7 @@ async def get_recent_events_for_room(
Returns:
A list of events and a token pointing to the start of the returned
events. The events returned are in ascending order.
events. The events returned are in ascending topological order.
"""

rows, token = await self.get_recent_event_ids_for_room(
Expand Down

0 comments on commit d26808d

Please sign in to comment.