-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDatabaseHandler.py
More file actions
138 lines (123 loc) · 4.59 KB
/
DatabaseHandler.py
File metadata and controls
138 lines (123 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
This module is responsible for all database operations.
"""
import logging
import sys
import warnings
import threading
import pymysql
import pymysql.cursors
### based on Sebastian's database handler (uses pymysql)
class DatabaseHandler:
"""
This class handles all database operations.
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, host, user, password, db_name):
self.host = host
self.user = user
self.password = password
self.db_name = db_name
self.logger = logging.getLogger()
self.cnx = None
self.data_base = None
self.connect()
self.condition = threading.Condition()
def connect(self):
"""
Connect to the database.
"""
try:
self.data_base = pymysql.connect(
self.host, self.user, self.password, self.db_name,
cursorclass=pymysql.cursors.DictCursor, charset='utf8')
warnings.filterwarnings("error", category=pymysql.Warning)
self.cnx = self.data_base.cursor()
self.data_base.autocommit(True)
self.logger.debug('Connected to database %s on %s with user %s'
% (self.db_name, self.host, self.user))
except pymysql.Error as my_sql_error:
self.logger.error(
"Error while establishing connection to the database server [%d]: %s"
% (my_sql_error.args[0], my_sql_error.args[1]))
sys.exit(1)
def close(self):
"""
Close the connection.
"""
self.cnx.close()
self.data_base.close()
self.logger.debug('Closed DB connection')
@staticmethod
def __build_insert_sql(self, table, objs):
"""
This method creates an insert SQL string and returns it.
:param table: The table in which the data shall be inserted.
:param objs:
:return: Insert SQL string.
"""
if len(objs) == 0:
return None
key_set = set()
key_set = [key_set.update(row.keys()) for row in objs]
columns = [col for col in key_set]
tuples = []
for item in objs:
if item:
values = []
for key in columns:
try:
values.append('"%key_set"' % str(item[key]).replace('"', "")
if not item[key] == '' else 'NULL')
except KeyError:
values.append('NULL')
if not all(value == 'NULL' for value in values):
tuples.append('(%key_set)' % ', '.join(values))
return 'INSERT INTO `' + table + '` (' + ', '.join(
['`%key_set`' % column for column in columns]) \
+ ') VALUES\n' + ',\n'.join(tuples)
@staticmethod
def __build_select_sql(table_name):
"""
Builds a select * statement.
:param table_name: The table for which a select * shall be performed.
:return: Returns the select * string.
"""
statement = 'SELECT * FROM`' + table_name + ';'
return statement
#Execute SQL-statement
def execute(self, statement):
"""
This method will execute a plain SQL statement.
:param statement: The statement that shall be executed.
:return:
"""
if statement:
with self.condition:
try:
self.logger.debug('Executing SQL-query:\n\t%s'
% statement.replace('\n', '\n\t'))
self.cnx.execute(statement)
return self.cnx.fetchall()
except pymysql.Warning as mysql_error:
self.logger.warning("Warning while executing statement: %s" % mysql_error)
except pymysql.Error as mysql_error:
self.logger.error("Error while executing statement [%d]: %s"
% (mysql_error.args[0], mysql_error.args[1]))
def persist_dict(self, table, array_of_dicts):
"""
Build an insert SQL statement and execute it.
:param table: The table in which the data shall be inserted.
:param array_of_dicts: The data that is to be inserted.
"""
sql = self.__build_insert_sql(table, array_of_dicts)
self.execute(sql)
def select(self, table):
"""
Execute a select statement
:param table:
:return:
"""
sql = self.__build_select_sql(table)
result_set = self.execute(sql)
return result_set