-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodel.py
More file actions
179 lines (142 loc) · 7.43 KB
/
model.py
File metadata and controls
179 lines (142 loc) · 7.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from transformers import AutoTokenizer, AutoModelForCausalLM
import google.generativeai as genai
import os
import logging
import time
import re
# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class MSG:
def __init__(self, summarizing_model_name, scoring_model_name, device):
logger.info("Initializing MSG class")
start_time = time.time()
self.summarizing_model_name = summarizing_model_name
self.scoring_model_name = scoring_model_name
if self.summarizing_model_name.startswith('gemma'):
try:
# Load model and tokenizer
logger.info(f"Loading model and tokenizer for {self.summarizing_model_name}, on {device}")
self.tokenizer = AutoTokenizer.from_pretrained(self.summarizing_model_name)
logger.debug(f"Tokenizer loaded: {self.tokenizer.__class__.__name__}")
self.model = AutoModelForCausalLM.from_pretrained(self.summarizing_model_name).to(device)
logger.debug(f"Model loaded: {self.model.__class__.__name__}")
except Exception as e:
logger.exception(f"An error occurred: {str(e)}")
finally:
total_time = time.time() - start_time
logger.info(f"ModelHandler initialized in {total_time:.2f} seconds")
elif self.summarizing_model_name.startswith('gemini'):
try:
logger.info(f"Authorizing API for {self.summarizing_model_name}")
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
except Exception as e:
logger.exception(f"An error occurred: {str(e)}")
finally:
total_time = time.time() - start_time
logger.info(f"Gemini API initialized in {total_time:.2f} seconds")
def summarize(self, text_input):
logger.info("Starting summarizing function")
start_time = time.time()
if self.summarizing_model_name.startswith('gemma'):
try:
# Prepare chat and prompt
logger.debug("Preparing chat and prompt")
chat = [
{ "role": "user", "content": f"Please summarize the following html source in three lines or less.:{text_input}" },
]
prompt = self.tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
logger.debug(f"Prompt created.")
# Tokenize input
logger.debug("Tokenizing input")
input_ids = self.tokenizer(prompt, return_tensors="pt")
logger.debug(f"Input tokenized. Shape: {input_ids.input_ids.shape}")
# Generate output
logger.info("Generating summary")
generation_start_time = time.time()
outputs = self.model.generate(
**input_ids,
max_length=2000,
do_sample=True,
top_p=0.95,
temperature=0.7,
repetition_penalty=1.1,
)
generation_time = time.time() - generation_start_time
logger.info(f"Summary generated in {generation_time:.2f} seconds")
# Decode output
logger.debug("Decoding output")
raw_summary = self.tokenizer.decode(outputs[0])
logger.debug(f"Raw summary length: {len(raw_summary)} characters")
# Extract summary using regex
logger.debug("Extracting summary")
pattern = r'<start_of_turn>model\s*(.*?)\s*<end_of_turn>'
match = re.search(pattern, raw_summary, re.DOTALL)
if match:
self.summary = match.group(1).strip()
logger.info("Summary successfully extracted")
logger.info(f"Summary:\n{self.summary}")
else:
logger.error("Failed to extract summary from model output")
print("Error!")
except Exception as e:
logger.exception(f"An error occurred: {str(e)}")
finally:
total_time = time.time() - start_time
logger.info(f"Summary function completed in {total_time:.2f} seconds")
return self.summary
elif self.summarizing_model_name.startswith('gemini'):
try:
self.summarizing_model = genai.GenerativeModel(self.summaring_model)
prompt = f"Please summarize the following html source in three lines or less.: {text_input}"
response = self.summarizing_model.generate_content(prompt)
self.summary = response.text
logger.info("Summary successfully extracted")
except Exception as e:
logger.exception(f"An error occurred: {str(e)}")
finally:
total_time = time.time() - start_time
logger.info(f"Summary function completed in {total_time:.2f} seconds")
return self.summary
def scoring(self, topics):
logger.info("Start scoring function")
start_time = time.time()
try:
generation_start_time = time.time()
self.scoring_model = genai.GenerativeModel(self.scoring_model_name)
prompt = """Compare given sentences and topics and score the sentences between 1~100.
The criteria are as following:
- Is the sentence relevant to at least 1 topic among the given topics?
- Is there any topic well-representing the sentence?
- Is the sentence properly matches at least 1 topic?
Answer in this format: <score>put_score_in_integer</score><reason>put_reason_in_2_or_3_sentences</reason>"
Sentences: """ + self.summary + '\nTopics: ' + topics
response = self.scoring_model.generate_content(prompt)
generation_time = time.time() - generation_start_time
logger.info(f"Score and reason generated in {generation_time:.2f} seconds")
# Extract score & reason using regex
logger.debug("Processing..")
score_pattern = r'<score>(.*?)</score>'
reason_pattern = r'<reason>(.*?)</reason>'
score_match = re.search(score_pattern, response.text)
reason_match = re.search(reason_pattern, response.text, re.DOTALL)
if score_match:
self.score = score_match.group(1)
logger.info("Score successfully extracted")
logger.info(f"Score: {self.score}")
else:
self.score = None
logger.error("Failed to extract score from model output")
if reason_match:
self.reason = reason_match.group(1)
logger.info("Reason successfully extracted")
logger.info(f"Reason: {self.reason}")
else:
self.reason = None
logger.error("Failed to extract reason from model output")
except Exception as e:
logger.exception(f"An error occurred: {str(e)}")
finally:
total_time = time.time() - start_time
logger.info(f"Scoring function completed in {total_time:.2f} seconds")
return self.score, self.reason