Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 59 additions & 39 deletions src/httpcore2/httpcore2/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,61 +261,81 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]:
Any closing connections are returned, allowing the I/O for closing
those connections to be handled separately.
"""
closing_connections = []
closing_connections: list[AsyncConnectionInterface] = []
retained_connections: list[AsyncConnectionInterface] = []

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
# First we handle cleaning up any connections that are closed
# or have expired their keep-alive, in a single pass.
for connection in self._connections:
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
continue
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and sum(connection.is_idle() for connection in self._connections) > self._max_keepalive_connections
):
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)
else:
retained_connections.append(connection)

# Then we close any surplus idle connections, to enforce the
# max_keepalive_connections setting.
idle_surplus = (
sum(connection.is_idle() for connection in retained_connections) - self._max_keepalive_connections
)
if idle_surplus > 0:
kept: list[AsyncConnectionInterface] = []
for connection in retained_connections:
if idle_surplus > 0 and connection.is_idle():
# log: "closing idle connection"
closing_connections.append(connection)
idle_surplus -= 1
else:
kept.append(connection)
retained_connections = kept

self._connections = retained_connections

# Snapshot the set of reusable connections once, rather than rebuilding
# it per queued request — this is what brings the loop from O(N*M) to
# O(N+M) in the common case.
available_connections = [connection for connection in self._connections if connection.is_available()]
new_connection_budget = self._max_connections - len(self._connections)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in self._requests:
if not pool_request.is_queued():
continue
origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [connection for connection in self._connections if connection.is_idle()]

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if available_connections:
# log: "reusing existing connection"
connection = available_connections[0]
pool_request.assign_to_connection(connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
for connection in available_connections:
if connection.can_handle_request(origin):
# log: "reusing existing connection"
pool_request.assign_to_connection(connection)
break
else:
if new_connection_budget > 0:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
new_connection_budget -= 1
continue
for idx, connection in enumerate(available_connections):
if connection.is_idle():
# log: "closing idle connection"
del available_connections[idx]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
break

return closing_connections

Expand Down
98 changes: 59 additions & 39 deletions src/httpcore2/httpcore2/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,61 +261,81 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]:
Any closing connections are returned, allowing the I/O for closing
those connections to be handled separately.
"""
closing_connections = []
closing_connections: list[ConnectionInterface] = []
retained_connections: list[ConnectionInterface] = []

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
# First we handle cleaning up any connections that are closed
# or have expired their keep-alive, in a single pass.
for connection in self._connections:
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
continue
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and sum(connection.is_idle() for connection in self._connections) > self._max_keepalive_connections
):
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)
else:
retained_connections.append(connection)

# Then we close any surplus idle connections, to enforce the
# max_keepalive_connections setting.
idle_surplus = (
sum(connection.is_idle() for connection in retained_connections) - self._max_keepalive_connections
)
if idle_surplus > 0:
kept: list[ConnectionInterface] = []
for connection in retained_connections:
if idle_surplus > 0 and connection.is_idle():
# log: "closing idle connection"
closing_connections.append(connection)
idle_surplus -= 1
else:
kept.append(connection)
retained_connections = kept

self._connections = retained_connections

# Snapshot the set of reusable connections once, rather than rebuilding
# it per queued request — this is what brings the loop from O(N*M) to
# O(N+M) in the common case.
available_connections = [connection for connection in self._connections if connection.is_available()]
new_connection_budget = self._max_connections - len(self._connections)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in self._requests:
if not pool_request.is_queued():
continue
origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [connection for connection in self._connections if connection.is_idle()]

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if available_connections:
# log: "reusing existing connection"
connection = available_connections[0]
pool_request.assign_to_connection(connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
for connection in available_connections:
if connection.can_handle_request(origin):
# log: "reusing existing connection"
pool_request.assign_to_connection(connection)
break
else:
if new_connection_budget > 0:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
new_connection_budget -= 1
continue
for idx, connection in enumerate(available_connections):
if connection.is_idle():
# log: "closing idle connection"
del available_connections[idx]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
break

return closing_connections

Expand Down
Loading