-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathretail.py
More file actions
1257 lines (1139 loc) · 51.4 KB
/
retail.py
File metadata and controls
1257 lines (1139 loc) · 51.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import argparse
import logging
import matplotlib
matplotlib.use('TkAgg')
import pyrealsense2 as rs
import numpy as np
import cv2
import os
import mediapipe as mp
import time
import datetime
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
import matplotlib.patches as mpatches
from openpyxl import Workbook, load_workbook
import queue
import threading
from collections import deque
##########################################################
# Global Cropping Size Settings (Method 1)
##########################################################
CROP_WIDTH = 640 # Fixed crop width
CROP_HEIGHT = 480 # Fixed crop height
##########################################################
# (A) Define log handler to store recent logs in a deque
##########################################################
MAX_LOG_LINES = 20 # Maximum number of log lines to save
log_deque = deque(maxlen=MAX_LOG_LINES)
class Cv2LogHandler(logging.Handler):
def emit(self, record):
msg = self.format(record)
log_deque.append(msg)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [%(levelname)s] - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
cv2_log_handler = Cv2LogHandler()
cv2_log_handler.setLevel(logging.INFO)
cv2_log_handler.setFormatter(
logging.Formatter('%(asctime)s - [%(levelname)s] - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
)
logging.getLogger().addHandler(cv2_log_handler)
##########################################################
# (B) Excel Writing Functions and Scheduler Thread
##########################################################
FLUSH_INTERVAL = 2.0 # Check every 2 seconds if Excel write is needed
FLUSH_BATCH_SIZE = 10 # Trigger write when the buffer reaches 10 records
excel_queue = queue.Queue()
flush_event = threading.Event()
stop_event = threading.Event()
def log_intersection_2d_async(face_id, plane_name, u, v, timestamp):
# Here u, v are already converted to feet
record = (timestamp, face_id, plane_name, u, v)
excel_queue.put(record)
if excel_queue.qsize() >= FLUSH_BATCH_SIZE:
flush_event.set()
def flush_excel_queue(xlsx_path):
temp_records = []
while not excel_queue.empty():
temp_records.append(excel_queue.get())
if len(temp_records) == 0:
return
headers = ["Timestamp", "Face_ID", "Plane_Name", "u_local", "v_local"]
file_exists = os.path.exists(xlsx_path)
try:
if file_exists:
wb = load_workbook(xlsx_path, keep_vba=True)
ws = wb.active
if ws.max_row == 0:
ws.append(headers)
else:
if ws['A1'].value != headers[0]:
ws.insert_rows(1)
for col_idx, val in enumerate(headers, start=1):
ws.cell(row=1, column=col_idx, value=val)
else:
wb = Workbook()
ws = wb.active
ws.append(headers)
for (ts_, face_id_, plane_name_, u_, v_) in temp_records:
ws.append([ts_, face_id_, plane_name_, u_, v_])
wb.save(xlsx_path)
logging.info(f"[Excel] Wrote {len(temp_records)} records to {xlsx_path}")
except Exception as e:
logging.error(f"Failed to write to Excel {xlsx_path}, error: {e}")
def flush_thread_func(xlsx_path):
while not stop_event.is_set():
triggered = flush_event.wait(timeout=FLUSH_INTERVAL)
flush_excel_queue(xlsx_path)
flush_event.clear()
flush_excel_queue(xlsx_path)
def get_max_face_id_from_xlsm(xlsx_path):
if not os.path.exists(xlsx_path):
logging.warning(f"Excel file not found: {xlsx_path}")
return -1
try:
wb = load_workbook(xlsx_path, keep_vba=True)
ws = wb.active
if ws.max_row < 2:
wb.close()
logging.warning("No valid data row in Excel (only header or empty).")
return -1
header_row = [cell.value for cell in ws[1]]
if "Face_ID" not in header_row:
wb.close()
logging.warning("Column 'Face_ID' not found in Excel.")
return -1
face_id_col_idx = header_row.index("Face_ID") + 1
max_id = -1
for row_idx in range(2, ws.max_row + 1):
val = ws.cell(row=row_idx, column=face_id_col_idx).value
if val is not None:
try:
face_id_int = int(val)
if face_id_int > max_id:
max_id = face_id_int
except ValueError:
logging.warning(f"Row {row_idx} Face_ID is not an integer: {val}")
wb.close()
return max_id
except Exception as e:
logging.error(f"Cannot read {xlsx_path}, error: {e}")
return -1
##########################################################
# Dummy implementation for ransac_weighted_kabsch
##########################################################
def ransac_weighted_kabsch(ref_points, det_points, weights, threshold, max_iterations):
total_weight = np.sum(weights)
ref_centroid = np.sum(ref_points * weights, axis=1, keepdims=True) / total_weight
det_centroid = np.sum(det_points * weights, axis=1, keepdims=True) / total_weight
ref_centered = ref_points - ref_centroid
det_centered = det_points - det_centroid
W = np.diag(weights)
covariance_matrix = ref_centered @ W @ det_centered.T
U, _, Vt = np.linalg.svd(covariance_matrix)
R = Vt.T @ U.T
if np.linalg.det(R) < 0:
Vt[-1, :] *= -1
R = Vt.T @ U.T
t = det_centroid - R @ ref_centroid
inliers = np.ones(det_points.shape[1], dtype=bool)
return (R, t), inliers
##########################################################
# Definition of KalmanBoxTracker and SORT Tracker
##########################################################
class KalmanBoxTracker:
count = 0
def __init__(self, bbox):
self.bbox = bbox
self.id = KalmanBoxTracker.count
KalmanBoxTracker.count += 1
self.hits = 1
self.age = 0
self.time_since_update = 0
def update(self, bbox):
if bbox is not None:
self.bbox = bbox
self.hits += 1
self.time_since_update = 0
else:
self.time_since_update += 1
self.age += 1
def get_state(self):
return self.bbox
class Sort:
def __init__(self, max_age=10, min_hits=3):
self.max_age = max_age
self.min_hits = min_hits
self.trackers = []
def update(self, dets):
matched, unmatched_dets, unmatched_trks = self.associate_detections_to_trackers(dets, self.trackers)
for m in matched:
self.trackers[m[1]].update(dets[m[0]])
for idx in unmatched_dets:
self.trackers.append(KalmanBoxTracker(dets[idx]))
for t_idx in unmatched_trks:
self.trackers[t_idx].update(None)
ret = []
for t in self.trackers[::-1]:
d = t.get_state()
if (t.hits >= self.min_hits) and (t.time_since_update < 1):
ret.append([d[0], d[1], d[2], d[3], t.id])
if t.time_since_update > self.max_age:
self.trackers.remove(t)
return ret
def associate_detections_to_trackers(self, dets, trackers):
if len(trackers) == 0:
return [], list(range(len(dets))), []
iou_matrix = np.zeros((len(dets), len(trackers)), dtype=np.float32)
for d, det in enumerate(dets):
for t, trk in enumerate(trackers):
iou_matrix[d, t] = self.iou(det, trk.get_state())
matched_indices = []
unmatched_dets = list(range(len(dets)))
unmatched_trks = list(range(len(trackers)))
for _ in range(min(len(dets), len(trackers))):
max_val = np.max(iou_matrix)
if max_val < 0.1:
break
d_ind, t_ind = np.unravel_index(np.argmax(iou_matrix), iou_matrix.shape)
matched_indices.append([d_ind, t_ind])
iou_matrix[d_ind, :] = -1
iou_matrix[:, t_ind] = -1
unmatched_dets.remove(d_ind)
unmatched_trks.remove(t_ind)
return matched_indices, unmatched_dets, unmatched_trks
def iou(self, bb_test, bb_gt):
xx1 = np.maximum(bb_test[0], bb_gt[0])
yy1 = np.maximum(bb_test[1], bb_gt[1])
xx2 = np.minimum(bb_test[2], bb_gt[2])
yy2 = np.minimum(bb_test[3], bb_gt[3])
w = np.maximum(0., xx2 - xx1)
h = np.maximum(0., yy2 - yy1)
wh = w * h
o = wh / (
(bb_test[2] - bb_test[0]) * (bb_test[3] - bb_test[1]) +
(bb_gt[2] - bb_gt[0]) * (bb_gt[3] - bb_gt[1]) - wh
)
return o
##########################################################
# Global Parameter Settings
##########################################################
XLSX_PATH = r"./log/intersection_log_3.xlsm"
IMAGE_FOLDER = "./data"
RVEC_PATH = "rotation_vectors.npy"
TVEC_PATH = "translation_vectors.npy"
CAMERA_MATRIX_PATH = "rgb_intrinsic_matrix.npy"
DIST_COEFFS_PATH = "dist_coeffs.npy"
FORWARD_LENGTH_3D = 0.8 # m
FORWARD_LENGTH_2D = 0.2 # m
X_LIMITS = (-10, 10) # ft
Y_LIMITS = (-16, 16) # ft
Z_LIMITS = (-10, 3) # ft
RANSAC_THRESHOLD = 0.01
RANSAC_MAX_ITERATIONS = 150
FRAMERATE = 30.0
FRAME_INTERVAL = 1.0 / FRAMERATE
MIN_DETECTION_CONFIDENCE = 0.5
MIN_TRACKING_CONFIDENCE = 0.5
REFERENCE_LANDMARK_INDICES = [4, 33, 133, 263, 362, 61, 291, 13]
STABLE_POINTS_SET = [4, 33, 133, 263, 362, 61, 291, 13]
SMOOTH_WINDOW_SIZE = 3
INITIALIZATION_FRAMES = 10
###############################################
# Unit Conversion Definitions: The original CUBES_INFO data is in feet
###############################################
FT_TO_M = 0.3048 # 1 ft = 0.3048 m
M_TO_FT = 3.28084 # 1 m ≈ 3.28084 ft
# Define CUBES_INFO (in feet); later these will be converted to meters for computations
CUBES_INFO = [
{
"name": "Plane1",
"color": "red",
"face_points": [
[4, 7, -5],
[10, 7, -5],
[4, 7, -4]
],
"depth_point": [0, 7.5, 0],
"invert_x": True,
"invert_y": True,
"record_face": 0
},
{
"name": "Plane2",
"color": "blue",
"face_points": [
[-1, 5, -1],
[4, 5, -1],
[-1, 5, 3]
],
"depth_point": [0, 6, 0],
"invert_x": True,
"invert_y": True,
"record_face": 0
},
{
"name": "Plane3",
"color": "green",
"face_points": [
[-4.5, 5, -2],
[-3, 5, -2],
[-4.5, 5, 3]
],
"depth_point": [0, 13, 0],
"invert_x": True,
"invert_y": True,
"record_face": 0
},
{
"name": "Plane4",
"color": "yellow",
"face_points": [
[-6.5, -2.5, -5],
[-6.5, 18.5, -5],
[-6.5, -2.5, 3]
],
"depth_point": [-6, 0, 0],
"invert_x": True,
"invert_y": True,
"record_face": 1
},
{
"name": "Plane5",
"color": "purple",
"face_points": [
[-6, 18.5, -5],
[8, 18.5, -5],
[-6, 18.5, 3]
],
"depth_point": [0, 19, 0],
"invert_x": True,
"invert_y": True,
"record_face": 0
},
{
"name": "Plane6",
"color": "brown",
"face_points": [
[5.5, 5, -0.5],
[8, 5, -0.5],
[5.5, 5, 3]
],
"depth_point": [0, 13, 0],
"invert_x": True,
"invert_y": True,
"record_face": 0
},
{
"name": "Plane7",
"color": "black",
"face_points": [
[10.5, -2.5, -5],
[10.5, 18, -5],
[10.5, -2.5, 3]
],
"depth_point": [10, 0, 0],
"invert_x": True,
"invert_y": False,
"record_face": 1
}
# ,
# {
# "name": "Wall1",
# "color": "cyan",
# "face_points": [
# [-1.75, 3, -3],
# [1.75, 3, -3],
# [-1.75, 3, 0]
# ],
# "depth_point": [0, 5, 0],
# "invert_x": True,
# "invert_y": True,
# "record_face": 0
# },
# {
# "name": "Wall2",
# "color": "red",
# "face_points": [
# [-2, -3, -3],
# [-2, 3, -3],
# [-2, -3, 0]
# ],
# "depth_point": [-1.75, 0, 0],
# "invert_x": True,
# "invert_y": True,
# "record_face": 1
# },
# {
# "name": "Wall3",
# "color": "green",
# "face_points": [
# [2, -3, -3],
# [2, 3, -3],
# [2, -3, 0]
# ],
# "depth_point": [1.75, 5, 0],
# "invert_x": True,
# "invert_y": False,
# "record_face": 1
# }
]
# At program start, convert all points in CUBES_INFO from feet to meters for internal calculations
for cube in CUBES_INFO:
cube["face_points"] = (np.array(cube["face_points"], dtype=float) * FT_TO_M).tolist()
cube["depth_point"] = (np.array(cube["depth_point"], dtype=float) * FT_TO_M).tolist()
##########################################################
# Improved plot_cube_with_index: Convert m back to ft before plotting (Method A)
##########################################################
def plot_cube_with_index(ax, face_points, depth_point, color='r', alpha=0.5):
# face_points and depth_point are in meters for internal computations
p0, p1, p2 = face_points
vec1 = p1 - p0
vec2 = p2 - p0
if np.allclose(np.cross(vec1, vec2), 0):
raise ValueError("Invalid face points (collinear).")
p3 = p1 + (p2 - p0)
front_face = [p0, p1, p3, p2]
normal = np.cross(vec1, vec2)
normal /= np.linalg.norm(normal)
dp_vec = depth_point - p0
depth = np.dot(dp_vec, normal)
offset = depth * normal
back_face = [v + offset for v in front_face]
side1 = [front_face[0], front_face[1], back_face[1], back_face[0]]
side2 = [front_face[1], front_face[2], back_face[2], back_face[1]]
side3 = [front_face[2], front_face[3], back_face[3], back_face[2]]
side4 = [front_face[3], front_face[0], back_face[0], back_face[3]]
cube_faces = []
cube_faces.append((0, front_face))
cube_faces.append((1, back_face))
cube_faces.append((2, side1))
cube_faces.append((3, side2))
cube_faces.append((4, side3))
cube_faces.append((5, side4))
if ax is not None:
# Method A: Convert all meter data to feet before plotting
poly3d_list = []
for idx, face in cube_faces:
face_ft = [(np.array(pt) * M_TO_FT).tolist() for pt in face]
poly3d_list.append(face_ft)
poly = Poly3DCollection(poly3d_list, facecolors=color, edgecolors='k', alpha=alpha)
ax.add_collection3d(poly)
return cube_faces
##########################################################
# New Function: Plot each vertex of the same face (four vertices) in different colors (converted to ft first)
##########################################################
def plot_face_points_with_colors(ax, face, colors=None, s=80):
if colors is None:
colors = ['red', 'green', 'blue', 'yellow']
face_ft = [(np.array(pt) * M_TO_FT).tolist() for pt in face]
for i, pt in enumerate(face_ft):
ax.scatter(pt[0], pt[1], pt[2], color=colors[i % len(colors)], s=s)
ax.text(pt[0], pt[1], pt[2], f'{i}', color='black', fontsize=10)
##########################################################
# Geometry / Detection Related Functions
##########################################################
def load_distortion_coeffs(dist_coeffs_path):
if not os.path.exists(dist_coeffs_path):
logging.warning(f"Distortion file not found: {dist_coeffs_path}. Assuming no distortion.")
return np.zeros((5, 1), dtype=np.float32)
return np.load(dist_coeffs_path).astype(np.float32)
def load_vectors(rvec_path, tvec_path):
if not os.path.exists(rvec_path):
raise FileNotFoundError(f"Rotation vector not found: {rvec_path}")
if not os.path.exists(tvec_path):
raise FileNotFoundError(f"Translation vector not found: {tvec_path}")
rvec = np.load(rvec_path).astype(np.float32).squeeze()
tvec = np.load(tvec_path).astype(np.float32).squeeze()
rvec = rvec.reshape(3, 1) if rvec.shape == (3,) else rvec
tvec = tvec.reshape(3, 1) if tvec.shape == (3,) else tvec
return rvec.astype(np.float32), tvec.astype(np.float32)
def initialize_realsense():
logging.info("Initializing RealSense pipeline...")
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 1280, 720, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
profile = pipeline.start(config)
depth_sensor = profile.get_device().first_depth_sensor()
depth_scale = depth_sensor.get_depth_scale()
logging.info(f"Depth scale: {depth_scale} meters/unit.")
color_stream = profile.get_stream(rs.stream.color).as_video_stream_profile()
intrinsics = color_stream.get_intrinsics()
distortion_coeffs = np.array(intrinsics.coeffs, dtype=np.float32).reshape(5, 1)
return pipeline, depth_scale, intrinsics, distortion_coeffs
def draw_axes(img, origin, imgpts):
imgpts = imgpts.astype(int)
colors = [(0, 0, 255), (0, 255, 0), (255, 0, 0)]
for i, pt in enumerate(imgpts):
pt_tuple = tuple(pt.ravel())
img = cv2.line(img, origin, pt_tuple, colors[i], 3)
return img
def reorder_face_points4(face_points):
p0 = face_points[0]
others = face_points[1:]
dist_list = []
for i, pt in enumerate(others):
d = np.linalg.norm(pt - p0)
dist_list.append((d, i))
dist_list.sort(key=lambda x: x[0])
idx1 = dist_list[0][1]
idx2 = dist_list[1][1]
idx3 = dist_list[2][1]
p1 = others[idx1]
p2 = others[idx2]
p3 = others[idx3]
new_points = np.array([p0, p1, p2, p3], dtype=float)
return new_points
def face_local_uv(intersection_point, face_points, invert_x=False, invert_y=False):
new_pts = reorder_face_points4(face_points)
p0, p1, p2, p3 = new_pts
u_axis = p1 - p0
v_axis = p2 - p0
u_length = np.linalg.norm(u_axis)
v_length = np.linalg.norm(v_axis)
u_axis_norm = u_axis / (u_length + 1e-12)
v_axis_norm = v_axis / (v_length + 1e-12)
local_vec = intersection_point - p0
u_local = np.dot(local_vec, u_axis_norm)
v_local = np.dot(local_vec, v_axis_norm)
if invert_x:
u_local = u_length - u_local
if invert_y:
v_local = v_length - v_local
return (u_local, v_local, u_length, v_length)
def intersect_line_with_plane(p0, dir_vec, plane_points):
p1, p2, p3, p4 = plane_points
normal = np.cross(p2 - p1, p3 - p1)
nrm = np.linalg.norm(normal)
if nrm < 1e-12:
return None
normal /= nrm
denom = np.dot(normal, dir_vec)
if abs(denom) < 1e-9:
return None
d = np.dot(normal, p1 - p0) / denom
if d < 0:
return None
return d
def area_of_triangle(a, b, c):
return 0.5 * np.linalg.norm(np.cross(b - a, c - a))
def point_in_polygon(point, polygon):
p1, p2, p3, p4 = polygon
area_orig = area_of_triangle(p1, p2, p3) + area_of_triangle(p1, p3, p4)
area_test = (area_of_triangle(point, p1, p2) +
area_of_triangle(point, p2, p3) +
area_of_triangle(point, p3, p4) +
area_of_triangle(point, p4, p1))
return np.isclose(area_orig, area_test, atol=1e-9)
def intersect_line_with_polygon(p0, dir_vec, polygon):
t = intersect_line_with_plane(p0, dir_vec, polygon)
if t is None:
return None
intersect_point = p0 + t * dir_vec
if point_in_polygon(intersect_point, polygon):
return intersect_point
return None
def find_frontmost_intersection(p0, dir_vec, all_cubes):
closest_dist = float('inf')
closest_hit = None
for cube_data, cube_faces in all_cubes:
for (face_idx, face_points) in cube_faces:
ipt = intersect_line_with_polygon(p0, dir_vec, face_points)
if ipt is not None:
dist = np.linalg.norm(ipt - p0)
if dist < closest_dist:
closest_dist = dist
closest_hit = (cube_data, face_idx, face_points, ipt)
return closest_hit
def stable_nose_direction(R_local, old_nose_dir, nose_world_3d, camera_world_position,
angle_threshold_deg=60.0):
new_z = R_local[:, 2]
to_camera = (camera_world_position - nose_world_3d).ravel()
if old_nose_dir is None:
if np.dot(new_z, to_camera) < 0:
R_local[:, 2] = -new_z
return R_local
dot_ = np.dot(old_nose_dir, new_z)
dot_clamped = np.clip(dot_, -1.0, 1.0)
angle_deg = np.degrees(np.arccos(dot_clamped))
if angle_deg < angle_threshold_deg:
return R_local
if np.dot(new_z, to_camera) < 0:
R_local[:, 2] = -new_z
return R_local
def clear_all_3d_objects(ax, track_draw_data, intersection_draw_data):
for tid, tdd in track_draw_data.items():
scat_obj = tdd["scatter"]
line_obj = tdd["nose_line"]
head_obj = tdd["nose_head"]
if scat_obj is not None:
scat_obj.remove()
if line_obj is not None:
line_obj.remove()
if head_obj is not None:
head_obj.remove()
if tid in intersection_draw_data and intersection_draw_data[tid] is not None:
intersection_draw_data[tid].remove()
track_draw_data.clear()
intersection_draw_data.clear()
def show_log_window():
width = 800
height = 400
log_canvas = np.zeros((height, width, 3), dtype=np.uint8)
x0, y0 = 10, 20
dy = 18
color_text = (255, 255, 255)
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
thickness = 1
lines_snapshot = list(log_deque)
for i, line in enumerate(lines_snapshot):
y = y0 + i * dy
if y + 5 > height:
break
cv2.putText(log_canvas, line, (x0, y), font, font_scale, color_text, thickness, cv2.LINE_AA)
cv2.imshow("Log Window", log_canvas)
def define_local_coordinate_system(points_world_dict, stable_points_set, initial_R_local=None):
stable_points = []
for sp in stable_points_set:
if sp not in points_world_dict:
return None, None
stable_points.append(points_world_dict[sp].ravel())
stable_points = np.array(stable_points)
centroid = np.mean(stable_points, axis=0, keepdims=True)
centered = stable_points - centroid
U, S, Vt = np.linalg.svd(centered, full_matrices=False)
x_axis = Vt[0, :].reshape(3, 1)
y_axis = Vt[1, :].reshape(3, 1)
z_axis = Vt[2, :].reshape(3, 1)
x_axis /= np.linalg.norm(x_axis)
y_axis /= np.linalg.norm(y_axis)
z_axis /= np.linalg.norm(z_axis)
R_local = np.hstack([x_axis, y_axis, z_axis])
if np.linalg.det(R_local) < 0:
R_local[:, 2] = -R_local[:, 2]
if initial_R_local is not None:
old_z = initial_R_local[:, 2]
new_z = R_local[:, 2]
if np.dot(old_z, new_z) < 0:
R_local = -R_local
origin = centroid.T
return origin, R_local
##########################################################
# Added: Import MediaPipe drawing utilities for face mesh landmarks
##########################################################
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
##########################################################
# Main Program
##########################################################
def main():
logging.info("Program started.")
flush_thread = threading.Thread(target=flush_thread_func, args=(XLSX_PATH,), daemon=True)
flush_thread.start()
reference_points = np.array([
[0.0, 0.0, 0.0],
[-0.03, 0.02, 0.0],
[-0.02, 0.02, 0.0],
[0.03, 0.02, 0.0],
[0.02, 0.02, 0.0],
[-0.02, -0.02, 0.0],
[0.02, -0.02, 0.0],
[0.0, -0.03, 0.0]
], dtype=np.float32).T
existing_max_id = get_max_face_id_from_xlsm(XLSX_PATH)
if existing_max_id < 0:
existing_max_id = -1
KalmanBoxTracker.count = existing_max_id + 1
logging.info(f"Loaded existing max Face_ID = {existing_max_id}, next new ID = {KalmanBoxTracker.count}")
# Parse command line arguments (crop parameters removed)
parser = argparse.ArgumentParser()
parser.add_argument("--enable_log", action="store_true", default=True,
help="Whether to log intersection data into Excel. (default: True)")
parser.add_argument("--enable_3d_plot", action="store_true", default=False,
help="Whether to display a 3D plot. (default: False)")
parser.add_argument("--enable_display", action="store_true", default=False,
help="Whether to show real-time video in an OpenCV window. (default: False)")
parser.add_argument("--rotate180", action="store_false", default=True,
help="Flip camera image 180 deg by default; specify --rotate180 to disable flipping.")
args = parser.parse_args()
runtime_flip180 = args.rotate180
rvec_path = os.path.join(IMAGE_FOLDER, RVEC_PATH)
tvec_path = os.path.join(IMAGE_FOLDER, TVEC_PATH)
camera_matrix_path = os.path.join(IMAGE_FOLDER, CAMERA_MATRIX_PATH)
dist_coeffs_path = os.path.join(IMAGE_FOLDER, DIST_COEFFS_PATH)
if not os.path.exists(camera_matrix_path):
logging.error(f"Camera intrinsic matrix file not found: {camera_matrix_path}")
raise FileNotFoundError(f"Camera intrinsic matrix file not found: {camera_matrix_path}")
camera_matrix = np.load(camera_matrix_path).astype(np.float32)
dist_coeffs = load_distortion_coeffs(dist_coeffs_path)
rvec, tvec = load_vectors(rvec_path, tvec_path)
R_loaded, _ = cv2.Rodrigues(rvec)
R_inv = R_loaded.T
t_wc = -R_loaded.T @ tvec
pipeline, depth_scale, intrinsics_realsense, distortion_coeffs_realsense = initialize_realsense()
fx_realsense = intrinsics_realsense.fx
fy_realsense = intrinsics_realsense.fy
cx_realsense = intrinsics_realsense.ppx
cy_realsense = intrinsics_realsense.ppy
spatial = rs.spatial_filter()
spatial.set_option(rs.option.holes_fill, 3)
spatial.set_option(rs.option.filter_smooth_alpha, 0.5)
spatial.set_option(rs.option.filter_smooth_delta, 20)
spatial.set_option(rs.option.filter_magnitude, 2)
temporal = rs.temporal_filter()
temporal.set_option(rs.option.filter_smooth_alpha, 0.4)
temporal.set_option(rs.option.filter_smooth_delta, 20)
hole_filling = rs.hole_filling_filter(2)
align_to = rs.stream.color
align = rs.align(align_to)
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=10,
refine_landmarks=True,
min_detection_confidence=MIN_DETECTION_CONFIDENCE,
min_tracking_confidence=MIN_TRACKING_CONFIDENCE
)
logging.info("MediaPipe FaceMesh initialized.")
fig = None
ax = None
if args.enable_3d_plot:
fig = plt.figure(figsize=(8, 6))
ax = fig.add_subplot(111, projection='3d')
ax.set_proj_type('persp')
# Here X_LIMITS, Y_LIMITS, Z_LIMITS are in ft range
ax.set_xlim(X_LIMITS)
ax.set_ylim(Y_LIMITS)
ax.set_zlim(Z_LIMITS)
ax.set_xlabel('X (ft)')
ax.set_ylabel('Y (ft)')
ax.set_zlabel('Z (ft)')
ax.set_title('Multi-Face Nose Vector in 3D')
ax.grid(True)
ax.invert_zaxis()
ax.invert_yaxis()
plt.show(block=False)
all_cubes = []
if ax is not None:
legend_patches = []
for cube in CUBES_INFO:
face_points = np.array(cube["face_points"], dtype=float)
depth_point = np.array(cube["depth_point"], dtype=float)
# Call improved plot_cube_with_index; it converts m to ft for plotting
faces = plot_cube_with_index(ax, face_points, depth_point, color=cube["color"], alpha=0.6)
# Also plot the front face vertex points in ft
front_face = faces[0][1]
plot_face_points_with_colors(ax, front_face, colors=['red', 'green', 'blue', 'yellow'], s=80)
all_cubes.append((cube, faces))
patch = mpatches.Patch(color=cube["color"], label=cube["name"])
legend_patches.append(patch)
ax.legend(handles=legend_patches, loc='upper right')
else:
for cube in CUBES_INFO:
face_points = np.array(cube["face_points"], dtype=float)
depth_point = np.array(cube["depth_point"], dtype=float)
faces = []
try:
faces = plot_cube_with_index(None, face_points, depth_point, color='gray', alpha=0.0)
except:
pass
all_cubes.append((cube, faces))
track_draw_data = {}
intersection_draw_data = {}
mot_tracker = Sort(max_age=10, min_hits=2)
fps_frame_count = 0
fps_start_time = time.time()
fps = 0.0
logging.info("Entering main loop.")
try:
while True:
# Retrieve RealSense raw frames
frames = pipeline.wait_for_frames()
if not frames:
logging.warning("No frames retrieved, continue next loop.")
show_log_window()
cv2.waitKey(1)
time.sleep(FRAME_INTERVAL)
continue
color_frame_original = frames.get_color_frame()
if not color_frame_original:
logging.warning("No color frame retrieved, skip iteration.")
show_log_window()
cv2.waitKey(1)
time.sleep(FRAME_INTERVAL)
continue
color_image_original = np.asanyarray(color_frame_original.get_data())
if runtime_flip180:
color_image_original = cv2.rotate(color_image_original, cv2.ROTATE_180)
orig_height, orig_width = color_image_original.shape[:2]
# Crop the image
if CROP_WIDTH > 0 and CROP_HEIGHT > 0:
crop_width = CROP_WIDTH
crop_height = CROP_HEIGHT
crop_x = (orig_width - crop_width) // 2
crop_y = (orig_height - crop_height) // 2
color_image_cropped = color_image_original[crop_y:crop_y + crop_height,
crop_x:crop_x + crop_width].copy()
detection_input = color_image_cropped.copy()
else:
detection_input = color_image_original.copy()
crop_x = 0
crop_y = 0
crop_width = orig_width
crop_height = orig_height
# Adjust camera intrinsics (only modify principal point)
new_camera_matrix = camera_matrix.copy()
if CROP_WIDTH > 0 and CROP_HEIGHT > 0:
new_camera_matrix[0, 2] -= crop_x
new_camera_matrix[1, 2] -= crop_y
detection_input_rgb = cv2.cvtColor(detection_input, cv2.COLOR_BGR2RGB)
results = face_mesh.process(detection_input_rgb)
img_vis = detection_input.copy() if args.enable_display else None
face_bboxes = []
face_landmarks_dict = []
if results and results.multi_face_landmarks:
for fm_idx, face_landmarks in enumerate(results.multi_face_landmarks):
px = []
py = []
for lm in face_landmarks.landmark:
x = lm.x * crop_width
y = lm.y * crop_height
px.append(x)
py.append(y)
x1 = max(0, np.min(px))
y1 = max(0, np.min(py))
x2 = min(crop_width, np.max(px))
y2 = min(crop_height, np.max(py))
face_bboxes.append([x1, y1, x2, y2])
face_landmarks_dict.append(face_landmarks)
if img_vis is not None:
for face_landmarks in results.multi_face_landmarks:
mp_drawing.draw_landmarks(
image=img_vis,
landmark_list=face_landmarks,
connections=mp_face_mesh.FACEMESH_TESSELATION,
landmark_drawing_spec=mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1,
circle_radius=1),
connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_tesselation_style()
)
else:
if ax is not None:
clear_all_3d_objects(ax, track_draw_data, intersection_draw_data)
if img_vis is not None:
fps_frame_count += 1
elapsed_time = time.time() - fps_start_time
if elapsed_time >= 1.0:
fps = fps_frame_count / elapsed_time
fps_frame_count = 0
fps_start_time = time.time()
cv2.putText(img_vis, f"FPS: {fps:.2f}", (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2)
cv2.putText(img_vis, "Press 'r' to flip 180", (50, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 200, 200), 2)
cv2.imshow("Cropped - Frontmost Intersection", img_vis)
show_log_window()
key = cv2.waitKey(1)
if key & 0xFF == ord('r'):
runtime_flip180 = not runtime_flip180
logging.info(f"Toggled runtime_flip180 => {runtime_flip180}")
if key & 0xFF in [ord('q'), 27]:
logging.info("User pressed q/ESC, exiting main loop.")
break
if ax is not None:
fig.canvas.draw()
fig.canvas.flush_events()
time.sleep(FRAME_INTERVAL)
continue
aligned_frames = align.process(frames)
depth_frame = aligned_frames.get_depth_frame()
color_frame_aligned = aligned_frames.get_color_frame()
if not depth_frame or not color_frame_aligned:
show_log_window()
cv2.waitKey(1)
time.sleep(FRAME_INTERVAL)
continue
color_image_aligned = np.asanyarray(color_frame_aligned.get_data())
if runtime_flip180:
color_image_aligned = cv2.rotate(color_image_aligned, cv2.ROTATE_180)
depth_image = np.asanyarray(depth_frame.get_data()) * depth_scale
if runtime_flip180:
depth_image = cv2.rotate(depth_image, cv2.ROTATE_180)
color_image = color_image_aligned[crop_y:crop_y + crop_height, crop_x:crop_x + crop_width].copy()
depth_image = depth_image[crop_y:crop_y + crop_height, crop_x:crop_x + crop_width].copy()
if args.enable_display:
img_vis = color_image.copy()
trackers_result = mot_tracker.update(face_bboxes)
current_active_ids = set()
for trk_data in trackers_result:
x1, y1, x2, y2, tid = trk_data
current_active_ids.add(tid)
if img_vis is not None:
cv2.rectangle(img_vis, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
cv2.putText(img_vis, f"ID: {tid}", (int(x1), int(y1) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
best_iou = -1
best_fm_idx = -1
for fm_idx, fb in enumerate(face_bboxes):
iou_val = mot_tracker.iou([x1, y1, x2, y2], fb)
if iou_val > best_iou:
best_iou = iou_val
best_fm_idx = fm_idx
if best_fm_idx == -1:
continue