From 73e949a01bb3d1d5b35ff9ec5fbc3b0a09592107 Mon Sep 17 00:00:00 2001 From: Fran Garcia Date: Mon, 19 Jul 2021 12:50:12 +0100 Subject: [PATCH] Stop leaking connections when pool is full When our connection pool is full (there are no available connections) we can run into a case where we leak perfectly useable connections. This essentially reduces the size of our connection pool every time it happens, as it can slowly leak away our unused connections until we don't have enough connections available to cope with our workload. This is particularly problematic when preparing connections for a pipeline, since we can potentially request (and leak) a large number of connections in one go. This change makes sure that before raising an exception if there are no available connections we return back all connections we've checked to the pool. In cases where we can't guarantee the connections we'd return to the pool are in a safe state we just add back `None` objects to the pool so at least the effective size of the connection pool remains unchanged. --- rediscluster/connection.py | 20 ++++++++++++++++---- rediscluster/pipeline.py | 10 +++++++++- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/rediscluster/connection.py b/rediscluster/connection.py index 70dadd0b..a8f4d551 100644 --- a/rediscluster/connection.py +++ b/rediscluster/connection.py @@ -277,7 +277,7 @@ def make_connection(self, node): return connection - def release(self, connection): + def release(self, connection, add_to_pool=True): """ Releases the connection back to the pool """ @@ -296,7 +296,15 @@ def release(self, connection): pass # TODO: Log.warning("Tried to release connection that did not exist any longer : {0}".format(connection)) - self._available_connections.setdefault(connection.node["name"], []).append(connection) + if add_to_pool: + self._available_connections.setdefault(connection.node["name"], []).append(connection) + else: + # If we don't add it back to the pool it shouldn't count towards the + # connection pool, or we'll artificially reduce the maximum size of the + # pool + self._created_connections_per_node.setdefault(node['name'], 0) + if self._created_connections_per_node[connection.node["name"]] > 0: + self._created_connections_per_node[connection.node["name"]] -= 1 def disconnect(self): """ @@ -538,7 +546,7 @@ def get_connection_by_node(self, node): return connection - def release(self, connection): + def release(self, connection, add_to_pool=True): """ Releases the connection back to the pool """ @@ -546,9 +554,13 @@ def release(self, connection): if connection.pid != self.pid: return + # In some cases we don't want to add back this connection to the pool but + # we still want to free its slot + conn_to_add = connection if add_to_pool else None + # Put the connection back into the pool. try: - self._get_pool(connection.node).put_nowait(connection) + self._get_pool(connection.node).put_nowait(conn_to_add) except Full: # perhaps the pool has been reset() after a fork? regardless, # we don't want this connection diff --git a/rediscluster/pipeline.py b/rediscluster/pipeline.py index 3eae7a39..e00e36ae 100644 --- a/rediscluster/pipeline.py +++ b/rediscluster/pipeline.py @@ -203,7 +203,15 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections= # we can build a list of commands for each node. node_name = node['name'] if node_name not in nodes: - nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node)) + try: + nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node)) + except: + # Sommething happened, maybe the pool is full, we need to release any connection + # we've taken or we'll leak them. Because we're not sure if the connections are + # in a good state we'll release their slot but not reuse them + for n in nodes.values(): + self.connection_pool.release(n.connection, add_to_pool=False) + raise nodes[node_name].append(c)