-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcache.py
More file actions
117 lines (87 loc) · 3.04 KB
/
cache.py
File metadata and controls
117 lines (87 loc) · 3.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
import time
import threading
from dotenv import load_dotenv
load_dotenv()
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
USE_REDIS = False
redis_client = None
# Attempt import + connection
try:
import redis
redis_client = redis.StrictRedis.from_url(REDIS_URL, decode_responses=True)
# Try pinging Redis
try:
redis_client.ping()
USE_REDIS = True
print("Redis connected successfully.")
except redis.exceptions.ConnectionError:
print("Redis server not reachable. Falling back to in-memory cache.")
USE_REDIS = False
except ImportError:
print("Redis library not installed. Using in-memory fallback.")
USE_REDIS = False
class CacheBackend:
"""Unified interface for any cache backend (Redis or in-memory)."""
def get(self, key: str):
"""Return value for key, or None if not present."""
raise NotImplementedError
def set(self, key: str, value: str, ttl: int):
"""Set value with TTL (in seconds)."""
raise NotImplementedError
def delete(self, key: str):
"""Delete a key from the cache."""
raise NotImplementedError
def exists(self, key: str) -> bool:
"""Return whether the key exists."""
raise NotImplementedError
class RedisCache(CacheBackend):
def __init__(self,client):
self.client = client
def get(self, key: str):
return self.client.get(key)
def set(self, key: str, value: str, ttl: int):
self.client.set(key, value, ex=ttl)
def delete(self, key: str):
self.client.delete(key)
def exists(self, key: str) -> bool:
return self.client.exists(key) == 1
class MemoryTTLCache(CacheBackend):
def __init__(self):
self.store = {}
self.lock = threading.Lock()
def _purge_expired(self):
""" Remove expired items"""
now = time.time()
expired_keys = []
for key, (_, expire_at) in self.store.items():
if expire_at is not None and expire_at < now:
expired_keys.append(key)
for key in expired_keys:
del self.store[key]
def get(self,key: str):
with self.lock:
self._purge_expired()
if key not in self.store:
return None
value, expire_at = self.store[key]
if expire_at is not None and expire_at < time.time():
del self.store[key]
return None
return value
def set(self, key: str, value: str, ttl: int):
expire_at = time.time() + ttl if ttl is not None else None
with self.lock:
self.store[key] = (value, expire_at)
def delete (self, key: str):
with self.lock:
if key in self.store:
del self.store[key]
def exists(self,key:str):
with self.lock:
self._purge_expired()
return key in self.store
if USE_REDIS:
cache = RedisCache(redis_client)
else:
cache = MemoryTTLCache()