# HG changeset patch # User Jay Rolette # Date 1589658030 18000 # Sat May 16 14:40:30 2020 -0500 # Node ID 4d9031e09336b9d527c6a22ca9c071761ba20be4 # Parent 479d6b0abaa0e06a589758bcc611bc6b340b6d2b case 5281: backport earlier solution to redis-py connection pool shutting down connections that were in-use by other threads diff -r 479d6b0abaa0 -r 4d9031e09336 redis/connection.py --- a/redis/connection.py Fri May 15 13:34:51 2020 +0000 +++ b/redis/connection.py Sat May 16 14:40:30 2020 -0500 @@ -518,6 +518,10 @@ self._connect_callbacks = [] self._buffer_cutoff = 6000 + # Connection pool generation id + self.pool_generation = 0 + + def __repr__(self): repr_args = ','.join(['%s=%s' % (k, v) for k, v in self.repr_pieces()]) return '%s<%s>' % (self.__class__.__name__, repr_args) @@ -889,6 +893,10 @@ self._connect_callbacks = [] self._buffer_cutoff = 6000 + # Connection pool generation id + self.pool_generation = 0 + + def repr_pieces(self): pieces = [ ('path', self.path), @@ -1092,8 +1100,12 @@ # will notice the first thread already did the work and simply # release the lock. self._fork_lock = threading.Lock() + + self.generation = 0 + self.reset() + def __repr__(self): return "%s<%s>" % ( type(self).__name__, @@ -1177,6 +1189,12 @@ with self._lock: try: connection = self._available_connections.pop() + if connection.pool_generation != self.generation: + # generation counter mismatch: let this connection go and + # create a new one + connection.disconnect() + self._created_connections -= 1 + connection = self.make_connection() except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) @@ -1216,8 +1234,10 @@ "Create a new connection" if self._created_connections >= self.max_connections: raise ConnectionError("Too many connections") + new_connection = self.connection_class(**self.connection_kwargs) + new_connection.pool_generation = self.generation self._created_connections += 1 - return self.connection_class(**self.connection_kwargs) + return new_connection def release(self, connection): "Releases the connection back to the pool" @@ -1227,17 +1247,29 @@ return try: self._in_use_connections.remove(connection) - self._available_connections.append(connection) + # Verify generation id before putting connection back in the free list + if connection.pool_generation == self.generation: + self._available_connections.append(connection) + else: + # generation id mismatch, kill the connection + connection.disconnect() + self._created_connections -= 1 except KeyError: pass def disconnect(self): - "Disconnects all connections in the pool" + """ + Disconnects all connections in the pool + + The disconnect happens over time as connections cycle into and out + of the pool. This avoids the problem of ripping connections out + from under threads that are using them. + """ self._checkpid() with self._lock: - all_conns = chain(self._available_connections, - self._in_use_connections) - for connection in all_conns: + self.generation += 1 + # Safe to release connections that aren't in use + for connection in self._available_connections: connection.disconnect()