Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make LruCache register its own metrics #8561

Merged
merged 4 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8561.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move metric registration code down into `LruCache`.
4 changes: 1 addition & 3 deletions synapse/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from synapse.events import EventBase
from synapse.logging import opentracing as opentracing
from synapse.types import StateMap, UserID
from synapse.util.caches import register_cache
from synapse.util.caches.lrucache import LruCache
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -70,8 +69,7 @@ def __init__(self, hs):
self.store = hs.get_datastore()
self.state = hs.get_state_handler()

self.token_cache = LruCache(10000)
register_cache("cache", "token_cache", self.token_cache)
self.token_cache = LruCache(10000, "token_cache")

self._auth_blocking = AuthBlocking(self.hs)

Expand Down
4 changes: 1 addition & 3 deletions synapse/push/push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from synapse.events import EventBase
from synapse.types import UserID
from synapse.util.caches import register_cache
from synapse.util.caches.lrucache import LruCache

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -186,8 +185,7 @@ def _get_value(self, dotted_key: str) -> Optional[str]:


# Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches
regex_cache = LruCache(50000)
register_cache("cache", "regex_push_cache", regex_cache)
regex_cache = LruCache(50000, "regex_push_cache")


def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
Expand Down
13 changes: 8 additions & 5 deletions synapse/util/caches/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import logging
from sys import intern
from typing import Callable, Dict, Optional
from typing import Callable, Dict, Optional, Sized

import attr
from prometheus_client.core import Gauge
Expand Down Expand Up @@ -92,20 +92,23 @@ def collect(self):
def register_cache(
cache_type: str,
cache_name: str,
cache,
cache: Sized,
collect_callback: Optional[Callable] = None,
resizable: bool = True,
resize_callback: Optional[Callable] = None,
) -> CacheMetric:
"""Register a cache object for metric collection and resizing.

Args:
cache_type
cache_type: a string indicating the "type" of the cache. This is used
only for deduplication so isn't too important provided it's constant.
cache_name: name of the cache
cache: cache itself
cache: cache itself, which must implement __len__(), and may optionally implement
a max_size property
collect_callback: If given, a function which is called during metric
collection to update additional metrics.
resizable: Whether this cache supports being resized.
resizable: Whether this cache supports being resized, in which case either
resize_callback must be provided, or the cache must support set_max_size().
resize_callback: A function which can be called to resize the cache.

Returns:
Expand Down
43 changes: 13 additions & 30 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from twisted.internet import defer

from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches import register_cache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry

Expand Down Expand Up @@ -54,10 +53,7 @@ class DeferredCache(Generic[KT, VT]):

__slots__ = (
"cache",
"name",
"keylen",
"thread",
"metrics",
"_pending_deferred_cache",
)

Expand Down Expand Up @@ -89,37 +85,27 @@ def __init__(
cache_type()
) # type: MutableMapping[KT, CacheEntry]

def metrics_cb():
cache_pending_metric.labels(name).set(len(self._pending_deferred_cache))

# cache is used for completed results and maps to the result itself, rather than
# a Deferred.
self.cache = LruCache(
max_size=max_entries,
keylen=keylen,
cache_name=name,
cache_type=cache_type,
size_callback=(lambda d: len(d)) if iterable else None,
evicted_callback=self._on_evicted,
metrics_collection_callback=metrics_cb,
apply_cache_factor_from_config=apply_cache_factor_from_config,
)

self.name = name
self.keylen = keylen
self.thread = None # type: Optional[threading.Thread]
self.metrics = register_cache(
"cache",
name,
self.cache,
collect_callback=self._metrics_collection_callback,
)

@property
def max_entries(self):
return self.cache.max_size

def _on_evicted(self, evicted_count):
self.metrics.inc_evictions(evicted_count)

def _metrics_collection_callback(self):
cache_pending_metric.labels(self.name).set(len(self._pending_deferred_cache))

def check_thread(self):
expected_thread = self.thread
if expected_thread is None:
Expand Down Expand Up @@ -154,21 +140,18 @@ def get(
if val is not _Sentinel.sentinel:
val.callbacks.update(callbacks)
if update_metrics:
self.metrics.inc_hits()
m = self.cache.metrics
assert m # we always have a name, so should always have metrics
m.inc_hits()
return val.deferred

val = self.cache.get(key, _Sentinel.sentinel, callbacks=callbacks)
if val is not _Sentinel.sentinel:
self.metrics.inc_hits()
return val

if update_metrics:
self.metrics.inc_misses()

if default is _Sentinel.sentinel:
val = self.cache.get(
key, default, callbacks=callbacks, update_metrics=update_metrics
)
if val is _Sentinel.sentinel:
raise KeyError()
else:
return default
return val

def set(
self,
Expand Down
9 changes: 1 addition & 8 deletions synapse/util/caches/dictionary_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

from synapse.util.caches.lrucache import LruCache

from . import register_cache

logger = logging.getLogger(__name__)


Expand All @@ -46,18 +44,16 @@ class DictionaryCache:
"""

def __init__(self, name, max_entries=1000):
self.cache = LruCache(max_size=max_entries, size_callback=len)
self.cache = LruCache(max_size=max_entries, cache_name=name, size_callback=len)

self.name = name
self.sequence = 0
self.thread = None
# caches_by_name[name] = self.cache

class Sentinel:
__slots__ = []

self.sentinel = Sentinel()
self.metrics = register_cache("dictionary", name, self.cache)

def check_thread(self):
expected_thread = self.thread
Expand All @@ -82,8 +78,6 @@ def get(self, key, dict_keys=None):
"""
entry = self.cache.get(key, self.sentinel)
if entry is not self.sentinel:
self.metrics.inc_hits()

if dict_keys is None:
return DictionaryEntry(
entry.full, entry.known_absent, dict(entry.value)
Expand All @@ -95,7 +89,6 @@ def get(self, key, dict_keys=None):
{k: entry.value[k] for k in dict_keys if k in entry.value},
)

self.metrics.inc_misses()
return DictionaryEntry(False, set(), {})

def invalidate(self, key):
Expand Down
46 changes: 35 additions & 11 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Callable, Optional, Type, Union

from synapse.config import cache as cache_config
from synapse.util.caches import CacheMetric, register_cache
from synapse.util.caches.treecache import TreeCache


Expand All @@ -43,27 +44,29 @@ def __init__(self, prev_node, next_node, key, value, callbacks=set()):

class LruCache:
"""
Least-recently-used cache.
Least-recently-used cache, supporting prometheus metrics and invalidation callbacks.

Supports del_multi only if cache_type=TreeCache
If cache_type=TreeCache, all keys must be tuples.

Can also set callbacks on objects when getting/setting which are fired
when that key gets invalidated/evicted.
"""

def __init__(
self,
max_size: int,
cache_name: Optional[str] = None,
keylen: int = 1,
cache_type: Type[Union[dict, TreeCache]] = dict,
size_callback: Optional[Callable] = None,
evicted_callback: Optional[Callable] = None,
metrics_collection_callback: Optional[Callable[[], None]] = None,
apply_cache_factor_from_config: bool = True,
):
"""
Args:
max_size: The maximum amount of entries the cache can hold

cache_name: The name of this cache, for the prometheus metrics. If unset,
no metrics will be reported on this cache.

keylen: The length of the tuple used as the cache key. Ignored unless
cache_type is `TreeCache`.

Expand All @@ -73,9 +76,13 @@ def __init__(

size_callback (func(V) -> int | None):

evicted_callback (func(int)|None):
if not None, called on eviction with the size of the evicted
entry
metrics_collection_callback:
metrics collection callback. This is called early in the metrics
collection process, before any of the metrics registered with the
prometheus Registry are collected, so can be used to update any dynamic
metrics.

Ignored if cache_name is None.

apply_cache_factor_from_config (bool): If true, `max_size` will be
multiplied by a cache factor derived from the homeserver config
Expand All @@ -94,6 +101,19 @@ def __init__(
else:
self.max_size = int(max_size)

if cache_name is not None:
metrics = register_cache(
"lru_cache",
cache_name,
self,
collect_callback=metrics_collection_callback,
) # type: Optional[CacheMetric]
else:
metrics = None

# this is exposed for access from outside this class
self.metrics = metrics

list_root = _Node(None, None, None, None)
list_root.next_node = list_root
list_root.prev_node = list_root
Expand All @@ -105,8 +125,8 @@ def evict():
todelete = list_root.prev_node
evicted_len = delete_node(todelete)
cache.pop(todelete.key, None)
if evicted_callback:
evicted_callback(evicted_len)
if metrics:
metrics.inc_evictions(evicted_len)

def synchronized(f):
@wraps(f)
Expand Down Expand Up @@ -169,13 +189,17 @@ def delete_node(node):
return deleted_len

@synchronized
def cache_get(key, default=None, callbacks=[]):
def cache_get(key, default=None, callbacks=[], update_metrics=True):
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
node.callbacks.update(callbacks)
if update_metrics and metrics:
metrics.inc_hits()
return node.value
else:
if update_metrics and metrics:
metrics.inc_misses()
return default

@synchronized
Expand Down
4 changes: 2 additions & 2 deletions tests/util/test_lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_pop(self):
self.assertEquals(cache.pop("key"), None)

def test_del_multi(self):
cache = LruCache(4, 2, cache_type=TreeCache)
cache = LruCache(4, keylen=2, cache_type=TreeCache)
cache[("animal", "cat")] = "mew"
cache[("animal", "dog")] = "woof"
cache[("vehicles", "car")] = "vroom"
Expand Down Expand Up @@ -160,7 +160,7 @@ def test_del_multi(self):
m2 = Mock()
m3 = Mock()
m4 = Mock()
cache = LruCache(4, 2, cache_type=TreeCache)
cache = LruCache(4, keylen=2, cache_type=TreeCache)

cache.set(("a", "1"), "value", callbacks=[m1])
cache.set(("a", "2"), "value", callbacks=[m2])
Expand Down