Skip to content

Commit aa2c31a

Browse files
committed
more tests and many bugfixes for sqlcache
1 parent 2c9cfed commit aa2c31a

2 files changed

Lines changed: 547 additions & 59 deletions

File tree

pytrackunit/sqlcache.py

Lines changed: 86 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
unit text,
6969
start int,
7070
end int,
71+
CONSTRAINT time CHECK (start < end),
7172
primary key(unit, start)
7273
)
7374
'''
@@ -85,6 +86,39 @@
8586
)
8687
'''
8788

89+
def error_item_to_sql_item(_x,meta):
90+
"""
91+
returns the error as a tuple and converts the time to unix timestamp (milliseconds)
92+
"""
93+
_id = meta['id']
94+
#print(x['time'])
95+
_time = int(get_datetime(_x['time']).timestamp()*1000)
96+
#print(_time,x['time'],get_datetime(x['time']))
97+
_spn = _x['spn']
98+
_fmi = _x['fmi']
99+
_oc = _x['occurrenceCount']
100+
if 'name' in _x:
101+
_name = _x['name']
102+
else:
103+
_name = 'none'
104+
if 'description' in _x:
105+
_desc = _x['description']
106+
else:
107+
_desc = 'none'
108+
return (_id,_time,_spn,_fmi,_oc,_name,_desc)
109+
def sql_item_to_error_item(obj):
110+
"""
111+
the operation error_item_to_sql_item reversed
112+
"""
113+
_x = {}
114+
_x['id'] = obj[0]
115+
_x['time'] = datetime.fromtimestamp(obj[1]/1000.0).isoformat()
116+
_x['spn'] = obj[2]
117+
_x['fmi'] = obj[3]
118+
_x['occurrenceCount'] = obj[4]
119+
_x['name'] = obj[5]
120+
_x['description'] = obj[6]
121+
return _x
88122
class SqlInsertIter:
89123
"""iterator for tucache data"""
90124
def __init__(self, sqliter, meta=None, _db=None):
@@ -103,29 +137,12 @@ def __aiter__(self):
103137
return self
104138

105139
async def __anext__(self):
106-
def map_f(_x,meta):
107-
_id = meta['id']
108-
#print(x['time'])
109-
_time = int(get_datetime(_x['time']).timestamp()*1000)
110-
#print(_time,x['time'],get_datetime(x['time']))
111-
_spn = _x['spn']
112-
_fmi = _x['fmi']
113-
_oc = _x['occurrenceCount']
114-
if 'name' in _x:
115-
_name = _x['name']
116-
else:
117-
_name = 'none'
118-
if 'description' in _x:
119-
_desc = _x['description']
120-
else:
121-
_desc = 'none'
122-
return (_id,_time,_spn,_fmi,_oc,_name,_desc)
123-
124140
try:
125141
data, meta = await self.sqliter.__anext__()
126142
if self._db is not None:
127143
#print(data)
128-
sqldata = map(lambda x: map_f(x,meta),data)
144+
data = list(data)
145+
sqldata = map(lambda x: error_item_to_sql_item(x,meta),data)
129146
self.cur.executemany("INSERT INTO error VALUES (?,?,?,?,?,?,?)",sqldata)
130147
return data, meta
131148
except StopAsyncIteration as exc:
@@ -136,7 +153,17 @@ def map_f(_x,meta):
136153
print("Committed")
137154
self._db.commit()
138155
raise StopAsyncIteration from exc
139-
156+
except sqlite3.IntegrityError as exc1:
157+
print("Integrety error with",meta)
158+
print("Try to find double entry (insert side)")
159+
sqldata = map(lambda x: error_item_to_sql_item(x,meta),data)
160+
for in_item in sqldata:
161+
try:
162+
self.cur.execute("INSERT INTO error VALUES (?,?,?,?,?,?,?)",in_item)
163+
except sqlite3.IntegrityError as exc2:
164+
print("Item throwing Integrity error",in_item)
165+
self._db.rollback()
166+
raise sqlite3.IntegrityError from exc1
140167
class SqlCache:
141168
"""Sql cache can cache trackunit data in Sqlite DB"""
142169
def __init__(self,auth=None,_dir=None,db_file="webdb.db",upstream_cache=None):
@@ -170,16 +197,16 @@ def get_history(self, unit, start):
170197
print(meta)
171198
except StopIteration:
172199
print("no element found")
173-
174200
def get_candata(self,unit,start):
175201
"""returns history data of a vehicle with start and end date"""
176202
#tbd
177-
def get_errors_upstream(self, veh_id, start_ts, end_ts, previter=None):
203+
def get_faults_upstream(self, veh_id, start_ts, end_ts, previter=None):
178204
"""gets errors from upstream cache"""
179205
cur = self._db.cursor()
180206

181207
cur.execute("INSERT INTO errormeta VALUES (?,?,?)",(veh_id,start_ts,end_ts))
182-
self._db.commit()
208+
# wait until data is in database and commit then
209+
#self._db.commit()
183210

184211
start = datetime.fromtimestamp(start_ts/1000.0)
185212
end = datetime.fromtimestamp(end_ts/1000.0)
@@ -200,48 +227,48 @@ def get_errors_upstream(self, veh_id, start_ts, end_ts, previter=None):
200227

201228
return previter, _len
202229

203-
def get_errors(self, veh_id, start, end, previter=None):
204-
"""returns error in between the given datetime objects"""
205-
start_ts = int(start.timestamp()*1000)
206-
end_ts = int(end.timestamp()*1000)
207-
return self.get_errors_unixts(veh_id, start_ts, end_ts, previter)
208-
209-
def get_errors_unixts(self, veh_id, start_ts, end_ts, previter=None):
230+
def get_faults_unixts(self, veh_id, start_ts, end_ts, previter=None):
210231
"""returns error in between the given datetime objects"""
211232

212233
cur = self._db.cursor()
213234

214235
# Query meta database to check whether there the data is in database
215236
try:
216-
_, me_start, me_end = next(iter(cur.execute("""
217-
select * from errormeta where unit = ? and
218-
(start <= ? and end >= ?) or
219-
(start <= ? and end >= ?) order by start
220-
""",(veh_id,start_ts,start_ts,end_ts,end_ts))))
237+
me_vehid, me_start, me_end = next(iter(cur.execute("""
238+
select * from errormeta where unit = ? and (
239+
(start <= ? and end > ?) or
240+
(start < ? and end >= ?) or
241+
(start >= ? and end <= ?)
242+
) order by start
243+
""",(veh_id,start_ts,start_ts,end_ts,end_ts,start_ts,end_ts))))
221244
me_start = int(float(me_start))
222245
me_end = int(float(me_end))
223246
except StopIteration:
224-
print("Stop iteration. Get errors from upstream now")
225-
return self.get_errors_upstream(veh_id,start_ts,end_ts,previter)
247+
print("Stop iteration. Didnt find",\
248+
veh_id,start_ts,end_ts,\
249+
"Get errors from upstream now")
250+
return self.get_faults_upstream(veh_id,start_ts,end_ts,previter)
251+
252+
print("Found block",me_vehid,me_start,me_end)
226253

227254
# Depending on the start and end of the next block, apply divide and conquer
228255
# by recursive calls of this function.
229256
if me_start <= start_ts:
230257
if me_end < end_ts:
231-
previter, cnt1 = self.get_errors_unixts(veh_id,start_ts,me_end,previter)
232-
previter, cnt2 = self.get_errors_unixts(veh_id,me_end+1,start_ts,previter)
258+
previter, cnt1 = self.get_faults_unixts(veh_id,start_ts,me_end,previter)
259+
previter, cnt2 = self.get_faults_unixts(veh_id,me_end+1,end_ts,previter)
233260
return previter,(cnt1+cnt2)
234-
return self.get_errors_sql(veh_id,start_ts,end_ts,previter)
261+
return self.get_faults_sql(veh_id,start_ts,end_ts,previter)
235262
if me_end >= end_ts:
236-
previter, cnt1 = self.get_errors_unixts(veh_id,start_ts,me_start-1,previter)
237-
previter, cnt2 = self.get_errors_unixts(veh_id,me_start,end_ts,previter)
263+
previter, cnt1 = self.get_faults_unixts(veh_id,start_ts,me_start-1,previter)
264+
previter, cnt2 = self.get_faults_unixts(veh_id,me_start,end_ts,previter)
238265
return previter,(cnt1+cnt2)
239-
previter, cnt1 = self.get_errors_unixts(veh_id,start_ts,me_start-1,previter)
240-
previter, cnt2 = self.get_errors_unixts(veh_id,me_start,me_end,previter)
241-
previter, cnt3 = self.get_errors_unixts(veh_id,me_end+1,end_ts,previter)
266+
previter, cnt1 = self.get_faults_unixts(veh_id,start_ts,me_start-1,previter)
267+
previter, cnt2 = self.get_faults_unixts(veh_id,me_start,me_end,previter)
268+
previter, cnt3 = self.get_faults_unixts(veh_id,me_end+1,end_ts,previter)
242269
return previter,(cnt1+cnt2+cnt3)
243270

244-
def get_errors_sql(self, veh_id, start_ts, end_ts, previter=None):
271+
def get_faults_sql(self, veh_id, start_ts, end_ts, previter=None):
245272
"""gets data of this period from db whether or not it was actually stored there"""
246273
cur = self._db.cursor()
247274

@@ -252,6 +279,8 @@ def get_errors_sql(self, veh_id, start_ts, end_ts, previter=None):
252279
order by time
253280
""",(veh_id,start_ts,end_ts)))[0]
254281

282+
print("found",cnt,"in sql")
283+
255284
meta = {}
256285
meta["id"] = veh_id
257286
meta["start"] = start_ts
@@ -261,9 +290,11 @@ def get_errors_sql(self, veh_id, start_ts, end_ts, previter=None):
261290
previter = TuIter()
262291

263292
if cnt > 0:
264-
previter.add(SqlIter(iter(cur.execute(\
293+
previter.add(SqlIter(iter(map(lambda x: [x],map(sql_item_to_error_item,cur.execute(\
265294
"select * from error where unit = ? and time >= ? and time <= ? order by time",\
266-
(veh_id,start_ts,end_ts))),meta))
295+
(veh_id,start_ts,end_ts))))),meta))
296+
else:
297+
print("could not find any item in block ", start_ts, end_ts,"for unit",veh_id)
267298

268299
return previter, cnt
269300
def get_faults(self,veh_id,tdelta=None,previter=None):
@@ -273,14 +304,16 @@ def get_faults(self,veh_id,tdelta=None,previter=None):
273304
else:
274305
end = self.tdelta_end
275306
end = end.replace(hour=0,minute=0,second=0,microsecond=0)
276-
if isinstance(tdelta,datetime):
277-
start = end+tdelta
307+
if isinstance(tdelta,timedelta):
308+
start = end-tdelta
278309
else:
279310
irange = int(tdelta)
280311
if irange <= 0:
281312
return []
282313
start = end-timedelta(days=irange)
283-
return self.get_errors(veh_id,start,end,previter)
314+
return self.get_faults_timedelta(veh_id,start,end,previter)
284315
def get_faults_timedelta(self,veh_id,start,end,previter=None):
285-
"""get_faults_timedelta"""
286-
return self.get_errors(veh_id,start,end,previter)
316+
"""returns error in between the given datetime objects"""
317+
start_ts = int(start.timestamp()*1000)
318+
end_ts = int(end.timestamp()*1000)
319+
return self.get_faults_unixts(veh_id, start_ts, end_ts, previter)

0 commit comments

Comments
 (0)