Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster partial join to room with complex auth graph #7

Merged
merged 5 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster partial join to room with complex auth graph.
79 changes: 30 additions & 49 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
from synapse.util.iterutils import batch_iter, partition, sorted_topologically
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr

Expand Down Expand Up @@ -1678,65 +1678,44 @@ async def _auth_and_persist_outliers(

# We need to persist an event's auth events before the event.
auth_graph = {
ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
ev.event_id: [e_id for e_id in ev.auth_event_ids() if e_id in event_map]
for ev in event_map.values()
}
for roots in sorted_topologically_batched(event_map.values(), auth_graph):
if not roots:
# if *none* of the remaining events are ready, that means
# we have a loop. This either means a bug in our logic, or that
# somebody has managed to create a loop (which requires finding a
# hash collision in room v2 and later).
logger.warning(
"Loop found in auth events while fetching missing state/auth "
"events: %s",
shortstr(event_map.keys()),
)
return

logger.info(
"Persisting %i of %i remaining outliers: %s",
len(roots),
len(event_map),
shortstr(e.event_id for e in roots),
)

await self._auth_and_persist_outliers_inner(room_id, roots)

async def _auth_and_persist_outliers_inner(
self, room_id: str, fetched_events: Collection[EventBase]
) -> None:
"""Helper for _auth_and_persist_outliers

Persists a batch of events where we have (theoretically) already persisted all
of their auth events.

Marks the events as outliers, auths them, persists them to the database, and,
where appropriate (eg, an invite), awakes the notifier.
sorted_auth_event_ids = sorted_topologically(event_map.keys(), auth_graph)
sorted_auth_events = [event_map[e_id] for e_id in sorted_auth_event_ids]
logger.info(
"Persisting %i remaining outliers: %s",
len(sorted_auth_events),
shortstr(e.event_id for e in sorted_auth_events),
)

Params:
origin: where the events came from
room_id: the room that the events are meant to be in (though this has
not yet been checked)
fetched_events: the events to persist
"""
# get all the auth events for all the events in this batch. By now, they should
# have been persisted.
auth_events = {
aid for event in fetched_events for aid in event.auth_event_ids()
auth_event_ids = {
aid for event in sorted_auth_events for aid in event.auth_event_ids()
}
auth_map = {
ev.event_id: ev
for ev in sorted_auth_events
if ev.event_id in auth_event_ids
}
persisted_events = await self._store.get_events(
auth_events,
allow_rejected=True,
)

missing_events = auth_event_ids.difference(auth_map)
if missing_events:
persisted_events = await self._store.get_events(
missing_events,
allow_rejected=True,
redact_behaviour=EventRedactBehaviour.as_is,
)
auth_map.update(persisted_events)

events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = []

async def prep(event: EventBase) -> None:
with nested_logging_context(suffix=event.event_id):
auth = []
for auth_event_id in event.auth_event_ids():
ae = persisted_events.get(auth_event_id)
ae = auth_map.get(auth_event_id)
if not ae:
# the fact we can't find the auth event doesn't mean it doesn't
# exist, which means it is premature to reject `event`. Instead we
Expand All @@ -1755,7 +1734,9 @@ async def prep(event: EventBase) -> None:
context = EventContext.for_outlier(self._storage_controllers)
try:
validate_event_for_room_version(event)
await check_state_independent_auth_rules(self._store, event)
await check_state_independent_auth_rules(
self._store, event, batched_auth_events=auth_map
)
check_state_dependent_auth_rules(event, auth)
except AuthError as e:
logger.warning("Rejecting %r because %s", event, e)
Expand All @@ -1772,7 +1753,7 @@ async def prep(event: EventBase) -> None:

events_and_contexts_to_persist.append((event, context))

for event in fetched_events:
for event in sorted_auth_events:
await prep(event)

await self.persist_events_and_notify(
Expand Down
Loading