From 95ffdcdb709094a19f7ccba383b0389ac228360a Mon Sep 17 00:00:00 2001 From: Alessio D'Ambrosio Date: Wed, 13 May 2026 08:50:25 +0200 Subject: [PATCH 1/4] bump version to 1.2.1 Signed-off-by: Alessio D'Ambrosio --- requirements.txt | 4 ++-- src/python_st3215/__init__.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements.txt b/requirements.txt index e9bc495..4dbee3e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -pyserial>=3.5 -types-pyserial>=3.5 +pyserial>=3.4 +types-pyserial>=3.4 diff --git a/src/python_st3215/__init__.py b/src/python_st3215/__init__.py index 98616e4..77e23a5 100644 --- a/src/python_st3215/__init__.py +++ b/src/python_st3215/__init__.py @@ -14,7 +14,7 @@ from .servo import Servo from .st3215 import ST3215 -__version__ = "1.2.0" +__version__ = "1.2.1" __all__ = [ "__version__", From 2bc31416014c9b5ea697df3199c2bc986d85fbe3 Mon Sep 17 00:00:00 2001 From: Alessio D'Ambrosio Date: Wed, 13 May 2026 09:05:53 +0200 Subject: [PATCH 2/4] fix registers to match memory table/documentation, a couple of bugfixes and updated examples. Signed-off-by: Alessio D'Ambrosio --- examples/01_scan_servos.py | 18 ++--- examples/02_movement.py | 7 +- examples/03_read_position.py | 30 +++++---- examples/04_speed_control.py | 24 +++---- examples/05_torque_limit.py | 11 ++-- examples/06_angle_limit.py | 12 ++-- examples/07_temperature_monitor.py | 53 ++++++++------- examples/08_movement_wait.py | 13 ++-- examples/09_registered_write.py | 33 +++++----- examples/10_constant_speed_mode.py | 29 +++++--- examples/11_sweep_motion.py | 30 ++++----- examples/12_position_correction.py | 17 +++-- examples/13_load_sensing.py | 58 ++++++++-------- examples/14_change_id.py | 102 ++++++++++++++--------------- examples/15_factory_reset.py | 26 ++++---- examples/16_sync_read.py | 68 +++++++++---------- examples/17_sync_write.py | 56 +++++++--------- examples/18_network_port.py | 45 ++++++------- src/python_st3215/__init__.py | 4 +- src/python_st3215/decorators.py | 2 +- src/python_st3215/registers.py | 52 ++++++++++----- src/python_st3215/st3215.py | 60 ++++++++--------- 22 files changed, 394 insertions(+), 356 deletions(-) diff --git a/examples/01_scan_servos.py b/examples/01_scan_servos.py index 2a5bcaf..dc1f69e 100644 --- a/examples/01_scan_servos.py +++ b/examples/01_scan_servos.py @@ -6,9 +6,9 @@ from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: print("Scanning for servos...\n") servos = controller.list_servos() @@ -18,24 +18,26 @@ print(f"Found {len(servos)} servo(s)\n") print("=" * 80) + mode_names = {0: "Position", 1: "Constant Speed", 2: "PWM", 3: "Stepper"} + for servo_id in servos: servo = controller.wrap_servo(servo_id) + voltage_raw = servo.sram.read_current_voltage() + voltage_str = f"{voltage_raw / 10:.1f}V" if voltage_raw is not None else "N/A" + + mode = servo.eeprom.read_operating_mode() + print(f"\nServo ID: {servo_id}") print( f" Firmware: v{servo.eeprom.read_firmware_major_version()}.{servo.eeprom.read_firmware_minor_version()}" ) print(f" Position: {servo.sram.read_current_location()}") print(f" Temperature: {servo.sram.read_current_temperature()}°C") - print(f" Voltage: {servo.sram.read_current_voltage() / 10:.1f}V") + print(f" Voltage: {voltage_str}") print( f" Min/Max Angle: {servo.eeprom.read_min_angle_limit()} / {servo.eeprom.read_max_angle_limit()}" ) - - mode = servo.eeprom.read_operating_mode() - mode_names = {0: "Position", 1: "Constant Speed", 2: "PWM", 3: "Stepper"} print(f" Operating Mode: {mode_names.get(mode, 'Unknown')}") print("\n" + "=" * 80) -finally: - controller.close() diff --git a/examples/02_movement.py b/examples/02_movement.py index 710ee2b..6c4a49f 100644 --- a/examples/02_movement.py +++ b/examples/02_movement.py @@ -4,11 +4,12 @@ import os import time + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) servo.sram.torque_enable() @@ -25,5 +26,3 @@ time.sleep(1) servo.sram.torque_disable() -finally: - controller.close() diff --git a/examples/03_read_position.py b/examples/03_read_position.py index 29459ce..61ae3f6 100644 --- a/examples/03_read_position.py +++ b/examples/03_read_position.py @@ -7,22 +7,24 @@ from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) servo.sram.torque_disable() print("Manually move the servo. Press Ctrl+C to exit.") - while True: - position = servo.sram.read_current_location() - speed = servo.sram.read_current_speed() - load = servo.sram.read_current_load() - print( - f"Position: {position:5d} | Speed: {speed:5d} | Load: {load:4d}", end="\r" - ) - time.sleep(0.1) -except KeyboardInterrupt: - print("\nStopped.") -finally: - controller.close() + try: + while True: + position = servo.sram.read_current_location() + speed = servo.sram.read_current_speed() + load = servo.sram.read_current_load() + + pos_str = f"{position:5d}" if position is not None else " N/A" + spd_str = f"{speed:5d}" if speed is not None else " N/A" + load_str = f"{load:4d}" if load is not None else " N/A" + + print(f"Position: {pos_str} | Speed: {spd_str} | Load: {load_str}", end="\r") + time.sleep(0.1) + except KeyboardInterrupt: + print("\nStopped.") diff --git a/examples/04_speed_control.py b/examples/04_speed_control.py index f3b4d77..bccaa18 100644 --- a/examples/04_speed_control.py +++ b/examples/04_speed_control.py @@ -1,32 +1,34 @@ """ -Control servo movement speed using acceleration parameter. +Control servo movement speed using the acceleration parameter. + +Acceleration unit: 100 steps/s² per unit (e.g. 10 = 1000 steps/s²). +Lower values = slower ramp-up, smoother motion. """ import os import time + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) servo.sram.torque_enable() - print("Slow movement (acceleration = 10)...") + print("Slow movement (acceleration = 10 → 1000 steps/s²)...") servo.sram.write_acceleration(10) servo.sram.write_target_location(3000) - time.sleep(2) + time.sleep(3) - print("Fast movement (acceleration = 100)...") - servo.sram.write_acceleration(100) + print("Medium movement (acceleration = 50 → 5000 steps/s²)...") + servo.sram.write_acceleration(50) servo.sram.write_target_location(1000) time.sleep(2) - print("Very fast movement (acceleration = 254)...") + print("Fast movement (acceleration = 254 → 25400 steps/s²)...") servo.sram.write_acceleration(254) servo.sram.write_target_location(2048) - time.sleep(2) + time.sleep(1) servo.sram.torque_disable() -finally: - controller.close() diff --git a/examples/05_torque_limit.py b/examples/05_torque_limit.py index fedc184..3cc36ff 100644 --- a/examples/05_torque_limit.py +++ b/examples/05_torque_limit.py @@ -1,14 +1,18 @@ """ Demonstrate torque limiting to reduce servo force. + +Torque limit unit: 0.1% per unit (0-1000, where 1000 = 100%). +This affects SRAM only; it resets to max_torque (EEPROM) on power-up. """ import os import time + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) servo.sram.torque_enable() @@ -27,7 +31,6 @@ servo.sram.write_target_location(3000) time.sleep(1.5) + # Restore full torque before disabling so the next power-on is predictable servo.sram.write_torque_limit(1000) servo.sram.torque_disable() -finally: - controller.close() diff --git a/examples/06_angle_limit.py b/examples/06_angle_limit.py index 98b1556..7fb3c90 100644 --- a/examples/06_angle_limit.py +++ b/examples/06_angle_limit.py @@ -1,14 +1,18 @@ """ Set minimum and maximum angle limits to restrict servo range. + +Limits are stored in EEPROM and persist across power cycles. +The servo will clamp movement to stay within the configured limits. """ import os import time + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) print("Setting angle limits: 1500 to 2500") @@ -33,10 +37,8 @@ time.sleep(1) print(f"Actual position: {servo.sram.read_current_location()}") - print("\nRestoring full range...") + print("\nRestoring full range (0-4095)...") servo.eeprom.write_min_angle_limit(0) servo.eeprom.write_max_angle_limit(4095) servo.sram.torque_disable() -finally: - controller.close() diff --git a/examples/07_temperature_monitor.py b/examples/07_temperature_monitor.py index 56d0c76..e882b4a 100644 --- a/examples/07_temperature_monitor.py +++ b/examples/07_temperature_monitor.py @@ -1,5 +1,5 @@ """ -Monitor servo temperature and voltage during operation. +Monitor servo temperature, voltage, and current during operation. """ import os @@ -7,36 +7,39 @@ from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) servo.sram.torque_enable() servo.sram.write_acceleration(50) - print("Monitoring temperature and voltage. Press Ctrl+C to exit.") - print("Moving servo continuously to generate heat.. .\n") + print("Monitoring temperature, voltage, and current. Press Ctrl+C to exit.") + print("Moving servo continuously to generate load...\n") positions = [1000, 3000] pos_index = 0 - while True: - temp = servo.sram.read_current_temperature() - voltage = servo.sram.read_current_voltage() / 10 - current = servo.sram.read_current_current() * 6.5 - - print( - f"Temp: {temp:2d}°C | Voltage: {voltage:4.1f}V | Current: {current:6.1f}mA", - end="\r", - ) - - servo.sram.write_target_location(positions[pos_index]) - pos_index = (pos_index + 1) % 2 - time.sleep(1) - -except KeyboardInterrupt: - print("\nStopped.") - servo.sram.torque_disable() - -finally: - controller.close() + try: + while True: + temp = servo.sram.read_current_temperature() + voltage_raw = servo.sram.read_current_voltage() + current_raw = servo.sram.read_current_current() + + # voltage unit: 0.1V; current unit: 6.5mA + voltage_str = f"{voltage_raw / 10:4.1f}V" if voltage_raw is not None else " N/A " + current_str = f"{current_raw * 6.5:6.1f}mA" if current_raw is not None else " N/A " + temp_str = f"{temp:2d}°C" if temp is not None else "N/A " + + print( + f"Temp: {temp_str} | Voltage: {voltage_str} | Current: {current_str}", + end="\r", + ) + + servo.sram.write_target_location(positions[pos_index]) + pos_index = (pos_index + 1) % 2 + time.sleep(1) + + except KeyboardInterrupt: + print("\nStopped.") + servo.sram.torque_disable() diff --git a/examples/08_movement_wait.py b/examples/08_movement_wait.py index ae0e9e8..4750246 100644 --- a/examples/08_movement_wait.py +++ b/examples/08_movement_wait.py @@ -4,11 +4,14 @@ import os import time + from python_st3215 import ST3215 -def wait_for_stop(servo, timeout=5): - """Wait until servo stops moving or timeout.""" +def wait_for_stop(servo, timeout=5.0): + """Wait until servo stops moving or timeout expires.""" + # Brief delay so the servo has time to set the moving flag before we poll + time.sleep(0.05) start_time = time.time() while servo.sram.is_moving(): if time.time() - start_time > timeout: @@ -18,9 +21,9 @@ def wait_for_stop(servo, timeout=5): return True -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) servo.sram.torque_enable() servo.sram.write_acceleration(30) @@ -41,5 +44,3 @@ def wait_for_stop(servo, timeout=5): print("Arrived!") servo.sram.torque_disable() -finally: - controller.close() diff --git a/examples/09_registered_write.py b/examples/09_registered_write.py index afa1bf0..8623ef8 100644 --- a/examples/09_registered_write.py +++ b/examples/09_registered_write.py @@ -1,29 +1,36 @@ """ Use registered write to prepare multiple servos, then execute simultaneously. + +REG_WRITE stages the command in each servo's buffer without executing it. +ACTION then fires all buffered commands at once across all servos. """ import os +import sys import time from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: - servos = controller.list_servos() +with ST3215(PORT) as controller: + print("Scanning for servos...") + servo_ids = controller.list_servos() + if not servo_ids: + print("No servos found!") + sys.exit(1) + print(f"Found {len(servo_ids)} servo(s): {servo_ids}") - servo_objects = [controller.wrap_servo(sid) for sid in servos] + servo_objects = [controller.wrap_servo(sid) for sid in servo_ids] for servo in servo_objects: servo.sram.torque_enable() servo.sram.write_acceleration(50) - print("Preparing movements with registered write...") + print("\nPreparing movements with registered write...") for i, servo in enumerate(servo_objects): - if i % 2 == 0: - servo.sram.write_target_location(1000, reg=True) - else: - servo.sram.write_target_location(3000, reg=True) + target = 1000 if i % 2 == 0 else 3000 + servo.sram.write_target_location(target, reg=True) print("Executing all movements simultaneously!") controller.broadcast.action() @@ -31,10 +38,8 @@ print("Preparing opposite movements...") for i, servo in enumerate(servo_objects): - if i % 2 == 0: - servo.sram.write_target_location(3000, reg=True) - else: - servo.sram.write_target_location(1000, reg=True) + target = 3000 if i % 2 == 0 else 1000 + servo.sram.write_target_location(target, reg=True) print("Executing!") controller.broadcast.action() @@ -42,5 +47,3 @@ for servo in servo_objects: servo.sram.torque_disable() -finally: - controller.close() diff --git a/examples/10_constant_speed_mode.py b/examples/10_constant_speed_mode.py index 4c3ae13..8ea3295 100644 --- a/examples/10_constant_speed_mode.py +++ b/examples/10_constant_speed_mode.py @@ -1,38 +1,47 @@ """ Use constant speed mode to make servo rotate continuously. + +Operating mode is stored in EEPROM, so the original mode is saved and +restored at the end. Torque must be disabled before switching modes. """ import os import time + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) + original_mode = servo.eeprom.read_operating_mode() + print(f"Original operating mode: {original_mode}") + + # Torque must be off before changing operating mode + servo.sram.torque_disable() + print("Switching to constant speed mode (mode 1)...") servo.eeprom.write_operating_mode(1) servo.sram.torque_enable() - print("Rotating clockwise at speed 500...") + print("Rotating clockwise at speed 500 steps/s...") servo.sram.write_running_speed(500) time.sleep(3) - print("Rotating counter-clockwise at speed -500...") + print("Rotating counter-clockwise at speed -500 steps/s...") servo.sram.write_running_speed(-500) time.sleep(3) - print("Fast rotation at speed 1500...") + print("Fast clockwise rotation at 1500 steps/s...") servo.sram.write_running_speed(1500) time.sleep(2) print("Stopping...") servo.sram.write_running_speed(0) - time.sleep(1) + time.sleep(0.5) - print("Switching back to position control mode (mode 0)...") + # Torque must be off before changing operating mode servo.sram.torque_disable() - servo.eeprom.write_operating_mode(0) -finally: - controller.close() + print(f"Restoring original mode ({original_mode})...") + servo.eeprom.write_operating_mode(original_mode) diff --git a/examples/11_sweep_motion.py b/examples/11_sweep_motion.py index c5c7e37..5602411 100644 --- a/examples/11_sweep_motion.py +++ b/examples/11_sweep_motion.py @@ -4,11 +4,12 @@ import os import time + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) servo.sram.torque_enable() servo.sram.write_acceleration(30) @@ -18,17 +19,14 @@ print("Sweeping back and forth. Press Ctrl+C to exit.") - while True: - servo.sram.write_target_location(max_pos) - time.sleep(1.5) - - servo.sram.write_target_location(min_pos) - time.sleep(1.5) - -except KeyboardInterrupt: - print("\nStopped.") - servo.sram.write_target_location(2048) - time.sleep(1) - servo.sram.torque_disable() -finally: - controller.close() + try: + while True: + servo.sram.write_target_location(max_pos) + time.sleep(1.5) + servo.sram.write_target_location(min_pos) + time.sleep(1.5) + except KeyboardInterrupt: + print("\nStopped.") + servo.sram.write_target_location(2048) + time.sleep(1) + servo.sram.torque_disable() diff --git a/examples/12_position_correction.py b/examples/12_position_correction.py index b169d2e..e74686b 100644 --- a/examples/12_position_correction.py +++ b/examples/12_position_correction.py @@ -1,14 +1,19 @@ """ Use position correction to offset the zero position. + +Position correction is stored in EEPROM and shifts all position readings +and targets by the given number of steps (-2047 to +2047). +The original correction is saved and restored at the end. """ import os import time + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) original_correction = servo.eeprom.read_position_correction() @@ -23,21 +28,19 @@ time.sleep(1.5) print(f"Actual position: {servo.sram.read_current_location()}") - print("\nApplying +500 correction...") + print("\nApplying +500 step correction...") servo.eeprom.write_position_correction(500) servo.sram.write_target_location(2048) time.sleep(1.5) print(f"Actual position: {servo.sram.read_current_location()}") - print("\nApplying -500 correction...") + print("\nApplying -500 step correction...") servo.eeprom.write_position_correction(-500) servo.sram.write_target_location(2048) time.sleep(1.5) print(f"Actual position: {servo.sram.read_current_location()}") - print("\nRestoring original correction...") + print(f"\nRestoring original correction ({original_correction})...") servo.eeprom.write_position_correction(original_correction) servo.sram.torque_disable() -finally: - controller.close() diff --git a/examples/13_load_sensing.py b/examples/13_load_sensing.py index 0cf922a..03b3aa2 100644 --- a/examples/13_load_sensing.py +++ b/examples/13_load_sensing.py @@ -1,14 +1,17 @@ """ -Read servo load to detect when it's being blocked or resisted. +Read servo load to detect when it is being blocked or resisted. + +Load unit: 0.1% per unit (0-1000, where 1000 = 100% duty cycle). """ import os import time + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) servo.sram.torque_enable() servo.sram.write_acceleration(50) @@ -19,26 +22,29 @@ positions = [1000, 3000] pos_index = 0 - while True: - servo.sram.write_target_location(positions[pos_index]) - pos_index = (pos_index + 1) % 2 - - for _ in range(15): - load = servo.sram.read_current_load() - load_percent = load / 10 - position = servo.sram.read_current_location() - - bar_length = int(load_percent / 2) - bar = "█" * bar_length + "░" * (50 - bar_length) - - print( - f"Pos: {position:4d} | Load: {load_percent:5.1f}% [{bar}]\033[K", - end="\r", - ) - time.sleep(0.1) - -except KeyboardInterrupt: - print("\nStopped.") - servo.sram.torque_disable() -finally: - controller.close() + try: + while True: + servo.sram.write_target_location(positions[pos_index]) + pos_index = (pos_index + 1) % 2 + + for _ in range(15): + load = servo.sram.read_current_load() + position = servo.sram.read_current_location() + + if load is not None: + load_percent = load / 10 + # Clamp bar to [0, 50] in case load briefly exceeds 100% + bar_length = max(0, min(50, int(load_percent / 2))) + bar = "█" * bar_length + "░" * (50 - bar_length) + load_str = f"{load_percent:5.1f}%" + else: + bar = "░" * 50 + load_str = " N/A " + + pos_str = f"{position:4d}" if position is not None else " N/A" + print(f"Pos: {pos_str} | Load: {load_str} [{bar}]\033[K", end="\r") + time.sleep(0.1) + + except KeyboardInterrupt: + print("\nStopped.") + servo.sram.torque_disable() diff --git a/examples/14_change_id.py b/examples/14_change_id.py index 7f3c973..5d9c36d 100644 --- a/examples/14_change_id.py +++ b/examples/14_change_id.py @@ -1,60 +1,60 @@ """ -Change a servo's ID. USE WITH CAUTION - only one servo should be connected! +Change a servo's ID. + +WARNING: Only one servo should be connected to avoid ID conflicts. +The new ID is written to EEPROM and is permanent across power cycles. """ import os +import sys + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") + +print("=" * 60) +print("CHANGE SERVO ID") +print("=" * 60) +print("\nWARNING: Only connect ONE servo to avoid ID conflicts!") try: - print("=" * 60) - print("CHANGE SERVO ID") - print("=" * 60) - print("\nWARNING: Only connect ONE servo to avoid ID conflicts!") - - try: - old_id_input = input("\nEnter CURRENT Servo ID (default 1): ").strip() - OLD_ID = int(old_id_input) if old_id_input else 1 - - new_id_input = input("Enter NEW Servo ID: ").strip() - if not new_id_input: - print("New ID is required. Exiting.") - exit(1) - NEW_ID = int(new_id_input) - except ValueError: - print("Invalid input. Please enter numeric IDs.") - exit(1) - - print(f"\nThis will change servo ID from {OLD_ID} to {NEW_ID}") - - response = input("\nType 'yes' to continue: ") - if response.lower() != "yes": - print("Cancelled.") + old_id_input = input("\nEnter CURRENT Servo ID (default 1): ").strip() + OLD_ID = int(old_id_input) if old_id_input else 1 + + new_id_input = input("Enter NEW Servo ID (0-253): ").strip() + if not new_id_input: + print("New ID is required. Exiting.") + sys.exit(1) + NEW_ID = int(new_id_input) +except ValueError: + print("Invalid input. Please enter numeric IDs.") + sys.exit(1) + +if not (0 <= NEW_ID <= 253): + print(f"Invalid ID {NEW_ID}. Must be 0-253.") + sys.exit(1) + +print(f"\nThis will change servo ID from {OLD_ID} to {NEW_ID}") +response = input("\nType 'yes' to continue: ") +if response.lower() != "yes": + print("Cancelled.") + sys.exit(0) + +with ST3215(PORT) as controller: + servo = controller.wrap_servo(OLD_ID) + print(f"\nCurrent ID confirmed: {servo.eeprom.read_id()}") + + # EEPROM is unlocked by default; unlock explicitly to be safe + servo.sram.unlock() + print(f"Writing new ID {NEW_ID}...") + servo.eeprom.write_id(NEW_ID) + servo.sram.lock() + + print("Verifying change...") + new_servo = controller.wrap_servo(NEW_ID) + confirmed_id = new_servo.eeprom.read_id() + + if confirmed_id == NEW_ID: + print(f"Successfully changed to ID {NEW_ID}") else: - servo = controller.wrap_servo(OLD_ID) - - print(f"\nCurrent ID: {servo.eeprom.read_id()}") - print(f"Changing to ID {NEW_ID}...") - - # Unlock EEPROM to allow saving changes - print("Unlocking EEPROM...") - servo.sram.unlock() - - print("Writing new ID...") - servo.eeprom.write_id(NEW_ID) - - # Lock EEPROM again (optional but recommended) - print("Locking EEPROM...") - servo.sram.lock() - - print("\nVerifying change...") - new_servo = controller.wrap_servo(NEW_ID) - confirmed_id = new_servo.eeprom.read_id() - - if confirmed_id == NEW_ID: - print(f"✓ Successfully changed to ID {NEW_ID}") - else: - print(f"✗ Failed to change ID (read: {confirmed_id})") -finally: - controller.close() + print(f"Failed to change ID (read back: {confirmed_id})") diff --git a/examples/15_factory_reset.py b/examples/15_factory_reset.py index bf1f2a6..38a28ea 100644 --- a/examples/15_factory_reset.py +++ b/examples/15_factory_reset.py @@ -1,33 +1,35 @@ """ -Example script to reset an ST3215 servo to factory settings. +Reset an ST3215 servo to factory settings. + +After reset the servo reboots briefly. This example waits for it to +come back online and confirms it responds to a ping. + +NOTE: Factory reset restores default ID (1) and baudrate (1,000,000). """ import os import time + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: servo = controller.wrap_servo(1) - servo.sram.torque_enable() - print("Resetting servo to factory settings...") + print("Sending factory reset command...") servo.reset() - print("Servo has been reset to factory settings.") + print("Reset command sent. Waiting for servo to reboot...") - print("Waiting for servo to come back online...") - timeout = 5 + timeout = 5.0 start = time.time() while time.time() - start < timeout: try: if servo.ping(): - print("Servo is back!") + print("Servo is back online.") break except Exception: pass time.sleep(0.2) else: - print("Timeout: Servo did not come back online within 5 seconds.") -finally: - controller.close() + print("Timeout: servo did not come back online within 5 seconds.") diff --git a/examples/16_sync_read.py b/examples/16_sync_read.py index 8148813..78dc04b 100644 --- a/examples/16_sync_read.py +++ b/examples/16_sync_read.py @@ -1,63 +1,55 @@ """ Use SYNC READ to query multiple servos simultaneously. -This is more efficient than reading from each servo individually. +More efficient than reading from each servo individually. """ import os +import sys import time + from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: print("Scanning for servos...") servo_ids = controller.list_servos() if not servo_ids: print("No servos found!") - exit(1) + sys.exit(1) print(f"Found {len(servo_ids)} servo(s): {servo_ids}") - servos = [] for servo_id in servo_ids: - servo = controller.wrap_servo(servo_id) - servos.append(servo) - print(f"Connected to servo {servo_id}") - - for servo in servos: - servo.sram.torque_disable() + controller.wrap_servo(servo_id).sram.torque_disable() print("\nManually move the servos. Press Ctrl+C to exit.") - print("Reading positions, speeds, and loads from all servos simultaneously.. .\n") - - active_servo_ids = [servo.id for servo in servos] + print("Reading positions, speeds, and loads from all servos simultaneously...\n") - while True: - positions = controller.broadcast.sram.sync_read_current_location( - active_servo_ids - ) - speeds = controller.broadcast.sram.sync_read_current_speed(active_servo_ids) - loads = controller.broadcast.sram.sync_read_current_load(active_servo_ids) + try: + while True: + positions = controller.broadcast.sram.sync_read_current_location(servo_ids) + speeds = controller.broadcast.sram.sync_read_current_speed(servo_ids) + loads = controller.broadcast.sram.sync_read_current_load(servo_ids) - print("\033[H\033[J") - print("=" * 60) - print(f"{'ID':<6} {'Position':<12} {'Speed':<12} {'Load':<12}") - print("=" * 60) + # \033[H\033[J moves cursor to top-left and clears screen for in-place refresh + print("\033[H\033[J", end="") + print("=" * 60) + print(f"{'ID':<6} {'Position':<12} {'Speed':<12} {'Load':<12}") + print("=" * 60) - for servo_id in active_servo_ids: - pos = positions.get(servo_id, "N/A") - spd = speeds.get(servo_id, "N/A") - load = loads.get(servo_id, "N/A") + for servo_id in servo_ids: + pos = positions.get(servo_id) + spd = speeds.get(servo_id) + load = loads.get(servo_id) - pos_str = f"{pos:5d}" if isinstance(pos, int) else str(pos) - spd_str = f"{spd:5d}" if isinstance(spd, int) else str(spd) - load_str = f"{load:4d}" if isinstance(load, int) else str(load) + pos_str = f"{pos:5d}" if pos is not None else " N/A" + spd_str = f"{spd:5d}" if spd is not None else " N/A" + load_str = f"{load:4d}" if load is not None else " N/A" - print(f"{servo_id:<6} {pos_str:<12} {spd_str:<12} {load_str:<12}") + print(f"{servo_id:<6} {pos_str:<12} {spd_str:<12} {load_str:<12}") - print("=" * 60) - time.sleep(0.1) + print("=" * 60) + time.sleep(0.1) -except KeyboardInterrupt: - print("\n\nStopped.") -finally: - controller.close() + except KeyboardInterrupt: + print("\n\nStopped.") diff --git a/examples/17_sync_write.py b/examples/17_sync_write.py index f7e9a80..c32d78b 100644 --- a/examples/17_sync_write.py +++ b/examples/17_sync_write.py @@ -1,60 +1,52 @@ """ Use SYNC WRITE to control multiple servos simultaneously. -This is more efficient than writing to each servo individually. +More efficient than writing to each servo individually. """ import os +import sys import time from python_st3215 import ST3215 -controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) +PORT = os.environ.get("ST3215_PORT", "/dev/ttyUSB0") -try: +with ST3215(PORT) as controller: print("Scanning for servos...") servo_ids = controller.list_servos() if not servo_ids: print("No servos found!") - exit(1) + sys.exit(1) print(f"Found {len(servo_ids)} servo(s): {servo_ids}") - servos = [] - for servo_id in servo_ids: - servo = controller.wrap_servo(servo_id) - servos.append(servo) - print(f"Connected to servo {servo_id}") + servo_objects = [controller.wrap_servo(sid) for sid in servo_ids] - for servo in servos: - servo.sram.torque_enable() + # Enable torque on all servos using SYNC WRITE + controller.broadcast.sram.sync_write_torque_switch({s.id: 1 for s in servo_objects}) - print("\nSetting acceleration for all servos using SYNC WRITE...") - acceleration_data = {servo.id: 50 for servo in servos} - controller.broadcast.sram.sync_write_acceleration(acceleration_data) + print("\nSetting acceleration for all servos via SYNC WRITE...") + controller.broadcast.sram.sync_write_acceleration({s.id: 50 for s in servo_objects}) time.sleep(0.5) - print("Moving all servos to position 2048 simultaneously...") - target_positions = {servo.id: 2048 for servo in servos} - controller.broadcast.sram.sync_write_target_location(target_positions) + print("Moving all servos to center (2048) simultaneously...") + controller.broadcast.sram.sync_write_target_location({s.id: 2048 for s in servo_objects}) time.sleep(2) - print("Moving servos to different positions...") - target_positions = {} - for i, servo in enumerate(servos): - target_positions[servo.id] = 1000 + (i * 500) - controller.broadcast.sram.sync_write_target_location(target_positions) + print("Moving servos to staggered positions...") + # Spread servos evenly between 1000 and 3000, clamped to valid range + n = len(servo_objects) + targets = {} + for i, servo in enumerate(servo_objects): + pos = 1000 + int((i / max(n - 1, 1)) * 2000) if n > 1 else 2048 + targets[servo.id] = max(0, min(4095, pos)) + controller.broadcast.sram.sync_write_target_location(targets) time.sleep(2) - print("Returning to center position...") - target_positions = {servo.id: 2048 for servo in servos} - controller.broadcast.sram.sync_write_target_location(target_positions) + print("Returning all servos to center...") + controller.broadcast.sram.sync_write_target_location({s.id: 2048 for s in servo_objects}) time.sleep(2) - for servo in servos: - servo.sram.torque_disable() + # Disable torque on all servos using SYNC WRITE + controller.broadcast.sram.sync_write_torque_switch({s.id: 0 for s in servo_objects}) print("\nDone!") - -except KeyboardInterrupt: - print("\nStopped.") -finally: - controller.close() diff --git a/examples/18_network_port.py b/examples/18_network_port.py index 20e848d..accb527 100644 --- a/examples/18_network_port.py +++ b/examples/18_network_port.py @@ -1,16 +1,15 @@ """ -Scan for all servos on the bus and display their information. -Supports either a direct serial device (e.g. /dev/ttyUSB0) or a network -socket URL (e.g. socket://:). +Connect to an ST3215 controller over a network socket instead of a direct +serial port. Useful when the servo controller is on a remote machine. Env vars: -- ST3215_URL : full pyserial URL (takes precedence) -- ST3215_HOST : IP/hostname for socket connection (default: 100.122.96.71) -- ST3215_PORT : TCP port for socket connection (default: 2000) -- ST3215_DEV : fallback serial device (default: /dev/ttyUSB0) + ST3215_URL Full pyserial URL (takes precedence over host/port) + ST3215_HOST IP or hostname of the remote machine (default: st3215-host) + ST3215_TCP_PORT TCP port on the remote machine (default: 2000) -Run this on the host connected to the ST3215 driver board (make sure you have socat installed and set up udev rules for /dev/ttyACM0): - `stty -F /dev/ttyACM0 1000000 raw -echo && socat -d -d TCP4-LISTEN:2000,bind=0.0.0.0,reuseaddr,fork,nodelay FILE:/dev/ttyACM0,b1000000,raw,echo=0` +On the remote host (requires socat and the servo connected to /dev/ttyACM0): + stty -F /dev/ttyACM0 1000000 raw -echo + socat TCP4-LISTEN:2000,bind=0.0.0.0,reuseaddr,fork,nodelay FILE:/dev/ttyACM0,b1000000,raw,echo=0 """ import os @@ -22,21 +21,23 @@ url_env = os.environ.get("ST3215_URL") host = os.environ.get("ST3215_HOST", "st3215-host") -port = os.environ.get("ST3215_PORT", "2000") +# ST3215_TCP_PORT is the TCP port on the remote machine, not the local serial device +tcp_port = os.environ.get("ST3215_TCP_PORT", "2000") if url_env: - TARGET_URL = url_env -elif host and port: - TARGET_URL = f"socket://{host}:{port}" + target_url = url_env +elif host: + target_url = f"socket://{host}:{tcp_port}" else: + print("Set ST3215_URL or ST3215_HOST to connect.") sys.exit(1) -print(f"Connecting to ST3215 via: {TARGET_URL}") +print(f"Connecting to ST3215 via: {target_url}") -ser = serial.serial_for_url(TARGET_URL, timeout=0.02) -controller = ST3215(ser=ser, read_timeout=0.02) +# Network latency is higher than USB, so use a larger timeout +ser = serial.serial_for_url(target_url, timeout=0.02) -try: +with ST3215(ser=ser, read_timeout=0.02) as controller: print("Scanning for servos...\n") servos = controller.list_servos(timeout=0.02) @@ -51,20 +52,20 @@ for servo_id in servos: servo = controller.wrap_servo(servo_id) + voltage_raw = servo.sram.read_current_voltage() + voltage_str = f"{voltage_raw / 10:.1f}V" if voltage_raw is not None else "N/A" + mode = servo.eeprom.read_operating_mode() + print(f"\nServo ID: {servo_id}") print( f" Firmware: v{servo.eeprom.read_firmware_major_version()}.{servo.eeprom.read_firmware_minor_version()}" ) print(f" Position: {servo.sram.read_current_location()}") print(f" Temperature: {servo.sram.read_current_temperature()}°C") - print(f" Voltage: {servo.sram.read_current_voltage() / 10:.1f}V") + print(f" Voltage: {voltage_str}") print( f" Min/Max Angle: {servo.eeprom.read_min_angle_limit()} / {servo.eeprom.read_max_angle_limit()}" ) - - mode = servo.eeprom.read_operating_mode() print(f" Operating Mode: {mode_names.get(mode, 'Unknown')}") print("\n" + "=" * 80) -finally: - controller.close() diff --git a/src/python_st3215/__init__.py b/src/python_st3215/__init__.py index 77e23a5..69825f5 100644 --- a/src/python_st3215/__init__.py +++ b/src/python_st3215/__init__.py @@ -11,6 +11,7 @@ ServoStatusError, ST3215Error, ) +from .instructions import Instruction from .servo import Servo from .st3215 import ST3215 @@ -19,6 +20,8 @@ __all__ = [ "__version__", "ST3215", + "Instruction", + "Servo", "ST3215Error", "ServoNotRespondingError", "InvalidInstructionError", @@ -30,5 +33,4 @@ "BroadcastOperationError", "InvalidIDError", "ServoStatusError", - "Servo", ] diff --git a/src/python_st3215/decorators.py b/src/python_st3215/decorators.py index 170f0a8..8c86b6c 100644 --- a/src/python_st3215/decorators.py +++ b/src/python_st3215/decorators.py @@ -53,7 +53,7 @@ def encode_signed_word(value: int) -> tuple[int, int]: Tuple of (low_byte, high_byte) """ if value < -32767 or value > 32767: - raise ValueError + raise ValueError(f"encode_signed_word: value {value} out of range [-32767, 32767]") low, high = abs(value) & 0xFF, (abs(value) >> 8) & 0x7F if value < 0: high |= 0x80 diff --git a/src/python_st3215/registers.py b/src/python_st3215/registers.py index 86332cd..f9c9fb2 100644 --- a/src/python_st3215/registers.py +++ b/src/python_st3215/registers.py @@ -122,11 +122,11 @@ def read_baudrate(self) -> Optional[int]: 0 = 1,000,000 baud 1 = 500,000 baud 2 = 250,000 baud - 3 = 115,200 baud - 4 = 57,600 baud - 5 = 38,400 baud - 6 = 19,200 baud - 7 = 9,600 baud + 3 = 128,000 baud + 4 = 115,200 baud + 5 = 76,800 baud + 6 = 57,600 baud + 7 = 38,400 baud Returns None if read fails """ return read_byte(self.servo, 0x06) @@ -230,7 +230,7 @@ def read_max_angle_limit(self) -> Optional[int]: Read the maximum angle limit. Returns: - int: Maximum position in steps (1-4095), or None if read fails. + int: Maximum position in steps (0-4095), or None if read fails. Must be greater than min_angle_limit. """ return read_word(self.servo, 0x0B) @@ -243,7 +243,7 @@ def write_max_angle_limit( Set the maximum angle limit. Args: - value (int): Maximum position in steps (1-4095). + value (int): Maximum position in steps (0-4095). Must be greater than min_angle_limit. reg (bool): If True, use registered write mode. @@ -919,6 +919,26 @@ def write_torque_switch( """ return write_byte(self.servo, 0x28, value, reg) + def sync_write_torque_switch(self, servo_data: dict[int, int]) -> None: + """ + SYNC WRITE torque switch to multiple servos. + + Args: + servo_data: Dictionary mapping servo_id to torque switch value + (0 = off, 1 = on, 128 = correct position to 2048). + + Returns: + None (broadcast operation, no response) + """ + if self.servo.id != 254: + raise BroadcastOperationError( + "sync_write_torque_switch can only be called on the broadcast servo (ID 254)." + ) + formatted_data: dict[int, list[int]] = {} + for servo_id, value in servo_data.items(): + formatted_data[servo_id] = [value & 0xFF] + return self.servo._sync_write(0x28, 1, formatted_data) # type: ignore + def torque_enable(self, reg: bool = False) -> dict[str, object] | None: """ Enable servo torque (motor power on). @@ -1054,7 +1074,7 @@ def read_runtime(self) -> Optional[int]: Read the runtime setting (PWM open-loop mode). Returns: - int: Runtime value (0-1000). BIT10 indicates direction. + int: Runtime value (0-2047). BIT10 indicates direction. Used for PWM open-loop speed control. Returns None if read fails. """ @@ -1066,7 +1086,7 @@ def write_runtime(self, value: int, reg: bool = False) -> dict[str, object] | No Set the runtime (PWM open-loop mode). Args: - value (int): Runtime value (0-1000). BIT10 indicates direction. + value (int): Runtime value (0-2047). BIT10 indicates direction. reg (bool): If True, use registered write mode. Returns: @@ -1225,9 +1245,9 @@ def read_current_location(self) -> Optional[int]: Read the current position (feedback). Returns: - int: Current position in steps. Returns None if read fails. + int: Current position in steps (signed). Returns None if read fails. """ - return read_word(self.servo, 0x38) + return read_word(self.servo, 0x38, signed=True) def sync_read_current_location( self, servo_ids: list[int] @@ -1252,7 +1272,7 @@ def sync_read_current_location( for servo_id, response in responses.items(): if response and isinstance(response, dict) and response.get("parameters"): data: bytes = response["parameters"] - results[servo_id] = data[0] | (data[1] << 8) + results[servo_id] = decode_signed_word(data[0] | (data[1] << 8)) else: results[servo_id] = None return results @@ -1286,12 +1306,8 @@ def sync_read_current_speed(self, servo_ids: list[int]) -> dict[int, Optional[in results: dict[int, int | None] = {} for servo_id, response in responses.items(): if response and isinstance(response, dict) and response.get("parameters"): - data: list[int] | bytes = response["parameters"] - if isinstance(data, (bytes, bytearray)): - raw = data[0] | (data[1] << 8) - else: - raw = data[0] | (data[1] << 8) - results[servo_id] = decode_signed_word(raw) + data: bytes = response["parameters"] + results[servo_id] = decode_signed_word(data[0] | (data[1] << 8)) else: results[servo_id] = None return results diff --git a/src/python_st3215/st3215.py b/src/python_st3215/st3215.py index 873ad89..f3351b4 100644 --- a/src/python_st3215/st3215.py +++ b/src/python_st3215/st3215.py @@ -444,29 +444,25 @@ def list_servos( found = [] total = end_id - start_id + 1 - old_retry_count = self.retry_count - self.retry_count = 1 - try: - for i, servo_id in enumerate(range(start_id, end_id + 1)): - if progress_callback: - progress_callback(i + 1, total) - try: - packet = self.send_instruction(servo_id, Instruction.PING) - response = self.read_response(packet, timeout=timeout) - if response: - parsed = self.parse_response(response) - if parsed and parsed.get("error") == 0: - found.append(servo_id) - self.logger.info(f"Found servo at ID {servo_id}") - except ( - serial.SerialException, - ChecksumError, - CommunicationTimeoutError, - ): - continue - finally: - self.retry_count = old_retry_count + for i, servo_id in enumerate(range(start_id, end_id + 1)): + if progress_callback: + progress_callback(i + 1, total) + try: + packet = self.send_instruction(servo_id, Instruction.PING) + response = self.read_response(packet, timeout=timeout) + if response: + parsed = self.parse_response(response) + if parsed and parsed.get("error") == 0: + found.append(servo_id) + self.logger.info(f"Found servo at ID {servo_id}") + except ( + serial.SerialException, + ChecksumError, + CommunicationTimeoutError, + ): + continue + self.logger.info(f"Scan complete. Found {len(found)} servo(s): {found}") return found @@ -504,18 +500,22 @@ def _sync_read( if rx is None: return responses b = 0 - while b+3 < len(rx) and rx[b]==0xFF and rx[b+1]==0xFF: - servo_id = rx[b+2] - paramlen = rx[b+3] + while b + 3 < len(rx) and rx[b] == 0xFF and rx[b + 1] == 0xFF: + servo_id = rx[b + 2] + paramlen = rx[b + 3] pkglen = paramlen + 4 if paramlen != data_length + 2: self.logger.warning( - f"Servo {servo_id}: no valid response for SYNC READ" + f"Servo {servo_id}: unexpected paramlen {paramlen} in SYNC READ, skipping" + ) + b += 4 + continue + if b + pkglen <= len(rx): + responses[servo_id] = self.parse_response(rx[b : b + pkglen]) + else: + self.logger.warning( + f"Servo {servo_id}: truncated packet in SYNC READ response, dropping" ) - responses[servo_id] = None - break - if b+pkglen <= len(rx): - responses[servo_id] = self.parse_response(rx[b:b+pkglen]) b += pkglen return responses From d84746a54da5d7ae3a42e6e9fff891f47a0cec89 Mon Sep 17 00:00:00 2001 From: Alessio D'Ambrosio Date: Wed, 13 May 2026 09:22:05 +0200 Subject: [PATCH 3/4] add unit tests, update workflows and fix mypy warnings Signed-off-by: Alessio D'Ambrosio --- .github/workflows/compile.yml | 10 +- .github/workflows/release.yml | 19 ++ .github/workflows/test.yml | 40 +++ .github/workflows/type-check.yml | 10 +- README.md | 3 +- pyproject.toml | 8 +- src/python_st3215/errors.py | 3 + src/python_st3215/servo.py | 4 +- tests/__init__.py | 0 tests/test_st3215.py | 469 +++++++++++++++++++++++++++++++ 10 files changed, 553 insertions(+), 13 deletions(-) create mode 100644 .github/workflows/test.yml create mode 100644 tests/__init__.py create mode 100644 tests/test_st3215.py diff --git a/.github/workflows/compile.yml b/.github/workflows/compile.yml index b4699ba..a7d980e 100644 --- a/.github/workflows/compile.yml +++ b/.github/workflows/compile.yml @@ -16,14 +16,18 @@ on: jobs: compile: - name: Library Compilation + name: Library Compilation (Python ${{ matrix.python-version }}) runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] steps: - uses: actions/checkout@v4 - - name: Set up Python + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: - python-version: "3.x" + python-version: ${{ matrix.python-version }} - name: Install build tools run: | pip install --upgrade pip diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index fec1981..62eaf51 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -5,8 +5,27 @@ on: branches: [main] jobs: + test: + name: Run tests before release + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.x" + - name: Install dependencies + run: | + pip install --upgrade pip + pip install -r requirements.txt + pip install pytest + pip install -e . + - name: Run tests + run: pytest tests/ -v + build: name: Build distribution + needs: test runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..e993e34 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,40 @@ +name: Tests + +on: + push: + paths: + - "src/**/*.py" + - "tests/**/*.py" + - "pyproject.toml" + - "requirements.txt" + - ".github/workflows/test.yml" + pull_request: + paths: + - "src/**/*.py" + - "tests/**/*.py" + - "pyproject.toml" + - "requirements.txt" + - ".github/workflows/test.yml" + +jobs: + test: + name: Unit Tests (Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + pip install --upgrade pip + pip install -r requirements.txt + pip install pytest + pip install -e . + - name: Run tests + run: pytest tests/ -v diff --git a/.github/workflows/type-check.yml b/.github/workflows/type-check.yml index c21a384..664e304 100644 --- a/.github/workflows/type-check.yml +++ b/.github/workflows/type-check.yml @@ -16,14 +16,18 @@ on: jobs: type-check: - name: Type Checking with mypy + name: Type Checking with mypy (Python ${{ matrix.python-version }}) runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] steps: - uses: actions/checkout@v4 - - name: Set up Python + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: - python-version: "3.x" + python-version: ${{ matrix.python-version }} - name: Install dependencies run: | pip install --upgrade pip diff --git a/README.md b/README.md index 01a3490..fab5d04 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,5 @@ # Python-ST3215 -[![wakatime](https://wakatime.com/badge/user/b67f4ae3-1ee8-40ea-b8d8-196354064008/project/2d953882-4dfb-4c53-8a18-c698dc275f82.svg)](https://wakatime.com/badge/user/b67f4ae3-1ee8-40ea-b8d8-196354064008/project/2d953882-4dfb-4c53-8a18-c698dc275f82) [![PyPI Version](https://img.shields.io/pypi/v/python-st3215)](https://pypi.org/project/python-st3215/) ![Python Versions](https://img.shields.io/pypi/pyversions/python-st3215) ![License](https://img.shields.io/github/license/alessiodam/python-st3215) @@ -61,7 +60,7 @@ Hover over classes and functions in your editor to see type hints, parameter des | Brand | SKU | Product | |-----------|-------|------------------------------------------------------------------------------| | Waveshare | 22414 | [ST3215 Series Serial Bus Servo](https://www.waveshare.com/st3215-servo.htm) | -| | | USB to RS485 Serial Converter | +| Waveshare | 25514 | [Bus Servo Adapter (A)](https://www.waveshare.com/bus-servo-adapter-a.htm) | ## Memory Table diff --git a/pyproject.toml b/pyproject.toml index 275967d..1181a4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ authors = [ ] description = "A Python library for interfacing with the ST3215 smart servo." readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.9" dependencies = [ "pyserial >= 3.4", ] @@ -21,7 +21,6 @@ classifiers = [ "Topic :: System :: Hardware", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -45,8 +44,11 @@ build-backend = "hatchling.build" [tool.hatch.version] path = "src/python_st3215/__init__.py" +[tool.pytest.ini_options] +testpaths = ["tests"] + [tool.mypy] -python_version = "3.13" +python_version = "3.9" packages = ["python_st3215"] exclude = ["examples"] strict = true diff --git a/src/python_st3215/errors.py b/src/python_st3215/errors.py index 938ce51..8cc247a 100644 --- a/src/python_st3215/errors.py +++ b/src/python_st3215/errors.py @@ -1,3 +1,6 @@ +from __future__ import annotations + + class ST3215Error(Exception): """Base exception for all ST3215-related errors.""" diff --git a/src/python_st3215/servo.py b/src/python_st3215/servo.py index 5cf06eb..2ce5212 100644 --- a/src/python_st3215/servo.py +++ b/src/python_st3215/servo.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Sequence, cast +from typing import TYPE_CHECKING, Any, Optional, Sequence, cast if TYPE_CHECKING: from .st3215 import ST3215 @@ -60,7 +60,7 @@ def send( def ping(self) -> dict[str, object] | None: """Send PING command to the servo to check if it is responsive.""" - return cast(dict[str, object] | None, self.controller.ping(self.id)) + return cast(Optional[dict[str, object]], self.controller.ping(self.id)) def action(self) -> dict[str, object] | None: """Send ACTION command to the servo to execute all registered commands.""" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_st3215.py b/tests/test_st3215.py new file mode 100644 index 0000000..068121a --- /dev/null +++ b/tests/test_st3215.py @@ -0,0 +1,469 @@ +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from python_st3215.decorators import ( + decode_signed_word, + encode_signed_word, + encode_unsigned_word, +) +from python_st3215.errors import ( + BroadcastOperationError, + ChecksumError, + CommunicationTimeoutError, + InvalidIDError, + InvalidInstructionError, + ServoNotRespondingError, + ServoStatusError, +) +from python_st3215.instructions import Instruction +from python_st3215.st3215 import ST3215 + + +def _make_ctrl(read_data: bytes = b"") -> tuple[ST3215, MagicMock]: + """Return (controller, mock_serial) with ser.read returning read_data.""" + mock_ser = MagicMock() + mock_ser.is_open = True + mock_ser.timeout = 0.002 + mock_ser.read.return_value = read_data + ctrl = ST3215(ser=mock_ser) + return ctrl, mock_ser + + +def _valid_response(servo_id: int, params: bytes = b"") -> bytes: + """Build a well-formed response packet.""" + length = len(params) + 2 + error = 0 + checksum_base = servo_id + length + error + sum(params) + checksum = (~checksum_base) & 0xFF + return bytes([0xFF, 0xFF, servo_id, length, error]) + params + bytes([checksum]) + + +def _ping_response(servo_id: int) -> bytes: + return _valid_response(servo_id) + + +class TestInit: + def test_raises_without_port_or_ser(self): + with pytest.raises(ValueError): + ST3215() + + def test_accepts_mock_ser(self): + ctrl, _ = _make_ctrl() + assert ctrl.is_connected() + + def test_broadcast_servo_created(self): + ctrl, _ = _make_ctrl() + assert ctrl.broadcast.id == 254 + + def test_close(self): + ctrl, mock_ser = _make_ctrl() + ctrl.close() + mock_ser.close.assert_called_once() + + def test_context_manager(self): + mock_ser = MagicMock() + mock_ser.is_open = True + mock_ser.timeout = 0.002 + mock_ser.read.return_value = b"" + with ST3215(ser=mock_ser) as ctrl: + assert ctrl.is_connected() + mock_ser.close.assert_called() + + def test_is_connected_false_when_closed(self): + ctrl, mock_ser = _make_ctrl() + mock_ser.is_open = False + assert not ctrl.is_connected() + + +class TestBuildPacket: + def test_basic_ping_packet(self): + ctrl, _ = _make_ctrl() + pkt = ctrl.build_packet(1, Instruction.PING) + assert pkt[0:2] == b"\xff\xff" + assert pkt[2] == 1 # servo id + assert pkt[3] == 2 # length = 0 params + 2 + assert pkt[4] == Instruction.PING + + def test_checksum_correctness(self): + ctrl, _ = _make_ctrl() + pkt = ctrl.build_packet(1, Instruction.PING) + checksum_base = pkt[2] + pkt[3] + pkt[4] + expected = (~checksum_base) & 0xFF + assert pkt[-1] == expected + + def test_packet_with_params(self): + ctrl, _ = _make_ctrl() + params = [0x2A, 0x02] + pkt = ctrl.build_packet(5, Instruction.READ, params) + assert pkt[3] == len(params) + 2 + + def test_invalid_id_raises(self): + ctrl, _ = _make_ctrl() + with pytest.raises(InvalidIDError): + ctrl.build_packet(255, Instruction.PING) + + def test_invalid_instruction_raises(self): + ctrl, _ = _make_ctrl() + with pytest.raises(InvalidInstructionError): + ctrl.build_packet(1, 0xFF) + + def test_broadcast_id_allowed(self): + ctrl, _ = _make_ctrl() + pkt = ctrl.build_packet(254, Instruction.SYNC_WRITE, [0x28, 0x01, 0x01, 0x01]) + assert pkt[2] == 254 + + +class TestParseResponse: + def test_valid_response(self): + ctrl, _ = _make_ctrl() + data = _valid_response(1, b"\x05\x0a") + parsed = ctrl.parse_response(data) + assert parsed is not None + assert parsed["id"] == 1 + assert parsed["error"] == 0 + assert parsed["checksum_valid"] is True + assert parsed["parameters"] == b"\x05\x0a" + + def test_too_short_returns_none(self): + ctrl, _ = _make_ctrl() + assert ctrl.parse_response(b"\xff\xff\x01") is None + + def test_checksum_mismatch_raises(self): + ctrl, _ = _make_ctrl() + data = bytearray(_valid_response(1)) + data[-1] ^= 0xFF # corrupt checksum + with pytest.raises(ChecksumError): + ctrl.parse_response(bytes(data)) + + def test_error_status_no_raise_by_default(self): + ctrl, _ = _make_ctrl() + servo_id = 1 + length = 2 + error = 0x04 + checksum = (~(servo_id + length + error)) & 0xFF + data = bytes([0xFF, 0xFF, servo_id, length, error, checksum]) + parsed = ctrl.parse_response(data) + assert parsed is not None + assert parsed["error"] == 0x04 + + def test_error_status_raises_when_flag_set(self): + ctrl, _ = _make_ctrl() + servo_id = 1 + length = 2 + error = 0x04 + checksum = (~(servo_id + length + error)) & 0xFF + data = bytes([0xFF, 0xFF, servo_id, length, error, checksum]) + with pytest.raises(ServoStatusError): + ctrl.parse_response(data, raise_on_error=True) + + def test_no_params_empty_bytes(self): + ctrl, _ = _make_ctrl() + data = _valid_response(3) + parsed = ctrl.parse_response(data) + assert parsed["parameters"] == b"" + + +class TestReadResponse: + def test_strips_sent_packet_echo(self): + sent = b"\xff\xff\x01\x02\x01\xfb" + reply = _valid_response(1) + ctrl, mock_ser = _make_ctrl(read_data=sent + reply) + result = ctrl.read_response(sent) + assert result == reply + + def test_no_echo_returns_raw(self): + reply = _valid_response(1) + ctrl, mock_ser = _make_ctrl(read_data=reply) + sent = b"\xff\xff\x01\x02\x01\xfb" + result = ctrl.read_response(sent) + assert result == reply + + def test_empty_read_returns_none(self): + ctrl, _ = _make_ctrl(read_data=b"") + result = ctrl.read_response(b"\xff\xff\x01\x02\x01\xfb") + assert result is None + + +class TestPing: + def test_ping_returns_parsed_response(self): + servo_id = 1 + reply = _ping_response(servo_id) + ctrl, mock_ser = _make_ctrl(read_data=reply) + result = ctrl.ping(servo_id, use_retry=False) + assert result is not None + assert result["id"] == servo_id + + def test_ping_broadcast_raises(self): + ctrl, _ = _make_ctrl() + with pytest.raises(BroadcastOperationError): + ctrl.ping(254) + + def test_ping_no_response_returns_none(self): + ctrl, _ = _make_ctrl(read_data=b"") + result = ctrl.ping(1, use_retry=False) + assert result is None + + def test_ping_with_retry_success(self): + servo_id = 2 + reply = _ping_response(servo_id) + ctrl, mock_ser = _make_ctrl() + mock_ser.read.side_effect = [b"", b"", reply] + result = ctrl.ping(servo_id, use_retry=True) + assert result is not None + + +class TestWrapServo: + def test_wrap_servo_with_verify(self): + servo_id = 3 + reply = _ping_response(servo_id) + ctrl, mock_ser = _make_ctrl(read_data=reply) + servo = ctrl.wrap_servo(servo_id, verify=True) + assert servo.id == servo_id + + def test_wrap_servo_no_response_raises(self): + ctrl, _ = _make_ctrl(read_data=b"") + with pytest.raises(ServoNotRespondingError): + ctrl.wrap_servo(5, verify=True) + + def test_wrap_servo_no_verify(self): + ctrl, _ = _make_ctrl() + servo = ctrl.wrap_servo(7, verify=False) + assert servo.id == 7 + + def test_wrap_servo_broadcast_raises(self): + ctrl, _ = _make_ctrl() + with pytest.raises(BroadcastOperationError): + ctrl.wrap_servo(254) + + +class TestListServos: + def test_finds_responding_servo(self): + servo_id = 1 + reply = _ping_response(servo_id) + ctrl, mock_ser = _make_ctrl() + + def read_side_effect(size): + last_write = mock_ser.write.call_args[0][0] + if last_write[2] == servo_id: + return reply + return b"" + + mock_ser.read.side_effect = read_side_effect + found = ctrl.list_servos(start_id=0, end_id=5) + assert servo_id in found + + def test_invalid_start_id_raises(self): + ctrl, _ = _make_ctrl() + with pytest.raises(InvalidIDError): + ctrl.list_servos(start_id=254) + + def test_invalid_end_id_raises(self): + ctrl, _ = _make_ctrl() + with pytest.raises(InvalidIDError): + ctrl.list_servos(end_id=254) + + def test_start_greater_than_end_raises(self): + ctrl, _ = _make_ctrl() + with pytest.raises(ValueError): + ctrl.list_servos(start_id=10, end_id=5) + + def test_progress_callback_called(self): + ctrl, mock_ser = _make_ctrl(read_data=b"") + calls = [] + ctrl.list_servos( + start_id=0, end_id=2, progress_callback=lambda c, t: calls.append((c, t)) + ) + assert len(calls) == 3 + assert calls[0] == (1, 3) + assert calls[-1] == (3, 3) + + +class TestSyncWrite: + def test_sync_write_sends_instruction(self): + ctrl, mock_ser = _make_ctrl() + ctrl._sync_write(0x28, 1, {1: [0x01], 2: [0x01]}) + mock_ser.write.assert_called_once() + pkt = mock_ser.write.call_args[0][0] + assert pkt[4] == Instruction.SYNC_WRITE + + def test_sync_write_wrong_data_length_raises(self): + ctrl, _ = _make_ctrl() + with pytest.raises(ValueError): + ctrl._sync_write(0x28, 2, {1: [0x01]}) # expects 2 bytes, got 1 + + +class TestEncodeDecode: + @pytest.mark.parametrize("val", [0, 1, 100, 32767, -1, -100, -32767]) + def test_signed_word_roundtrip(self, val): + low, high = encode_signed_word(val) + raw = low | (high << 8) + assert decode_signed_word(raw) == val + + def test_encode_signed_word_out_of_range(self): + with pytest.raises(ValueError): + encode_signed_word(32768) + with pytest.raises(ValueError): + encode_signed_word(-32768) + + @pytest.mark.parametrize("val", [0, 1, 255, 1000, 65535]) + def test_unsigned_word_roundtrip(self, val): + low, high = encode_unsigned_word(val) + assert (low | (high << 8)) == val + + def test_decode_negative(self): + raw = 0x8064 # sign bit set, magnitude 100 + assert decode_signed_word(raw) == -100 + + def test_decode_positive(self): + assert decode_signed_word(0x0064) == 100 + + +class TestValidateValueRange: + def test_write_id_in_range(self): + servo_id = 1 + reply = _valid_response(servo_id) + ctrl, _ = _make_ctrl(read_data=reply) + servo = ctrl.wrap_servo(servo_id, verify=False) + ctrl.ser.read.return_value = reply + servo.eeprom.write_id(10) + + def test_write_id_out_of_range_raises(self): + ctrl, _ = _make_ctrl() + servo = ctrl.wrap_servo(1, verify=False) + with pytest.raises(ValueError): + servo.eeprom.write_id(254) + + def test_write_baudrate_out_of_range_raises(self): + ctrl, _ = _make_ctrl() + servo = ctrl.wrap_servo(1, verify=False) + with pytest.raises(ValueError): + servo.eeprom.write_baudrate(8) + + +class TestValidateServoId: + def test_broadcast_ping_raises(self): + ctrl, _ = _make_ctrl() + with pytest.raises(BroadcastOperationError): + ctrl.ping(254) + + def test_valid_id_passes(self): + ctrl, _ = _make_ctrl(read_data=_ping_response(1)) + result = ctrl.ping(1, use_retry=False) + assert result is not None + + +class TestServoRegisters: + def _ctrl_with_reply(self, servo_id: int, params: bytes): + reply = _valid_response(servo_id, params) + ctrl, mock_ser = _make_ctrl(read_data=reply) + return ctrl, mock_ser + + def test_read_id(self): + ctrl, _ = self._ctrl_with_reply(1, bytes([0x01])) + servo = ctrl.wrap_servo(1, verify=False) + assert servo.eeprom.read_id() == 1 + + def test_read_torque_switch(self): + ctrl, _ = self._ctrl_with_reply(1, bytes([0x01])) + servo = ctrl.wrap_servo(1, verify=False) + assert servo.sram.read_torque_switch() == 1 + + def test_is_moving_true(self): + ctrl, _ = self._ctrl_with_reply(1, bytes([0x01])) + servo = ctrl.wrap_servo(1, verify=False) + assert servo.sram.is_moving() is True + + def test_is_moving_false(self): + ctrl, _ = self._ctrl_with_reply(1, bytes([0x00])) + servo = ctrl.wrap_servo(1, verify=False) + assert servo.sram.is_moving() is False + + def test_read_target_location_positive(self): + val = 500 + low, high = encode_signed_word(val) + ctrl, _ = self._ctrl_with_reply(1, bytes([low, high])) + servo = ctrl.wrap_servo(1, verify=False) + assert servo.sram.read_target_location() == val + + def test_read_target_location_negative(self): + val = -300 + low, high = encode_signed_word(val) + ctrl, _ = self._ctrl_with_reply(1, bytes([low, high])) + servo = ctrl.wrap_servo(1, verify=False) + assert servo.sram.read_target_location() == val + + def test_write_target_location_out_of_range_raises(self): + ctrl, _ = _make_ctrl() + servo = ctrl.wrap_servo(1, verify=False) + with pytest.raises(ValueError): + servo.sram.write_target_location(99999) + + +class TestBroadcastSync: + def test_sync_write_torque_from_non_broadcast_raises(self): + ctrl, _ = _make_ctrl() + servo = ctrl.wrap_servo(1, verify=False) + with pytest.raises(BroadcastOperationError): + servo.sram.sync_write_torque_switch({1: 1}) + + def test_sync_write_torque_from_broadcast(self): + ctrl, mock_ser = _make_ctrl() + ctrl.broadcast.sram.sync_write_torque_switch({1: 1, 2: 0}) + mock_ser.write.assert_called_once() + + def test_sync_read_current_location_from_non_broadcast_raises(self): + ctrl, _ = _make_ctrl() + servo = ctrl.wrap_servo(1, verify=False) + with pytest.raises(BroadcastOperationError): + servo.sram.sync_read_current_location([1, 2]) + + def test_sync_read_current_location_no_response(self): + ctrl, _ = _make_ctrl(read_data=b"") + result = ctrl.broadcast.sram.sync_read_current_location([1, 2]) + assert result == {1: None, 2: None} + + +class TestRetryOperation: + def test_returns_on_first_success(self): + ctrl, _ = _make_ctrl() + call_count = 0 + + def op(): + nonlocal call_count + call_count += 1 + return {"ok": True} + + result = ctrl._retry_operation(op, "test") + assert result == {"ok": True} + assert call_count == 1 + + def test_retries_on_none(self): + ctrl, _ = _make_ctrl() + ctrl.retry_delay = 0 + ctrl.retry_count = 3 + call_count = 0 + + def op(): + nonlocal call_count + call_count += 1 + if call_count < 3: + return None + return {"ok": True} + + result = ctrl._retry_operation(op, "test") + assert result == {"ok": True} + assert call_count == 3 + + def test_raises_after_all_retries(self): + ctrl, _ = _make_ctrl() + ctrl.retry_delay = 0 + ctrl.retry_count = 2 + + def op(): + raise ChecksumError("bad checksum") + + with pytest.raises(CommunicationTimeoutError): + ctrl._retry_operation(op, "test") From b9ac5c058a47bd31d15037403ddb977fa07411e2 Mon Sep 17 00:00:00 2001 From: Alessio D'Ambrosio Date: Wed, 13 May 2026 09:25:25 +0200 Subject: [PATCH 4/4] update release workflow Signed-off-by: Alessio D'Ambrosio --- .github/workflows/release.yml | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 62eaf51..fe4056a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -2,7 +2,7 @@ name: Build and Publish to PyPi on: push: - branches: [main] + tags: ["*.*.*"] jobs: test: @@ -36,11 +36,7 @@ jobs: with: python-version: "3.x" - name: Install pypa/build - run: >- - python3 -m - pip install - build - --user + run: python3 -m pip install build --user - name: Build a binary wheel and a source tarball run: python3 -m build - name: Store the distribution packages @@ -49,11 +45,26 @@ jobs: name: python-package-distributions path: dist/ + github-release: + name: Upload wheels to GitHub Release + needs: build + runs-on: ubuntu-latest + permissions: + contents: write + steps: + - name: Download all the dists + uses: actions/download-artifact@v4 + with: + name: python-package-distributions + path: dist/ + - name: Upload to GitHub Release + uses: softprops/action-gh-release@v2 + with: + files: dist/* + publish-to-pypi: name: Publish Python distribution to PyPI - if: startsWith(github.ref, 'refs/tags/') - needs: - - build + needs: build runs-on: ubuntu-latest environment: name: pypi @@ -66,5 +77,5 @@ jobs: with: name: python-package-distributions path: dist/ - - name: Publish distribution 📦 to PyPI + - name: Publish distribution to PyPI uses: pypa/gh-action-pypi-publish@release/v1