Skip to content

Commit aba09c6

Browse files
committed
refactor(scrapy): simplify httpcache close_spider cleanup and document cache boundary logs
1 parent 4a5a77e commit aba09c6

1 file changed

Lines changed: 41 additions & 35 deletions

File tree

src/apify/scrapy/extensions/_httpcache.py

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -85,44 +85,18 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None:
8585
if self._async_thread is None:
8686
raise ValueError('Async thread not initialized')
8787

88+
if current_time is None:
89+
current_time = int(time())
90+
8891
logger.info(f'Cleaning up cache items (max {self._expiration_max_items})')
89-
# `close` always runs in the `finally`, so neither a cleanup failure below nor an early return can leak
90-
# the event-loop thread.
92+
# Best-effort: a cleanup failure is logged and swallowed (the sweep only reclaims storage, so failing it
93+
# must not turn a normal spider close into an error), and `close` always runs in the `finally`, so
94+
# neither the failure nor an early return can leak the event-loop thread.
9195
try:
9296
if self._expiration_secs > 0:
93-
if current_time is None:
94-
current_time = int(time())
95-
96-
async def expire_kvs() -> None:
97-
if self._kvs is None:
98-
raise ValueError('Key value store not initialized')
99-
# Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order,
100-
# so stale entries may linger. This only reclaims storage; `retrieve_response` already treats
101-
# an expired entry as a cache miss.
102-
processed = 0
103-
async for item in self._kvs.iterate_keys():
104-
if processed >= self._expiration_max_items:
105-
break
106-
processed += 1
107-
value = await self._kvs.get_value(item.key)
108-
try:
109-
gzip_time = read_gzip_time(value)
110-
except Exception as exc:
111-
logger.warning(f'Malformed cache item {item.key}: {exc}')
112-
await self._kvs.delete_value(item.key)
113-
else:
114-
if self._expiration_secs < current_time - gzip_time:
115-
logger.debug(f'Expired cache item {item.key}')
116-
await self._kvs.delete_value(item.key)
117-
else:
118-
logger.debug(f'Valid cache item {item.key}')
119-
120-
# Best-effort: log and swallow a cleanup failure rather than raise. The sweep only reclaims
121-
# storage, so failing it must not turn a normal spider close into an error.
122-
try:
123-
self._async_thread.run_coro(expire_kvs())
124-
except Exception:
125-
logger.exception('Failed to clean up expired cache items.')
97+
self._async_thread.run_coro(self._expire_kvs(current_time))
98+
except Exception:
99+
logger.exception('Failed to clean up expired cache items.')
126100
finally:
127101
logger.debug('Closing cache storage')
128102
try:
@@ -147,6 +121,9 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
147121
try:
148122
value = self._async_thread.run_coro(self._kvs.get_value(key))
149123
except Exception:
124+
# Not redundant: `run_coro` re-raises but no longer logs, and Scrapy logs this downloader-middleware
125+
# Deferred failure only generically ('Error downloading <request>'), or not at all if a request
126+
# errback swallows it. So record the cache-specific context here before re-raising.
150127
logger.exception('Failed to retrieve a response from the cache.')
151128
raise
152129

@@ -198,9 +175,38 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non
198175
try:
199176
self._async_thread.run_coro(self._kvs.set_value(key, value))
200177
except Exception:
178+
# Boundary log, see `retrieve_response`: `run_coro` re-raises without logging and Scrapy logs this
179+
# Deferred failure only generically (or not at all behind an errback), so capture the context here.
201180
logger.exception('Failed to store a response in the cache.')
202181
raise
203182

183+
async def _expire_kvs(self, current_time: int) -> None:
184+
"""Sweep the cache key-value store, deleting expired or unreadable entries.
185+
186+
Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, so stale
187+
entries may linger. This only reclaims storage; `retrieve_response` already treats an expired entry as
188+
a cache miss.
189+
"""
190+
if self._kvs is None:
191+
raise ValueError('Key value store not initialized')
192+
processed = 0
193+
async for item in self._kvs.iterate_keys():
194+
if processed >= self._expiration_max_items:
195+
break
196+
processed += 1
197+
value = await self._kvs.get_value(item.key)
198+
try:
199+
gzip_time = read_gzip_time(value)
200+
except Exception as exc:
201+
logger.warning(f'Malformed cache item {item.key}: {exc}')
202+
await self._kvs.delete_value(item.key)
203+
else:
204+
if self._expiration_secs < current_time - gzip_time:
205+
logger.debug(f'Expired cache item {item.key}')
206+
await self._kvs.delete_value(item.key)
207+
else:
208+
logger.debug(f'Valid cache item {item.key}')
209+
204210

205211
def to_gzip(data: dict, mtime: int | None = None) -> bytes:
206212
"""Dump a dictionary to a gzip-compressed JSON byte stream.

0 commit comments

Comments
 (0)