Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix paginating /relations with a live token #14866

Merged
merged 5 commits into from
Jan 26, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 71 additions & 39 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,74 @@ def generate_pagination_where_clause(
return " AND ".join(where_clause)


def generate_pagination_bounds(
direction: str, from_token: RoomStreamToken, to_token: Optional[RoomStreamToken]
) -> Tuple[str, Tuple[Optional[int], int], Optional[Tuple[Optional[int], int]]]:
"""
Generate a start and end point for this page of events.

Args:
direction: Whether pagination is going forwards or backwards. One of "f" or "b".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try direction: Literal["f", "b"] to enforce this?

(I expect that would need propagating up the call stack. This may be more trouble than it's worth; ignore if so)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make it an enum even, I'll take a look as a follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM either way. But yeah, definitely one for a follow-up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #14927.

from_token: The token to start pagination at.
to_token: The token to end pagination at, or None to not limit the end point.

Returns:
A three tuple of:

ASC or DESC for sorting of the query.

The starting position as a tuple of ints representing
(topological position, stream position). The topological position
may be None for live tokens.

The end position in the same format as the starting position, or None
if no to_token was provided.
"""

# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
if direction == "b":
order = "DESC"
else:
order = "ASC"

# The bounds for the stream tokens are complicated by the fact
# that we need to handle the instance_map part of the tokens. We do this
# by fetching all events between the min stream token and the maximum
# stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
# then filtering the results.
if from_token.topological is not None:
from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple()
elif direction == "b":
from_bound = (
None,
from_token.get_max_stream_pos(),
)
else:
from_bound = (
None,
from_token.stream,
)

to_bound: Optional[Tuple[Optional[int], int]] = None
if to_token:
if to_token.topological is not None:
to_bound = to_token.as_historical_tuple()
elif direction == "b":
to_bound = (
None,
to_token.stream,
)
else:
to_bound = (
None,
to_token.get_max_stream_pos(),
)

return order, from_bound, to_bound


def _make_generic_sql_bound(
bound: str,
column_names: Tuple[str, str],
Expand Down Expand Up @@ -1272,47 +1340,11 @@ def _paginate_room_events_txn(
`to_token`), or `limit` is zero.
"""

# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
if direction == "b":
order = "DESC"
else:
order = "ASC"

# The bounds for the stream tokens are complicated by the fact
# that we need to handle the instance_map part of the tokens. We do this
# by fetching all events between the min stream token and the maximum
# stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
# then filtering the results.
if from_token.topological is not None:
from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple()
elif direction == "b":
from_bound = (
None,
from_token.get_max_stream_pos(),
)
else:
from_bound = (
None,
from_token.stream,
)

to_bound: Optional[Tuple[Optional[int], int]] = None
if to_token:
if to_token.topological is not None:
to_bound = to_token.as_historical_tuple()
elif direction == "b":
to_bound = (
None,
to_token.stream,
)
else:
to_bound = (
None,
to_token.get_max_stream_pos(),
)
order, from_bound, to_bound = generate_pagination_bounds(
direction, from_token, to_token
)

bounds = generate_pagination_where_clause(
direction=direction,
Expand Down