Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ pluggy==1.6.0
# pytest-cov
plyvel==1.5.1
# via -r requirements.in
portalocker==2.10.1
# via -r requirements.in
pre-commit==4.6.0
# via -r requirements-dev.in
prompt-toolkit==3.0.52
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ yt-dlp>=2026.2.21,<2027

# --- slack_event_handler (GitHub PR comments) ---
PyGithub>=2.0,<3
portalocker>=2.8,<3
2 changes: 2 additions & 0 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ pinecone-plugin-interface==0.0.7
# via pinecone
plyvel==1.5.1
# via -r requirements.in
portalocker==2.10.1
# via -r requirements.in
prompt-toolkit==3.0.52
# via click-repl
propcache==0.5.2
Expand Down
234 changes: 190 additions & 44 deletions slack_event_handler/tests/test_job_queue.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""Tests for slack_event_handler.utils.job_queue."""

import threading
from contextlib import contextmanager
from unittest.mock import MagicMock, patch

import pytest

from slack_event_handler.utils import job_queue
from slack_event_handler.utils.rate_limiter import record_posted
from slack_event_handler.utils.state import load_state


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -91,10 +95,9 @@ def test_process_job_posts_and_replies(settings):
job_queue.KEY_IS_DM: False,
}

with patch("slack_event_handler.utils.job_queue.wait_for_slot"):
with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"):
with patch("slack_event_handler.utils.job_queue.post_pr_comment"):
with patch("slack_event_handler.utils.job_queue.record_posted"):
job_queue._process_job(job)
job_queue._process_job(job)

mock_app.client.chat_postMessage.assert_called_once()
mock_app.client.reactions_add.assert_called_once()
Expand All @@ -121,10 +124,9 @@ def test_process_job_reactions_already_reacted_swallows(settings):
job_queue.KEY_IS_DM: False,
}

with patch("slack_event_handler.utils.job_queue.wait_for_slot"):
with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"):
with patch("slack_event_handler.utils.job_queue.post_pr_comment"):
with patch("slack_event_handler.utils.job_queue.record_posted"):
job_queue._process_job(job)
job_queue._process_job(job)


@pytest.mark.django_db
Expand All @@ -146,11 +148,34 @@ def test_process_job_reactions_other_error_raises(settings):
job_queue.KEY_IS_DM: False,
}

with patch("slack_event_handler.utils.job_queue.wait_for_slot"):
with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"):
with patch("slack_event_handler.utils.job_queue.post_pr_comment"):
with patch("slack_event_handler.utils.job_queue.record_posted"):
with pytest.raises(RuntimeError, match="boom"):
job_queue._process_job(job)
with pytest.raises(RuntimeError, match="boom"):
job_queue._process_job(job)


@pytest.mark.django_db
def test_process_job_clears_busy_after_slot_reserved(settings):
settings.SLACK_PR_BOT_GITHUB_TOKEN = "tok"
job_queue.set_slack_app(MagicMock(), "T1")
job = {
job_queue.KEY_JOB_ID: "jid",
job_queue.KEY_TEAM_ID: "T1",
job_queue.KEY_OWNER: "o",
job_queue.KEY_REPO: "r",
job_queue.KEY_PULL_NUMBER: 1,
job_queue.KEY_CHANNEL: "C1",
job_queue.KEY_MESSAGE_TS: "9.9",
job_queue.KEY_USER_ID: "U1",
job_queue.KEY_IS_DM: False,
}

with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"):
with patch("slack_event_handler.utils.job_queue.post_pr_comment"):
job_queue._process_job(job)

with job_queue._worker_busy_lock:
assert not job_queue._worker_busy_by_team.get("T1", False)


@pytest.mark.django_db
Expand Down Expand Up @@ -203,10 +228,9 @@ def test_process_job_dm_uses_chat_post_message_without_thread_ts(settings):
job_queue.KEY_IS_DM: True,
}

with patch("slack_event_handler.utils.job_queue.wait_for_slot"):
with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"):
with patch("slack_event_handler.utils.job_queue.post_pr_comment"):
with patch("slack_event_handler.utils.job_queue.record_posted"):
job_queue._process_job(job)
job_queue._process_job(job)

kwargs = mock_app.client.chat_postMessage.call_args.kwargs
assert "thread_ts" not in kwargs
Expand All @@ -230,10 +254,9 @@ def test_process_job_skips_reaction_when_team_id_none(settings):
job_queue.KEY_IS_DM: False,
}

with patch("slack_event_handler.utils.job_queue.wait_for_slot"):
with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"):
with patch("slack_event_handler.utils.job_queue.post_pr_comment"):
with patch("slack_event_handler.utils.job_queue.record_posted"):
job_queue._process_job(job)
job_queue._process_job(job)

mock_app.client.reactions_add.assert_not_called()

Expand All @@ -257,11 +280,10 @@ def test_process_job_logs_when_rate_limited(settings):
}

with patch("slack_event_handler.utils.job_queue.compute_delay", return_value=5.0):
with patch("slack_event_handler.utils.job_queue.wait_for_slot"):
with patch("slack_event_handler.utils.job_queue.wait_and_reserve_slot"):
with patch("slack_event_handler.utils.job_queue.post_pr_comment"):
with patch("slack_event_handler.utils.job_queue.record_posted"):
with patch("slack_event_handler.utils.job_queue.logger") as log:
job_queue._process_job(job)
with patch("slack_event_handler.utils.job_queue.logger") as log:
job_queue._process_job(job)
assert log.debug.called


Expand All @@ -285,25 +307,35 @@ def test_worker_processes_job_then_exits_on_sleep(settings):
loads = [
{"queue": [job], "postedAt": []},
{"queue": [], "postedAt": []},
{"queue": [], "postedAt": []},
]

def fake_load(team_id=None):
return loads.pop(0)
@contextmanager
def fake_modify(team_id=None):
state = loads.pop(0) if loads else {"queue": [], "postedAt": []}
yield state

def sleep_side_effect(_sec):
raise RuntimeError("stop_worker_loop")

with patch.object(job_queue, "load_state", side_effect=fake_load):
with patch.object(job_queue, "save_state"):
with patch.object(job_queue, "wait_for_slot"):
load_peeks = [
{"queue": [job], "postedAt": []},
{"queue": [], "postedAt": []},
]

def load_side_effect(team_id=None):
if load_peeks:
return load_peeks.pop(0)
return {"queue": [], "postedAt": []}

with patch.object(job_queue, "modify_state", fake_modify):
with patch.object(job_queue, "load_state", side_effect=load_side_effect):
with patch.object(job_queue, "wait_and_reserve_slot"):
with patch.object(job_queue, "post_pr_comment"):
with patch.object(job_queue, "record_posted"):
with patch.object(
job_queue.time, "sleep", side_effect=sleep_side_effect
):
with pytest.raises(RuntimeError, match="stop_worker_loop"):
job_queue._worker("T1")
with patch.object(
job_queue.time, "sleep", side_effect=sleep_side_effect
):
with pytest.raises(RuntimeError, match="stop_worker_loop"):
job_queue._worker("T1")


@pytest.mark.django_db
Expand All @@ -327,30 +359,144 @@ def test_worker_process_job_failure_sends_error_reply(settings):
loads = [
{"queue": [job], "postedAt": []},
{"queue": [], "postedAt": []},
{"queue": [], "postedAt": []},
]

def fake_load(team_id=None):
return loads.pop(0)
@contextmanager
def fake_modify(team_id=None):
state = loads.pop(0) if loads else {"queue": [], "postedAt": []}
yield state

def sleep_side_effect(_sec):
raise RuntimeError("stop_worker_loop")

with patch.object(job_queue, "load_state", side_effect=fake_load):
with patch.object(job_queue, "save_state"):
load_peeks = [
{"queue": [job], "postedAt": []},
{"queue": [], "postedAt": []},
]

def load_side_effect(team_id=None):
if load_peeks:
return load_peeks.pop(0)
return {"queue": [], "postedAt": []}

with patch.object(job_queue, "modify_state", fake_modify):
with patch.object(job_queue, "load_state", side_effect=load_side_effect):
with patch.object(
job_queue, "post_pr_comment", side_effect=RuntimeError("gh")
):
with patch.object(job_queue, "wait_for_slot"):
with patch.object(job_queue, "record_posted"):
with patch.object(
job_queue.time, "sleep", side_effect=sleep_side_effect
):
with pytest.raises(RuntimeError, match="stop_worker_loop"):
job_queue._worker("T1")
with patch.object(job_queue, "wait_and_reserve_slot"):
with patch.object(
job_queue.time, "sleep", side_effect=sleep_side_effect
):
with pytest.raises(RuntimeError, match="stop_worker_loop"):
job_queue._worker("T1")

texts = [
(ca.kwargs.get("text") or "")
for ca in mock_app.client.chat_postMessage.call_args_list
]
assert any("Could not post" in t for t in texts)


@pytest.mark.django_db
def test_concurrent_enqueue_preserves_all_jobs(settings, tmp_path):
settings.SLACK_PR_BOT_COMMENTS_MAX_PER_WINDOW = 5
path = tmp_path / "state_T9.json"
n = 30
barrier = threading.Barrier(n)
job_ids: list[str] = []
errors: list[BaseException] = []
lock = threading.Lock()

def worker():
try:
barrier.wait(timeout=10)
job = job_queue.enqueue_job(
owner="o",
repo="r",
pull_number=1,
channel="C1",
message_ts="1.0",
user_id="U1",
is_dm=False,
team_id="T9",
)
with lock:
job_ids.append(job[job_queue.KEY_JOB_ID])
except BaseException as e:
with lock:
errors.append(e)

with patch(
"slack_event_handler.utils.state._get_state_file_path",
return_value=str(path),
):
threads = [threading.Thread(target=worker) for _ in range(n)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=30)
Comment thread
snowfox1003 marked this conversation as resolved.
assert all(not t.is_alive() for t in threads)

assert not errors
loaded = load_state("T9")
assert len(loaded["queue"]) == n
assert len(set(job_ids)) == n


@pytest.mark.django_db
def test_concurrent_enqueue_and_record_posted(settings, tmp_path):
settings.SLACK_PR_BOT_COMMENTS_MAX_PER_WINDOW = 10
path = tmp_path / "state_T9.json"
n_enqueue = 15
n_posted = 15
n_total = n_enqueue + n_posted
barrier = threading.Barrier(n_total)
errors: list[BaseException] = []
lock = threading.Lock()

def enqueue_worker():
try:
barrier.wait(timeout=10)
job_queue.enqueue_job(
owner="o",
repo="r",
pull_number=1,
channel="C1",
message_ts="1.0",
user_id="U1",
team_id="T9",
)
except BaseException as e:
with lock:
errors.append(e)

def posted_worker():
try:
barrier.wait(timeout=10)
record_posted("T9")
except BaseException as e:
with lock:
errors.append(e)

with patch(
"slack_event_handler.utils.state._get_state_file_path",
return_value=str(path),
):
with patch(
"slack_event_handler.utils.rate_limiter.time.time", return_value=42.0
):
threads = [
threading.Thread(target=enqueue_worker) for _ in range(n_enqueue)
]
threads += [threading.Thread(target=posted_worker) for _ in range(n_posted)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=30)
assert all(not t.is_alive() for t in threads)

assert not errors
loaded = load_state("T9")
assert len(loaded["queue"]) == n_enqueue
assert len(loaded["postedAt"]) == n_posted
Loading
Loading