Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 33 additions & 25 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,23 @@ jobs:
- name: Cargo clippy
run: cargo clippy --all-targets --all-features -- -D warnings

- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: "0.10.12"
cache-dependency-glob: uv.lock

- name: Install Python dev deps
run: pip install -e ".[dev]"
run: uv sync --extra dev

- name: Ruff check
run: ruff check py_src/ tests/
run: uv run ruff check py_src/ tests/

- name: Ruff format check
run: ruff format --check py_src/ tests/
run: uv run ruff format --check py_src/ tests/

- name: Mypy
run: mypy py_src/taskito/
run: uv run mypy py_src/taskito/ tests/python/ --no-incremental

rust-test:
runs-on: ubuntu-latest
Expand All @@ -70,6 +76,15 @@ jobs:
env:
LD_LIBRARY_PATH: ${{ env.pythonLocation }}/lib

- name: Check Rust (postgres features)
run: cargo check --workspace --features postgres

- name: Check Rust (redis features)
run: cargo check --workspace --features redis

- name: Check Rust (native-async features)
run: cargo check --workspace --features native-async

test:
needs: lint
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -102,32 +117,25 @@ jobs:
with:
save-if: ${{ matrix.os != 'ubuntu-latest' }}

- name: Create virtualenv (Unix)
if: runner.os != 'Windows'
run: |
python -m venv .venv
echo "${{ github.workspace }}/.venv/bin" >> $GITHUB_PATH
echo "VIRTUAL_ENV=${{ github.workspace }}/.venv" >> $GITHUB_ENV

- name: Create virtualenv (Windows)
if: runner.os == 'Windows'
run: |
python -m venv .venv
echo "${{ github.workspace }}\.venv\Scripts" >> $env:GITHUB_PATH
echo "VIRTUAL_ENV=${{ github.workspace }}\.venv" >> $env:GITHUB_ENV
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: "0.10.12"
cache-dependency-glob: uv.lock

- name: Install maturin
run: pip install maturin
- name: Install dependencies
run: uv sync --extra dev

- name: Build and install
run: |
pip install -e ".[dev]"
maturin develop --release
- name: Build native extension
uses: PyO3/maturin-action@v1
with:
command: develop
args: --release --features native-async

- name: Run Python tests
run: |
set +e
pytest tests/python/ -v --junitxml=test-results.xml
uv run python -m pytest tests/python/ -v --junitxml=test-results.xml
PYTEST_EXIT=$?
if [ $PYTEST_EXIT -eq 0 ]; then exit 0; fi
# SIGABRT (134) during interpreter shutdown is a known PyO3 issue;
Expand All @@ -139,7 +147,7 @@ jobs:
print(int(r.get('failures',0)) + int(r.get('errors',0)))
")
if [ "$FAILURES" = "0" ]; then
echo "::warning::Tests passed but process crashed during cleanup (known PyO3 issue on this Python version)"
echo "::warning::Tests passed but process crashed during cleanup (known PyO3 issue)"
exit 0
fi
fi
Expand Down
68 changes: 68 additions & 0 deletions docs/guide/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,51 @@ assert results[0].succeeded
| `wraps` | `Any` | Wrap a real object — returned as-is when accessed. |
| `track_calls` | `bool` | Increment `call_count` each access. |

#### `return_value` vs `wraps`

Use `return_value` when you want a simple stub:

```python
mock_cache = MockResource("cache", return_value={"key": "value"})
```

Use `wraps` when you need the real object but want call tracking:

```python
real_db = create_test_database()
spy_db = MockResource("db", wraps=real_db, track_calls=True)
```

#### Multiple resources

Pass multiple resources to `test_mode`:

```python
with queue.test_mode(resources={
"db": MockResource("db", return_value=mock_db),
"cache": MockResource("cache", return_value={}),
"mailer": MockResource("mailer", return_value=mock_smtp),
}) as results:
process_order.delay(order_id=123)
```

#### Testing with `inject`

Tasks that use `@queue.task(inject=["db"])` receive the mock resource automatically:

```python
@queue.task(inject=["db"])
def create_user(name, db=None):
db.execute("INSERT INTO users (name) VALUES (?)", (name,))

mock_db = MagicMock()
with queue.test_mode(resources={"db": mock_db}) as results:
create_user.delay("Alice")

assert results[0].succeeded
mock_db.execute.assert_called_once()
```

!!! note
When `resources=` is provided, proxy reconstruction is bypassed automatically. Proxy markers in arguments are passed through as-is so tests don't fail due to missing files or network connections.

Expand Down Expand Up @@ -344,3 +389,26 @@ def test_e2e():

!!! info "Middleware in test mode"
Per-task and queue-level `TaskMiddleware` hooks (`before`, `after`, `on_retry`) **do fire** in test mode, since they run in the Python wrapper around your task function. This lets you verify middleware behavior in tests without running a real worker.

## Running Tests Locally

```bash
# Rust tests
cargo test --workspace

# Rebuild the Python extension after Rust changes
uv run maturin develop

# Python tests
uv run python -m pytest tests/python/ -v

# Linting
uv run ruff check py_src/ tests/
uv run mypy py_src/taskito/ --no-incremental
```

To build with native async support:

```bash
uv run maturin develop --features native-async
```
6 changes: 4 additions & 2 deletions tests/python/conftest.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
"""Shared fixtures for taskito tests."""

import threading
from collections.abc import Generator
from pathlib import Path

import pytest

from taskito import Queue


@pytest.fixture
def queue(tmp_path):
def queue(tmp_path: Path) -> Queue:
"""Create a fresh queue with a temp database."""
db_path = str(tmp_path / "test.db")
return Queue(db_path=db_path, workers=2)


@pytest.fixture
def run_worker(queue):
def run_worker(queue: Queue) -> Generator[threading.Thread]:
"""Start a worker thread for the given queue. Stops automatically at teardown."""
thread = threading.Thread(target=queue.run_worker, daemon=True)
thread.start()
Expand Down
38 changes: 20 additions & 18 deletions tests/python/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,58 @@

import threading

from taskito import Queue

def test_task_registration(queue):

def test_task_registration(queue: Queue) -> None:
"""Tasks can be registered with the decorator."""

@queue.task()
def add(a, b):
def add(a: int, b: int) -> int:
return a + b

assert add.name.endswith("add")
assert add.name in queue._task_registry


def test_enqueue_returns_job_result(queue):
def test_enqueue_returns_job_result(queue: Queue) -> None:
"""Enqueueing a task returns a JobResult handle."""

@queue.task()
def noop():
def noop() -> None:
pass

job = noop.delay()
assert job.id is not None
assert len(job.id) > 0


def test_task_direct_call(queue):
def test_task_direct_call(queue: Queue) -> None:
"""Decorated tasks can still be called directly."""

@queue.task()
def multiply(a, b):
def multiply(a: int, b: int) -> int:
return a * b

assert multiply(3, 4) == 12


def test_apply_async_with_delay(queue):
def test_apply_async_with_delay(queue: Queue) -> None:
"""apply_async accepts a delay parameter."""

@queue.task()
def slow_task():
def slow_task() -> None:
pass

job = slow_task.apply_async(delay=60)
assert job.id is not None


def test_apply_async_with_overrides(queue):
def test_apply_async_with_overrides(queue: Queue) -> None:
"""apply_async can override default task settings."""

@queue.task(priority=1, queue="default")
def configurable_task(x):
def configurable_task(x: int) -> int:
return x

job = configurable_task.apply_async(
Expand All @@ -64,11 +66,11 @@ def configurable_task(x):
assert job.id is not None


def test_queue_stats(queue):
def test_queue_stats(queue: Queue) -> None:
"""stats() returns counts by status."""

@queue.task()
def sample_task():
def sample_task() -> None:
pass

sample_task.delay()
Expand All @@ -79,11 +81,11 @@ def sample_task():
assert stats["running"] == 0


def test_worker_executes_task(queue):
def test_worker_executes_task(queue: Queue) -> None:
"""Worker processes tasks and stores results."""

@queue.task()
def add(a, b):
def add(a: int, b: int) -> int:
return a + b

job = add.delay(2, 3)
Expand All @@ -100,11 +102,11 @@ def add(a, b):
assert result == 5


def test_worker_handles_kwargs(queue):
def test_worker_handles_kwargs(queue: Queue) -> None:
"""Worker correctly passes keyword arguments."""

@queue.task()
def greet(name, greeting="Hello"):
def greet(name: str, greeting: str = "Hello") -> str:
return f"{greeting}, {name}!"

job = greet.delay("World", greeting="Hi")
Expand All @@ -119,11 +121,11 @@ def greet(name, greeting="Hello"):
assert result == "Hi, World!"


def test_worker_none_result(queue):
def test_worker_none_result(queue: Queue) -> None:
"""Tasks returning None work correctly."""

@queue.task()
def void_task():
def void_task() -> None:
pass

job = void_task.delay()
Expand Down
14 changes: 8 additions & 6 deletions tests/python/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import threading

from taskito import Queue

def test_enqueue_many(queue):

def test_enqueue_many(queue: Queue) -> None:
"""enqueue_many enqueues all items in a single batch."""

@queue.task()
def double(x):
def double(x: int) -> int:
return x * 2

jobs = queue.enqueue_many(
Expand All @@ -20,11 +22,11 @@ def double(x):
assert stats["pending"] == 10


def test_task_map(queue):
def test_task_map(queue: Queue) -> None:
"""TaskWrapper.map() enqueues and returns results."""

@queue.task()
def add(a, b):
def add(a: int, b: int) -> int:
return a + b

jobs = add.map([(1, 2), (3, 4), (5, 6)])
Expand All @@ -37,11 +39,11 @@ def add(a, b):
assert sorted(results) == [3, 7, 11]


def test_batch_stats(queue):
def test_batch_stats(queue: Queue) -> None:
"""Batch enqueue of 50 items shows correct pending count."""

@queue.task()
def noop():
def noop() -> None:
pass

queue.enqueue_many(
Expand Down
Loading
Loading