-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_travis.py
More file actions
32 lines (28 loc) · 1.12 KB
/
run_travis.py
File metadata and controls
32 lines (28 loc) · 1.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import subprocess
import threading
import yaml
from typing import Any, Dict, Iterable, Union
returncodes = {}
def execute_script(script_or_scripts: Union[str, Iterable[str]]):
if isinstance(script_or_scripts, str):
process = subprocess.run(script_or_scripts.split())
return process.returncode
elif isinstance(script_or_scripts, Iterable):
for script in script_or_scripts:
returncode = execute_script(script)
if returncode != 0:
return returncode
return 0
else:
raise ValueError(f"`script_or_scripts` must be a string or iterable of strings, not '{type(script_or_scripts)}'")
def execute_job(job: Dict[str, Any]):
returncodes[job["name"]] = execute_script(job["script"])
if __name__ == "__main__":
config = yaml.safe_load(open(".travis.yml"))
threads = [
threading.Thread(target=execute_job, args=(job, ))
for job in config["jobs"]["include"]
]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
[print(job["name"], returncodes[job["name"]]) for job in config["jobs"]["include"]]