-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathQT_BC_3.0.py
More file actions
300 lines (239 loc) · 11.9 KB
/
QT_BC_3.0.py
File metadata and controls
300 lines (239 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
import math
import sys
def readpoints(infile):
point_list = []
n = 0
with open(infile) as file:
for line in file:
n += 1
line = line.strip().split()
# Files with index column starting with "point"
if line[0].lower().startswith("point"):
point_list.append((line[0].replace("p", "P", 1), *[float(value) for value in line[1:]]))
# For files with no index column
else:
point_list.append((f"Point{1+n}", *[float(value) for value in line]))
return point_list
def euclidean_dist(pointA, pointB):
"""Takes two points as input and calculates the euclidean distance between them"""
coords1 = pointA[1:]
coords2 = pointB[1:]
# Verify both points have the same number of dimensions
if len(coords1) != len(coords2):
raise ValueError("Points must have the same number of dimensions")
# Calculate square sum of differences
square_sum = sum((coords2[i] - coords1[i]) ** 2 for i in range(len(coords1)))
# Get the euclidean distance
distance = math.sqrt(square_sum)
return distance
def generate_all_candidate_clusters(point_data, distance_matrix, threshold):
"""
Generates all possible candidate clusters for every point as a center.
Returns a list of lists where each inner list contains the indices of points in a cluster.
The first point in each inner list is always the center point.
"""
all_candidates = []
# For each potential center point
for center_idx in range(len(point_data)):
# Start with the center point
cluster = [center_idx]
# Make a list of all other points
available_points = [i for i in range(len(point_data)) if i != center_idx]
# Continue adding points as long as possible
while available_points:
best_point = None
best_max_dist = float("inf")
# Try each available point
for point_idx in available_points:
# Create a temporary cluster with this point added
temp_cluster = cluster + [point_idx]
# Calculate maximum distance in this temporary cluster
max_dist = 0
for i in range(len(temp_cluster)):
for j in range(i + 1, len(temp_cluster)):
a, b = temp_cluster[i], temp_cluster[j]
key = (a, b) if a < b else (b, a)
dist = distance_matrix[key]
max_dist = max(max_dist, dist)
# If this point keeps the cluster within threshold and has the smallest diameter
if max_dist <= threshold and max_dist < best_max_dist:
best_point = point_idx
best_max_dist = max_dist
# If we found a point to add, add it and remove from available points
if best_point is not None:
cluster.append(best_point)
available_points.remove(best_point)
else:
# If no point can be added without exceeding threshold, we're done
break
# Add this candidate cluster to our list of all candidates
all_candidates.append(cluster)
return all_candidates
def find_best_cluster(candidate_clusters, distance_matrix):
"""
Finds the best cluster from the list of candidate clusters.
The best cluster is the one with the most points.
In case of a tie, the function will return the cluster with the minimum diameter.
"""
if not candidate_clusters:
return []
# Find the maximum number of points in any cluster
max_points = max(len(cluster) for cluster in candidate_clusters)
# Get all clusters with the maximum number of points
largest_clusters = [cluster for cluster in candidate_clusters if len(cluster) == max_points]
# If there's only one largest cluster, return it
if len(largest_clusters) == 1:
return largest_clusters[0]
# Otherwise, find the one with minimum diameter
best_cluster = None
min_diameter = float("inf")
for cluster in largest_clusters:
# Calculate the maximum distance (diameter) within this cluster
cluster_diameter = 0
for i in range(len(cluster)):
for j in range(i + 1, len(cluster)):
a, b = cluster[i], cluster[j]
key = (a, b) if a < b else (b, a)
dist = distance_matrix[key]
cluster_diameter = max(cluster_diameter, dist)
# Update best cluster if this one has a smaller diameter
if cluster_diameter < min_diameter:
min_diameter = cluster_diameter
best_cluster = cluster
return best_cluster
def update_candidate_clusters(candidate_clusters, best_cluster, distance_matrix, threshold, used_points):
"""
Updates the list of candidate clusters by:
1. Removing the best cluster itself
2. Removing clusters whose center point is in the best cluster
3. Regenerating clusters for center points that are not in the best cluster
but whose original clusters contained points from the best cluster
Returns the updated list of candidate clusters
"""
# Create a set of points that are in the best cluster for faster lookups
best_cluster_points = set(best_cluster)
# Add best cluster points to the overall used points set
used_points.update(best_cluster_points)
# Create a list to store updated candidate clusters
updated_candidates = []
# Get total number of points in the data (assuming points are indexed from 0 to n-1)
# Instead of trying to calculate this, we'll pass it from the calling function
total_points = max(max(cluster) for cluster in candidate_clusters) + 1
# Get all available points (points not in any best cluster yet)
available_points = [i for i in range(total_points) if i not in used_points]
# For each original cluster
for cluster in candidate_clusters:
center_point = cluster[0]
# Skip this cluster if its center is in the best cluster
if center_point in best_cluster_points:
continue
# Check if this center point is still available
if center_point not in used_points:
# Regenerate the cluster for this center using only available points
regenerated_points = available_points + [center_point] # Center is always included
###CREO QUE NO QUEREMOS AQUI INCLUIRLO???? => lo quita luego
# Start with the center point
new_cluster = [center_point]
points_to_try = [p for p in regenerated_points if p != center_point]
# Continue adding points as long as possible
while points_to_try:
best_point = None
best_max_dist = float("inf")
# Try each available point
for point_idx in points_to_try:
# Create a temporary cluster with this point added
temp_cluster = new_cluster + [point_idx]
# Calculate maximum distance in this temporary cluster
max_dist = 0
valid_cluster = True
for i in range(len(temp_cluster)):
for j in range(i + 1, len(temp_cluster)):
a, b = temp_cluster[i], temp_cluster[j]
# Ensure we're using the correct key format for the distance matrix
key = (min(a, b), max(a, b))
##ES LO MISMO???
# key = (p1, p2) if p1 < p2 else (p2, p1)
# Check if the key exists in the distance matrix
if key not in distance_matrix:
valid_cluster = False
break
dist = distance_matrix[key]
max_dist = max(max_dist, dist)
if not valid_cluster:
break
# If this point keeps the cluster within threshold and has the smallest diameter
if valid_cluster and max_dist <= threshold and max_dist < best_max_dist:
best_point = point_idx
best_max_dist = max_dist
# If we found a point to add, add it and remove from points to try
if best_point is not None:
new_cluster.append(best_point)
points_to_try.remove(best_point)
else:
# If no point can be added without exceeding threshold, we're done
break
# Only add if the new cluster has at least 2 points
if len(new_cluster) >= 2:
updated_candidates.append(new_cluster)
return updated_candidates
def optimized_qt_clustering(point_data, distance_matrix, threshold):
"""
Optimized QT clustering algorithm that avoids recalculating all candidate clusters.
Pre-computes all possible candidate clusters and then repeatedly finds and removes the best cluster.
"""
# Generate all possible candidate clusters
candidate_clusters = generate_all_candidate_clusters(point_data, distance_matrix, threshold)
# Keep track of used points and final clusters
used_points = set()
final_clusters = []
# Continue until all points are used or no more valid clusters can be formed
while candidate_clusters and len(used_points) < len(point_data):
# Find the best cluster
best = find_best_cluster(candidate_clusters, distance_matrix)
if not best:
break
# Add this cluster to final results
final_clusters.append(best)
# Update the candidate clusters with available points
candidate_clusters = update_candidate_clusters(
candidate_clusters,
best,
distance_matrix,
threshold,
used_points
)
# Add any remaining points as single-point clusters
for i in range(len(point_data)):
if i not in used_points:
final_clusters.append([i])
return final_clusters
# Main execution
if __name__ == "__main__":
# Parse command line arguments
if len(sys.argv) == 2:
infile = sys.argv[1]
else:
infile = "data/point1000.lst"
print(f"Reading points from: {infile}")
# Load data
point_data = readpoints(infile)
# Calculate distances between all points
distance_matrix = {}
for i in range(len(point_data)):
for j in range(i + 1, len(point_data)):
dist = euclidean_dist(point_data[i], point_data[j])
distance_matrix[(i, j)] = dist
# Find maximum distance (diameter)
max_distance = max(distance_matrix.values())
# Set quality threshold to 30% of maximum distance
quality_threshold = 0.3 * max_distance
print(f"Maximum distance: {max_distance}")
print(f"Quality Threshold (30% of diameter): {quality_threshold}")
# Run the optimized QT clustering algorithm
clusters = optimized_qt_clustering(point_data, distance_matrix, quality_threshold)
# Display results
print(f"\nFound {len(clusters)} clusters:")
for i, cluster in enumerate(clusters):
# Convert indices to point names
point_names = [point_data[idx][0] for idx in cluster]
print(f"Cluster {i+1}: {len(cluster)} points - {point_names}")