6868 unit text,
6969 start int,
7070 end int,
71+ CONSTRAINT time CHECK (start < end),
7172 primary key(unit, start)
7273)
7374'''
8586)
8687'''
8788
89+ def error_item_to_sql_item (_x ,meta ):
90+ """
91+ returns the error as a tuple and converts the time to unix timestamp (milliseconds)
92+ """
93+ _id = meta ['id' ]
94+ #print(x['time'])
95+ _time = int (get_datetime (_x ['time' ]).timestamp ()* 1000 )
96+ #print(_time,x['time'],get_datetime(x['time']))
97+ _spn = _x ['spn' ]
98+ _fmi = _x ['fmi' ]
99+ _oc = _x ['occurrenceCount' ]
100+ if 'name' in _x :
101+ _name = _x ['name' ]
102+ else :
103+ _name = 'none'
104+ if 'description' in _x :
105+ _desc = _x ['description' ]
106+ else :
107+ _desc = 'none'
108+ return (_id ,_time ,_spn ,_fmi ,_oc ,_name ,_desc )
109+ def sql_item_to_error_item (obj ):
110+ """
111+ the operation error_item_to_sql_item reversed
112+ """
113+ _x = {}
114+ _x ['id' ] = obj [0 ]
115+ _x ['time' ] = datetime .fromtimestamp (obj [1 ]/ 1000.0 ).isoformat ()
116+ _x ['spn' ] = obj [2 ]
117+ _x ['fmi' ] = obj [3 ]
118+ _x ['occurrenceCount' ] = obj [4 ]
119+ _x ['name' ] = obj [5 ]
120+ _x ['description' ] = obj [6 ]
121+ return _x
88122class SqlInsertIter :
89123 """iterator for tucache data"""
90124 def __init__ (self , sqliter , meta = None , _db = None ):
@@ -103,29 +137,12 @@ def __aiter__(self):
103137 return self
104138
105139 async def __anext__ (self ):
106- def map_f (_x ,meta ):
107- _id = meta ['id' ]
108- #print(x['time'])
109- _time = int (get_datetime (_x ['time' ]).timestamp ()* 1000 )
110- #print(_time,x['time'],get_datetime(x['time']))
111- _spn = _x ['spn' ]
112- _fmi = _x ['fmi' ]
113- _oc = _x ['occurrenceCount' ]
114- if 'name' in _x :
115- _name = _x ['name' ]
116- else :
117- _name = 'none'
118- if 'description' in _x :
119- _desc = _x ['description' ]
120- else :
121- _desc = 'none'
122- return (_id ,_time ,_spn ,_fmi ,_oc ,_name ,_desc )
123-
124140 try :
125141 data , meta = await self .sqliter .__anext__ ()
126142 if self ._db is not None :
127143 #print(data)
128- sqldata = map (lambda x : map_f (x ,meta ),data )
144+ data = list (data )
145+ sqldata = map (lambda x : error_item_to_sql_item (x ,meta ),data )
129146 self .cur .executemany ("INSERT INTO error VALUES (?,?,?,?,?,?,?)" ,sqldata )
130147 return data , meta
131148 except StopAsyncIteration as exc :
@@ -136,7 +153,17 @@ def map_f(_x,meta):
136153 print ("Committed" )
137154 self ._db .commit ()
138155 raise StopAsyncIteration from exc
139-
156+ except sqlite3 .IntegrityError as exc1 :
157+ print ("Integrety error with" ,meta )
158+ print ("Try to find double entry (insert side)" )
159+ sqldata = map (lambda x : error_item_to_sql_item (x ,meta ),data )
160+ for in_item in sqldata :
161+ try :
162+ self .cur .execute ("INSERT INTO error VALUES (?,?,?,?,?,?,?)" ,in_item )
163+ except sqlite3 .IntegrityError as exc2 :
164+ print ("Item throwing Integrity error" ,in_item )
165+ self ._db .rollback ()
166+ raise sqlite3 .IntegrityError from exc1
140167class SqlCache :
141168 """Sql cache can cache trackunit data in Sqlite DB"""
142169 def __init__ (self ,auth = None ,_dir = None ,db_file = "webdb.db" ,upstream_cache = None ):
@@ -170,16 +197,16 @@ def get_history(self, unit, start):
170197 print (meta )
171198 except StopIteration :
172199 print ("no element found" )
173-
174200 def get_candata (self ,unit ,start ):
175201 """returns history data of a vehicle with start and end date"""
176202 #tbd
177- def get_errors_upstream (self , veh_id , start_ts , end_ts , previter = None ):
203+ def get_faults_upstream (self , veh_id , start_ts , end_ts , previter = None ):
178204 """gets errors from upstream cache"""
179205 cur = self ._db .cursor ()
180206
181207 cur .execute ("INSERT INTO errormeta VALUES (?,?,?)" ,(veh_id ,start_ts ,end_ts ))
182- self ._db .commit ()
208+ # wait until data is in database and commit then
209+ #self._db.commit()
183210
184211 start = datetime .fromtimestamp (start_ts / 1000.0 )
185212 end = datetime .fromtimestamp (end_ts / 1000.0 )
@@ -200,48 +227,48 @@ def get_errors_upstream(self, veh_id, start_ts, end_ts, previter=None):
200227
201228 return previter , _len
202229
203- def get_errors (self , veh_id , start , end , previter = None ):
204- """returns error in between the given datetime objects"""
205- start_ts = int (start .timestamp ()* 1000 )
206- end_ts = int (end .timestamp ()* 1000 )
207- return self .get_errors_unixts (veh_id , start_ts , end_ts , previter )
208-
209- def get_errors_unixts (self , veh_id , start_ts , end_ts , previter = None ):
230+ def get_faults_unixts (self , veh_id , start_ts , end_ts , previter = None ):
210231 """returns error in between the given datetime objects"""
211232
212233 cur = self ._db .cursor ()
213234
214235 # Query meta database to check whether there the data is in database
215236 try :
216- _ , me_start , me_end = next (iter (cur .execute ("""
217- select * from errormeta where unit = ? and
218- (start <= ? and end >= ?) or
219- (start <= ? and end >= ?) order by start
220- """ ,(veh_id ,start_ts ,start_ts ,end_ts ,end_ts ))))
237+ me_vehid , me_start , me_end = next (iter (cur .execute ("""
238+ select * from errormeta where unit = ? and (
239+ (start <= ? and end > ?) or
240+ (start < ? and end >= ?) or
241+ (start >= ? and end <= ?)
242+ ) order by start
243+ """ ,(veh_id ,start_ts ,start_ts ,end_ts ,end_ts ,start_ts ,end_ts ))))
221244 me_start = int (float (me_start ))
222245 me_end = int (float (me_end ))
223246 except StopIteration :
224- print ("Stop iteration. Get errors from upstream now" )
225- return self .get_errors_upstream (veh_id ,start_ts ,end_ts ,previter )
247+ print ("Stop iteration. Didnt find" ,\
248+ veh_id ,start_ts ,end_ts ,\
249+ "Get errors from upstream now" )
250+ return self .get_faults_upstream (veh_id ,start_ts ,end_ts ,previter )
251+
252+ print ("Found block" ,me_vehid ,me_start ,me_end )
226253
227254 # Depending on the start and end of the next block, apply divide and conquer
228255 # by recursive calls of this function.
229256 if me_start <= start_ts :
230257 if me_end < end_ts :
231- previter , cnt1 = self .get_errors_unixts (veh_id ,start_ts ,me_end ,previter )
232- previter , cnt2 = self .get_errors_unixts (veh_id ,me_end + 1 ,start_ts ,previter )
258+ previter , cnt1 = self .get_faults_unixts (veh_id ,start_ts ,me_end ,previter )
259+ previter , cnt2 = self .get_faults_unixts (veh_id ,me_end + 1 ,end_ts ,previter )
233260 return previter ,(cnt1 + cnt2 )
234- return self .get_errors_sql (veh_id ,start_ts ,end_ts ,previter )
261+ return self .get_faults_sql (veh_id ,start_ts ,end_ts ,previter )
235262 if me_end >= end_ts :
236- previter , cnt1 = self .get_errors_unixts (veh_id ,start_ts ,me_start - 1 ,previter )
237- previter , cnt2 = self .get_errors_unixts (veh_id ,me_start ,end_ts ,previter )
263+ previter , cnt1 = self .get_faults_unixts (veh_id ,start_ts ,me_start - 1 ,previter )
264+ previter , cnt2 = self .get_faults_unixts (veh_id ,me_start ,end_ts ,previter )
238265 return previter ,(cnt1 + cnt2 )
239- previter , cnt1 = self .get_errors_unixts (veh_id ,start_ts ,me_start - 1 ,previter )
240- previter , cnt2 = self .get_errors_unixts (veh_id ,me_start ,me_end ,previter )
241- previter , cnt3 = self .get_errors_unixts (veh_id ,me_end + 1 ,end_ts ,previter )
266+ previter , cnt1 = self .get_faults_unixts (veh_id ,start_ts ,me_start - 1 ,previter )
267+ previter , cnt2 = self .get_faults_unixts (veh_id ,me_start ,me_end ,previter )
268+ previter , cnt3 = self .get_faults_unixts (veh_id ,me_end + 1 ,end_ts ,previter )
242269 return previter ,(cnt1 + cnt2 + cnt3 )
243270
244- def get_errors_sql (self , veh_id , start_ts , end_ts , previter = None ):
271+ def get_faults_sql (self , veh_id , start_ts , end_ts , previter = None ):
245272 """gets data of this period from db whether or not it was actually stored there"""
246273 cur = self ._db .cursor ()
247274
@@ -252,6 +279,8 @@ def get_errors_sql(self, veh_id, start_ts, end_ts, previter=None):
252279 order by time
253280 """ ,(veh_id ,start_ts ,end_ts )))[0 ]
254281
282+ print ("found" ,cnt ,"in sql" )
283+
255284 meta = {}
256285 meta ["id" ] = veh_id
257286 meta ["start" ] = start_ts
@@ -261,9 +290,11 @@ def get_errors_sql(self, veh_id, start_ts, end_ts, previter=None):
261290 previter = TuIter ()
262291
263292 if cnt > 0 :
264- previter .add (SqlIter (iter (cur .execute (\
293+ previter .add (SqlIter (iter (map ( lambda x : [ x ], map ( sql_item_to_error_item , cur .execute (\
265294 "select * from error where unit = ? and time >= ? and time <= ? order by time" ,\
266- (veh_id ,start_ts ,end_ts ))),meta ))
295+ (veh_id ,start_ts ,end_ts ))))),meta ))
296+ else :
297+ print ("could not find any item in block " , start_ts , end_ts ,"for unit" ,veh_id )
267298
268299 return previter , cnt
269300 def get_faults (self ,veh_id ,tdelta = None ,previter = None ):
@@ -273,14 +304,16 @@ def get_faults(self,veh_id,tdelta=None,previter=None):
273304 else :
274305 end = self .tdelta_end
275306 end = end .replace (hour = 0 ,minute = 0 ,second = 0 ,microsecond = 0 )
276- if isinstance (tdelta ,datetime ):
277- start = end + tdelta
307+ if isinstance (tdelta ,timedelta ):
308+ start = end - tdelta
278309 else :
279310 irange = int (tdelta )
280311 if irange <= 0 :
281312 return []
282313 start = end - timedelta (days = irange )
283- return self .get_errors (veh_id ,start ,end ,previter )
314+ return self .get_faults_timedelta (veh_id ,start ,end ,previter )
284315 def get_faults_timedelta (self ,veh_id ,start ,end ,previter = None ):
285- """get_faults_timedelta"""
286- return self .get_errors (veh_id ,start ,end ,previter )
316+ """returns error in between the given datetime objects"""
317+ start_ts = int (start .timestamp ()* 1000 )
318+ end_ts = int (end .timestamp ()* 1000 )
319+ return self .get_faults_unixts (veh_id , start_ts , end_ts , previter )
0 commit comments