-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
84 lines (72 loc) · 2.83 KB
/
database.py
File metadata and controls
84 lines (72 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import pandas as pd
import hashlib
from datetime import datetime
from sqlalchemy import create_engine
# global variables
data_db_name = "data"
runs_db_name = "runs"
runs_table_name = "meta"
def save(df: pd.DataFrame,
table_name: str):
"""
Save the data frame to the database.
:param df: Dataframe to save
:param table_name: name of
"""
# 1: create a table with data and save it
engine = create_engine(f'mysql+mysqlconnector://{os.environ["DB_USER"]}:{os.environ["DB_PASSWORD"]}@'
f'{os.environ["DB_HOST"]}:{os.environ["DB_PORT"]}/{data_db_name}')
conn = engine.connect()
df.to_sql(name=table_name,
con=conn,
if_exists="fail")
conn.close()
# 2: save meta_data
engine = create_engine(f'mysql+mysqlconnector://{os.environ["DB_USER"]}:{os.environ["DB_PASSWORD"]}@'
f'{os.environ["DB_HOST"]}:{os.environ["DB_PORT"]}/{runs_db_name}')
conn = engine.connect()
df_meta = pd.DataFrame(data={"table_name": table_name,
"hierarchy": "data",
"time_stamp": str(datetime.today()),
"hash": hashlib.sha1(pd.util.hash_pandas_object(df).values).hexdigest()}, index=[0])
df_meta.to_sql(name=runs_table_name,
con=conn,
index=False,
if_exists="append")
conn.close()
def current_data():
"""
Shows the current data saved in the database.
:return: pd.Dataframe
"""
engine = create_engine(f'mysql+mysqlconnector://{os.environ["DB_USER"]}:{os.environ["DB_PASSWORD"]}@'
f'{os.environ["DB_HOST"]}:{os.environ["DB_PORT"]}/{runs_db_name}')
conn = engine.connect()
df_meta = pd.read_sql_table(table_name=runs_table_name, con=conn)
conn.close()
return df_meta
def load(table_name: str):
"""
Load the data from the database.
:param table_name:
:return: pd.Dataframe
"""
# 1: load meta-data and perform checks
df_meta = current_data()
# check that the table exists
if not table_name in df_meta['table_name'].values:
raise ValueError("Table not found in the database")
elif len(df_meta[df_meta['table_name'] == table_name]) > 1:
print(f"Warning, multiple tables with the name '{table_name}' found in the meta-data database")
else:
pass
# 2: load data
engine = create_engine(f'mysql+mysqlconnector://{os.environ["DB_USER"]}:{os.environ["DB_PASSWORD"]}@'
f'{os.environ["DB_HOST"]}:{os.environ["DB_PORT"]}/{data_db_name}')
conn = engine.connect()
df = pd.read_sql_table(table_name=table_name, con=conn, index_col="index")
conn.close()
# treatment of data loaded from container database
df = df.infer_objects()
return df