-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathQT_BC.py
More file actions
208 lines (167 loc) · 7.86 KB
/
QT_BC.py
File metadata and controls
208 lines (167 loc) · 7.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import math
import sys
def readpoints(infile):
point_list = []
n = 0
with open(infile) as file:
for line in file:
n += 1
line = line.strip().split()
# Files with index column starting with "point"
if line[0].lower().startswith("point"):
point_list.append((line[0].replace("p", "P", 1), *[float(value) for value in line[1:]]))
# For files with no index column
else:
point_list.append((f"Point{1+n}", *[float(value) for value in line]))
return point_list
def euclidean_dist(pointA, pointB):
"""Takes two points as input and calculates the euclidean distance between them"""
coords1 = pointA[1:]
coords2 = pointB[1:]
# Verify both points have the same number of dimensions
if len(coords1) != len(coords2):
raise ValueError("Points must have the same number of dimensions")
# Calculate square sum of differences
square_sum = sum((coords2[i] - coords1[i]) ** 2 for i in range(len(coords1)))
# Get the euclidean distance
distance = math.sqrt(square_sum)
return distance
def generate_all_candidate_clusters(point_data, distance_matrix, threshold):
"""
Generates all possible candidate clusters for every point as a center.
Returns a list of lists where each inner list contains the indices of points in a cluster.
The first point in each inner list is always the center point.
"""
all_candidates = []
# For each potential center point
for center_idx in range(len(point_data)):
# Start with the center point
cluster = [center_idx]
# Make a list of all other points
available_points = [i for i in range(len(point_data)) if i != center_idx]
# Continue adding points as long as possible
while available_points:
best_point = None
best_max_dist = float("inf")
# Try each available point
for point_idx in available_points:
# Create a temporary cluster with this point added
temp_cluster = cluster + [point_idx]
# Calculate maximum distance in this temporary cluster
max_dist = 0
for i in range(len(temp_cluster)):
for j in range(i + 1, len(temp_cluster)):
a, b = temp_cluster[i], temp_cluster[j]
key = (a, b) if a < b else (b, a)
dist = distance_matrix[key]
max_dist = max(max_dist, dist)
# If this point keeps the cluster within threshold and has the smallest diameter
if max_dist <= threshold and max_dist < best_max_dist:
best_point = point_idx
best_max_dist = max_dist
# If we found a point to add, add it and remove from available points
if best_point is not None:
cluster.append(best_point)
available_points.remove(best_point)
else:
# If no point can be added without exceeding threshold, we're done
break
# Add this candidate cluster to our list of all candidates
if len(cluster) >= 2: # Only include clusters with at least 2 points
all_candidates.append(cluster)
return all_candidates
def find_best_cluster(candidate_clusters):
"""
Finds the best cluster from the list of candidate clusters.
The best cluster is the one with the most points.
In case of a tie, the function will return the first one found.
"""
if not candidate_clusters:
return []
best_cluster = max(candidate_clusters, key=len)
return best_cluster
def update_candidate_clusters(candidate_clusters, best_cluster):
"""
Updates the list of candidate clusters by:
1. Removing the best cluster itself
2. Removing clusters whose center point is in the best cluster
3. Recalculating and updating clusters that contain any point from the best cluster
Returns the updated list of candidate clusters
"""
# Create a set of points that are in the best cluster for faster lookups
best_cluster_points = set(best_cluster)
# Create a list to store clusters to keep or update
updated_candidates = []
for cluster in candidate_clusters:
center_point = cluster[0]
# Skip this cluster if it's the best cluster itself or its center is in the best cluster
if center_point in best_cluster_points:
continue
# Check if any point in this cluster is in the best cluster
if any(point in best_cluster_points for point in cluster):
# This cluster needs to be updated - filter out points in the best cluster
updated_cluster = [point for point in cluster if point not in best_cluster_points]
# Only add if the updated cluster has at least 2 points (including center)
if len(updated_cluster) >= 2:
updated_candidates.append(updated_cluster)
else:
# This cluster doesn't contain any points from the best cluster, keep it as is
updated_candidates.append(cluster)
return updated_candidates
def optimized_qt_clustering(point_data, distance_matrix, threshold):
"""
Optimized QT clustering algorithm that avoids recalculating all candidate clusters.
Pre-computes all candidate clusters and then repeatedly finds and removes the best cluster.
"""
# Generate all possible candidate clusters
candidate_clusters = generate_all_candidate_clusters(point_data, distance_matrix, threshold)
# Keep track of used points and final clusters
used_points = set()
final_clusters = []
# Continue until all points are used or no more valid clusters can be formed
while candidate_clusters and len(used_points) < len(point_data):
# Find the best cluster
best = find_best_cluster(candidate_clusters)
if not best:
break
# Add this cluster to final results
final_clusters.append(best)
# Update the set of used points
used_points.update(best)
# Update the candidate clusters
candidate_clusters = update_candidate_clusters(candidate_clusters, best)
# Add any remaining points as single-point clusters
for i in range(len(point_data)):
if i not in used_points:
final_clusters.append([i])
return final_clusters
# Main execution
if __name__ == "__main__":
# Parse command line arguments
if len(sys.argv) == 2:
infile = sys.argv[1]
else:
infile = "data/point100.lst"
print(f"Reading points from: {infile}")
# Load data
point_data = readpoints(infile)
# Calculate distances between all points
distance_matrix = {}
for i in range(len(point_data)):
for j in range(i + 1, len(point_data)):
dist = euclidean_dist(point_data[i], point_data[j])
distance_matrix[(i, j)] = dist
# Find maximum distance (diameter)
max_distance = max(distance_matrix.values())
# Set quality threshold to 30% of maximum distance
quality_threshold = 0.3 * max_distance
print(f"Maximum distance: {max_distance}")
print(f"Quality Threshold (30% of diameter): {quality_threshold}")
# Run the optimized QT clustering algorithm
clusters = optimized_qt_clustering(point_data, distance_matrix, quality_threshold)
# Display results
print(f"\nFound {len(clusters)} clusters:")
for i, cluster in enumerate(clusters):
# Convert indices to point names
point_names = [point_data[idx][0] for idx in cluster]
print(f"Cluster {i+1}: {len(cluster)} points - {point_names}")