This repository was archived by the owner on Apr 22, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
executable file
·97 lines (74 loc) · 2.58 KB
/
main.py
File metadata and controls
executable file
·97 lines (74 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python
import importlib
import logging
import pkgutil
import signal
from multiprocessing import Process
from threading import Event, Thread
import services as services_package
from control_matrix.queue import QueueSubscriber, QueuePublisher
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("ControlMatrix")
class Subscription(Thread):
def __init__(self, name, sub_port, handler):
super().__init__()
self.stop_flag = Event()
self.name = name
self.handler = handler
self.socket = QueueSubscriber("", self.handle_message, sub_port, self.stop_flag)
def handle_message(self, message):
log.info(f" <-- {self.name : <10} | {message}")
self.handler(message)
def run(self):
self.socket.start()
class ControlMatrix:
def __init__(self):
signal.signal(signal.SIGINT, self.stop)
signal.signal(signal.SIGTERM, self.stop)
self.services_pkg = services_package
self.services = {}
self.threads = []
self.processes = []
self.queue = QueuePublisher()
def load_services(self):
for _, name, is_pkg in pkgutil.iter_modules(
self.services_pkg.__path__, self.services_pkg.__name__ + "."
):
module = importlib.import_module(f"{name}.main")
try:
service_cls = getattr(module, "Service")
self.services[name] = service_cls
log.info(f"Loading {service_cls.name}")
except AttributeError:
pass
for name, service in self.services.items():
print(name, service)
def run(self):
self.threads = [
Subscription(name, service.pub_port, self.forward_handler)
for name, service in self.services.items()
if service.is_enabled and service.pub_port
]
self.processes = [
Process(target=service, name=name)
for name, service in self.services.items()
if service.is_enabled
]
for thread in self.threads:
thread.start()
for proc in self.processes:
proc.start()
for proc in self.processes:
proc.join()
for thread in self.threads:
thread.join()
def stop(self, *args):
for thread in self.threads:
thread.stop_flag.set()
def forward_handler(self, message):
log.info(f" --> master | {message}")
self.queue.send_raw(message)
if __name__ == "__main__":
ctrl = ControlMatrix()
ctrl.load_services()
ctrl.run()