-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathClassifier.py
More file actions
229 lines (184 loc) · 10.2 KB
/
Copy pathClassifier.py
File metadata and controls
229 lines (184 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
from ollama import chat
from ollama import ChatResponse
import re
import pandas as pd
import sys
import time
# MODEL = 'phi4'
MODEL = 'llama3.1:8b'
INPUT_FILE = 'data.csv'
RUN = 6
SAMPLE_SIZE = 400
OUTPUT_FILE_NAME = f"results_{MODEL}_{RUN}"
"""
A script to classify tweets based on the Myers-Briggs Personality Type scale. You need Ollama installed locally to run this script.
"""
# def generate(prompt, data):
# #
# """
# Give the model a task and content, generate a response from the model and return the result.
# """
# response: ChatResponse = chat(model=MODEL, stream=False, messages=[
# {
# 'role': 'user',
# 'content': f"{prompt}{data}"
# },
# ])
# return strip_think_tags(response.message.content)
def generate(prompt, data):
"""
Give the model a task and content, generate a response from the model, and return the result.
Handles cases where data is too long by splitting it into multiple messages.
"""
MAX_TOKENS = 2048 # The token limit for the model
PROMPT_TOKEN_ESTIMATE = 50 # Estimated tokens for the prompt
CHUNK_SIZE = MAX_TOKENS - PROMPT_TOKEN_ESTIMATE - 50 # Leave room for response and tags
# Function to split data into chunks
def split_into_chunks(data, chunk_size):
return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
# Split data into manageable chunks
data_chunks = split_into_chunks(data, CHUNK_SIZE)
# Construct the messages thread
messages = [{'role': 'system', 'content': 'You are an expert personality classifier based on the Myers-Briggs Personality Type scale. Your task is to analyze text input and classify personality traits accurately. Do not provide explanations, context, or additional text—respond with the acronym only.'}] # Optional system message
messages.append({'role': 'user', 'content': prompt})
for chunk in data_chunks:
messages.append({'role': 'user', 'content': chunk})
# Generate response
response: ChatResponse = chat(model=MODEL, stream=False, messages=messages)
return strip_think_tags(response.message.content)
def strip_think_tags(response_text):
"""
Strips everything between <think> and </think> tags from the response if using DeepSeek
"""
stripped = re.sub(r"<think>.*?</think>", "", response_text, flags=re.DOTALL).strip()
return stripped
def load_csv(file_path):
"""
Load a CSV file and return a list of dictionaries.
"""
df = pd.read_csv(file_path)
return df
def classify(data):
"""
Classify the data using the model.
"""
# use the generate function to classify the data into the MTBI categories
mbti_regex = r'\b(?:INTJ|INTP|ENTJ|ENTP|INFJ|INFP|ENFJ|ENFP|ISTJ|ISFJ|ESTJ|ESFJ|ISTP|ISFP|ESTP|ESFP)\b'
prompt = f"Predict what this person’s personality type using the Myers-Briggs Personality Type scale. In your response, provide only the acronym of the personality type you think the person is. Again, only provide a four letter response based on Myers-Briggs. Here are their last 50 Tweets:"
cleaned_data = re.sub(r'http[s]?://\S+', '', data)
response = generate(prompt, cleaned_data).strip()
match = re.search(mbti_regex, response)
return match.group(0) if match else "UNKNOWN"
def classify_by_preference(data):
"""
Classify the data using by preference.
"""
# use the generate function to classify the data into the MTBI categories
mbti_regex = r'^[IENSTFJP]$'
#classify I vs E
# prompt = f"Predict whether this person has a preference for Introversion or Extroversion using Myers-Briggs Personality Type scale. Respond only with an I for Introversion or E for Extroversion. Again only provide I or E. Here are their last 50 Tweets:"
prompt = f"Predict if this person is an Extravert (E) or Introvert (I) using the Myers-Briggs Personality Type scale. Provide only a single letter: E for Extraversion or I for Introversion. Do not include any additional explanation, context, or text. Here are their last 50 Tweets:"
response = generate(prompt, data).strip()
print(response)
match = re.search(mbti_regex, response)
IvE = match.group(0) if match else "-"
# prompt = f"Predict whether this person has a preference for Intuition or Sensing using Myers-Briggs Personality Type scale. Respond only with an N for Intuition or S for Sensing. Again only provide N or S. Here are their last 50 Tweets:"
# prompt = f"Predict if this person is an Intuitive (N) or Sensing (S) type using the Myers-Briggs Personality Type scale. An Intuitive (N) type prefers abstract, big-picture thinking, focusing on ideas and future possibilities. A Sensing (S) type relies on concrete, detail-oriented thinking and prefers practical, present-focused information. Provide only N for Intuitive or S for Sensing. Only respond with one letter. Here are their last 50 Tweets:"
prompt = f"Predict if this person is an Intuitive (N) or Sensing (S) type using the Myers-Briggs Personality Type scale. Provide only a single letter: N for Intuitive or S for Sensing. Do not include any additional explanation, context, or text. Here are their last 50 Tweets:"
response = generate(prompt, data).strip()
print(response)
match = re.search(mbti_regex, response)
NvS = match.group(0) if match else "-"
# prompt = f"Predict whether this person has a preference for Thinking or Feeling using Myers-Briggs Personality Type scale. Respond only with a T for Thinking or F for Feeling. Again only provide T or F. Here are their last 50 Tweets:"
# prompt = f"Predict if this person is a Thinking (T) or Feeling (F) type using the Myers-Briggs Personality Type scale. A Thinking (T) type makes decisions based on logic, objective analysis, and fairness. A Feeling (F) type makes decisions based on empathy, personal values, and harmony. Provide only T for Thinking or F for Feeling. Only respond with one letter. Here are their last 50 Tweets:"
prompt = f"Predict if this person is a Thinking (T) or Feeling (F) type using the Myers-Briggs Personality Type scale. Provide only a single letter: T for Thinking or F for Feeling. Do not include any additional explanation, context, or text. Here are their last 50 Tweets:"
response = generate(prompt, data).strip()
print(response)
match = re.search(mbti_regex, response)
TvF = match.group(0) if match else "-"
# prompt = f"Predict whether this person has a preference for Judging or Perceiving using Myers-Briggs Personality Type scale. Respond only with a J for Judging or P for Perceiving. Again only provide J or P. Here are their last 50 Tweets:"
prompt = f"Predict if this person is a Judging (J) or Perceiving (P) type using the Myers-Briggs Personality Type scale. Provide only a single letter: J for Judging or P for Perceiving. Do not include any additional explanation, context, or text. Here are their last 50 Tweets:"
response = generate(prompt, data).strip()
print(response)
match = re.search(mbti_regex, response)
JvP = match.group(0) if match else "-"
return IvE + NvS + TvF + JvP
def create_classification_csv(input_csv, output_csv):
"""
Classifies data from the input CSV, compares predictions with known types,
and saves results to a new CSV file.
Args:
input_csv (str): Path to the input CSV file.
output_csv (str): Path to save the output CSV file.
"""
# df = pd.read_csv(input_csv)
df = pd.read_csv(input_csv)
sampled_df = df.sample(n=SAMPLE_SIZE)
# sampled_df = df.sample(n=SAMPLE_SIZE, random_state=42)
predictions = []
for index, row in sampled_df.iterrows():
# print(row['posts'])
predicted_type = classify(row['posts']) # classify function assumed to exist
# predicted_type = classify_by_preference(row['posts'])
print(f"Index: {index}, Type: {row['type']}, Predicted: {predicted_type}")
is_correct = row['type'] == predicted_type
EvI = row['type'][0] == predicted_type[0]
SvN = row['type'][1] == predicted_type[1]
TvF = row['type'][2] == predicted_type[2]
JvP = row['type'][3] == predicted_type[3]
predictions.append({
'Index': index,
'Type': row['type'],
'Prediction': predicted_type,
'Correct': is_correct,
'EvI': EvI,
'SvN': SvN,
'TvF': TvF,
'JvP': JvP,
'Model': MODEL,
'Posts': row['posts']
})
# time.sleep(30)
# Create a new DataFrame from the predictions and save to a CSV
results_df = pd.DataFrame(predictions)
results_df.to_csv(output_csv, index=False)
def calculate_accuracy(input_csv):
"""
Calculate the accuracy of the model on the input CSV data.
Args:
input_csv (str): Path to the input CSV file.
Returns:
float: The accuracy of the model as a percentage.
"""
df = pd.read_csv(input_csv)
correct_predictions = df['Correct'].sum()
correct_EvI = df['EvI'].sum()
correct_SvN = df['SvN'].sum()
correct_TvF = df['TvF'].sum()
correct_JvP = df['JvP'].sum()
total_predictions = len(df)
total_accuracy = correct_predictions / total_predictions * 100
EvI_accuracy = correct_EvI / total_predictions * 100
SvN_accuracy = correct_SvN / total_predictions * 100
TvF_accuracy = correct_TvF / total_predictions * 100
JvP_accuracy = correct_JvP / total_predictions * 100
print(f"Total Accuracy: {total_accuracy:.2f}%")
print(f"EvI Accuracy: {EvI_accuracy:.2f}%")
print(f"SvN Accuracy: {SvN_accuracy:.2f}%")
print(f"TvF Accuracy: {TvF_accuracy:.2f}%")
print(f"JvP Accuracy: {JvP_accuracy:.2f}%")
# df = load_csv('data.csv')
# # print(df.loc[1, 'posts'])
# print(f"Known Type: {df.loc[2, 'type']}")
# print(classify(df.loc[2, 'posts']))
# # print(df.info())
start_time = time.time() # Record the start time
create_classification_csv('data.csv', OUTPUT_FILE_NAME+'.csv')
results_summary = OUTPUT_FILE_NAME+'_summary.txt'
end_time = time.time() # Record the end time
with open(results_summary, 'w') as f:
sys.stdout = f
calculate_accuracy(OUTPUT_FILE_NAME + '.csv')
sys.stdout = sys.__stdout__
elapsed_time = end_time - start_time # Calculate the elapsed time
print(f"Elapsed Time: {elapsed_time:.2f} seconds")