From 1f8b0ca981147f9cbf3975793bff9682d7588c67 Mon Sep 17 00:00:00 2001 From: snowfox1003 Date: Tue, 2 Jun 2026 12:45:20 -0400 Subject: [PATCH 1/5] chore: update requirements to include portalocker and refactor state management for thread safety --- requirements.in | 1 + requirements.lock | 2 + scripts/generate_service_docs.py | 11 +- slack_event_handler/tests/test_job_queue.py | 136 ++++++++++++++++++-- slack_event_handler/tests/test_state.py | 44 +++++++ slack_event_handler/utils/job_queue.py | 22 ++-- slack_event_handler/utils/rate_limiter.py | 9 +- slack_event_handler/utils/state.py | 60 ++++++++- 8 files changed, 256 insertions(+), 29 deletions(-) diff --git a/requirements.in b/requirements.in index 4c8c26ea..dcd4cf50 100644 --- a/requirements.in +++ b/requirements.in @@ -48,3 +48,4 @@ yt-dlp>=2026.2.21,<2027 # --- slack_event_handler (GitHub PR comments) --- PyGithub>=2.0,<3 +portalocker>=2.8,<3 diff --git a/requirements.lock b/requirements.lock index 488b9a67..3dcafe32 100644 --- a/requirements.lock +++ b/requirements.lock @@ -106,6 +106,8 @@ pinecone-plugin-interface==0.0.7 # via pinecone plyvel==1.5.1 # via -r requirements.in +portalocker==2.10.1 + # via -r requirements.in prompt-toolkit==3.0.52 # via click-repl propcache==0.5.2 diff --git a/scripts/generate_service_docs.py b/scripts/generate_service_docs.py index f4b030fa..ad8b4902 100644 --- a/scripts/generate_service_docs.py +++ b/scripts/generate_service_docs.py @@ -162,7 +162,9 @@ def _extract_public_functions(source: str) -> list[ServiceFuncRow]: def _render_service_table( - rows: Iterable[ServiceFuncRow], *, section_title: str = "## Public API (generated)" + rows: Iterable[ServiceFuncRow], + *, + section_title: str = "## Public API (generated)", ) -> str: lines = [ section_title, @@ -201,7 +203,9 @@ class ProtocolRow: properties: tuple[ProtocolProperty, ...] -def _extract_protocols(source: str) -> tuple[list[ProtocolRow], list[ServiceFuncRow]]: +def _extract_protocols( + source: str, +) -> tuple[list[ProtocolRow], list[ServiceFuncRow]]: tree = ast.parse(source) protocols: list[ProtocolRow] = [] helpers: list[ServiceFuncRow] = [] @@ -324,6 +328,9 @@ def _discover_apps_with_services() -> list[tuple[str, Path]]: continue svc = child / "services.py" if svc.is_file(): + content = _read_text(svc) + if "This service should be skipped for docs generation" in content: + continue found.append((child.name, svc)) return found diff --git a/slack_event_handler/tests/test_job_queue.py b/slack_event_handler/tests/test_job_queue.py index bc739742..7f975987 100644 --- a/slack_event_handler/tests/test_job_queue.py +++ b/slack_event_handler/tests/test_job_queue.py @@ -1,10 +1,14 @@ """Tests for slack_event_handler.utils.job_queue.""" +import threading +from contextlib import contextmanager from unittest.mock import MagicMock, patch import pytest from slack_event_handler.utils import job_queue +from slack_event_handler.utils.rate_limiter import record_posted +from slack_event_handler.utils.state import load_state @pytest.fixture(autouse=True) @@ -285,17 +289,22 @@ def test_worker_processes_job_then_exits_on_sleep(settings): loads = [ {"queue": [job], "postedAt": []}, {"queue": [], "postedAt": []}, - {"queue": [], "postedAt": []}, ] - def fake_load(team_id=None): - return loads.pop(0) + @contextmanager + def fake_modify(team_id=None): + state = loads.pop(0) if loads else {"queue": [], "postedAt": []} + yield state def sleep_side_effect(_sec): raise RuntimeError("stop_worker_loop") - with patch.object(job_queue, "load_state", side_effect=fake_load): - with patch.object(job_queue, "save_state"): + with patch.object(job_queue, "modify_state", fake_modify): + with patch.object( + job_queue, + "load_state", + return_value={"queue": [], "postedAt": []}, + ): with patch.object(job_queue, "wait_for_slot"): with patch.object(job_queue, "post_pr_comment"): with patch.object(job_queue, "record_posted"): @@ -327,17 +336,22 @@ def test_worker_process_job_failure_sends_error_reply(settings): loads = [ {"queue": [job], "postedAt": []}, {"queue": [], "postedAt": []}, - {"queue": [], "postedAt": []}, ] - def fake_load(team_id=None): - return loads.pop(0) + @contextmanager + def fake_modify(team_id=None): + state = loads.pop(0) if loads else {"queue": [], "postedAt": []} + yield state def sleep_side_effect(_sec): raise RuntimeError("stop_worker_loop") - with patch.object(job_queue, "load_state", side_effect=fake_load): - with patch.object(job_queue, "save_state"): + with patch.object(job_queue, "modify_state", fake_modify): + with patch.object( + job_queue, + "load_state", + return_value={"queue": [], "postedAt": []}, + ): with patch.object( job_queue, "post_pr_comment", side_effect=RuntimeError("gh") ): @@ -354,3 +368,105 @@ def sleep_side_effect(_sec): for ca in mock_app.client.chat_postMessage.call_args_list ] assert any("Could not post" in t for t in texts) + + +@pytest.mark.django_db +def test_concurrent_enqueue_preserves_all_jobs(settings, tmp_path): + settings.SLACK_PR_BOT_COMMENTS_MAX_PER_WINDOW = 5 + path = tmp_path / "state_T9.json" + n = 30 + barrier = threading.Barrier(n) + job_ids: list[str] = [] + errors: list[BaseException] = [] + lock = threading.Lock() + + def worker(): + try: + barrier.wait(timeout=10) + job = job_queue.enqueue_job( + owner="o", + repo="r", + pull_number=1, + channel="C1", + message_ts="1.0", + user_id="U1", + is_dm=False, + team_id="T9", + ) + with lock: + job_ids.append(job[job_queue.KEY_JOB_ID]) + except BaseException as e: + with lock: + errors.append(e) + + with patch( + "slack_event_handler.utils.state._get_state_file_path", + return_value=str(path), + ): + threads = [threading.Thread(target=worker) for _ in range(n)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=30) + + assert not errors + loaded = load_state("T9") + assert len(loaded["queue"]) == n + assert len(set(job_ids)) == n + + +@pytest.mark.django_db +def test_concurrent_enqueue_and_record_posted(settings, tmp_path): + settings.SLACK_PR_BOT_COMMENTS_MAX_PER_WINDOW = 10 + path = tmp_path / "state_T9.json" + n_enqueue = 15 + n_posted = 15 + n_total = n_enqueue + n_posted + barrier = threading.Barrier(n_total) + errors: list[BaseException] = [] + lock = threading.Lock() + + def enqueue_worker(): + try: + barrier.wait(timeout=10) + job_queue.enqueue_job( + owner="o", + repo="r", + pull_number=1, + channel="C1", + message_ts="1.0", + user_id="U1", + team_id="T9", + ) + except BaseException as e: + with lock: + errors.append(e) + + def posted_worker(): + try: + barrier.wait(timeout=10) + record_posted("T9") + except BaseException as e: + with lock: + errors.append(e) + + with patch( + "slack_event_handler.utils.state._get_state_file_path", + return_value=str(path), + ): + with patch( + "slack_event_handler.utils.rate_limiter.time.time", return_value=42.0 + ): + threads = [ + threading.Thread(target=enqueue_worker) for _ in range(n_enqueue) + ] + threads += [threading.Thread(target=posted_worker) for _ in range(n_posted)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=30) + + assert not errors + loaded = load_state("T9") + assert len(loaded["queue"]) == n_enqueue + assert len(loaded["postedAt"]) == n_posted diff --git a/slack_event_handler/tests/test_state.py b/slack_event_handler/tests/test_state.py index a0c2136e..4831b976 100644 --- a/slack_event_handler/tests/test_state.py +++ b/slack_event_handler/tests/test_state.py @@ -1,5 +1,6 @@ """Tests for slack_event_handler.utils.state.""" +import threading from unittest.mock import patch import pytest @@ -71,6 +72,49 @@ def test_sanitize_team_id_empty_returns_default(): assert state_mod._sanitize_team_id_for_path("") == "default" +def test_get_lock_file_path_appends_lock_suffix(data_dir): + state_path = str(data_dir / "state.json") + with patch.object(state_mod, "_get_state_file_path", return_value=state_path): + assert state_mod._get_lock_file_path(None) == f"{state_path}.lock" + + +def test_get_lock_file_path_team_id(data_dir): + state_path = str(data_dir / "state_T9.json") + with patch.object(state_mod, "_get_state_file_path", return_value=state_path): + assert state_mod._get_lock_file_path("T9") == f"{state_path}.lock" + + +def test_state_file_lock_blocks_until_released(data_dir): + state_path = str(data_dir / "state.json") + lock_path = f"{state_path}.lock" + holder_ready = threading.Event() + holder_release = threading.Event() + second_acquired = threading.Event() + + def hold_lock(): + with patch.object(state_mod, "_get_lock_file_path", return_value=lock_path): + with state_mod.state_file_lock(None): + holder_ready.set() + holder_release.wait(timeout=5) + + def try_lock(): + holder_ready.wait(timeout=5) + with patch.object(state_mod, "_get_lock_file_path", return_value=lock_path): + with state_mod.state_file_lock(None): + second_acquired.set() + + holder = threading.Thread(target=hold_lock) + waiter = threading.Thread(target=try_lock) + holder.start() + waiter.start() + holder_ready.wait(timeout=5) + assert not second_acquired.is_set() + holder_release.set() + waiter.join(timeout=5) + holder.join(timeout=5) + assert second_acquired.is_set() + + def test_load_state_corrupt_json_quarantine_oserror_fallback(data_dir, monkeypatch): bad = data_dir / "state.json" bad.write_text("{not json", encoding="utf-8") diff --git a/slack_event_handler/utils/job_queue.py b/slack_event_handler/utils/job_queue.py index 481e6036..e69b75d6 100644 --- a/slack_event_handler/utils/job_queue.py +++ b/slack_event_handler/utils/job_queue.py @@ -22,7 +22,7 @@ wait_for_slot, ) from slack_event_handler.utils.github_pr_client import post_pr_comment -from slack_event_handler.utils.state import load_state, save_state +from slack_event_handler.utils.state import load_state, modify_state logger = logging.getLogger(__name__) @@ -61,7 +61,6 @@ def enqueue_job( team_id: Optional[str] = None, ) -> dict: """Adds a new job to the persistent FIFO queue for this team and returns it.""" - state = load_state(team_id) job = { KEY_JOB_ID: str(uuid.uuid4()), KEY_TEAM_ID: team_id, @@ -74,8 +73,8 @@ def enqueue_job( KEY_IS_DM: is_dm, KEY_ENQUEUED_AT: time.time(), } - state["queue"].append(job) - save_state(state, team_id) + with modify_state(team_id) as state: + state["queue"].append(job) return job @@ -194,16 +193,17 @@ def _worker(team_id: Optional[str]) -> None: """Long-running FIFO worker daemon thread for one team.""" logger.debug("PR job queue worker started for team %s", team_id or "default") while True: - state = load_state(team_id) - - if not state["queue"]: + with modify_state(team_id) as state: + if not state["queue"]: + job = None + else: + job, *remaining = state["queue"] + state["queue"] = remaining + + if job is None: time.sleep(1) continue - job, *remaining = state["queue"] - state["queue"] = remaining - save_state(state, team_id) - with _worker_busy_lock: _worker_busy_by_team[team_id] = True label = _job_label(job) diff --git a/slack_event_handler/utils/rate_limiter.py b/slack_event_handler/utils/rate_limiter.py index f1eddaf4..973ae74a 100644 --- a/slack_event_handler/utils/rate_limiter.py +++ b/slack_event_handler/utils/rate_limiter.py @@ -10,7 +10,7 @@ from django.conf import settings -from slack_event_handler.utils.state import load_state, save_state +from slack_event_handler.utils.state import load_state, modify_state SLOT_BUFFER_SEC = 0.05 @@ -64,7 +64,6 @@ def wait_for_slot(team_id: Optional[str] = None) -> None: def record_posted(team_id: Optional[str] = None) -> None: """Records a successful post timestamp and prunes expired entries for this team.""" - state = load_state(team_id) - recent = recent_timestamps_at(state["postedAt"], time.time()) - state["postedAt"] = recent + [time.time()] - save_state(state, team_id) + with modify_state(team_id) as state: + recent = recent_timestamps_at(state["postedAt"], time.time()) + state["postedAt"] = recent + [time.time()] diff --git a/slack_event_handler/utils/state.py b/slack_event_handler/utils/state.py index 754f5990..f32bdd81 100644 --- a/slack_event_handler/utils/state.py +++ b/slack_event_handler/utils/state.py @@ -12,14 +12,37 @@ import os import re import tempfile +import threading import time +from contextlib import contextmanager from copy import deepcopy -from typing import Any, Optional +from typing import Any, Generator, Optional + +try: + import fcntl +except ImportError: # pragma: no cover - Windows + fcntl = None # type: ignore[assignment] + +import portalocker logger = logging.getLogger(__name__) _DEFAULT_STATE: dict[str, Any] = {"postedAt": [], "queue": []} +_team_thread_locks: dict[str, threading.Lock] = {} +_team_thread_locks_guard = threading.Lock() + + +def _thread_lock_for(team_id: Optional[str]) -> threading.Lock: + """In-process mutex paired with the file lock (required for reliable Windows locking).""" + key = team_id if team_id is not None else "" + with _team_thread_locks_guard: + lock = _team_thread_locks.get(key) + if lock is None: + lock = threading.Lock() + _team_thread_locks[key] = lock + return lock + def _sanitize_team_id_for_path(team_id: str) -> str: """Safe filename segment from Slack team_id (e.g. T01234ABCD -> T01234ABCD).""" @@ -39,6 +62,41 @@ def _get_state_file_path(team_id: Optional[str] = None) -> str: return str(data_dir / "state.json") +def _get_lock_file_path(team_id: Optional[str] = None) -> str: + """Resolve the advisory lock file path (sibling of the state JSON file).""" + return f"{_get_state_file_path(team_id)}.lock" + + +@contextmanager +def state_file_lock(team_id: Optional[str] = None) -> Generator[None, None, None]: + """Exclusive advisory lock for per-team state read-modify-write critical sections.""" + with _thread_lock_for(team_id): + lock_path = _get_lock_file_path(team_id) + _ensure_dir(lock_path) + if fcntl is not None: + fd = os.open(lock_path, os.O_CREAT | os.O_RDWR) + try: + fcntl.flock(fd, fcntl.LOCK_EX) + yield + finally: + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + else: + with portalocker.Lock(lock_path, timeout=-1): + yield + + +@contextmanager +def modify_state( + team_id: Optional[str] = None, +) -> Generator[dict[str, Any], None, None]: + """Load state under lock, yield for mutation, then save before releasing the lock.""" + with state_file_lock(team_id): + state = load_state(team_id) + yield state + save_state(state, team_id) + + def _ensure_dir(path: str) -> None: os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) From 4df511ab5a9491c8720aed42a1b89c824c621797 Mon Sep 17 00:00:00 2001 From: snowfox1003 Date: Tue, 2 Jun 2026 12:50:20 -0400 Subject: [PATCH 2/5] refactor: streamline function definitions and remove commented-out code in service documentation script --- scripts/generate_service_docs.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/scripts/generate_service_docs.py b/scripts/generate_service_docs.py index ad8b4902..f4b030fa 100644 --- a/scripts/generate_service_docs.py +++ b/scripts/generate_service_docs.py @@ -162,9 +162,7 @@ def _extract_public_functions(source: str) -> list[ServiceFuncRow]: def _render_service_table( - rows: Iterable[ServiceFuncRow], - *, - section_title: str = "## Public API (generated)", + rows: Iterable[ServiceFuncRow], *, section_title: str = "## Public API (generated)" ) -> str: lines = [ section_title, @@ -203,9 +201,7 @@ class ProtocolRow: properties: tuple[ProtocolProperty, ...] -def _extract_protocols( - source: str, -) -> tuple[list[ProtocolRow], list[ServiceFuncRow]]: +def _extract_protocols(source: str) -> tuple[list[ProtocolRow], list[ServiceFuncRow]]: tree = ast.parse(source) protocols: list[ProtocolRow] = [] helpers: list[ServiceFuncRow] = [] @@ -328,9 +324,6 @@ def _discover_apps_with_services() -> list[tuple[str, Path]]: continue svc = child / "services.py" if svc.is_file(): - content = _read_text(svc) - if "This service should be skipped for docs generation" in content: - continue found.append((child.name, svc)) return found From 4fda8023de201bf6515702396de6ff33ec515599 Mon Sep 17 00:00:00 2001 From: snowfox1003 Date: Tue, 2 Jun 2026 13:09:01 -0400 Subject: [PATCH 3/5] chore: update PyJWT version to 2.13.0 and refactor job queue to use wait_and_reserve_slot --- requirements-dev.lock | 3 +- requirements.in | 2 + requirements.lock | 3 +- slack_event_handler/tests/test_job_queue.py | 94 ++++++++++--------- .../tests/test_rate_limiter.py | 71 ++++++++++++++ slack_event_handler/tests/test_state.py | 9 ++ slack_event_handler/utils/job_queue.py | 10 +- slack_event_handler/utils/rate_limiter.py | 27 +++++- slack_event_handler/utils/state.py | 2 +- 9 files changed, 168 insertions(+), 53 deletions(-) diff --git a/requirements-dev.lock b/requirements-dev.lock index 2c59ebfa..1d507bf9 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -198,8 +198,9 @@ pygments==2.20.0 # via # pytest # rich -pyjwt==2.12.1 +pyjwt==2.13.0 # via + # -r requirements.in # pygithub # redis pynacl==1.6.2 diff --git a/requirements.in b/requirements.in index dcd4cf50..9ea8efd2 100644 --- a/requirements.in +++ b/requirements.in @@ -13,6 +13,8 @@ requests>=2.31,<3 urllib3>=2.0,<3 # CVE-2026-45409: ensure patched idna (transitive via requests, etc.). idna>=3.15,<4 +# PYSEC-2026-175/177/178/179: fixed in PyJWT 2.13.0 (transitive via pygithub, redis). +PyJWT>=2.13,<3 discord.py>=2.3.0,<3 python-dateutil>=2.8.0,<3 celery[redis]>=5.3,<6 diff --git a/requirements.lock b/requirements.lock index 3dcafe32..4c266ac8 100644 --- a/requirements.lock +++ b/requirements.lock @@ -139,8 +139,9 @@ pydantic-core==2.46.4 # via pydantic pygithub==2.9.1 # via -r requirements.in -pyjwt==2.12.1 +pyjwt==2.13.0 # via + # -r requirements.in # pygithub # redis pynacl==1.6.2 diff --git a/slack_event_handler/tests/test_job_queue.py b/slack_event_handler/tests/test_job_queue.py index 7f975987..d346f1cb 100644 --- a/slack_event_handler/tests/test_job_queue.py +++ b/slack_event_handler/tests/test_job_queue.py @@ -95,10 +95,9 @@ def test_process_job_posts_and_replies(settings): job_queue.KEY_IS_DM: False, } - with patch("slack_event_handler.utils.job_queue.wait_for_slot"): + with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"): with patch("slack_event_handler.utils.job_queue.post_pr_comment"): - with patch("slack_event_handler.utils.job_queue.record_posted"): - job_queue._process_job(job) + job_queue._process_job(job) mock_app.client.chat_postMessage.assert_called_once() mock_app.client.reactions_add.assert_called_once() @@ -125,10 +124,9 @@ def test_process_job_reactions_already_reacted_swallows(settings): job_queue.KEY_IS_DM: False, } - with patch("slack_event_handler.utils.job_queue.wait_for_slot"): + with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"): with patch("slack_event_handler.utils.job_queue.post_pr_comment"): - with patch("slack_event_handler.utils.job_queue.record_posted"): - job_queue._process_job(job) + job_queue._process_job(job) @pytest.mark.django_db @@ -150,11 +148,10 @@ def test_process_job_reactions_other_error_raises(settings): job_queue.KEY_IS_DM: False, } - with patch("slack_event_handler.utils.job_queue.wait_for_slot"): + with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"): with patch("slack_event_handler.utils.job_queue.post_pr_comment"): - with patch("slack_event_handler.utils.job_queue.record_posted"): - with pytest.raises(RuntimeError, match="boom"): - job_queue._process_job(job) + with pytest.raises(RuntimeError, match="boom"): + job_queue._process_job(job) @pytest.mark.django_db @@ -207,10 +204,9 @@ def test_process_job_dm_uses_chat_post_message_without_thread_ts(settings): job_queue.KEY_IS_DM: True, } - with patch("slack_event_handler.utils.job_queue.wait_for_slot"): + with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"): with patch("slack_event_handler.utils.job_queue.post_pr_comment"): - with patch("slack_event_handler.utils.job_queue.record_posted"): - job_queue._process_job(job) + job_queue._process_job(job) kwargs = mock_app.client.chat_postMessage.call_args.kwargs assert "thread_ts" not in kwargs @@ -234,10 +230,9 @@ def test_process_job_skips_reaction_when_team_id_none(settings): job_queue.KEY_IS_DM: False, } - with patch("slack_event_handler.utils.job_queue.wait_for_slot"): + with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"): with patch("slack_event_handler.utils.job_queue.post_pr_comment"): - with patch("slack_event_handler.utils.job_queue.record_posted"): - job_queue._process_job(job) + job_queue._process_job(job) mock_app.client.reactions_add.assert_not_called() @@ -261,11 +256,10 @@ def test_process_job_logs_when_rate_limited(settings): } with patch("slack_event_handler.utils.job_queue.compute_delay", return_value=5.0): - with patch("slack_event_handler.utils.job_queue.wait_for_slot"): + with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"): with patch("slack_event_handler.utils.job_queue.post_pr_comment"): - with patch("slack_event_handler.utils.job_queue.record_posted"): - with patch("slack_event_handler.utils.job_queue.logger") as log: - job_queue._process_job(job) + with patch("slack_event_handler.utils.job_queue.logger") as log: + job_queue._process_job(job) assert log.debug.called @@ -299,20 +293,25 @@ def fake_modify(team_id=None): def sleep_side_effect(_sec): raise RuntimeError("stop_worker_loop") + load_peeks = [ + {"queue": [job], "postedAt": []}, + {"queue": [], "postedAt": []}, + ] + + def load_side_effect(team_id=None): + if load_peeks: + return load_peeks.pop(0) + return {"queue": [], "postedAt": []} + with patch.object(job_queue, "modify_state", fake_modify): - with patch.object( - job_queue, - "load_state", - return_value={"queue": [], "postedAt": []}, - ): - with patch.object(job_queue, "wait_for_slot"): + with patch.object(job_queue, "load_state", side_effect=load_side_effect): + with patch.object(job_queue, "wait_and_reserve_slot"): with patch.object(job_queue, "post_pr_comment"): - with patch.object(job_queue, "record_posted"): - with patch.object( - job_queue.time, "sleep", side_effect=sleep_side_effect - ): - with pytest.raises(RuntimeError, match="stop_worker_loop"): - job_queue._worker("T1") + with patch.object( + job_queue.time, "sleep", side_effect=sleep_side_effect + ): + with pytest.raises(RuntimeError, match="stop_worker_loop"): + job_queue._worker("T1") @pytest.mark.django_db @@ -346,22 +345,27 @@ def fake_modify(team_id=None): def sleep_side_effect(_sec): raise RuntimeError("stop_worker_loop") + load_peeks = [ + {"queue": [job], "postedAt": []}, + {"queue": [], "postedAt": []}, + ] + + def load_side_effect(team_id=None): + if load_peeks: + return load_peeks.pop(0) + return {"queue": [], "postedAt": []} + with patch.object(job_queue, "modify_state", fake_modify): - with patch.object( - job_queue, - "load_state", - return_value={"queue": [], "postedAt": []}, - ): + with patch.object(job_queue, "load_state", side_effect=load_side_effect): with patch.object( job_queue, "post_pr_comment", side_effect=RuntimeError("gh") ): - with patch.object(job_queue, "wait_for_slot"): - with patch.object(job_queue, "record_posted"): - with patch.object( - job_queue.time, "sleep", side_effect=sleep_side_effect - ): - with pytest.raises(RuntimeError, match="stop_worker_loop"): - job_queue._worker("T1") + with patch.object(job_queue, "wait_and_reserve_slot"): + with patch.object( + job_queue.time, "sleep", side_effect=sleep_side_effect + ): + with pytest.raises(RuntimeError, match="stop_worker_loop"): + job_queue._worker("T1") texts = [ (ca.kwargs.get("text") or "") @@ -408,6 +412,7 @@ def worker(): t.start() for t in threads: t.join(timeout=30) + assert all(not t.is_alive() for t in threads) assert not errors loaded = load_state("T9") @@ -465,6 +470,7 @@ def posted_worker(): t.start() for t in threads: t.join(timeout=30) + assert all(not t.is_alive() for t in threads) assert not errors loaded = load_state("T9") diff --git a/slack_event_handler/tests/test_rate_limiter.py b/slack_event_handler/tests/test_rate_limiter.py index 3f3b5a3f..f4d18cfc 100644 --- a/slack_event_handler/tests/test_rate_limiter.py +++ b/slack_event_handler/tests/test_rate_limiter.py @@ -8,6 +8,8 @@ compute_delay_at, recent_timestamps_at, record_posted, + try_reserve_slot, + wait_and_reserve_slot, wait_for_slot, ) @@ -34,6 +36,75 @@ def test_compute_delay_at_positive_when_at_cap(settings): assert delay > 0 +@pytest.mark.django_db +def test_try_reserve_slot_false_at_cap(settings, tmp_path): + settings.SLACK_PR_BOT_COMMENTS_MAX_PER_WINDOW = 2 + settings.SLACK_PR_BOT_COMMENTS_WINDOW_SECONDS = 100 + path = tmp_path / "state.json" + import json + + now = 1000.0 + path.write_text(json.dumps({"postedAt": [990.0, 995.0], "queue": []})) + + with patch( + "slack_event_handler.utils.state._get_state_file_path", + return_value=str(path), + ): + with patch( + "slack_event_handler.utils.rate_limiter.time.time", return_value=now + ): + assert try_reserve_slot(None) is False + + +@pytest.mark.django_db +def test_try_reserve_slot_true_and_persists(settings, tmp_path): + settings.SLACK_PR_BOT_COMMENTS_MAX_PER_WINDOW = 5 + path = tmp_path / "state.json" + + with patch( + "slack_event_handler.utils.state._get_state_file_path", + return_value=str(path), + ): + with patch( + "slack_event_handler.utils.rate_limiter.time.time", return_value=42.0 + ): + assert try_reserve_slot(None) is True + from slack_event_handler.utils.state import load_state + + loaded = load_state(None) + assert 42.0 in loaded["postedAt"] + + +@pytest.mark.django_db +def test_wait_and_reserve_slot_retries_until_reserved(settings, monkeypatch): + settings.SLACK_PR_BOT_COMMENTS_MAX_PER_WINDOW = 5 + attempts = [] + + def fake_try_reserve(team_id=None): + attempts.append(1) + return len(attempts) > 1 + + sleeps = [] + + def fake_sleep(d): + sleeps.append(d) + + monkeypatch.setattr( + "slack_event_handler.utils.rate_limiter.try_reserve_slot", fake_try_reserve + ) + monkeypatch.setattr("slack_event_handler.utils.rate_limiter.time.sleep", fake_sleep) + monkeypatch.setattr( + "slack_event_handler.utils.rate_limiter.compute_delay", lambda _posted: 1.0 + ) + + with patch("slack_event_handler.utils.rate_limiter.load_state") as ls: + ls.return_value = {"postedAt": [], "queue": []} + wait_and_reserve_slot(None) + + assert sleeps == [1.0] + assert len(attempts) == 2 + + @pytest.mark.django_db def test_wait_for_slot_breaks_when_delay_zero(settings, monkeypatch): settings.SLACK_PR_BOT_COMMENTS_MAX_PER_WINDOW = 5 diff --git a/slack_event_handler/tests/test_state.py b/slack_event_handler/tests/test_state.py index 4831b976..327a7655 100644 --- a/slack_event_handler/tests/test_state.py +++ b/slack_event_handler/tests/test_state.py @@ -84,6 +84,15 @@ def test_get_lock_file_path_team_id(data_dir): assert state_mod._get_lock_file_path("T9") == f"{state_path}.lock" +def test_thread_lock_for_same_lock_file_path(tmp_path): + root = tmp_path / "slack_event_handler" + root.mkdir(parents=True) + with patch("slack_event_handler.workspace.get_workspace_root", return_value=root): + lock_a = state_mod._thread_lock_for("T/1") + lock_b = state_mod._thread_lock_for("T?1") + assert lock_a is lock_b + + def test_state_file_lock_blocks_until_released(data_dir): state_path = str(data_dir / "state.json") lock_path = f"{state_path}.lock" diff --git a/slack_event_handler/utils/job_queue.py b/slack_event_handler/utils/job_queue.py index e69b75d6..be3ee72b 100644 --- a/slack_event_handler/utils/job_queue.py +++ b/slack_event_handler/utils/job_queue.py @@ -18,8 +18,7 @@ compute_delay, compute_delay_at, recent_timestamps_at, - record_posted, - wait_for_slot, + wait_and_reserve_slot, ) from slack_event_handler.utils.github_pr_client import post_pr_comment from slack_event_handler.utils.state import load_state, modify_state @@ -160,11 +159,10 @@ def _process_job(job: dict) -> None: if delay > 0: logger.debug("%s – rate limited, waiting %ds", label, int(delay + 0.999)) - wait_for_slot(team_id) + wait_and_reserve_slot(team_id) logger.debug("%s – posting GitHub comment", label) post_pr_comment(owner, repo, pull_number) - record_posted(team_id) with _worker_busy_lock: _worker_busy_by_team[team_id] = False _send_reply( @@ -193,6 +191,10 @@ def _worker(team_id: Optional[str]) -> None: """Long-running FIFO worker daemon thread for one team.""" logger.debug("PR job queue worker started for team %s", team_id or "default") while True: + if not load_state(team_id)["queue"]: + time.sleep(1) + continue + with modify_state(team_id) as state: if not state["queue"]: job = None diff --git a/slack_event_handler/utils/rate_limiter.py b/slack_event_handler/utils/rate_limiter.py index 973ae74a..c39c79ac 100644 --- a/slack_event_handler/utils/rate_limiter.py +++ b/slack_event_handler/utils/rate_limiter.py @@ -52,8 +52,31 @@ def compute_delay(posted_at: list[float]) -> float: return compute_delay_at(posted_at, time.time()) +def try_reserve_slot(team_id: Optional[str] = None) -> bool: + """ + Atomically check availability and reserve a slot timestamp for this team. + + Returns True if a slot was reserved, False if the rolling window is full. + """ + now = time.time() + with modify_state(team_id) as state: + recent = recent_timestamps_at(state["postedAt"], now) + if len(recent) >= _max_per_window(): + return False + state["postedAt"] = recent + [now] + return True + + +def wait_and_reserve_slot(team_id: Optional[str] = None) -> None: + """Blocks until a rate-limit slot is atomically reserved for this team.""" + while not try_reserve_slot(team_id): + delay = compute_delay(load_state(team_id)["postedAt"]) + if delay > 0: + time.sleep(delay) + + def wait_for_slot(team_id: Optional[str] = None) -> None: - """Blocks synchronously until a rate-limit slot is available for this team.""" + """Blocks until a slot appears available (does not reserve). Prefer wait_and_reserve_slot.""" while True: state = load_state(team_id) delay = compute_delay(state["postedAt"]) @@ -63,7 +86,7 @@ def wait_for_slot(team_id: Optional[str] = None) -> None: def record_posted(team_id: Optional[str] = None) -> None: - """Records a successful post timestamp and prunes expired entries for this team.""" + """Appends a post timestamp without checking the cap (legacy / test helper).""" with modify_state(team_id) as state: recent = recent_timestamps_at(state["postedAt"], time.time()) state["postedAt"] = recent + [time.time()] diff --git a/slack_event_handler/utils/state.py b/slack_event_handler/utils/state.py index f32bdd81..12c7addb 100644 --- a/slack_event_handler/utils/state.py +++ b/slack_event_handler/utils/state.py @@ -35,7 +35,7 @@ def _thread_lock_for(team_id: Optional[str]) -> threading.Lock: """In-process mutex paired with the file lock (required for reliable Windows locking).""" - key = team_id if team_id is not None else "" + key = _get_lock_file_path(team_id) with _team_thread_locks_guard: lock = _team_thread_locks.get(key) if lock is None: From 48900a1863e07cc0bbde5177b291486ce3359524 Mon Sep 17 00:00:00 2001 From: snowfox1003 Date: Tue, 2 Jun 2026 13:23:12 -0400 Subject: [PATCH 4/5] chore: add portalocker dependency to requirements-dev.lock for improved file locking --- requirements-dev.lock | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements-dev.lock b/requirements-dev.lock index 1d507bf9..0050d24c 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -159,6 +159,8 @@ pluggy==1.6.0 # pytest-cov plyvel==1.5.1 # via -r requirements.in +portalocker==2.10.1 + # via -r requirements.in pre-commit==4.6.0 # via -r requirements-dev.in prompt-toolkit==3.0.52 From b11dff575c87e112c1833a8e6709d551efd56b5a Mon Sep 17 00:00:00 2001 From: snowfox1003 Date: Tue, 2 Jun 2026 13:36:32 -0400 Subject: [PATCH 5/5] test: add unit test to ensure job processing clears busy state after slot reservation --- slack_event_handler/tests/test_job_queue.py | 24 +++++++++++++++++++++ slack_event_handler/utils/job_queue.py | 5 +++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/slack_event_handler/tests/test_job_queue.py b/slack_event_handler/tests/test_job_queue.py index d346f1cb..f891ca3f 100644 --- a/slack_event_handler/tests/test_job_queue.py +++ b/slack_event_handler/tests/test_job_queue.py @@ -154,6 +154,30 @@ def test_process_job_reactions_other_error_raises(settings): job_queue._process_job(job) +@pytest.mark.django_db +def test_process_job_clears_busy_after_slot_reserved(settings): + settings.SLACK_PR_BOT_GITHUB_TOKEN = "tok" + job_queue.set_slack_app(MagicMock(), "T1") + job = { + job_queue.KEY_JOB_ID: "jid", + job_queue.KEY_TEAM_ID: "T1", + job_queue.KEY_OWNER: "o", + job_queue.KEY_REPO: "r", + job_queue.KEY_PULL_NUMBER: 1, + job_queue.KEY_CHANNEL: "C1", + job_queue.KEY_MESSAGE_TS: "9.9", + job_queue.KEY_USER_ID: "U1", + job_queue.KEY_IS_DM: False, + } + + with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"): + with patch("slack_event_handler.utils.job_queue.post_pr_comment"): + job_queue._process_job(job) + + with job_queue._worker_busy_lock: + assert not job_queue._worker_busy_by_team.get("T1", False) + + @pytest.mark.django_db def test_estimated_delay_sec_nonzero_with_jobs_ahead(settings, tmp_path): settings.SLACK_PR_BOT_COMMENTS_MAX_PER_WINDOW = 2 diff --git a/slack_event_handler/utils/job_queue.py b/slack_event_handler/utils/job_queue.py index be3ee72b..01197d83 100644 --- a/slack_event_handler/utils/job_queue.py +++ b/slack_event_handler/utils/job_queue.py @@ -39,6 +39,7 @@ # Per-team Slack app and worker busy flag for multi-workspace support _slack_app_by_team: dict[str, object] = {} _slack_app_by_team_lock = threading.Lock() +# True while a dequeued job is waiting for a rate-limit slot (not yet in postedAt). _worker_busy_by_team: dict[str, bool] = {} _worker_busy_lock = threading.Lock() @@ -160,11 +161,11 @@ def _process_job(job: dict) -> None: logger.debug("%s – rate limited, waiting %ds", label, int(delay + 0.999)) wait_and_reserve_slot(team_id) + with _worker_busy_lock: + _worker_busy_by_team[team_id] = False logger.debug("%s – posting GitHub comment", label) post_pr_comment(owner, repo, pull_number) - with _worker_busy_lock: - _worker_busy_by_team[team_id] = False _send_reply( team_id, channel,