-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathQT_DM.py
More file actions
245 lines (201 loc) · 9.23 KB
/
QT_DM.py
File metadata and controls
245 lines (201 loc) · 9.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
##16:52 min
import math
import sys
def readpoints(infile):
point_list = []
n = 0
with open(infile) as file:
for line in file:
n += 1
line = line.strip().split()
# Files with index column starting with "point"
if line[0].lower().startswith("point"):
point_list.append((line[0].replace("p", "P", 1), *[float(value) for value in line[1:]]))
# For files with no index column
else:
point_list.append((f"Point{1+n}", *[float(value) for value in line]))
return point_list
def euclidean_dist(pointA, pointB):
"""Takes two points as input and calculates the euclidean distance between them"""
coords1 = pointA[1:]
coords2 = pointB[1:]
# Verify both points have the same number of dimensions
if len(coords1) != len(coords2):
raise ValueError("Points must have the same number of dimensions")
# Calculate square sum of differences
square_sum = sum((coords2[i] - coords1[i]) ** 2 for i in range(len(coords1)))
# Get the euclidean distance
distance = math.sqrt(square_sum)
return distance
def create_filtered_distance_matrix(point_data, quality_threshold):
"""
Calculate distances between all points and filter out pairs that exceed
the quality threshold.
Returns:
- distance_matrix: Dictionary with only distances <= quality_threshold
- point_neighbors: Dictionary mapping each point to its potential neighbors
"""
distance_matrix = {}
point_neighbors = {i: set() for i in range(len(point_data))}
for i in range(len(point_data)):
for j in range(i + 1, len(point_data)):
dist = euclidean_dist(point_data[i], point_data[j])
# Only keep distances that are within the threshold
if dist <= quality_threshold:
distance_matrix[(i, j)] = dist
# Add to the neighbors list for both points
point_neighbors[i].add(j)
point_neighbors[j].add(i)
return distance_matrix, point_neighbors
def candidate_cluster(center_idx, point_data, distance_matrix, threshold, used_set, point_neighbors):
"""
Forms a candidate cluster starting from center_idx.
Only considers points that are neighbors (within threshold distance).
"""
# Start with the center point
cluster = [center_idx]
# Make a list of all unused neighbors of the center point
available_points = [i for i in point_neighbors[center_idx] if i not in used_set]
# Continue adding points as long as possible
while available_points:
best_point = None
best_max_dist = float("inf")
# Try each available point
for point_idx in available_points:
# Check if this point is a neighbor of all points in the current cluster
valid_point = True
for existing_point in cluster:
key = (existing_point, point_idx) if existing_point < point_idx else (point_idx, existing_point)
if key not in distance_matrix:
valid_point = False
break
if not valid_point:
continue
# Create a temporary cluster with this point added
temp_cluster = cluster + [point_idx]
# Calculate maximum distance in this temporary cluster
max_dist = 0
for i in range(len(temp_cluster)):
for j in range(i + 1, len(temp_cluster)):
a, b = temp_cluster[i], temp_cluster[j]
key = (a, b) if a < b else (b, a)
dist = distance_matrix[key]
max_dist = max(max_dist, dist)
# If this point keeps the cluster within threshold and has the smallest diameter
if max_dist <= threshold and max_dist < best_max_dist:
best_point = point_idx
best_max_dist = max_dist
# If we found a point to add, add it and remove from available points
if best_point is not None:
cluster.append(best_point)
available_points.remove(best_point)
# Update available points to only include neighbors of all cluster points
new_available = []
for point in available_points:
valid = True
for cluster_point in cluster:
key = (cluster_point, point) if cluster_point < point else (point, cluster_point)
if key not in distance_matrix:
valid = False
break
if valid:
new_available.append(point)
available_points = new_available
else:
# If no point can be added without exceeding threshold, we're done
break
return cluster
def best_cluster(point_data, distance_matrix, threshold, used_set, point_neighbors):
"""
Tries to form a cluster from each unused point.
Returns the best cluster found: the one with the most points (and tightest if tied).
Only considers points that have potential neighbors.
"""
best_cluster = []
best_diameter = float("inf")
# Try to form a candidate cluster from every unused point
for i in range(len(point_data)):
if i in used_set:
continue # Skip if point has already been used
# Skip points that don't have enough neighbors to form a cluster
if len(point_neighbors[i] - used_set) == 0:
continue
# Form a candidate cluster starting from point i
cluster = candidate_cluster(i, point_data, distance_matrix, threshold, used_set, point_neighbors)
# Only consider clusters that have more than 1 point
if len(cluster) < 2:
continue
# Compute the diameter of this cluster
max_dist = 0
for a in range(len(cluster)):
for b in range(a + 1, len(cluster)):
p1, p2 = cluster[a], cluster[b]
key = (p1, p2) if p1 < p2 else (p2, p1)
d = distance_matrix[key]
max_dist = max(max_dist, d)
# Compare this cluster with the current best one
if (len(cluster) > len(best_cluster)) or (len(cluster) == len(best_cluster) and max_dist < best_diameter):
best_cluster = cluster
best_diameter = max_dist
# Return after checking ALL potential clusters
return best_cluster
def qt_clustering(point_data, distance_matrix, threshold, point_neighbors):
"""
Main QT clustering algorithm.
Repeatedly finds and removes the best cluster until all points are used.
"""
used_points = set()
clusters = []
while len(used_points) < len(point_data):
# Find the best cluster from remaining points
best = best_cluster(point_data, distance_matrix, threshold, used_points, point_neighbors)
# If no valid cluster with at least 2 points could be formed
if len(best) < 2:
# Add remaining points as single-point clusters
for i in range(len(point_data)):
if i not in used_points:
clusters.append([i])
used_points.add(i)
break # We're done
else:
# Add this cluster and update used points
clusters.append(best)
used_points.update(best)
return clusters
# Main execution
if __name__ == "__main__":
# Parse command line arguments
if len(sys.argv) == 2:
infile = sys.argv[1]
else:
infile = "data/point1000.lst"
print(f"Reading points from: {infile}")
# Load data
point_data = readpoints(infile)
# Calculate ALL distances for finding the maximum distance
all_distances = {}
for i in range(len(point_data)):
for j in range(i + 1, len(point_data)):
dist = euclidean_dist(point_data[i], point_data[j])
all_distances[(i, j)] = dist
# Find maximum distance (diameter)
max_distance = max(all_distances.values())
# Set quality threshold to 30% of maximum distance
quality_threshold = 0.3 * max_distance
print(f"Maximum distance: {max_distance}")
print(f"Quality Threshold (30% of diameter): {quality_threshold}")
# Create a filtered distance matrix that only contains distances <= threshold
filtered_distance_matrix, point_neighbors = create_filtered_distance_matrix(
point_data, quality_threshold
)
print(f"Full distance matrix size: {len(all_distances)}")
print(f"Filtered distance matrix size: {len(filtered_distance_matrix)}")
print(f"Reduction: {100 * (1 - len(filtered_distance_matrix) / len(all_distances)):.2f}%")
# Run the QT clustering algorithm with the filtered matrix
clusters = qt_clustering(point_data, filtered_distance_matrix, quality_threshold, point_neighbors)
# Display results
print(f"\nFound {len(clusters)} clusters:")
for i, cluster in enumerate(clusters):
# Convert indices to point names
point_names = [point_data[idx][0] for idx in cluster]
print(f"Cluster {i+1}: {len(cluster)} points - {point_names}")