Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions test/spell_check.words
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
apache
argparse
asyncio
capsys
colcon
contextlib
coroutine
Expand All @@ -16,9 +18,11 @@ pathlib
plugin
pydocstyle
pytest
readouterr
returncode
scspell
setuptools
sigint
thomas
traceback
unittest
9 changes: 9 additions & 0 deletions test/test_event_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright 2026 Open Source Robotics Foundation, Inc.
# Licensed under the Apache License, Version 2.0

from colcon_parallel_executor.event.executor import ParallelStatus


def test_parallel_status():
status = ParallelStatus(['job1', 'job2'])
assert status.processing == ('job1', 'job2')
35 changes: 35 additions & 0 deletions test/test_event_handler_parallel_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2026 Open Source Robotics Foundation, Inc.
# Licensed under the Apache License, Version 2.0

from unittest.mock import patch

from colcon_parallel_executor.event.executor import ParallelStatus
from colcon_parallel_executor.event_handler.parallel_status \
import ParallelStatusEventHandler
import pytest


@pytest.mark.parametrize(('isatty'), (
(True,),
(False,),
))
def test_parallel_status_event_handler(capsys, isatty):
with patch('sys.stdout.isatty', return_value=isatty):
handler = ParallelStatusEventHandler()
assert handler.enabled == isatty

event = (ParallelStatus(['job2', 'job1']), )
handler(event)

if not isatty:
return

captured = capsys.readouterr()
assert 'Processing' in captured.out
assert 'job1' in captured.out
assert 'job2' in captured.out

# Test with non-matching event
handler(('string event',))
captured = capsys.readouterr()
assert captured.out == ''
191 changes: 189 additions & 2 deletions test/test_executor_parallel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright 2016-2018 Dirk Thomas
# Licensed under the Apache License, Version 2.0

import argparse
import asyncio
from collections import OrderedDict
import os
Expand All @@ -9,10 +10,13 @@
from threading import Thread
import time
from types import SimpleNamespace
from unittest.mock import Mock
from unittest.mock import patch

from colcon_core.executor import Job
from colcon_core.executor import OnError
from colcon_core.subprocess import SIGINT_RESULT
from colcon_parallel_executor.executor.parallel import counting_number
from colcon_parallel_executor.executor.parallel \
import ParallelExecutorExtension
import pytest
Expand All @@ -22,9 +26,9 @@

class Job1(Job):

def __init__(self):
def __init__(self, identifier='job1'):
super().__init__(
identifier='job1', dependencies=set(), task=None,
identifier=identifier, dependencies=set(), task=None,
task_context=None)

async def __call__(self, *args, **kwargs):
Expand Down Expand Up @@ -213,3 +217,186 @@ def delayed_sigint():

assert rc == signal.SIGINT
ran_jobs.clear()


def test_counting_number():
assert counting_number('0') == 0
assert counting_number(1) == 1
with pytest.raises(ValueError):
counting_number('-1')


def test_add_arguments():
parser = argparse.ArgumentParser()
extension = ParallelExecutorExtension()
extension.add_arguments(parser=parser)
args = parser.parse_args(['--parallel-workers', '5'])
assert args.parallel_workers == 5


def test_parallel_run_until_complete_exception():
extension = ParallelExecutorExtension()
args = SimpleNamespace(parallel_workers=2)
jobs = OrderedDict()
jobs['one'] = Job1()

def mock_run(future):
future.get_coro().close()
raise RuntimeError('mock error')

with patch(
'asyncio.base_events.BaseEventLoop.run_until_complete',
side_effect=mock_run
):
rc = extension.execute(args, jobs)
assert rc == 1


def test_parallel_timeout():
extension = ParallelExecutorExtension()
extension.put_event_into_queue = Mock()

args = SimpleNamespace(parallel_workers=2)
jobs = OrderedDict()
jobs['eight'] = Job8() # sleep 3

original_wait = asyncio.wait

def mock_wait(*args, **kwargs):
kwargs['timeout'] = 0.01
return original_wait(*args, **kwargs)

with patch(
'colcon_parallel_executor.executor.parallel.asyncio.wait',
new=mock_wait
):
rc = extension.execute(args, jobs)

assert rc == 0
assert extension.put_event_into_queue.called
ran_jobs.clear()


class Job9(Job):

def __init__(self):
super().__init__(
identifier='job9', dependencies=set(), task=None,
task_context=None)

async def __call__(self, *args, **kwargs):
raise KeyboardInterrupt()


def test_parallel_job_keyboard_interrupt():
extension = ParallelExecutorExtension()
args = SimpleNamespace(parallel_workers=2)
jobs = OrderedDict()
jobs['nine'] = Job9()
rc = extension.execute(args, jobs)
assert rc == signal.SIGINT


class Job10(Job):

def __init__(self):
super().__init__(
identifier='job10', dependencies=set(), task=None,
task_context=None)

async def __call__(self, *args, **kwargs):
return SIGINT_RESULT


def test_parallel_job_sigint_result():
extension = ParallelExecutorExtension()
args = SimpleNamespace(parallel_workers=2)
jobs = OrderedDict()
jobs['ten'] = Job10()
rc = extension.execute(args, jobs)
assert rc == signal.SIGINT


def test_parallel_future_keyboard_interrupt():
extension = ParallelExecutorExtension()
args = SimpleNamespace(parallel_workers=2)
jobs = OrderedDict()
jobs['eleven'] = Job1()

original_wait = asyncio.wait

async def mock_wait(*args, **kwargs):
done, pending = await original_wait(*args, **kwargs)
for f in done:
f.exception = Mock(return_value=KeyboardInterrupt())
return done, pending

with patch(
'colcon_parallel_executor.executor.parallel.asyncio.wait',
new=mock_wait
):
rc = extension.execute(args, jobs)

assert rc == signal.SIGINT


def test_parallel_future_cancelled():
extension = ParallelExecutorExtension()
args = SimpleNamespace(parallel_workers=2)
jobs = OrderedDict()
jobs['twelve'] = Job1()

original_wait = asyncio.wait

async def mock_wait(*args, **kwargs):
done, pending = await original_wait(*args, **kwargs)
for f in done:
f.cancelled = Mock(return_value=True)
return done, pending

with patch(
'colcon_parallel_executor.executor.parallel.asyncio.wait',
new=mock_wait
):
rc = extension.execute(args, jobs)

assert rc == signal.SIGINT


def test_priority():
extension = ParallelExecutorExtension()
assert extension.PRIORITY > 100


class NonCoroutineJob(Job):

def __init__(self):
super().__init__(
identifier='non_coro', dependencies=set(), task=None,
task_context=None)

def __call__(self, *args, **kwargs):
pass


def test_job_not_coroutine():
extension = ParallelExecutorExtension()
args = SimpleNamespace(parallel_workers=2)
jobs = OrderedDict()
jobs['one'] = NonCoroutineJob()

rc = extension.execute(args, jobs)
assert rc == 1


def test_parallel_workers_zero():
extension = ParallelExecutorExtension()
args = SimpleNamespace(parallel_workers=0)
jobs = OrderedDict()
jobs['one'] = Job1('job1')
jobs['two'] = Job1('job2')

rc = extension.execute(args, jobs)
assert rc == 0
assert set(ran_jobs) == {'job1', 'job2'}
ran_jobs.clear()
Loading