-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdaemon.py
More file actions
96 lines (84 loc) · 2.92 KB
/
Copy pathdaemon.py
File metadata and controls
96 lines (84 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import threading
import traceback
from submodules.model.business_objects import general
from submodules.model import telemetry
def run_without_db_token(target, *args, **kwargs):
"""
DB session token isn't automatically created.
You can still do this with general.get_ctx_token but need to return it yourself with remove_and_refresh_session.
"""
fn_name = f"{target.__module__}.{target.__name__}"
def wrapper():
telemetry.TASKS_IN_PROGRESS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()
try:
target(*args, **kwargs)
except Exception:
telemetry.TASKS_ERRORS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()
print("=== Exception in thread ===", flush=True)
print(traceback.format_exc(), flush=True)
print("===========================", flush=True)
else:
telemetry.TASKS_PROCESSED.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()
finally:
telemetry.TASKS_IN_PROGRESS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).dec()
threading.Thread(
target=wrapper,
daemon=True,
).start()
def run_with_db_token(target, *args, **kwargs):
"""
DB session token is automatically created & returned at the end.
Long running threads needs to occasionally daemon.reset_session_token_in_thread to ensure the session doesn't get a timeout.
"""
fn_name = f"{target.__module__}.{target.__name__}"
# this is a workaround to set the token in the actual thread context
def wrapper():
general.get_ctx_token()
telemetry.TASKS_IN_PROGRESS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()
try:
target(*args, **kwargs)
except Exception:
telemetry.TASKS_ERRORS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()
print("=== Exception in thread ===", flush=True)
print(traceback.format_exc(), flush=True)
print("===========================", flush=True)
else:
telemetry.TASKS_PROCESSED.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()
finally:
general.remove_and_refresh_session()
telemetry.TASKS_IN_PROGRESS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).dec()
threading.Thread(
target=wrapper,
daemon=True,
).start()
def prepare_thread(target, *args, **kwargs) -> threading.Thread:
return threading.Thread(
target=target,
args=args,
kwargs=kwargs,
daemon=True,
)