Skip to content

Commit cc9f46d

Browse files
authored
Merge pull request #1 from einsteinmaster/asyncio
Changed from mutlithreading to asyncio
2 parents 58c0829 + 63110b5 commit cc9f46d

12 files changed

Lines changed: 297 additions & 164 deletions

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ requires = [
44
"setuptools",
55
"wheel",
66
"matplotlib",
7-
"requests",
7+
"aiohttp",
8+
"aiofiles",
89
"pytest"
910
]
1011
build-backend = "setuptools.build_meta"

pytrackunit/sqlcache.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""Module for caching data in sql db"""
2+
3+
import sqlite3
4+
import os.path
5+
6+
CREATE_HISTORY_META = '''
7+
create table histmeta(
8+
unit text, start text, end text
9+
)
10+
'''
11+
12+
CREATE_HISTORY_TABLE = '''
13+
CREATE TABLE history
14+
(unit text not null, time text not null, event int, keyId text, location text, address text, heading int, speed real,
15+
km real, run1 real,run2 real,run3 real,run4 real,runOdo real,temperature1 real,temperature2 real,
16+
input1 int,input2 int,input3 int,input4 int,input5 int,input6 int,input7 int,input8 int,input9 int,input10 int,
17+
output1 int,output2 int,output3 int,output4 int,output5 int,
18+
analogInput1 real,analogInput2 real,analogInput4 real,
19+
Input1ChangeCounter INT,Input2ChangeCounter INT,Input3ChangeCounter INT,Input4ChangeCounter INT,
20+
batteryLevel real,externalPower real,
21+
primary key (unit,time))
22+
'''
23+
24+
class SqlCache:
25+
"""Sql cache can cache trackunit data in Sqlite DB"""
26+
def __init__(self,db_file="webdb.db"):
27+
create_tables = not os.path.isfile(db_file)
28+
self.web_db_path = db_file
29+
self._db = sqlite3.connect(self.web_db_path)
30+
if create_tables:
31+
cur = self._db.cursor()
32+
cur.execute(CREATE_HISTORY_TABLE)
33+
cur.execute(CREATE_HISTORY_META)
34+
self._db.commit()
35+
def get_history(self, unit, start):
36+
"""returns history data of a vehicle with start and end date"""
37+
cur = self._db.cursor()
38+
meta = next(cur.execute(
39+
f"select * from histmeta where unit='{unit}' and start<='{start}' order by start"))
40+
print(meta)
41+
def get_candata(self,unit,start):
42+
"""returns history data of a vehicle with start and end date"""
43+
#tbd
44+

pytrackunit/trackunit.py

Lines changed: 26 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
import json
44
import os.path
5-
from math import ceil
6-
from multiprocessing import Pool
7-
from datetime import timedelta,datetime
5+
import asyncio
86
from .tucache import TuCache
97

108
class TrackUnit:
@@ -23,8 +21,7 @@ def __init__(self,config_filename=None,api_key=None,verbose=False):
2321
with open(config["apikey-location"],encoding="utf8") as file_apikey:
2422
api_key = file_apikey.readline()
2523
self.cache = TuCache(('API',api_key),_dir=config["webcache-location"],verbose=verbose)
26-
self.req_period = 30
27-
self.tdelta_end = None
24+
2825
@property
2926
def verbose(self):
3027
"""returns verbose mode value. in verbose mode, diagnostic output is printed to console."""
@@ -33,62 +30,36 @@ def verbose(self):
3330
def verbose(self, value):
3431
"""sets the verbose mode. in verbose mode, diagnostic output is printed to console."""
3532
self.cache.cache.verbose = value
36-
def get(self,req):
37-
"""get method"""
38-
url = r'https://api.trackunit.com/public/'+req
39-
data = self.cache.get(url)
40-
if data is None:
41-
raise Exception("no data: "+str(data))
42-
return data
33+
4334
def get_unitlist(self,_type=None,sort_by_hours=True):
4435
"""unitList method"""
45-
data = self.get('Unit')
36+
data = asyncio.run(self.cache.get_url('Unit'))
4637
if _type is not None:
4738
data = list(filter(lambda x: " " in x['name'] and _type in x['name'],data))
4839
if sort_by_hours:
4940
data.sort(key=lambda x: (x['run1'] if 'run1' in x else 0),reverse=True)
5041
return data
51-
def general_daydiff_get(self,furl,tdelta,threads=1):
52-
"""returns data for timedependant requests for a given daydelta"""
53-
if self.tdelta_end is None:
54-
end = datetime.now()
55-
else:
56-
end = self.tdelta_end
57-
end = end.replace(hour=0,minute=0,second=0,microsecond=0)
58-
if isinstance(tdelta,datetime):
59-
start = end+tdelta
60-
else:
61-
irange = int(tdelta)
62-
if irange <= 0:
63-
return []
64-
start = end-timedelta(days=irange)
65-
return self.general_time_range_get(furl,start,end,threads)
66-
def general_time_range_get(self,furl,start=None,end=None,threads=1):
67-
"""returns data for timedependant requests for a start and enddate"""
68-
total_data = []
69-
days = (end-start).days
70-
requests = []
71-
for week in range(ceil(days/self.req_period)):
72-
wstart = start+timedelta(days=week*self.req_period)
73-
wend = wstart+timedelta(days=min(self.req_period,(end-wstart).days))
74-
requests.append(furl(\
75-
wstart.strftime("%Y-%m-%dT%H:%M:%S"),\
76-
wend.strftime("%Y-%m-%dT%H:%M:%S")))
77-
if threads > 1:
78-
with Pool(threads) as _p:
79-
map_result = _p.map(self.get,requests)
80-
else:
81-
map_result = map(self.get,requests)
82-
for _r in map_result:
83-
total_data += _r
84-
return total_data
85-
def get_history(self,veh_id,tdelta,threads=1):
42+
43+
async def _a_get_history(self,veh_id,tdelta):
44+
"""async getHistory method"""
45+
data = []
46+
_it, _ = self.cache.get_history(veh_id,tdelta)
47+
async for _d in _it:
48+
data += _d
49+
return data
50+
51+
async def _a_get_candata(self,veh_id,tdelta=None):
52+
"""async getCanData method"""
53+
data = []
54+
_it, _ = self.cache.get_candata(veh_id,tdelta)
55+
async for _d in _it:
56+
data += _d
57+
return data
58+
59+
def get_history(self,veh_id,tdelta):
8660
"""getHistory method"""
87-
return self.general_daydiff_get(lambda t1,t2: \
88-
'Report/UnitHistory?unitId='+veh_id+'&from='+t1+'.0000001Z&to='+t2+'.0000000Z',\
89-
tdelta,threads)
90-
def get_candata(self,veh_id,tdelta=None,threads=1):
61+
return asyncio.run(self._a_get_history(veh_id,tdelta))
62+
63+
def get_candata(self,veh_id,tdelta=None):
9164
"""getCanData method"""
92-
return self.general_daydiff_get(lambda t1,t2: \
93-
'Report/UnitExtendedInfo?Id='+veh_id+'&from='+t1+'.0000001Z&to='+t2+'.0000000Z',\
94-
tdelta,threads)
65+
return asyncio.run(self._a_get_candata(veh_id,tdelta))

pytrackunit/tucache.py

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,69 @@
11
"""tucache module"""
22

3-
import sqlite3
4-
import os.path
3+
from datetime import datetime, timedelta
4+
from math import ceil
55
from .webcache import WebCache
6+
from .tuiter import ReqIter, TuIter
7+
8+
URL_BASE = r'https://api.trackunit.com/public/'
69

710
class TuCache:
811
"""tucache class"""
9-
def __init__(self,auth=None,_dir=None,use_sqlite=False,verbose=False):
12+
def __init__(self,auth=None,_dir=None,verbose=False):
1013
self.cache = WebCache(auth=auth,_dir=_dir,verbose=verbose)
11-
self.use_sqlite = use_sqlite
12-
if self.use_sqlite:
13-
self.web_db_path = os.path.join(_dir,"webdb.db")
14-
self._db = sqlite3.connect(self.web_db_path)
15-
cur = self._db.cursor()
16-
_res = list(cur.execute(
17-
'''
18-
SELECT name FROM sqlite_master WHERE type="table" AND name="history"
19-
'''))
20-
if len(_res) == 0:
21-
cur.execute('''CREATE TABLE history
22-
(unit text not null, time text not null, event int, keyId text, location text, address text, heading int, speed real,
23-
km real, run1 real,run2 real,run3 real,run4 real,runOdo real,temperature1 real,temperature2 real,
24-
input1 int,input2 int,input3 int,input4 int,input5 int,input6 int,input7 int,input8 int,input9 int,input10 int,
25-
output1 int,output2 int,output3 int,output4 int,output5 int,
26-
analogInput1 real,analogInput2 real,analogInput4 real,
27-
Input1ChangeCounter INT,Input2ChangeCounter INT,Input3ChangeCounter INT,Input4ChangeCounter INT,
28-
batteryLevel real,externalPower real,
29-
primary key (unit,time))''')
30-
self._db.commit()
14+
self.req_period = 30
15+
self.tdelta_end = None
3116
def clean(self):
3217
"""deletes all cached data"""
3318
self.cache.clean()
34-
def get(self,url):
19+
async def get_url(self,url):
3520
"""takes the data from cache if possible. otherwise data is loaded from web"""
36-
data = self.cache.get(url)
21+
data = await self.cache.get(URL_BASE+url)
3722
if self.cache.return_only_cache_files:
3823
return [data]
3924
if self.cache.dont_return_data:
4025
return []
4126
if self.cache.dont_read_files and len(data) == 0:
4227
return []
43-
# if self.use_sqlite and "api.trackunit.com/public/Report/UnitHistory" in url:
44-
# cur = self.db.cursor()
45-
# _data = list(map(lambda x: (x["time"])))
4628
return data.get('list')
29+
def general_daydiff_get(self,furl,tdelta,previter=None):
30+
"""returns data for timedependant requests for a given daydelta"""
31+
if self.tdelta_end is None:
32+
end = datetime.now()
33+
else:
34+
end = self.tdelta_end
35+
end = end.replace(hour=0,minute=0,second=0,microsecond=0)
36+
if isinstance(tdelta,datetime):
37+
start = end+tdelta
38+
else:
39+
irange = int(tdelta)
40+
if irange <= 0:
41+
return []
42+
start = end-timedelta(days=irange)
43+
return self.general_time_range_get(furl,start,end,previter)
44+
def general_time_range_get(self,furl,start=None,end=None,previter=None):
45+
"""returns data for timedependant requests for a start and enddate"""
46+
days = (end-start).days
47+
requests = []
48+
for week in range(ceil(days/self.req_period)):
49+
wstart = start+timedelta(days=week*self.req_period)
50+
wend = wstart+timedelta(days=min(self.req_period,(end-wstart).days))
51+
requests.append(furl(\
52+
wstart.strftime("%Y-%m-%dT%H:%M:%S"),\
53+
wend.strftime("%Y-%m-%dT%H:%M:%S")))
54+
internal_iter = ReqIter(self,iter(requests))
55+
if previter is None:
56+
previter = TuIter()
57+
previter.add(internal_iter)
58+
return previter,len(requests)
59+
60+
def get_history(self,veh_id,tdelta,previter=None):
61+
"""getHistory method"""
62+
return self.general_daydiff_get(lambda t1,t2: \
63+
'Report/UnitHistory?unitId='+veh_id+'&from='+t1+'.0000001Z&to='+t2+'.0000000Z',\
64+
tdelta,previter)
65+
def get_candata(self,veh_id,tdelta=None,previter=None):
66+
"""getCanData method"""
67+
return self.general_daydiff_get(lambda t1,t2: \
68+
'Report/UnitExtendedInfo?Id='+veh_id+'&from='+t1+'.0000001Z&to='+t2+'.0000000Z',\
69+
tdelta,previter)

pytrackunit/tuiter.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""iterator for tucache data"""
2+
3+
class ReqIter:
4+
"""iterator for web (url) requests"""
5+
def __init__(self,cache,requests):
6+
self.cache = cache
7+
self.requests = requests
8+
self.iter_started = False
9+
10+
def __aiter__(self):
11+
if self.iter_started:
12+
raise Exception("cant start tuiter more than once")
13+
self.iter_started = True
14+
return self
15+
16+
async def __anext__(self):
17+
try:
18+
return await self.cache.get_url(next(self.requests))
19+
except StopIteration as exc:
20+
raise StopAsyncIteration from exc
21+
22+
class SqlIter:
23+
"""iterator for tucache data"""
24+
def __init__(self, sqliter):
25+
self.sqliter = sqliter
26+
self.iter_started = False
27+
28+
def __aiter__(self):
29+
if self.iter_started:
30+
raise Exception("cant start tuiter more than once")
31+
self.iter_started = True
32+
return self
33+
34+
async def __anext__(self):
35+
try:
36+
return next(self.sqliter)
37+
except StopIteration as exc:
38+
raise StopAsyncIteration from exc
39+
40+
class TuIter:
41+
"""iterator holding all internal iterators"""
42+
def __init__(self) -> None:
43+
self.iterators = []
44+
self.iterator_pos = 0
45+
self.iter_started = False
46+
47+
def add(self,_iter):
48+
"""addes an internal iterator to this iterators list"""
49+
self.iterators.append(_iter)
50+
51+
def __aiter__(self):
52+
if self.iter_started:
53+
raise Exception("cant start tuiter more than once")
54+
self.iter_started = True
55+
return self
56+
57+
async def __anext__(self):
58+
try:
59+
return await self.iterators[self.iterator_pos].__anext__()
60+
except StopAsyncIteration as exc:
61+
self.iterator_pos += 1
62+
if self.iterator_pos >= len(self.iterators):
63+
raise StopAsyncIteration from exc
64+
return await self.__anext__()

0 commit comments

Comments
 (0)