Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Get pdu limiting #79

Merged
merged 7 commits into from
Feb 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def setup():
hs.get_pusherpool().start()
hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling()
hs.get_replication_layer().start_get_pdu_cache()

if config.daemonize:
print config.pid_file
Expand Down
42 changes: 39 additions & 3 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from .federation_base import FederationBase
from .units import Edu

from synapse.api.errors import CodeMessageException
from synapse.api.errors import CodeMessageException, SynapseError
from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent

Expand All @@ -30,6 +31,20 @@


class FederationClient(FederationBase):
def __init__(self):
self._get_pdu_cache = None

def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
max_len=1000,
expiry_ms=120*1000,
reset_expiry_on_get=False,
)

self._get_pdu_cache.start()

@log_function
def send_pdu(self, pdu, destinations):
"""Informs the replication layer about a new PDU generated within the
Expand Down Expand Up @@ -160,6 +175,11 @@ def get_pdu(self, destinations, event_id, outlier=False):

# TODO: Rate limit the number of times we try and get the same event.

if self._get_pdu_cache:
e = self._get_pdu_cache.get(event_id)
if e:
defer.returnValue(e)

pdu = None
for destination in destinations:
try:
Expand All @@ -181,15 +201,31 @@ def get_pdu(self, destinations, event_id, outlier=False):
pdu = yield self._check_sigs_and_hash(pdu)

break
except CodeMessageException:
raise
except SynapseError:
logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
)
continue
except CodeMessageException as e:
if 400 <= e.code < 500:
raise

logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
)
continue
except Exception as e:
logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
)
continue

if self._get_pdu_cache is not None:
self._get_pdu_cache[event_id] = pdu

defer.returnValue(pdu)

@defer.inlineCallbacks
Expand Down
46 changes: 9 additions & 37 deletions synapse/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.expiringcache import ExpiringCache
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.events.snapshot import EventContext
Expand Down Expand Up @@ -51,7 +52,6 @@ class _StateCacheEntry(object):
def __init__(self, state, state_group, ts):
self.state = state
self.state_group = state_group
self.ts = ts


class StateHandler(object):
Expand All @@ -69,12 +69,15 @@ def __init__(self, hs):
def start_caching(self):
logger.debug("start_caching")

self._state_cache = {}

def f():
self._prune_cache()
self._state_cache = ExpiringCache(
cache_name="state_cache",
clock=self.clock,
max_len=SIZE_OF_CACHE,
expiry_ms=EVICTION_TIMEOUT_SECONDS*1000,
reset_expiry_on_get=True,
)

self.clock.looping_call(f, 5*1000)
self._state_cache.start()

@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
Expand Down Expand Up @@ -409,34 +412,3 @@ def key_func(e):
return -int(e.depth), hashlib.sha1(e.event_id).hexdigest()

return sorted(events, key=key_func)

def _prune_cache(self):
logger.debug(
"_prune_cache. before len: %d",
len(self._state_cache.keys())
)

now = self.clock.time_msec()

if len(self._state_cache.keys()) > SIZE_OF_CACHE:
sorted_entries = sorted(
self._state_cache.items(),
key=lambda k, v: v.ts,
)

for k, _ in sorted_entries[SIZE_OF_CACHE:]:
self._state_cache.pop(k)

keys_to_delete = set()

for key, cache_entry in self._state_cache.items():
if now - cache_entry.ts > EVICTION_TIMEOUT_SECONDS*1000:
keys_to_delete.add(key)

for k in keys_to_delete:
self._state_cache.pop(k)

logger.debug(
"_prune_cache. after len: %d",
len(self._state_cache.keys())
)
115 changes: 115 additions & 0 deletions synapse/util/expiringcache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging


logger = logging.getLogger(__name__)


class ExpiringCache(object):
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
reset_expiry_on_get=False):
"""
Args:
cache_name (str): Name of this cache, used for logging.
clock (Clock)
max_len (int): Max size of dict. If the dict grows larger than this
then the oldest items get automatically evicted. Default is 0,
which indicates there is no max limit.
expiry_ms (int): How long before an item is evicted from the cache
in milliseconds. Default is 0, indicating items never get
evicted based on time.
reset_expiry_on_get (bool): If true, will reset the expiry time for
an item on access. Defaults to False.

"""
self._cache_name = cache_name

self._clock = clock

self._max_len = max_len
self._expiry_ms = expiry_ms

self._reset_expiry_on_get = reset_expiry_on_get

self._cache = {}

def start(self):
if not self._expiry_ms:
# Don't bother starting the loop if things never expire
return

def f():
self._prune_cache()

self._clock.looping_call(f, self._expiry_ms/2)

def __setitem__(self, key, value):
now = self._clock.time_msec()
self._cache[key] = _CacheEntry(now, value)

# Evict if there are now too many items
if self._max_len and len(self._cache.keys()) > self._max_len:
sorted_entries = sorted(
self._cache.items(),
key=lambda k, v: v.time,
)

for k, _ in sorted_entries[self._max_len:]:
self._cache.pop(k)

def __getitem__(self, key):
entry = self._cache[key]

if self._reset_expiry_on_get:
entry.time = self._clock.time_msec()

return entry.value

def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default

def _prune_cache(self):
if not self._expiry_ms:
# zero expiry time means don't expire. This should never get called
# since we have this check in start too.
return
begin_length = len(self._cache)

now = self._clock.time_msec()

keys_to_delete = set()

for key, cache_entry in self._cache.items():
if now - cache_entry.time > self._expiry_ms:
keys_to_delete.add(key)

for k in keys_to_delete:
self._cache.pop(k)

logger.debug(
"[%s] _prune_cache before: %d, after len: %d",
self._cache_name, begin_length, len(self._cache.keys())
)


class _CacheEntry(object):
def __init__(self, time, value):
self.time = time
self.value = value