-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrun_example.py
More file actions
181 lines (156 loc) · 6.45 KB
/
run_example.py
File metadata and controls
181 lines (156 loc) · 6.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
Launcher for the dual-gem5 Crazyflie light-chase simulation.
Starts:
1. Bridge helper server (this process)
2. Webots (one subprocess; runs leader + chaser + supervisor controllers)
3. gem5-leader (one subprocess; runs leader.elf)
4. gem5-chaser (one subprocess; runs chaser.elf)
The bridge helper matches the two Webots controllers by robot name to
the two gem5 server names:
Crazyflie_Leader <-> gem5-leader
Crazyflie_Chaser <-> gem5-chaser
"""
import argparse
import os
import subprocess
import time
from pathlib import Path
import bridge._bridge as br
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_GEM5_SCRIPT = SCRIPT_DIR / "gem5" / "gem5_script.py"
DEFAULT_WORLD = SCRIPT_DIR / "webots" / "worlds" / "crazyflie_dual_gem5.wbt"
DEFAULT_LEADER_ELF = SCRIPT_DIR / "gem5" / "firmware" / "build" / "leader.elf"
DEFAULT_CHASER_ELF = SCRIPT_DIR / "gem5" / "firmware" / "build" / "chaser.elf"
parser = argparse.ArgumentParser(
description="Launch the dual-gem5 Crazyflie light-chase simulation."
)
parser.add_argument("--gem5-path", required=True,
help="Path to gem5 executable (e.g. .../build/ARM/gem5.opt)")
parser.add_argument("--webots-path", required=True,
help="Path to the Webots executable")
parser.add_argument("--gem5-script", default=str(DEFAULT_GEM5_SCRIPT),
help="Path to the gem5 simulation script")
parser.add_argument("--gem5-leader-binary", default=str(DEFAULT_LEADER_ELF),
help="Leader firmware ELF")
parser.add_argument("--gem5-chaser-binary", default=str(DEFAULT_CHASER_ELF),
help="Chaser firmware ELF")
parser.add_argument("--webots-world", default=str(DEFAULT_WORLD),
help="Path to the Webots .wbt world file")
parser.add_argument("--output-dir", default=str(SCRIPT_DIR / "output"),
help="Directory for gem5 m5out trees")
# -- Webots display / speed control --
parser.add_argument(
"--webots-mode", choices=["pause", "realtime", "fast"], default="realtime",
help="Webots simulation mode (default: realtime). 'fast' runs as fast "
"as the host can compute and implies no rendering.",
)
parser.add_argument(
"--no-rendering", action="store_true",
help="Disable Webots 3D rendering (physics still runs). Big speed win. "
"Implied by --webots-mode=fast.",
)
parser.add_argument(
"--no-minimize", action="store_true",
help="Don't start the Webots window minimized (default: minimized).",
)
parser.add_argument(
"--batch", action="store_true",
help="Run Webots in batch mode (no save dialogs / quit prompts).",
)
parser.add_argument(
"--headless", action="store_true",
help="Shortcut for: --webots-mode=fast --no-rendering --batch. "
"Webots still needs an X display — use xvfb-run for true headless.",
)
def start_executable(path, args, friendly_name):
p = Path(path)
if not p.exists():
raise FileNotFoundError(f"{friendly_name} not found at {path}")
if not os.access(str(p), os.X_OK):
raise PermissionError(f"{friendly_name} at {path} is not executable")
print(f"[launcher] Starting {friendly_name}: {path} {' '.join(args)}")
return subprocess.Popen([str(path)] + args)
def main():
args = parser.parse_args()
# Robot name (Webots side) -> gem5 server name
client_to_server = {
"Crazyflie_Leader": "gem5-leader",
"Crazyflie_Chaser": "gem5-chaser",
}
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
for label, path in [
("gem5", args.gem5_path),
("gem5 script", args.gem5_script),
("leader firmware", args.gem5_leader_binary),
("chaser firmware", args.gem5_chaser_binary),
("webots", args.webots_path),
("world", args.webots_world),
]:
if not Path(path).exists():
raise FileNotFoundError(f"{label} not found at {path}")
print("[launcher] Starting bridge helper server...")
listen_fd = br.bridge_setup_helper_server_socket()
print(f"[launcher] Helper listening on fd {listen_fd}")
# Resolve --headless into the underlying Webots flags
mode = "fast" if args.headless else args.webots_mode
no_render = args.no_rendering or args.headless or mode == "fast"
batch = args.batch or args.headless
webots_args = []
if not args.no_minimize:
webots_args.append("--minimize")
webots_args.append(f"--mode={mode}")
if no_render:
webots_args.append("--no-rendering")
if batch:
webots_args.append("--batch")
webots_args += [args.webots_world, "--stdout", "--stderr"]
webots_proc = start_executable(args.webots_path, webots_args, "webots")
time.sleep(0.5)
gem5_procs = []
for server_name, binary in [
("gem5-leader", args.gem5_leader_binary),
("gem5-chaser", args.gem5_chaser_binary),
]:
m5out = output_dir / f"{server_name}-m5out"
gem5_args = [
"-re",
"-d", str(m5out),
args.gem5_script,
"--binary", binary,
"--server-name", server_name,
]
proc = start_executable(args.gem5_path, gem5_args, server_name)
gem5_procs.append((server_name, proc))
print("[launcher] All processes started. Entering bridge helper loop...")
print("[launcher] Press Ctrl+C to stop.")
try:
br.bridge_helper_server_loop(listen_fd, client_to_server)
except KeyboardInterrupt:
print("\n[launcher] Interrupted, shutting down...")
finally:
br.bridge_close_helper_server_socket(listen_fd)
print("[launcher] Bridge matchmaking done. Keeping launcher alive...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[launcher] Interrupted, shutting down...")
procs = [("webots", webots_proc)] + gem5_procs
for name, proc in procs:
if proc is None:
continue
try:
if proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
print(f"[launcher] {name} exited with code {proc.returncode}")
except Exception as e:
print(f"[launcher] Error terminating {name}: {e}")
print("[launcher] Done.")
if __name__ == "__main__":
main()