Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 60 additions & 42 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import socket
import stat
import sys
import threading
import time
from functools import lru_cache
from typing import Any, Callable, Dict, List, Optional, Sequence, Set
Expand Down Expand Up @@ -315,6 +316,9 @@ def _api_client() -> httpx.Client:
# 3. Helpers
# --------------------------------------------------------------------------- #
_cache: Dict[str, Dict] = {}
# Use RLock (reentrant lock) to allow nested acquisitions by the same thread
# This prevents deadlocks when _fetch_if_valid calls fetch_folder_data which calls _gh_get
_cache_lock = threading.RLock()


@lru_cache(maxsize=128)
Expand Down Expand Up @@ -523,49 +527,60 @@ def _retry_request(request_func, max_retries=MAX_RETRIES, delay=RETRY_DELAY):


def _gh_get(url: str) -> Dict:
if url not in _cache:
# Explicitly let HTTPError propagate (no need to catch just to re-raise)
with _gh.stream("GET", url) as r:
r.raise_for_status()

# 1. Check Content-Length header if present
cl = r.headers.get("Content-Length")
if cl:
try:
if int(cl) > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"({int(cl) / (1024 * 1024):.2f} MB)"
)
except ValueError as e:
# Only catch the conversion error, let the size error propagate
if "Response too large" in str(e):
raise e
log.warning(
f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. "
"Falling back to streaming size check."
)

# 2. Stream and check actual size
chunks = []
current_size = 0
for chunk in r.iter_bytes():
current_size += len(chunk)
if current_size > MAX_RESPONSE_SIZE:
# First check: Quick check without holding lock for long
with _cache_lock:
if url in _cache:
return _cache[url]

# Fetch data if not cached
# Explicitly let HTTPError propagate (no need to catch just to re-raise)
with _gh.stream("GET", url) as r:
r.raise_for_status()

# 1. Check Content-Length header if present
cl = r.headers.get("Content-Length")
if cl:
try:
if int(cl) > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)"
f"({int(cl) / (1024 * 1024):.2f} MB)"
)
chunks.append(chunk)
except ValueError as e:
# Only catch the conversion error, let the size error propagate
if "Response too large" in str(e):
raise e
log.warning(
f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. "
"Falling back to streaming size check."
)

try:
_cache[url] = json.loads(b"".join(chunks))
except json.JSONDecodeError as e:
# 2. Stream and check actual size
chunks = []
current_size = 0
for chunk in r.iter_bytes():
current_size += len(chunk)
if current_size > MAX_RESPONSE_SIZE:
raise ValueError(
f"Invalid JSON response from {sanitize_for_log(url)}"
) from e
f"Response too large from {sanitize_for_log(url)} "
f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)"
)
chunks.append(chunk)

return _cache[url]
try:
data = json.loads(b"".join(chunks))
except json.JSONDecodeError as e:
raise ValueError(
f"Invalid JSON response from {sanitize_for_log(url)}"
) from e

# Double-checked locking: Check again after fetch to avoid duplicate fetches
# If another thread already cached it while we were fetching, use theirs
# for consistency (return _cache[url] instead of data to ensure single source of truth)
with _cache_lock:
if url not in _cache:
_cache[url] = data
return _cache[url]


def check_api_access(client: httpx.Client, profile_id: str) -> bool:
Expand Down Expand Up @@ -693,7 +708,8 @@ def fetch_folder_data(url: str) -> Dict[str, Any]:

def warm_up_cache(urls: Sequence[str]) -> None:
urls = list(set(urls))
urls_to_process = [u for u in urls if u not in _cache]
with _cache_lock:
urls_to_process = [u for u in urls if u not in _cache]
if not urls_to_process:
return

Expand Down Expand Up @@ -1038,10 +1054,12 @@ def sync_profile(
# OPTIMIZATION: Move validation inside the thread pool to parallelize DNS lookups.
# Previously, sequential validation blocked the main thread.
def _fetch_if_valid(url: str):
# Optimization: If we already have the content in cache, skip validation
# because the content was validated at the time of fetch (warm_up_cache).
if url in _cache:
return fetch_folder_data(url)
# Optimization: If we already have the content in cache, return it directly.
# The content was validated at the time of fetch (warm_up_cache).
# Read directly from cache to avoid calling fetch_folder_data while holding lock.
with _cache_lock:
if url in _cache:
return _cache[url]

if validate_folder_url(url):
return fetch_folder_data(url)
Expand Down
Loading