Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove concept of a non-limited stream. #7011

Merged
merged 13 commits into from
Mar 20, 2020
1 change: 1 addition & 0 deletions changelog.d/7011.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove concept of a non-limited stream.
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ async def is_visible(self, observed_user, observer_user):

return False

async def get_all_presence_updates(self, last_id, current_id):
async def get_all_presence_updates(self, last_id, current_id, limit):
"""
Gets a list of presence update rows from between the given stream ids.
Each row has:
Expand All @@ -762,7 +762,7 @@ async def get_all_presence_updates(self, last_id, current_id):
"""
# TODO(markjh): replicate the unpersisted changes.
# This could use the in-memory stores for recent changes.
rows = await self.store.get_all_presence_updates(last_id, current_id)
rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
return rows

def notify_new_event(self):
Expand Down
11 changes: 9 additions & 2 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import logging
from collections import namedtuple
from typing import List

from twisted.internet import defer

Expand Down Expand Up @@ -257,7 +258,13 @@ def _push_update_local(self, member, typing):
"typing_key", self._latest_room_serial, rooms=[member.room_id]
)

async def get_all_typing_updates(self, last_id, current_id):
async def get_all_typing_updates(
self, last_id: int, current_id: int, limit: int
) -> List[dict]:
"""Get up to `limit` typing updates between the given tokens, earliest
updates first.
"""

if last_id == current_id:
return []

Expand All @@ -275,7 +282,7 @@ async def get_all_typing_updates(self, last_id, current_id):
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.sort()
return rows
return rows[:limit]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eyebrow raised: we're just dropping updates beyond the limit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current situation is that the replication sees that it is limited and panics. The next episode PR in the series changes it so that if the stream is limited then it gets the token from the last item in the returned list

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(ftr: the relevant PR is #7024)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conclusions from discussion elsewhere:

  • the fact that we ignore updates between limit and current_id is not unique to this function: all of the update_functions do the same thing (it just looks a bit different here because the typing stream is in-memory)
  • if we hit the limit, then we will throw an exception and disconnect the stream:
    if len(updates) >= MAX_EVENTS_BEHIND:
  • slicing the result here is mostly done for consistency with the other update_functions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

further discussion:

  • slicing the result here is mostly done for consistency with the other update_functions

... but also because we're going to rely on the update_functions honouring their limit in a future PR, so we should bring this one in line now even though it's not technically necessary yet.


def get_current_token(self):
return self._latest_room_serial
Expand Down
9 changes: 2 additions & 7 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,6 @@ async def _run_notifier_loop(self):
self.pending_updates = False

with Measure(self.clock, "repl.stream.get_updates"):
# First we tell the streams that they should update their
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming we didn't do this for the good of our health, but rather to combat race conditions between the streams. What has changed so that it is safe to get rid of it now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I've been trying to get my head around what this is protecting and I think I've understood a little bit here. Couple of potential reasons for having this:

  1. Helps preserve a bit of ordering across different streams, e.g. if there's a typing notification and then a message the synchrotrons are likely to get told about the typing notification and then the message.
  2. Some streams share the same ID generator/token (e.g. the two tag streams). We want to make sure that we send down the updates for both streams up to the same token so that any stream change caches get correctly updated before anything queries for changes up to the new current token.

Now the slight problem here is that while we do ensure we update the tokens of all streams in lockstep, we then send down updates batched by stream (rather than trying to interleave the streams so that earlier updates go first). This means that the race conditions above can still happen anyway, AFAICT.

The fact that item two above is still racy sounds like a bug that needs to be fixed either way, probably easiest by merging the relevant streams so that there is a one to one mapping between streams and ID generators (I think its only global vs room account data streams and devices and user signature streams).

For item one, I'm not convinced that updating the current token in lock step actually helps that much. Ideally in each loop there will only be one or two updates to send and so the lock step does nothing, while conversely if there are a lot of updates to send down the loop workers will see updates out of order between streams anyway as we send down updates by stream.

I'm therefore minded to keep the removal of this code simply because I think it makes things a bit clearer and easier to reason about.

Thoughts welcome though, as I feel like I might still be missing something,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, it sounds pretty plausible...

# current tokens.
for stream in self.streams:
stream.advance_current_token()

all_streams = self.streams

if self._replication_torture_level is not None:
Expand All @@ -180,7 +175,7 @@ async def _run_notifier_loop(self):
random.shuffle(all_streams)

for stream in all_streams:
if stream.last_token == stream.upto_token:
if stream.last_token == stream.current_token():
continue

if self._replication_torture_level:
Expand All @@ -192,7 +187,7 @@ async def _run_notifier_loop(self):
"Getting stream: %s: %s -> %s",
stream.NAME,
stream.last_token,
stream.upto_token,
stream.current_token(),
)
try:
updates, current_token = await stream.get_updates()
Expand Down
65 changes: 25 additions & 40 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import itertools
import logging
from collections import namedtuple
from typing import Any, List, Optional
from typing import Any, List, Optional, Tuple

import attr

from synapse.types import JsonDict

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -119,13 +121,12 @@ class Stream(object):
"""Base class for the streams.

Provides a `get_updates()` function that returns new updates since the last
time it was called up until the point `advance_current_token` was called.
time it was called.
"""

NAME = None # type: str # The name of the stream
# The type of the row. Used by the default impl of parse_row.
ROW_TYPE = None # type: Any
_LIMITED = True # Whether the update function takes a limit

@classmethod
def parse_row(cls, row):
Expand All @@ -146,26 +147,15 @@ def __init__(self, hs):
# The token from which we last asked for updates
self.last_token = self.current_token()

# The token that we will get updates up to
self.upto_token = self.current_token()

def advance_current_token(self):
"""Updates `upto_token` to "now", which updates up until which point
get_updates[_since] will fetch rows till.
"""
self.upto_token = self.current_token()

def discard_updates_and_advance(self):
"""Called when the stream should advance but the updates would be discarded,
e.g. when there are no currently connected workers.
"""
self.upto_token = self.current_token()
self.last_token = self.upto_token
self.last_token = self.current_token()

async def get_updates(self):
"""Gets all updates since the last time this function was called (or
since the stream was constructed if it hadn't been called before),
until the `upto_token`
since the stream was constructed if it hadn't been called before).

Returns:
Deferred[Tuple[List[Tuple[int, Any]], int]:
Expand All @@ -178,44 +168,44 @@ async def get_updates(self):

return updates, current_token

async def get_updates_since(self, from_token):
async def get_updates_since(
self, from_token: int
) -> Tuple[List[Tuple[int, JsonDict]], int]:
"""Like get_updates except allows specifying from when we should
stream updates

Returns:
Deferred[Tuple[List[Tuple[int, Any]], int]:
Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
list of ``(token, row)`` entries. ``row`` will be json-serialised and
sent over the replication steam.
Resolves to a pair `(updates, new_last_token)`, where `updates` is
a list of `(token, row)` entries and `new_last_token` is the new
position in stream.
"""

if from_token in ("NOW", "now"):
return [], self.upto_token
return [], self.current_token()

current_token = self.upto_token
current_token = self.current_token()

from_token = int(from_token)

if from_token == current_token:
return [], current_token

logger.info("get_updates_since: %s", self.__class__)
if self._LIMITED:
rows = await self.update_function(
from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
)
rows = await self.update_function(
from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
)

# never turn more than MAX_EVENTS_BEHIND + 1 into updates.
rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
else:
rows = await self.update_function(from_token, current_token)
# never turn more than MAX_EVENTS_BEHIND + 1 into updates.
rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)

updates = [(row[0], row[1:]) for row in rows]

# check we didn't get more rows than the limit.
# doing it like this allows the update_function to be a generator.
if self._LIMITED and len(updates) >= MAX_EVENTS_BEHIND:
if len(updates) >= MAX_EVENTS_BEHIND:
raise Exception("stream %s has fallen behind" % (self.NAME))

# Due to the assertin above we know we're up to date, so we know that
# our new stream position is `current_token`.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return updates, current_token

def current_token(self):
Expand All @@ -227,9 +217,8 @@ def current_token(self):
"""
raise NotImplementedError()

def update_function(self, from_token, current_token, limit=None):
"""Get updates between from_token and to_token. If Stream._LIMITED is
True then limit is provided, otherwise it's not.
def update_function(self, from_token, current_token, limit):
"""Get updates between from_token and to_token.

Returns:
Deferred(list(tuple)): the first entry in the tuple is the token for
Expand Down Expand Up @@ -257,7 +246,6 @@ def __init__(self, hs):

class PresenceStream(Stream):
NAME = "presence"
_LIMITED = False
ROW_TYPE = PresenceStreamRow

def __init__(self, hs):
Expand All @@ -272,7 +260,6 @@ def __init__(self, hs):

class TypingStream(Stream):
NAME = "typing"
_LIMITED = False
ROW_TYPE = TypingStreamRow

def __init__(self, hs):
Expand Down Expand Up @@ -372,7 +359,6 @@ class DeviceListsStream(Stream):
"""

NAME = "device_lists"
_LIMITED = False
ROW_TYPE = DeviceListsStreamRow

def __init__(self, hs):
Expand Down Expand Up @@ -462,7 +448,6 @@ class UserSignatureStream(Stream):
"""

NAME = "user_signature"
_LIMITED = False
ROW_TYPE = UserSignatureStreamRow

def __init__(self, hs):
Expand Down
10 changes: 8 additions & 2 deletions synapse/storage/data_stores/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ def get_users_whose_signatures_changed(self, user_id, from_key):
return set()

async def get_all_device_list_changes_for_remotes(
self, from_key: int, to_key: int
self, from_key: int, to_key: int, limit: int,
) -> List[Tuple[int, str]]:
"""Return a list of `(stream_id, entity)` which is the combined list of
changes to devices and which destinations need to be poked. Entity is
Expand All @@ -592,10 +592,16 @@ async def get_all_device_list_changes_for_remotes(
SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
) AS e
WHERE ? < stream_id AND stream_id <= ?
LIMIT ?
"""

return await self.db.execute(
"get_all_device_list_changes_for_remotes", None, sql, from_key, to_key
"get_all_device_list_changes_for_remotes",
None,
sql,
from_key,
to_key,
limit,
)

@cached(max_entries=10000)
Expand Down
14 changes: 10 additions & 4 deletions synapse/storage/data_stores/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def get_e2e_cross_signing_keys_bulk(

return result

def get_all_user_signature_changes_for_remotes(self, from_key, to_key):
def get_all_user_signature_changes_for_remotes(self, from_key, to_key, limit):
"""Return a list of changes from the user signature stream to notify remotes.
Note that the user signature stream represents when a user signs their
device with their user-signing key, which is not published to other
Expand All @@ -552,13 +552,19 @@ def get_all_user_signature_changes_for_remotes(self, from_key, to_key):
Deferred[list[(int,str)]] a list of `(stream_id, user_id)`
"""
sql = """
SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id
SELECT stream_id, from_user_id AS user_id
FROM user_signature_stream
WHERE ? < stream_id AND stream_id <= ?
GROUP BY user_id
ORDER BY stream_id ASC
LIMIT ?
"""
return self.db.execute(
"get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key
"get_all_user_signature_changes_for_remotes",
None,
sql,
from_key,
to_key,
limit,
)


Expand Down
23 changes: 13 additions & 10 deletions synapse/storage/data_stores/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _update_presence_txn(self, txn, stream_orderings, presence_states):
"status_msg": state.status_msg,
"currently_active": state.currently_active,
}
for state in presence_states
for stream_id, state in zip(stream_orderings, presence_states)
],
)

Expand All @@ -73,19 +73,22 @@ def _update_presence_txn(self, txn, stream_orderings, presence_states):
)
txn.execute(sql + clause, [stream_id] + list(args))

def get_all_presence_updates(self, last_id, current_id):
def get_all_presence_updates(self, last_id, current_id, limit):
if last_id == current_id:
return defer.succeed([])

def get_all_presence_updates_txn(txn):
sql = (
"SELECT stream_id, user_id, state, last_active_ts,"
" last_federation_update_ts, last_user_sync_ts, status_msg,"
" currently_active"
" FROM presence_stream"
" WHERE ? < stream_id AND stream_id <= ?"
)
txn.execute(sql, (last_id, current_id))
sql = """
SELECT stream_id, user_id, state, last_active_ts,
last_federation_update_ts, last_user_sync_ts,
status_msg,
currently_active
FROM presence_stream
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
return txn.fetchall()

return self.db.runInteraction(
Expand Down