-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
238 lines (189 loc) · 7.52 KB
/
Copy pathdatabase.py
File metadata and controls
238 lines (189 loc) · 7.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import sqlite3
class KnowledgeDB:
def __init__(self, db_path='knowledge.db'):
self.db_path = db_path
self.init_db()
def get_connection(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db(self):
"""Initialize the database with required tables"""
conn = self.get_connection()
cursor = conn.cursor()
# Questions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS questions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
question_text TEXT NOT NULL,
category TEXT,
order_index INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Responses table
cursor.execute('''
CREATE TABLE IF NOT EXISTS responses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
question_id INTEGER NOT NULL,
audio_path TEXT,
transcription TEXT,
duration_seconds REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (question_id) REFERENCES questions (id)
)
''')
# Metadata table for tracking progress
cursor.execute('''
CREATE TABLE IF NOT EXISTS session_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
current_question_index INTEGER DEFAULT 0,
total_responses INTEGER DEFAULT 0,
last_session_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Initialize metadata if not exists
cursor.execute('SELECT COUNT(*) as count FROM session_metadata')
if cursor.fetchone()['count'] == 0:
cursor.execute('INSERT INTO session_metadata (current_question_index) VALUES (0)')
conn.commit()
conn.close()
def import_questions(self, questions_list):
"""Import a list of questions into the database
Args:
questions_list: List of dicts with 'question', optional 'category'
"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT COALESCE(MAX(order_index), -1) as max_order FROM questions')
start_index = cursor.fetchone()['max_order'] + 1
imported_count = 0
for q in questions_list:
question_text = q if isinstance(q, str) else q.get('question', q.get('text', ''))
question_text = str(question_text).strip()
category = q.get('category', 'General') if isinstance(q, dict) else 'General'
if not question_text:
continue
cursor.execute('''
INSERT INTO questions (question_text, category, order_index)
VALUES (?, ?, ?)
''', (question_text, category, start_index + imported_count))
imported_count += 1
conn.commit()
conn.close()
return imported_count
def get_question_by_index(self, index):
"""Get a question by its order index"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT id, question_text, category, order_index
FROM questions
WHERE order_index = ?
ORDER BY id ASC
LIMIT 1
''', (index,))
row = cursor.fetchone()
conn.close()
if row:
return dict(row)
return None
def get_current_question(self):
"""Get the current question based on session progress"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT current_question_index FROM session_metadata WHERE id = 1')
index = cursor.fetchone()['current_question_index']
question = self.get_question_by_index(index)
# Also get total count
cursor.execute('SELECT COUNT(*) as total FROM questions')
total = cursor.fetchone()['total']
conn.close()
if question:
question['current_index'] = index
question['total_questions'] = total
return question
def save_response(self, question_id, transcription, audio_path=None, duration=None):
"""Save a transcribed response"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO responses (question_id, transcription, audio_path, duration_seconds)
VALUES (?, ?, ?, ?)
''', (question_id, transcription, audio_path, duration))
response_id = cursor.lastrowid
# Update metadata
cursor.execute('''
UPDATE session_metadata
SET total_responses = total_responses + 1,
last_session_date = CURRENT_TIMESTAMP
WHERE id = 1
''')
conn.commit()
conn.close()
return response_id
def question_exists(self, question_id):
"""Check if a question exists by ID"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT 1 FROM questions WHERE id = ? LIMIT 1', (question_id,))
exists = cursor.fetchone() is not None
conn.close()
return exists
def advance_to_next_question(self):
"""Move to the next question"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
UPDATE session_metadata
SET current_question_index = current_question_index + 1
WHERE id = 1
''')
conn.commit()
conn.close()
def get_all_responses(self, question_id=None):
"""Get all responses, optionally filtered by question"""
conn = self.get_connection()
cursor = conn.cursor()
if question_id:
cursor.execute('''
SELECT r.*, q.question_text, q.category
FROM responses r
JOIN questions q ON r.question_id = q.id
WHERE r.question_id = ?
ORDER BY r.created_at DESC
''', (question_id,))
else:
cursor.execute('''
SELECT r.*, q.question_text, q.category
FROM responses r
JOIN questions q ON r.question_id = q.id
ORDER BY r.created_at DESC
''')
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_stats(self):
"""Get overall statistics"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) as total FROM questions')
total_questions = cursor.fetchone()['total']
cursor.execute('SELECT COUNT(*) as total FROM responses')
total_responses = cursor.fetchone()['total']
cursor.execute('SELECT current_question_index FROM session_metadata WHERE id = 1')
current_index = cursor.fetchone()['current_question_index']
conn.close()
return {
'total_questions': total_questions,
'total_responses': total_responses,
'current_question_index': current_index,
'completion_percentage': (total_responses / total_questions * 100) if total_questions > 0 else 0
}
def reset_progress(self):
"""Reset the current question index to start over"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('UPDATE session_metadata SET current_question_index = 0 WHERE id = 1')
conn.commit()
conn.close()