diff --git a/.jules/bolt.md b/.jules/bolt.md index c717282b..2ca902f3 100644 --- a/.jules/bolt.md +++ b/.jules/bolt.md @@ -19,3 +19,7 @@ ## 2024-05-24 - Avoid Copying Large Sets for Membership Checks **Learning:** Copying a large set (e.g. 100k items) to create a snapshot for read-only membership checks is expensive O(N) and unnecessary. Python's set membership testing is thread-safe. **Action:** When filtering data against a shared large set, iterate and check membership directly instead of snapshotting, unless strict transactional consistency across the entire iteration is required. + +## 2024-05-24 - Parallelizing Batched API Calls +**Learning:** For bulk API operations (like pushing rules) that are split into batches, processing batches in parallel (with limited workers) can drastically reduce total time compared to serial execution. However, strict rate limit compliance is crucial; using a small, fixed number of workers (e.g., 4) balances speed and safety. +**Action:** Refactor serial batch loops to use `ThreadPoolExecutor` when the API supports concurrent requests, ensuring thread safety for any shared progress tracking or state updates. diff --git a/main.py b/main.py index e6aabc57..8d125f24 100644 --- a/main.py +++ b/main.py @@ -166,6 +166,7 @@ def _clean_env_kv(value: Optional[str], key: str) -> Optional[str]: RETRY_DELAY = 1 FOLDER_CREATION_DELAY = 5 # <--- CHANGED: Increased from 2 to 5 for patience MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10MB limit +MAX_PUSH_WORKERS = 4 # --------------------------------------------------------------------------- # # 2. Clients @@ -492,32 +493,44 @@ def push_rules( successful_batches = 0 total_batches = len(range(0, len(filtered_hostnames), BATCH_SIZE)) - for i, start in enumerate(range(0, len(filtered_hostnames), BATCH_SIZE), 1): - batch = filtered_hostnames[start : start + BATCH_SIZE] + def _push_batch(batch_idx: int, batch_items: List[str]) -> bool: data = { "do": str(do), "status": str(status), "group": str(folder_id), } - for j, hostname in enumerate(batch): + for j, hostname in enumerate(batch_items): data[f"hostnames[{j}]"] = hostname try: _api_post_form(client, f"{API_BASE}/{profile_id}/rules", data=data) log.info( "Folder %s – batch %d: added %d rules", - sanitize_for_log(folder_name), i, len(batch) + sanitize_for_log(folder_name), batch_idx, len(batch_items) ) - successful_batches += 1 if existing_rules_lock: with existing_rules_lock: - existing_rules.update(batch) + existing_rules.update(batch_items) else: - existing_rules.update(batch) + existing_rules.update(batch_items) + return True except httpx.HTTPError as e: - log.error(f"Failed to push batch {i} for folder {sanitize_for_log(folder_name)}: {sanitize_for_log(e)}") + log.error(f"Failed to push batch {batch_idx} for folder {sanitize_for_log(folder_name)}: {sanitize_for_log(e)}") if hasattr(e, 'response') and e.response is not None: log.debug(f"Response content: {e.response.text}") + return False + + # Optimization: Use parallel processing for batches + # Using MAX_PUSH_WORKERS workers to balance speed and rate limits + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PUSH_WORKERS) as executor: + futures = [] + for i, start in enumerate(range(0, len(filtered_hostnames), BATCH_SIZE), 1): + batch = filtered_hostnames[start : start + BATCH_SIZE] + futures.append(executor.submit(_push_batch, i, batch)) + + for future in concurrent.futures.as_completed(futures): + if future.result(): + successful_batches += 1 if successful_batches == total_batches: log.info("Folder %s – finished (%d new rules added)", sanitize_for_log(folder_name), len(filtered_hostnames))