-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample_pandas.py
More file actions
58 lines (42 loc) · 1.98 KB
/
example_pandas.py
File metadata and controls
58 lines (42 loc) · 1.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import asyncio
from typing import TypedDict
import pandas as pd
from aidlab import AidlabManager, DataType, Device, DeviceDelegate, DisconnectReason
class EcgSeries(TypedDict):
timestamp: list[int]
ecg: list[float]
class RespirationSeries(TypedDict):
timestamp: list[int]
respiration: list[float]
PATH = "./"
filename = "output.csv"
class MainManager(DeviceDelegate):
def __init__(self):
self.signals_data_ecg: EcgSeries = {"timestamp": [], "ecg": []}
self.signals_data_respiration: RespirationSeries = {"timestamp": [], "respiration": []}
async def run(self):
devices = await AidlabManager().scan()
if len(devices) > 0:
print("Connecting to:", devices[0].address)
await devices[0].connect(self)
print("Going to save data to csv file after 10 seconds. Don't forget wear device.")
await asyncio.sleep(10)
self.save_to_csv()
def did_connect(self, device: Device):
print("Connected to:", device.address)
asyncio.create_task(device.collect([DataType.ECG, DataType.RESPIRATION], []))
def did_disconnect(self, device: Device, reason: DisconnectReason):
print("Disconnected from:", device.address, reason)
self.save_to_csv()
def did_receive_ecg(self, _: Device, timestamp: int, value: float):
self.signals_data_ecg["timestamp"].append(timestamp)
self.signals_data_ecg["ecg"].append(value)
def did_receive_respiration(self, _: Device, timestamp: int, value: float):
self.signals_data_respiration["timestamp"].append(timestamp)
self.signals_data_respiration["respiration"].append(value)
def save_to_csv(self):
df_ecg = pd.DataFrame(self.signals_data_ecg)
df_respiration = pd.DataFrame(self.signals_data_respiration)
df_combined = pd.merge_asof(df_ecg, df_respiration, on="timestamp")
df_combined.to_csv(PATH + filename, mode="w", index=False, header=True)
asyncio.run(MainManager().run())