diff --git a/src/httpcore2/httpcore2/_async/connection_pool.py b/src/httpcore2/httpcore2/_async/connection_pool.py index b0df818c..21e791a8 100644 --- a/src/httpcore2/httpcore2/_async/connection_pool.py +++ b/src/httpcore2/httpcore2/_async/connection_pool.py @@ -261,36 +261,50 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]: Any closing connections are returned, allowing the I/O for closing those connections to be handled separately. """ - closing_connections = [] + closing_connections: list[AsyncConnectionInterface] = [] + retained_connections: list[AsyncConnectionInterface] = [] - # First we handle cleaning up any connections that are closed, - # have expired their keep-alive, or surplus idle connections. - for connection in list(self._connections): + # First we handle cleaning up any connections that are closed + # or have expired their keep-alive, in a single pass. + for connection in self._connections: if connection.is_closed(): # log: "removing closed connection" - self._connections.remove(connection) + continue elif connection.has_expired(): # log: "closing expired connection" - self._connections.remove(connection) - closing_connections.append(connection) - elif ( - connection.is_idle() - and sum(connection.is_idle() for connection in self._connections) > self._max_keepalive_connections - ): - # log: "closing idle connection" - self._connections.remove(connection) closing_connections.append(connection) + else: + retained_connections.append(connection) + + # Then we close any surplus idle connections, to enforce the + # max_keepalive_connections setting. + idle_surplus = ( + sum(connection.is_idle() for connection in retained_connections) - self._max_keepalive_connections + ) + if idle_surplus > 0: + kept: list[AsyncConnectionInterface] = [] + for connection in retained_connections: + if idle_surplus > 0 and connection.is_idle(): + # log: "closing idle connection" + closing_connections.append(connection) + idle_surplus -= 1 + else: + kept.append(connection) + retained_connections = kept + + self._connections = retained_connections + + # Snapshot the set of reusable connections once, rather than rebuilding + # it per queued request — this is what brings the loop from O(N*M) to + # O(N+M) in the common case. + available_connections = [connection for connection in self._connections if connection.is_available()] + new_connection_budget = self._max_connections - len(self._connections) # Assign queued requests to connections. - queued_requests = [request for request in self._requests if request.is_queued()] - for pool_request in queued_requests: + for pool_request in self._requests: + if not pool_request.is_queued(): + continue origin = pool_request.request.url.origin - available_connections = [ - connection - for connection in self._connections - if connection.can_handle_request(origin) and connection.is_available() - ] - idle_connections = [connection for connection in self._connections if connection.is_idle()] # There are three cases for how we may be able to handle the request: # @@ -298,24 +312,30 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]: # 2. We can create a new connection to handle the request. # 3. We can close an idle connection and then create a new connection # to handle the request. - if available_connections: - # log: "reusing existing connection" - connection = available_connections[0] - pool_request.assign_to_connection(connection) - elif len(self._connections) < self._max_connections: - # log: "creating new connection" - connection = self.create_connection(origin) - self._connections.append(connection) - pool_request.assign_to_connection(connection) - elif idle_connections: - # log: "closing idle connection" - connection = idle_connections[0] - self._connections.remove(connection) - closing_connections.append(connection) - # log: "creating new connection" - connection = self.create_connection(origin) - self._connections.append(connection) - pool_request.assign_to_connection(connection) + for connection in available_connections: + if connection.can_handle_request(origin): + # log: "reusing existing connection" + pool_request.assign_to_connection(connection) + break + else: + if new_connection_budget > 0: + # log: "creating new connection" + connection = self.create_connection(origin) + self._connections.append(connection) + pool_request.assign_to_connection(connection) + new_connection_budget -= 1 + continue + for idx, connection in enumerate(available_connections): + if connection.is_idle(): + # log: "closing idle connection" + del available_connections[idx] + self._connections.remove(connection) + closing_connections.append(connection) + # log: "creating new connection" + connection = self.create_connection(origin) + self._connections.append(connection) + pool_request.assign_to_connection(connection) + break return closing_connections diff --git a/src/httpcore2/httpcore2/_sync/connection_pool.py b/src/httpcore2/httpcore2/_sync/connection_pool.py index 78003f4d..7c88f1cc 100644 --- a/src/httpcore2/httpcore2/_sync/connection_pool.py +++ b/src/httpcore2/httpcore2/_sync/connection_pool.py @@ -261,36 +261,50 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]: Any closing connections are returned, allowing the I/O for closing those connections to be handled separately. """ - closing_connections = [] + closing_connections: list[ConnectionInterface] = [] + retained_connections: list[ConnectionInterface] = [] - # First we handle cleaning up any connections that are closed, - # have expired their keep-alive, or surplus idle connections. - for connection in list(self._connections): + # First we handle cleaning up any connections that are closed + # or have expired their keep-alive, in a single pass. + for connection in self._connections: if connection.is_closed(): # log: "removing closed connection" - self._connections.remove(connection) + continue elif connection.has_expired(): # log: "closing expired connection" - self._connections.remove(connection) - closing_connections.append(connection) - elif ( - connection.is_idle() - and sum(connection.is_idle() for connection in self._connections) > self._max_keepalive_connections - ): - # log: "closing idle connection" - self._connections.remove(connection) closing_connections.append(connection) + else: + retained_connections.append(connection) + + # Then we close any surplus idle connections, to enforce the + # max_keepalive_connections setting. + idle_surplus = ( + sum(connection.is_idle() for connection in retained_connections) - self._max_keepalive_connections + ) + if idle_surplus > 0: + kept: list[ConnectionInterface] = [] + for connection in retained_connections: + if idle_surplus > 0 and connection.is_idle(): + # log: "closing idle connection" + closing_connections.append(connection) + idle_surplus -= 1 + else: + kept.append(connection) + retained_connections = kept + + self._connections = retained_connections + + # Snapshot the set of reusable connections once, rather than rebuilding + # it per queued request — this is what brings the loop from O(N*M) to + # O(N+M) in the common case. + available_connections = [connection for connection in self._connections if connection.is_available()] + new_connection_budget = self._max_connections - len(self._connections) # Assign queued requests to connections. - queued_requests = [request for request in self._requests if request.is_queued()] - for pool_request in queued_requests: + for pool_request in self._requests: + if not pool_request.is_queued(): + continue origin = pool_request.request.url.origin - available_connections = [ - connection - for connection in self._connections - if connection.can_handle_request(origin) and connection.is_available() - ] - idle_connections = [connection for connection in self._connections if connection.is_idle()] # There are three cases for how we may be able to handle the request: # @@ -298,24 +312,30 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]: # 2. We can create a new connection to handle the request. # 3. We can close an idle connection and then create a new connection # to handle the request. - if available_connections: - # log: "reusing existing connection" - connection = available_connections[0] - pool_request.assign_to_connection(connection) - elif len(self._connections) < self._max_connections: - # log: "creating new connection" - connection = self.create_connection(origin) - self._connections.append(connection) - pool_request.assign_to_connection(connection) - elif idle_connections: - # log: "closing idle connection" - connection = idle_connections[0] - self._connections.remove(connection) - closing_connections.append(connection) - # log: "creating new connection" - connection = self.create_connection(origin) - self._connections.append(connection) - pool_request.assign_to_connection(connection) + for connection in available_connections: + if connection.can_handle_request(origin): + # log: "reusing existing connection" + pool_request.assign_to_connection(connection) + break + else: + if new_connection_budget > 0: + # log: "creating new connection" + connection = self.create_connection(origin) + self._connections.append(connection) + pool_request.assign_to_connection(connection) + new_connection_budget -= 1 + continue + for idx, connection in enumerate(available_connections): + if connection.is_idle(): + # log: "closing idle connection" + del available_connections[idx] + self._connections.remove(connection) + closing_connections.append(connection) + # log: "creating new connection" + connection = self.create_connection(origin) + self._connections.append(connection) + pool_request.assign_to_connection(connection) + break return closing_connections