-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
135 lines (113 loc) · 3.36 KB
/
Copy pathworker.py
File metadata and controls
135 lines (113 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import time
from enum import Enum
from typing import List
import random
import psycopg
from psycopg.rows import class_row
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class JobConfig:
poll_interval: int = 2 # number of seconds between each batch
max_attempts: int = 3 # maximum number of attempts before giving up
retry_interval: int = 5 # number of seconds to wait before retrying
batch_size: int = 5 # number of jobs to query per poll
@dataclass
class Job:
job_id: int
job_data: str
job_status: str
attempts: int
last_error: str
run_at: datetime
created_at: datetime
updated_at: datetime
class JobStatus(Enum):
pending = "pending"
success = "success"
failed = "failed"
conn = psycopg.connect("dbname=job_db user=postgres")
def task(job):
if random.choice([True, False, False]):
time.sleep(1)
print(f"job {job.job_id} success")
else:
raise Exception("random failure")
def run_job(cur, job: Job):
try:
print(f"running job {job.job_id} with data: {job.job_data}")
task(job)
cur.execute(
"""update jobs set
job_status=%s,
attempts=%s,
updated_at=%s
where job_id=%s""",
[JobStatus.success, job.attempts + 1, datetime.now(), job.job_id],
)
except Exception as e:
handle_job_error(cur, e, job)
def handle_job_error(cur, e, job):
print(f"job {job.job_id} failed: {e}")
if job.attempts + 1 < JobConfig.max_attempts:
cur.execute(
"""update jobs set
job_status=%s,
last_error=%s,
attempts=%s,
run_at=%s,
updated_at=%s
where job_id=%s""",
[
JobStatus.pending,
str(e),
job.attempts + 1,
datetime.now() + timedelta(seconds=JobConfig.retry_interval),
datetime.now(),
job.job_id,
],
)
else:
cur.execute(
"""update jobs set
job_status=%s,
last_error=%s,
attempts=%s,
updated_at=%s
where job_id=%s""",
[
JobStatus.failed,
str(e),
job.attempts + 1,
datetime.now(),
job.job_id,
],
)
def get_job_fields():
fields = [k for k in Job.__annotations__]
fields_str = ",".join(fields)
return fields_str
def get_pending_jobs(cur) -> List[Job]:
fields_str = get_job_fields()
jobs = cur.execute(
f"""select {fields_str} from jobs
where job_status=%s and run_at <= now()
order by job_id limit %s
for update skip locked""",
[JobStatus.pending, JobConfig.batch_size],
).fetchall()
return jobs
def main():
while True:
try:
cur = conn.cursor(row_factory=class_row(Job))
pending_jobs = get_pending_jobs(cur)
print(f"pending jobs found: {len(pending_jobs)}")
for job in pending_jobs:
run_job(cur, job)
conn.commit()
except Exception as e:
print(f"error while running jobs: {e}")
time.sleep(JobConfig.poll_interval)
if __name__ == "__main__":
main()