-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
154 lines (134 loc) · 5.11 KB
/
main.py
File metadata and controls
154 lines (134 loc) · 5.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import os
import json
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from fastapi import status
from contextlib import asynccontextmanager
from db import Database
db = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global db
print("Starting up...")
db = Database()
with db.get_connection() as conn:
with conn.cursor() as cur:
# cur.execute("""SELECT EXISTS (
# SELECT 1
# FROM pg_catalog.pg_tables
# WHERE schemaname = 'public'
# AND tablename = 'your_table_name'
# );"""
# )
# is_table_exists, = cur.fetchone()
# if is_table_exists:
# pass
cur.execute("BEGIN;")
cur.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";')
cur.execute(
"""
CREATE TABLE IF NOT EXISTS message_broker (
id SERIAL PRIMARY KEY,
message JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
picked_at TIMESTAMP,
deleted_at TIMESTAMP,
heartbeat TIMESTAMP,
receipt_handle UUID DEFAULT uuid_generate_v4()
);
"""
)
cur.execute(
"CREATE INDEX IF NOT EXISTS idx_message_broker_heartbeat ON message_broker (heartbeat);"
)
cur.execute("COMMIT;")
app.state.my_resource = "Some resource"
yield # The application is running at this point
# Shutdown logic here (e.g., close database connections)
print("Shutting down...")
db.close_all_connections()
# Example: Clean up the resource
del app.state.my_resource
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def get_all_messages():
data = []
with db.get_connection() as conn:
with conn.cursor() as cur:
# cur.execute("SELECT * from students ")
cur.execute("""SELECT * FROM message_broker;""")
all_rows = cur.fetchall()
for row in all_rows:
data.append(
{
"id": row[0],
"message": row[1],
"created_at": row[2].strftime("%Y-%m-%d %H:%M:%S"),
"picked_at": row[3],
"deleted_at": row[4],
"heartbeat": row[5],
"receipt_handle": row[6],
}
)
# Access the resource initialized during startup
return JSONResponse(content={"data": data}, status_code=status.HTTP_200_OK)
@app.post("/message")
async def create_message(message: dict):
with db.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO message_broker (message) VALUES (%s) RETURNING id;",
(json.dumps(message),),
)
id = cur.fetchone()[0]
cur.execute("SELECT * FROM message_broker WHERE id = %s;", (id,))
receipt_id = cur.fetchone()[-1]
return JSONResponse(
content={
"message": "Message created successfully",
"reciept_handle": receipt_id,
},
status_code=status.HTTP_201_CREATED,
)
@app.delete("/message/{receipt_handle}")
async def delete_message(receipt_handle: str):
with db.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE message_broker SET deleted_at = CURRENT_TIMESTAMP WHERE receipt_handle = %s RETURNING id;",
(receipt_handle,),
)
id = cur.fetchone()[0]
return JSONResponse(
content={"message": "Message deleted successfully", "id": id},
status_code=status.HTTP_200_OK,
)
@app.get("/message")
async def get_message():
with db.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM message_broker WHERE deleted_at IS NULL AND picked_at IS NULL ORDER BY created_at ASC LIMIT 5 FOR UPDATE SKIP LOCKED;"
)
rows = cur.fetchall()
for row in rows:
print("inside")
if row:
cur.execute(
"UPDATE message_broker SET picked_at = CURRENT_TIMESTAMP WHERE id = %s RETURNING receipt_handle;",
(row[0],),
)
receipt_handle = cur.fetchone()[0]
return JSONResponse(
content={"message": row[1], "receipt_handle": receipt_handle},
status_code=status.HTTP_200_OK,
)
else:
return JSONResponse(
content={"message": "No message available"},
status_code=status.HTTP_404_NOT_FOUND,
)
if "__main__" == __name__:
import uvicorn
print(f"Starting Server at http://{os.getenv('HOST')}:{os.getenv('PORT')}")
uvicorn.run("main:app", port=8080, reload=True, workers=4)