-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
460 lines (406 loc) Β· 18.3 KB
/
database.py
File metadata and controls
460 lines (406 loc) Β· 18.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
"""
Database Module for CleanAcc Bot
Repository: https://github.com/MeherMankar/CleanAcc
Developer: @MeherMankar
"""
import asyncio
import aiosqlite
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Optional, Dict, Any, List
import logging
from datetime import datetime
import config
logger = logging.getLogger(__name__)
class DatabasePool:
"""Database connection pool manager"""
def __init__(self, database_path: str, pool_size: int = 10):
self.database_path = database_path
self.pool_size = pool_size
self._pool: asyncio.Queue = asyncio.Queue(maxsize=pool_size)
self._initialized = False
async def initialize(self):
"""Initialize the connection pool"""
if self._initialized:
return
# Create initial connections
for _ in range(self.pool_size):
conn = await aiosqlite.connect(self.database_path)
conn.row_factory = aiosqlite.Row
await self._pool.put(conn)
self._initialized = True
logger.info(f"Database pool initialized with {self.pool_size} connections")
@asynccontextmanager
async def get_connection(self) -> AsyncGenerator[aiosqlite.Connection, None]:
"""Get a connection from the pool"""
if not self._initialized:
await self.initialize()
conn = await self._pool.get()
try:
yield conn
finally:
await self._pool.put(conn)
async def close(self):
"""Close all connections in the pool"""
while not self._pool.empty():
conn = await self._pool.get()
await conn.close()
logger.info("Database pool closed")
# Global pool instance
db_pool = DatabasePool(config.DATABASE_PATH, config.DATABASE_POOL_SIZE)
async def init_db():
"""Initialize database with improved schema"""
async with db_pool.get_connection() as db:
# Users table with additional fields
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
telegram_id INTEGER UNIQUE NOT NULL,
username TEXT,
first_name TEXT,
is_banned BOOLEAN DEFAULT FALSE,
rate_limit_count INTEGER DEFAULT 0,
rate_limit_reset TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Sessions table with validation fields
await db.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
phone_number TEXT NOT NULL,
session_file TEXT NOT NULL,
device_name TEXT,
device_os TEXT,
app_version TEXT,
proxy_host TEXT,
proxy_port INTEGER,
proxy_user TEXT,
proxy_pass TEXT,
is_valid BOOLEAN DEFAULT TRUE,
last_validated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
# Cleanup history table
await db.execute("""
CREATE TABLE IF NOT EXISTS cleanup_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
cleanup_type TEXT NOT NULL,
items_processed INTEGER DEFAULT 0,
items_deleted INTEGER DEFAULT 0,
status TEXT DEFAULT 'pending',
error_message TEXT,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (id),
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
# Proxy settings table
await db.execute("""
CREATE TABLE IF NOT EXISTS proxy_settings (
id INTEGER PRIMARY KEY,
proxy_url TEXT NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
last_tested TIMESTAMP,
success_rate REAL DEFAULT 0.0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# User activity log
await db.execute("""
CREATE TABLE IF NOT EXISTS user_activity (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
action TEXT NOT NULL,
details TEXT,
ip_address TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
await db.commit()
logger.info("Database schema initialized")
class Database:
"""Enhanced database operations with proper error handling"""
@staticmethod
async def add_user(telegram_id: int, username: str = None, first_name: str = None) -> bool:
"""Add user with activity logging"""
try:
async with db_pool.get_connection() as db:
await db.execute(
"""INSERT OR IGNORE INTO users
(telegram_id, username, first_name, last_activity)
VALUES (?, ?, ?, ?)""",
(telegram_id, username, first_name, datetime.now())
)
await db.commit()
# Log activity
await Database.log_user_activity(telegram_id, "user_registered")
return True
except Exception as e:
logger.error(f"Error adding user {telegram_id}: {e}")
return False
@staticmethod
async def get_user(telegram_id: int) -> Optional[Dict[str, Any]]:
"""Get user with activity update"""
try:
async with db_pool.get_connection() as db:
# Update last activity
await db.execute(
"UPDATE users SET last_activity = ? WHERE telegram_id = ?",
(datetime.now(), telegram_id)
)
cursor = await db.execute(
"SELECT * FROM users WHERE telegram_id = ?",
(telegram_id,)
)
row = await cursor.fetchone()
await db.commit()
return dict(row) if row else None
except Exception as e:
logger.error(f"Error getting user {telegram_id}: {e}")
return None
@staticmethod
async def check_rate_limit(telegram_id: int) -> bool:
"""Check if user is within rate limits"""
try:
async with db_pool.get_connection() as db:
cursor = await db.execute(
"""SELECT rate_limit_count, rate_limit_reset
FROM users WHERE telegram_id = ?""",
(telegram_id,)
)
row = await cursor.fetchone()
if not row:
return True
now = datetime.now()
reset_time = datetime.fromisoformat(row['rate_limit_reset'])
# Reset counter if window expired
if now > reset_time:
await db.execute(
"""UPDATE users SET rate_limit_count = 1,
rate_limit_reset = ? WHERE telegram_id = ?""",
(now.replace(second=0, microsecond=0) +
asyncio.timedelta(seconds=config.RATE_LIMIT_WINDOW), telegram_id)
)
await db.commit()
return True
# Check if within limits
if row['rate_limit_count'] >= config.RATE_LIMIT_REQUESTS:
return False
# Increment counter
await db.execute(
"UPDATE users SET rate_limit_count = rate_limit_count + 1 WHERE telegram_id = ?",
(telegram_id,)
)
await db.commit()
return True
except Exception as e:
logger.error(f"Error checking rate limit for {telegram_id}: {e}")
return True # Allow on error
@staticmethod
async def log_user_activity(telegram_id: int, action: str, details: str = None, ip_address: str = None):
"""Log user activity"""
try:
async with db_pool.get_connection() as db:
# Get user ID
cursor = await db.execute(
"SELECT id FROM users WHERE telegram_id = ?",
(telegram_id,)
)
user_row = await cursor.fetchone()
if user_row:
await db.execute(
"""INSERT INTO user_activity
(user_id, action, details, ip_address)
VALUES (?, ?, ?, ?)""",
(user_row['id'], action, details, ip_address)
)
await db.commit()
except Exception as e:
logger.error(f"Error logging activity for {telegram_id}: {e}")
@staticmethod
async def add_session(user_telegram_id: int, phone_number: str, session_file: str,
device_info: Dict[str, str] = None, proxy_info: Dict[str, Any] = None) -> bool:
"""Add session with validation"""
try:
async with db_pool.get_connection() as db:
async with db.execute("BEGIN"):
# Get user ID
cursor = await db.execute(
"SELECT id FROM users WHERE telegram_id = ?",
(user_telegram_id,)
)
user_row = await cursor.fetchone()
if not user_row:
return False
user_id = user_row['id']
device_name = device_info.get('name', 'Unknown') if device_info else 'Unknown'
device_os = device_info.get('os', 'Unknown') if device_info else 'Unknown'
app_version = device_info.get('version', 'Unknown') if device_info else 'Unknown'
proxy_host = proxy_info.get('host') if proxy_info else None
proxy_port = proxy_info.get('port') if proxy_info else None
proxy_user = proxy_info.get('user') if proxy_info else None
proxy_pass = proxy_info.get('pass') if proxy_info else None
await db.execute("""
INSERT INTO sessions
(user_id, phone_number, session_file, device_name, device_os, app_version,
proxy_host, proxy_port, proxy_user, proxy_pass, is_valid, last_validated)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (user_id, phone_number, session_file, device_name, device_os, app_version,
proxy_host, proxy_port, proxy_user, proxy_pass, True, datetime.now()))
await Database.log_user_activity(user_telegram_id, "session_added", phone_number)
return True
except Exception as e:
logger.error(f"Error adding session for {user_telegram_id}: {e}")
return False
@staticmethod
async def validate_session(session_id: int) -> bool:
"""Validate session and update status"""
try:
async with db_pool.get_connection() as db:
await db.execute(
"UPDATE sessions SET last_validated = ? WHERE id = ?",
(datetime.now(), session_id)
)
await db.commit()
return True
except Exception as e:
logger.error(f"Error validating session {session_id}: {e}")
return False
@staticmethod
async def start_cleanup_history(session_id: int, user_telegram_id: int, cleanup_type: str) -> Optional[int]:
"""Start cleanup history record"""
try:
async with db_pool.get_connection() as db:
cursor = await db.execute(
"SELECT id FROM users WHERE telegram_id = ?",
(user_telegram_id,)
)
user_row = await cursor.fetchone()
if not user_row:
return None
cursor = await db.execute(
"""INSERT INTO cleanup_history
(session_id, user_id, cleanup_type, status)
VALUES (?, ?, ?, 'running')""",
(session_id, user_row['id'], cleanup_type)
)
await db.commit()
return cursor.lastrowid
except Exception as e:
logger.error(f"Error starting cleanup history: {e}")
return None
@staticmethod
async def update_cleanup_progress(history_id: int, items_processed: int, items_deleted: int):
"""Update cleanup progress"""
try:
async with db_pool.get_connection() as db:
await db.execute(
"""UPDATE cleanup_history
SET items_processed = ?, items_deleted = ?
WHERE id = ?""",
(items_processed, items_deleted, history_id)
)
await db.commit()
except Exception as e:
logger.error(f"Error updating cleanup progress: {e}")
@staticmethod
async def complete_cleanup_history(history_id: int, status: str, error_message: str = None):
"""Complete cleanup history record"""
try:
async with db_pool.get_connection() as db:
await db.execute(
"""UPDATE cleanup_history
SET status = ?, error_message = ?, completed_at = ?
WHERE id = ?""",
(status, error_message, datetime.now(), history_id)
)
await db.commit()
except Exception as e:
logger.error(f"Error completing cleanup history: {e}")
# Keep existing methods with error handling improvements
@staticmethod
async def get_user_sessions(telegram_id: int) -> List[Dict[str, Any]]:
"""Get all user sessions with validation status"""
try:
async with db_pool.get_connection() as db:
cursor = await db.execute("""
SELECT s.*, u.telegram_id FROM sessions s
JOIN users u ON s.user_id = u.id
WHERE u.telegram_id = ?
ORDER BY s.created_at DESC
""", (telegram_id,))
rows = await cursor.fetchall()
return [dict(row) for row in rows]
except Exception as e:
logger.error(f"Error getting user sessions: {e}")
return []
@staticmethod
async def get_session_by_id(session_id: int) -> Optional[Dict[str, Any]]:
"""Get session by ID"""
try:
async with db_pool.get_connection() as db:
cursor = await db.execute(
"SELECT * FROM sessions WHERE id = ?",
(session_id,)
)
row = await cursor.fetchone()
return dict(row) if row else None
except Exception as e:
logger.error(f"Error getting session {session_id}: {e}")
return None
@staticmethod
async def delete_session(session_id: int, user_telegram_id: int) -> bool:
"""Delete session with logging"""
try:
async with db_pool.get_connection() as db:
cursor = await db.execute("""
DELETE FROM sessions
WHERE id = ? AND user_id = (
SELECT id FROM users WHERE telegram_id = ?
)
""", (session_id, user_telegram_id))
await db.commit()
if cursor.rowcount > 0:
await Database.log_user_activity(user_telegram_id, "session_deleted", str(session_id))
return True
return False
except Exception as e:
logger.error(f"Error deleting session {session_id}: {e}")
return False
@staticmethod
async def get_stats() -> Dict[str, int]:
"""Get enhanced statistics"""
try:
async with db_pool.get_connection() as db:
stats = {}
# Users count
cursor = await db.execute("SELECT COUNT(*) FROM users")
stats['users_count'] = (await cursor.fetchone())[0]
# Sessions count
cursor = await db.execute("SELECT COUNT(*) FROM sessions")
stats['sessions_count'] = (await cursor.fetchone())[0]
# Banned users count
cursor = await db.execute("SELECT COUNT(*) FROM users WHERE is_banned = TRUE")
stats['banned_count'] = (await cursor.fetchone())[0]
# Active sessions (validated recently)
cursor = await db.execute(
"SELECT COUNT(*) FROM sessions WHERE is_valid = TRUE AND last_validated > datetime('now', '-7 days')"
)
stats['active_sessions'] = (await cursor.fetchone())[0]
# Cleanup operations today
cursor = await db.execute(
"SELECT COUNT(*) FROM cleanup_history WHERE DATE(started_at) = DATE('now')"
)
stats['cleanups_today'] = (await cursor.fetchone())[0]
return stats
except Exception as e:
logger.error(f"Error getting stats: {e}")
return {}