diff --git a/README.rst b/README.rst index c906aea..0dd2be8 100644 --- a/README.rst +++ b/README.rst @@ -87,6 +87,12 @@ Accessing redis-py's Sentinel instance Change log ---------- +v2.0.0 +~~~~~~ + +* Connections are now thread-local to avoid race conditions after Redis master failover +* Removed support for `REDIS_{HOST, PORT, DB}` config variables + v1.0.0 ~~~~~~ diff --git a/flask_redis_sentinel.py b/flask_redis_sentinel.py index 79e758e..b232c06 100644 --- a/flask_redis_sentinel.py +++ b/flask_redis_sentinel.py @@ -1,4 +1,4 @@ -# Copyright 2015, 2016 Exponea s r.o. +# Copyright 2015, 2016, 2017 Exponea s r.o. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,110 +12,89 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import namedtuple +import six import inspect -try: - import urllib.parse as urlparse -except ImportError: - import urlparse -from flask import current_app -import warnings import redis -import redis.sentinel # requires redis-py 2.9.0+ +import redis.sentinel import redis_sentinel_url -import sys -from werkzeug.local import LocalProxy +from flask import current_app +from werkzeug.local import Local, LocalProxy from werkzeug.utils import import_string -if sys.version_info[0] == 2: # pragma: no cover - # Python 2.x - _string_types = basestring - - def iteritems(d): - return d.iteritems() -else: # pragma: no cover - # Python 3.x - _string_types = str - - def iteritems(d): - return d.items() - _EXTENSION_KEY = 'redissentinel' -class _ExtensionData(object): - def __init__(self, client_class, sentinel=None, default_connection=None): +class RedisSentinelInstance(object): + + def __init__(self, url, client_class, client_options, sentinel_class, sentinel_options): + self.url = url self.client_class = client_class - self.sentinel = sentinel - self.default_connection = default_connection - self.master_connections = {} - self.slave_connections = {} + self.client_options = client_options + self.sentinel_class = sentinel_class + self.sentinel_options = sentinel_options + self.local = Local() + self._connect() + if self.local.connection[0] is None: + # if there is no sentinel, we don't need to use thread-local storage + self.connection = self.local.connection + self.local = self + + def _connect(self): + try: + return self.local.connection + except AttributeError: + conn = redis_sentinel_url.connect( + self.url, + sentinel_class=self.sentinel_class, sentinel_options=self.sentinel_options, + client_class=self.client_class, client_options=self.client_options) + self.local.connection = conn + return conn + + @property + def sentinel(self): + return self._connect()[0] + + @property + def default_connection(self): + return self._connect()[1] def master_for(self, service_name, **kwargs): - if self.sentinel is None: - raise ValueError('Cannot get master {} using non-sentinel configuration'.format(service_name)) - if service_name not in self.master_connections: - self.master_connections[service_name] = self.sentinel.master_for(service_name, redis_class=self.client_class, - **kwargs) - return self.master_connections[service_name] - - def slave_for(self, service_name, **kwargs): - if self.sentinel is None: - raise ValueError('Cannot get slave {} using non-sentinel configuration'.format(service_name)) - if service_name not in self.slave_connections: - self.slave_connections[service_name] = self.sentinel.slave_for(service_name, redis_class=self.client_class, - **kwargs) - return self.slave_connections[service_name] - - -class _ExtensionProxy(LocalProxy): - __slots__ = ('__sentinel',) - - def __init__(self, sentinel, local, name=None): - object.__setattr__(self, '_ExtensionProxy__sentinel', sentinel) - super(_ExtensionProxy, self).__init__(local, name=name) - - def _get_current_object(self): - app = current_app._get_current_object() - if _EXTENSION_KEY not in app.extensions or self.__sentinel.config_prefix not in app.extensions[_EXTENSION_KEY]: - raise ValueError('RedisSentinel extension with config prefix {} was not initialized for application {}'. - format(self.__sentinel.config_prefix, app.import_name)) - ext_data = app.extensions[_EXTENSION_KEY][self.__sentinel.config_prefix] - - local = object.__getattribute__(self, '_LocalProxy__local') - - return local(ext_data) - - -class _PrefixedDict(object): - def __init__(self, config, prefix): - self.config = config - self.prefix = prefix - - def _key(self, key): - return '{}_{}'.format(self.prefix, key) - - def __getitem__(self, item): - return self.config[self._key(item)] + try: + return self.local.master_connections[service_name] + except AttributeError: + self.local.master_connections = {} + except KeyError: + pass - def __setitem__(self, item, value): - self.config[self._key(item)] = value + sentinel = self.sentinel + if sentinel is None: + msg = 'Cannot get master {} using non-sentinel configuration' + raise RuntimeError(msg.format(service_name)) - def __delitem__(self, item): - del self.config[self._key(item)] + conn = sentinel.master_for(service_name, redis_class=self.client_class, **kwargs) + self.local.master_connections[service_name] = conn + return conn - def __contains__(self, item): - return self._key(item) in self.config + def slave_for(self, service_name, **kwargs): + try: + return self.local.slave_connections[service_name] + except AttributeError: + self.local.slave_connections = {} + except KeyError: + pass - def get(self, item, default=None): - return self.config.get(self._key(item), default) + sentinel = self.sentinel + if sentinel is None: + msg = 'Cannot get slave {} using non-sentinel configuration' + raise RuntimeError(msg.format(service_name)) - def pop(self, item, *args, **kwargs): - return self.config.pop(self._key(item), *args, **kwargs) + conn = sentinel.slave_for(service_name, redis_class=self.client_class, **kwargs) + self.local.slave_connections[service_name] = conn + return conn -class SentinelExtension(object): +class RedisSentinel(object): """Flask extension that supports connections to master using Redis Sentinel. Supported URL types: @@ -124,92 +103,78 @@ class SentinelExtension(object): rediss:// unix:// """ - def __init__(self, app=None, config_prefix=None, client_class=None, sentinel_class=None): - self.config_prefix = None + + def __init__(self, app=None, config_prefix='REDIS', client_class=None, sentinel_class=None): + self.config_prefix = config_prefix self.client_class = client_class self.sentinel_class = sentinel_class if app is not None: - self.init_app(app, config_prefix=config_prefix) - self.default_connection = _ExtensionProxy(self, lambda ext_data: ext_data.default_connection) - self.sentinel = _ExtensionProxy(self, lambda ext_data: ext_data.sentinel) + self.init_app(app) + self.sentinel = LocalProxy(lambda: self.get_instance().sentinel) + self.default_connection = LocalProxy(lambda: self.get_instance().default_connection) def init_app(self, app, config_prefix=None, client_class=None, sentinel_class=None): - if _EXTENSION_KEY not in app.extensions: - app.extensions[_EXTENSION_KEY] = {} - - extensions = app.extensions[_EXTENSION_KEY] + config_prefix = config_prefix or self.config_prefix + app.config.setdefault(config_prefix + '_' + 'URL', 'redis://localhost/0') - if config_prefix is None: - config_prefix = 'REDIS' + config = self._strip_dict_prefix(app.config, config_prefix + '_') + extensions = app.extensions.setdefault(_EXTENSION_KEY, {}) if config_prefix in extensions: - raise ValueError('Config prefix {} already registered'.format(config_prefix)) - - self.config_prefix = config_prefix + msg = 'Redis sentinel extension with config prefix {} is already registered' + raise RuntimeError(msg.format(config_prefix)) - config = _PrefixedDict(app.config, config_prefix) - url = config.get('URL') + client_class = self._resolve_class( + config, 'CLASS', 'client_class', client_class, redis.StrictRedis) + sentinel_class = self._resolve_class( + config, 'SENTINEL_CLASS', 'sentinel_class', sentinel_class, redis.sentinel.Sentinel) - client_class = self._resolve_class(config, 'CLASS', client_class, 'client_class', - default=redis.StrictRedis) - sentinel_class = self._resolve_class(config, 'SENTINEL_CLASS', sentinel_class, 'sentinel_class', - default=redis.sentinel.Sentinel) + url = config.pop('URL') + client_options = self._config_from_variables(config, client_class) + sentinel_options = self._config_from_variables( + self._strip_dict_prefix(config, 'SENTINEL_'), client_class) - data = _ExtensionData(client_class) + extensions[config_prefix] = RedisSentinelInstance( + url, client_class, client_options, sentinel_class, sentinel_options) - if url: - connection_options = self._config_from_variables(config, client_class) - sentinel_options = self._config_from_variables(_PrefixedDict(config, 'SENTINEL'), client_class) - - connection_options.pop('host', None) - connection_options.pop('port', None) - connection_options.pop('db', None) - - result = redis_sentinel_url.connect(url, sentinel_class=sentinel_class, - sentinel_options=sentinel_options, - client_class=client_class, - client_options=connection_options) - data.sentinel, data.default_connection = result - else: - # Stay compatible with Flask-And-Redis for a while - warnings.warn('Setting redis connection via separate variables is deprecated. Please use REDIS_URL.', - DeprecationWarning) - kwargs = self._config_from_variables(config, client_class) - data.default_connection = client_class(**kwargs) - - extensions[config_prefix] = data + self.config_prefix = config_prefix - def _resolve_class(self, config, config_key, the_class, attr, default): - if the_class is not None: - pass - elif getattr(self, attr) is not None: + def _resolve_class(self, config, config_key, attr, the_class, default_class): + if the_class is None: the_class = getattr(self, attr) - else: - the_class = config.get(config_key, default) - if isinstance(the_class, _string_types): - the_class = import_string(the_class) + if the_class is None: + the_class = config.get(config_key, default_class) + if isinstance(the_class, six.string_types): + the_class = import_string(the_class) + config.pop(config_key, None) return the_class @staticmethod - def _config_from_variables(config, client_class): - host = config.get('HOST') - if host and (host.startswith('file://') or host.startswith('/')): - del config['HOST'] - config['UNIX_SOCKET_PATH'] = host + def _strip_dict_prefix(orig, prefix): + return {k[len(prefix):]: v for (k, v) in orig.items() if k.startswith(prefix)} - args = inspect.getargspec(client_class.__init__).args + @staticmethod + def _config_from_variables(config, the_class): + args = inspect.getargspec(the_class.__init__).args args.remove('self') + args.remove('host') + args.remove('port') + args.remove('db') - def get_config(suffix): - value = config[suffix] - if suffix == 'PORT': - return int(value) - return value + return {arg: config[arg.upper()] for arg in args if arg.upper() in config} - return {arg: get_config(arg.upper()) for arg in args if arg.upper() in config} + def get_instance(self): + app = current_app._get_current_object() + if _EXTENSION_KEY not in app.extensions or self.config_prefix not in app.extensions[_EXTENSION_KEY]: + msg = 'Redis sentinel extension with config prefix {} was not initialized for application {}' + raise RuntimeError(msg.format(self.config_prefix, app.import_name)) + return app.extensions[_EXTENSION_KEY][self.config_prefix] def master_for(self, service_name, **kwargs): - return _ExtensionProxy(self, lambda ext_data: ext_data.master_for(service_name, **kwargs)) + return LocalProxy(lambda: self.get_instance().master_for(service_name, **kwargs)) def slave_for(self, service_name, **kwargs): - return _ExtensionProxy(self, lambda ext_data: ext_data.slave_for(service_name, **kwargs)) \ No newline at end of file + return LocalProxy(lambda: self.get_instance().slave_for(service_name, **kwargs)) + + +SentinelExtension = RedisSentinel # for backwards-compatibility diff --git a/setup.py b/setup.py index eabae9e..31266f2 100644 --- a/setup.py +++ b/setup.py @@ -5,12 +5,12 @@ setup( name='Flask-Redis-Sentinel', py_modules=['flask_redis_sentinel'], - version='1.0.0', - install_requires=['Flask>=0.10.1', 'redis>=2.10.3', 'redis_sentinel_url>=1.0.0,<2.0.0'], + version='2.0.0', + install_requires=['Flask>=0.10.1', 'redis>=2.10.3', 'redis_sentinel_url>=1.0.0,<2.0.0', 'six'], description='Redis-Sentinel integration for Flask', url='/~https://github.com/exponea/flask-redis-sentinel', author='Martin Sucha', - author_email='martin.sucha@infinario.com', + author_email='martin.sucha@exponea.com', license='Apache 2.0', classifiers=[ 'Programming Language :: Python', @@ -19,6 +19,8 @@ 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.3', 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', 'Operating System :: OS Independent', 'License :: OSI Approved :: Apache Software License', 'Development Status :: 5 - Production/Stable', diff --git a/test_flask_redis_sentinel.py b/test_flask_redis_sentinel.py index 9231fc6..ae06965 100644 --- a/test_flask_redis_sentinel.py +++ b/test_flask_redis_sentinel.py @@ -1,4 +1,4 @@ -# Copyright 2015, 2016 Exponea s r.o. +# Copyright 2015, 2016, 2017 Exponea s r.o. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,85 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading from unittest import TestCase from flask import Flask from mock import MagicMock import redis -from flask_redis_sentinel import SentinelExtension, _PrefixedDict - - -class TestCompatibilityWithFlaskAndRedis(TestCase): - def test_empty_settings(self): - self.assertEqual(SentinelExtension._config_from_variables({}, redis.StrictRedis), {}) - - def test_port(self): - self.assertEqual(SentinelExtension._config_from_variables({'PORT': 7379}, redis.StrictRedis)['port'], 7379) - - def test_host(self): - self.assertEqual(SentinelExtension._config_from_variables({'HOST': 'otherhost'}, redis.StrictRedis), - {'host': 'otherhost'}) - - def test_host_path(self): - self.assertEqual(SentinelExtension._config_from_variables({'HOST': '/path/to/socket'}, redis.StrictRedis), - {'unix_socket_path': '/path/to/socket'}) - - def test_host_file_url(self): - self.assertEqual(SentinelExtension._config_from_variables({'HOST': 'file:///path/to/socket'}, redis.StrictRedis), - {'unix_socket_path': 'file:///path/to/socket'}) - - def test_db(self): - self.assertEqual(SentinelExtension._config_from_variables({'DB': 2}, redis.StrictRedis), - {'db': 2}) - - -class TestPrefixedDict(TestCase): - def setUp(self): - self.config = { - 'REDIS_HOST': 'localhost', - 'REDIS_PORT': 6379, - 'OTHER_KEY': 'aA', - 'ALTERNATE_HOST': 'alternate.local', - 'ALTERNATE_PORT': 6380, - } - self.prefixed = _PrefixedDict(self.config, 'REDIS') - - def test_get(self): - self.assertEqual(self.prefixed.get('HOST'), 'localhost') - self.assertEqual(self.prefixed.get('PORT'), 6379) - self.assertEqual(self.prefixed.get('DB'), None) - - def test_getitem(self): - self.assertEqual(self.prefixed['HOST'], 'localhost') - self.assertEqual(self.prefixed['PORT'], 6379) - with self.assertRaises(KeyError): - self.prefixed['DB'] - - def test_set(self): - self.prefixed['DB'] = 2 - self.prefixed['PORT'] = 7000 - self.assertEquals(self.config['REDIS_DB'], 2) - self.assertEquals(self.config['REDIS_PORT'], 7000) - - def test_del(self): - del self.prefixed['PORT'] - with self.assertRaises(KeyError): - del self.prefixed['DB'] - - def test_contains(self): - self.assertTrue('PORT' in self.prefixed) - self.assertFalse('DB' in self.prefixed) - - def test_pop(self): - self.assertEquals(self.prefixed.pop('PORT'), 6379) - self.assertNotIn('REDIS_PORT', self.config) - with self.assertRaises(KeyError): - self.prefixed.pop('DB') - - def test_pop_default(self): - self.assertEquals(self.prefixed.pop('PORT', None), 6379) - self.assertNotIn('REDIS_PORT', self.config) - self.assertIsNone(self.prefixed.pop('DB', None)) - self.assertNotIn('REDIS_DB', self.config) +from flask_redis_sentinel import SentinelExtension class FakeRedis(MagicMock): @@ -185,9 +112,7 @@ def test_default_connection(self): conn = sentinel.default_connection with self.app.app_context(): inst = conn._get_current_object() - self.assertEqual(inst.kwargs['host'], 'localhost') - self.assertEqual(inst.kwargs['port'], 6379) - self.assertEqual(inst.kwargs['db'], 0) + self.assertEqual(inst.kwargs['url'], 'redis://localhost/0') def test_default_connection_with_config_class(self): sentinel = SentinelExtension() @@ -196,9 +121,7 @@ def test_default_connection_with_config_class(self): conn = sentinel.default_connection with self.app.app_context(): inst = conn._get_current_object() - self.assertEqual(inst.kwargs['host'], 'localhost') - self.assertEqual(inst.kwargs['port'], 6379) - self.assertEqual(inst.kwargs['db'], 0) + self.assertEqual(inst.kwargs['url'], 'redis://localhost/0') def test_default_connection_with_init_class(self): sentinel = SentinelExtension() @@ -206,9 +129,7 @@ def test_default_connection_with_init_class(self): conn = sentinel.default_connection with self.app.app_context(): inst = conn._get_current_object() - self.assertEqual(inst.kwargs['host'], 'localhost') - self.assertEqual(inst.kwargs['port'], 6379) - self.assertEqual(inst.kwargs['db'], 0) + self.assertEqual(inst.kwargs['url'], 'redis://localhost/0') def test_default_connection_with_config_class_string(self): sentinel = SentinelExtension() @@ -217,9 +138,7 @@ def test_default_connection_with_config_class_string(self): conn = sentinel.default_connection with self.app.app_context(): inst = conn._get_current_object() - self.assertEqual(inst.kwargs['host'], 'localhost') - self.assertEqual(inst.kwargs['port'], 6379) - self.assertEqual(inst.kwargs['db'], 0) + self.assertEqual(inst.kwargs['url'], 'redis://localhost/0') def test_default_connection_redis_url(self): sentinel = SentinelExtension(client_class=FakeRedis) @@ -238,17 +157,13 @@ def test_default_connection_redis_url(self): def test_default_connection_redis_vars(self): sentinel = SentinelExtension(client_class=FakeRedis) - self.app.config['REDIS_HOST'] = 'hostname' - self.app.config['REDIS_PORT'] = 7001 - self.app.config['REDIS_DB'] = 3 + self.app.config['REDIS_URL'] = 'redis://hostname:7001/3' self.app.config['REDIS_DECODE_RESPONSES'] = True sentinel.init_app(self.app) conn = sentinel.default_connection with self.app.app_context(): inst = conn._get_current_object() - self.assertEqual(inst.kwargs['host'], 'hostname') - self.assertEqual(inst.kwargs['port'], 7001) - self.assertEqual(inst.kwargs['db'], 3) + self.assertEqual(inst.kwargs['url'], 'redis://hostname:7001/3') self.assertEqual(inst.kwargs['decode_responses'], True) def test_sentinel_kwargs_from_config(self): @@ -260,7 +175,6 @@ def test_sentinel_kwargs_from_config(self): self.assertIsNotNone(sentinel.sentinel) self.assertEquals(sentinel.sentinel.sentinel_kwargs, {'socket_connect_timeout': 0.3}) - def test_default_connection_sentinel_url_master(self): sentinel = SentinelExtension(client_class=FakeRedis, sentinel_class=FakeSentinel) self.app.config['REDIS_URL'] = 'redis+sentinel://hostname:7001/mymaster/3' @@ -276,7 +190,6 @@ def test_default_connection_sentinel_url_master(self): self.assertEqual(inst.kwargs['connection_kwargs']['db'], 3) self.assertEqual(inst.kwargs['connection_kwargs']['decode_responses'], True) - def test_default_connection_sentinel_url_slave(self): sentinel = SentinelExtension(client_class=FakeRedis, sentinel_class=FakeSentinel) self.app.config['REDIS_URL'] = 'redis+sentinel://hostname:7001/myslave/3?client_type=slave' @@ -336,7 +249,9 @@ def test_duplicate_prefix_registration(self): sentinel2 = SentinelExtension() sentinel.init_app(self.app) - with self.assertRaisesRegexp(ValueError, 'Config prefix REDIS already registered'): + + msg = 'Redis sentinel extension with config prefix REDIS is already registered' + with self.assertRaisesRegexp(RuntimeError, msg): sentinel2.init_app(self.app) def test_multiple_prefix_registration(self): @@ -363,6 +278,44 @@ def test_init_app_with_prefix_in_constructor(self): inst = conn._get_current_object() self.assertEqual(inst.kwargs['url'], 'redis://hostname2:7003/5') + def _check_threads(self, sentinel): + connections = {} + with self.app.app_context(): + connections['from_main_thread'] = sentinel.default_connection._get_current_object() + + def in_another_thread(): + with self.app.app_context(): + connections['from_another_thread'] = sentinel.default_connection._get_current_object() + + thread = threading.Thread(target=in_another_thread) + thread.start() + thread.join() + + with self.app.app_context(): + connections['from_main_thread_later'] = sentinel.default_connection._get_current_object() + + return connections + + def test_sentinel_threads(self): + sentinel = SentinelExtension(client_class=FakeRedis, sentinel_class=FakeSentinel) + self.app.config['REDIS_URL'] = 'redis+sentinel://hostname:7001/myslave/0' + sentinel.init_app(self.app) + + connections = self._check_threads(sentinel) + self.assertIsNot(connections['from_another_thread'], connections['from_main_thread']) + self.assertIsNot(connections['from_another_thread'], connections['from_main_thread_later']) + self.assertIs(connections['from_main_thread'], connections['from_main_thread_later']) + + def test_redis_threads(self): + sentinel = SentinelExtension(client_class=FakeRedis, sentinel_class=FakeSentinel) + self.app.config['REDIS_URL'] = 'redis://hostname:7001/0' + sentinel.init_app(self.app) + + connections = self._check_threads(sentinel) + self.assertIs(connections['from_another_thread'], connections['from_main_thread']) + self.assertIs(connections['from_another_thread'], connections['from_main_thread_later']) + self.assertIs(connections['from_main_thread'], connections['from_main_thread_later']) + def test_mixed_apps(self): sentinel1 = SentinelExtension(app=self.app, client_class=FakeRedis) conn1 = sentinel1.default_connection @@ -374,11 +327,13 @@ def test_mixed_apps(self): self.app3 = Flask('test3') with self.app2.app_context(): - with self.assertRaisesRegexp(ValueError, 'RedisSentinel extension with config prefix REDIS was not initialized for application test2'): + msg = 'Redis sentinel extension with config prefix REDIS was not initialized for application test2' + with self.assertRaisesRegexp(RuntimeError, msg): conn1._get_current_object() with self.app3.app_context(): - with self.assertRaisesRegexp(ValueError, 'RedisSentinel extension with config prefix REDIS was not initialized for application test3'): + msg = 'Redis sentinel extension with config prefix REDIS was not initialized for application test3' + with self.assertRaisesRegexp(RuntimeError, msg): conn1._get_current_object() def test_named_master(self): @@ -402,7 +357,7 @@ def test_named_master_no_sentinel(self): conn = sentinel.master_for('othermaster', db=6) with self.app.app_context(): self.assertIsNone(sentinel.sentinel._get_current_object()) - with self.assertRaisesRegexp(ValueError, 'Cannot get master othermaster using non-sentinel configuration'): + with self.assertRaisesRegexp(RuntimeError, 'Cannot get master othermaster using non-sentinel configuration'): inst = conn._get_current_object() def test_named_slave(self): @@ -426,5 +381,5 @@ def test_named_slave_no_sentinel(self): conn = sentinel.slave_for('otherslave', db=6) with self.app.app_context(): self.assertIsNone(sentinel.sentinel._get_current_object()) - with self.assertRaisesRegexp(ValueError, 'Cannot get slave otherslave using non-sentinel configuration'): + with self.assertRaisesRegexp(RuntimeError, 'Cannot get slave otherslave using non-sentinel configuration'): inst = conn._get_current_object()