Skip to content

Commit

Permalink
Use thread-local for connecting to redis to avoid race conditions on …
Browse files Browse the repository at this point in the history
…master failover

redis/redis-py#732

APP-2467 APP-3994
  • Loading branch information
lalinsky committed Sep 14, 2017
1 parent 1ba7ac4 commit b704961
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 254 deletions.
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ Accessing redis-py's Sentinel instance
Change log
----------

v2.0.0
~~~~~~

* Connections are now thread-local to avoid race conditions after Redis master failover
* Removed support for `REDIS_{HOST, PORT, DB}` config variables

v1.0.0
~~~~~~

Expand Down
265 changes: 115 additions & 150 deletions flask_redis_sentinel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2015, 2016 Exponea s r.o. <info@exponea.com>
# Copyright 2015, 2016, 2017 Exponea s r.o. <info@exponea.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,110 +12,89 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import namedtuple
import six
import inspect
try:
import urllib.parse as urlparse
except ImportError:
import urlparse
from flask import current_app
import warnings
import redis
import redis.sentinel # requires redis-py 2.9.0+
import redis.sentinel
import redis_sentinel_url
import sys
from werkzeug.local import LocalProxy
from flask import current_app
from werkzeug.local import Local, LocalProxy
from werkzeug.utils import import_string


if sys.version_info[0] == 2: # pragma: no cover
# Python 2.x
_string_types = basestring

def iteritems(d):
return d.iteritems()
else: # pragma: no cover
# Python 3.x
_string_types = str

def iteritems(d):
return d.items()

_EXTENSION_KEY = 'redissentinel'


class _ExtensionData(object):
def __init__(self, client_class, sentinel=None, default_connection=None):
class RedisSentinelInstance(object):

def __init__(self, url, client_class, client_options, sentinel_class, sentinel_options):
self.url = url
self.client_class = client_class
self.sentinel = sentinel
self.default_connection = default_connection
self.master_connections = {}
self.slave_connections = {}
self.client_options = client_options
self.sentinel_class = sentinel_class
self.sentinel_options = sentinel_options
self.local = Local()
self._connect()
if self.local.connection[0] is None:
# if there is no sentinel, we don't need to use thread-local storage
self.connection = self.local.connection
self.local = self

def _connect(self):
try:
return self.local.connection
except AttributeError:
conn = redis_sentinel_url.connect(
self.url,
sentinel_class=self.sentinel_class, sentinel_options=self.sentinel_options,
client_class=self.client_class, client_options=self.client_options)
self.local.connection = conn
return conn

@property
def sentinel(self):
return self._connect()[0]

@property
def default_connection(self):
return self._connect()[1]

def master_for(self, service_name, **kwargs):
if self.sentinel is None:
raise ValueError('Cannot get master {} using non-sentinel configuration'.format(service_name))
if service_name not in self.master_connections:
self.master_connections[service_name] = self.sentinel.master_for(service_name, redis_class=self.client_class,
**kwargs)
return self.master_connections[service_name]

def slave_for(self, service_name, **kwargs):
if self.sentinel is None:
raise ValueError('Cannot get slave {} using non-sentinel configuration'.format(service_name))
if service_name not in self.slave_connections:
self.slave_connections[service_name] = self.sentinel.slave_for(service_name, redis_class=self.client_class,
**kwargs)
return self.slave_connections[service_name]


class _ExtensionProxy(LocalProxy):
__slots__ = ('__sentinel',)

def __init__(self, sentinel, local, name=None):
object.__setattr__(self, '_ExtensionProxy__sentinel', sentinel)
super(_ExtensionProxy, self).__init__(local, name=name)

def _get_current_object(self):
app = current_app._get_current_object()
if _EXTENSION_KEY not in app.extensions or self.__sentinel.config_prefix not in app.extensions[_EXTENSION_KEY]:
raise ValueError('RedisSentinel extension with config prefix {} was not initialized for application {}'.
format(self.__sentinel.config_prefix, app.import_name))
ext_data = app.extensions[_EXTENSION_KEY][self.__sentinel.config_prefix]

local = object.__getattribute__(self, '_LocalProxy__local')

return local(ext_data)


class _PrefixedDict(object):
def __init__(self, config, prefix):
self.config = config
self.prefix = prefix

def _key(self, key):
return '{}_{}'.format(self.prefix, key)

def __getitem__(self, item):
return self.config[self._key(item)]
try:
return self.local.master_connections[service_name]
except AttributeError:
self.local.master_connections = {}
except KeyError:
pass

def __setitem__(self, item, value):
self.config[self._key(item)] = value
sentinel = self.sentinel
if sentinel is None:
msg = 'Cannot get master {} using non-sentinel configuration'
raise RuntimeError(msg.format(service_name))

def __delitem__(self, item):
del self.config[self._key(item)]
conn = sentinel.master_for(service_name, redis_class=self.client_class, **kwargs)
self.local.master_connections[service_name] = conn
return conn

def __contains__(self, item):
return self._key(item) in self.config
def slave_for(self, service_name, **kwargs):
try:
return self.local.slave_connections[service_name]
except AttributeError:
self.local.slave_connections = {}
except KeyError:
pass

def get(self, item, default=None):
return self.config.get(self._key(item), default)
sentinel = self.sentinel
if sentinel is None:
msg = 'Cannot get slave {} using non-sentinel configuration'
raise RuntimeError(msg.format(service_name))

def pop(self, item, *args, **kwargs):
return self.config.pop(self._key(item), *args, **kwargs)
conn = sentinel.slave_for(service_name, redis_class=self.client_class, **kwargs)
self.local.slave_connections[service_name] = conn
return conn


class SentinelExtension(object):
class RedisSentinel(object):
"""Flask extension that supports connections to master using Redis Sentinel.
Supported URL types:
Expand All @@ -124,92 +103,78 @@ class SentinelExtension(object):
rediss://
unix://
"""
def __init__(self, app=None, config_prefix=None, client_class=None, sentinel_class=None):
self.config_prefix = None

def __init__(self, app=None, config_prefix='REDIS', client_class=None, sentinel_class=None):
self.config_prefix = config_prefix
self.client_class = client_class
self.sentinel_class = sentinel_class
if app is not None:
self.init_app(app, config_prefix=config_prefix)
self.default_connection = _ExtensionProxy(self, lambda ext_data: ext_data.default_connection)
self.sentinel = _ExtensionProxy(self, lambda ext_data: ext_data.sentinel)
self.init_app(app)
self.sentinel = LocalProxy(lambda: self.get_instance().sentinel)
self.default_connection = LocalProxy(lambda: self.get_instance().default_connection)

def init_app(self, app, config_prefix=None, client_class=None, sentinel_class=None):
if _EXTENSION_KEY not in app.extensions:
app.extensions[_EXTENSION_KEY] = {}

extensions = app.extensions[_EXTENSION_KEY]
config_prefix = config_prefix or self.config_prefix
app.config.setdefault(config_prefix + '_' + 'URL', 'redis://localhost/0')

if config_prefix is None:
config_prefix = 'REDIS'
config = self._strip_dict_prefix(app.config, config_prefix + '_')

extensions = app.extensions.setdefault(_EXTENSION_KEY, {})
if config_prefix in extensions:
raise ValueError('Config prefix {} already registered'.format(config_prefix))

self.config_prefix = config_prefix
msg = 'Redis sentinel extension with config prefix {} is already registered'
raise RuntimeError(msg.format(config_prefix))

config = _PrefixedDict(app.config, config_prefix)
url = config.get('URL')
client_class = self._resolve_class(
config, 'CLASS', 'client_class', client_class, redis.StrictRedis)
sentinel_class = self._resolve_class(
config, 'SENTINEL_CLASS', 'sentinel_class', sentinel_class, redis.sentinel.Sentinel)

client_class = self._resolve_class(config, 'CLASS', client_class, 'client_class',
default=redis.StrictRedis)
sentinel_class = self._resolve_class(config, 'SENTINEL_CLASS', sentinel_class, 'sentinel_class',
default=redis.sentinel.Sentinel)
url = config.pop('URL')
client_options = self._config_from_variables(config, client_class)
sentinel_options = self._config_from_variables(
self._strip_dict_prefix(config, 'SENTINEL_'), client_class)

data = _ExtensionData(client_class)
extensions[config_prefix] = RedisSentinelInstance(
url, client_class, client_options, sentinel_class, sentinel_options)

if url:
connection_options = self._config_from_variables(config, client_class)
sentinel_options = self._config_from_variables(_PrefixedDict(config, 'SENTINEL'), client_class)

connection_options.pop('host', None)
connection_options.pop('port', None)
connection_options.pop('db', None)

result = redis_sentinel_url.connect(url, sentinel_class=sentinel_class,
sentinel_options=sentinel_options,
client_class=client_class,
client_options=connection_options)
data.sentinel, data.default_connection = result
else:
# Stay compatible with Flask-And-Redis for a while
warnings.warn('Setting redis connection via separate variables is deprecated. Please use REDIS_URL.',
DeprecationWarning)
kwargs = self._config_from_variables(config, client_class)
data.default_connection = client_class(**kwargs)

extensions[config_prefix] = data
self.config_prefix = config_prefix

def _resolve_class(self, config, config_key, the_class, attr, default):
if the_class is not None:
pass
elif getattr(self, attr) is not None:
def _resolve_class(self, config, config_key, attr, the_class, default_class):
if the_class is None:
the_class = getattr(self, attr)
else:
the_class = config.get(config_key, default)
if isinstance(the_class, _string_types):
the_class = import_string(the_class)
if the_class is None:
the_class = config.get(config_key, default_class)
if isinstance(the_class, six.string_types):
the_class = import_string(the_class)
config.pop(config_key, None)
return the_class

@staticmethod
def _config_from_variables(config, client_class):
host = config.get('HOST')
if host and (host.startswith('file://') or host.startswith('/')):
del config['HOST']
config['UNIX_SOCKET_PATH'] = host
def _strip_dict_prefix(orig, prefix):
return {k[len(prefix):]: v for (k, v) in orig.items() if k.startswith(prefix)}

args = inspect.getargspec(client_class.__init__).args
@staticmethod
def _config_from_variables(config, the_class):
args = inspect.getargspec(the_class.__init__).args
args.remove('self')
args.remove('host')
args.remove('port')
args.remove('db')

def get_config(suffix):
value = config[suffix]
if suffix == 'PORT':
return int(value)
return value
return {arg: config[arg.upper()] for arg in args if arg.upper() in config}

return {arg: get_config(arg.upper()) for arg in args if arg.upper() in config}
def get_instance(self):
app = current_app._get_current_object()
if _EXTENSION_KEY not in app.extensions or self.config_prefix not in app.extensions[_EXTENSION_KEY]:
msg = 'Redis sentinel extension with config prefix {} was not initialized for application {}'
raise RuntimeError(msg.format(self.config_prefix, app.import_name))
return app.extensions[_EXTENSION_KEY][self.config_prefix]

def master_for(self, service_name, **kwargs):
return _ExtensionProxy(self, lambda ext_data: ext_data.master_for(service_name, **kwargs))
return LocalProxy(lambda: self.get_instance().master_for(service_name, **kwargs))

def slave_for(self, service_name, **kwargs):
return _ExtensionProxy(self, lambda ext_data: ext_data.slave_for(service_name, **kwargs))
return LocalProxy(lambda: self.get_instance().slave_for(service_name, **kwargs))


SentinelExtension = RedisSentinel # for backwards-compatibility
8 changes: 5 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
setup(
name='Flask-Redis-Sentinel',
py_modules=['flask_redis_sentinel'],
version='1.0.0',
install_requires=['Flask>=0.10.1', 'redis>=2.10.3', 'redis_sentinel_url>=1.0.0,<2.0.0'],
version='2.0.0',
install_requires=['Flask>=0.10.1', 'redis>=2.10.3', 'redis_sentinel_url>=1.0.0,<2.0.0', 'six'],
description='Redis-Sentinel integration for Flask',
url='/~https://github.com/exponea/flask-redis-sentinel',
author='Martin Sucha',
author_email='martin.sucha@infinario.com',
author_email='martin.sucha@exponea.com',
license='Apache 2.0',
classifiers=[
'Programming Language :: Python',
Expand All @@ -19,6 +19,8 @@
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Operating System :: OS Independent',
'License :: OSI Approved :: Apache Software License',
'Development Status :: 5 - Production/Stable',
Expand Down
Loading

0 comments on commit b704961

Please sign in to comment.