From 27887dd88c61a47a1602130cd2aeb26cef576a8d Mon Sep 17 00:00:00 2001 From: realteck-ky <32591276+realteck-ky@users.noreply.github.com> Date: Tue, 4 Jun 2024 06:55:27 +0900 Subject: [PATCH 1/5] Fix install error on Windows --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9031f20..31b38cd 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ def read_requirements(): name='gs2d', version='0.0.4', description='gs2d: The Library for Generic Serial-bus Servo Driver kr-sac001 for Python', - long_description=open('README.md').read(), + long_description=open('README.md', encoding="utf-8").read(), long_description_content_type='text/markdown', author='Karakuri Products', author_email='gs2d@krkrpro.com', From 393ebe8f8352e41ed17ba9d543062b07335f973c Mon Sep 17 00:00:00 2001 From: realteck-ky <32591276+realteck-ky@users.noreply.github.com> Date: Tue, 4 Jun 2024 06:59:46 +0900 Subject: [PATCH 2/5] Baseline of KONDO copied from Futaba --- gs2d/Kondo.py | 1423 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1423 insertions(+) create mode 100644 gs2d/Kondo.py diff --git a/gs2d/Kondo.py b/gs2d/Kondo.py new file mode 100644 index 0000000..ff5dd3e --- /dev/null +++ b/gs2d/Kondo.py @@ -0,0 +1,1423 @@ +import time +import logging + +from .ICommandHandler import ICommandHandler +from .ISerialInterface import ISerialInterface +from .Driver import Driver +from .Util import ReceiveDataTimeoutException, NotSupportException, BadInputParametersException, WrongCheckSumException +from .Util import InvalidResponseDataException +from .Util import get_printable_hex + +# ロガー +logger = logging.getLogger(__name__) + + +class Futaba(Driver): + """Futabaのシリアルサーボクラス + + """ + + # アドレス空間 + ADDR_MODEL_NUMBER_L = 0 # 0x00 + ADDR_FIRMWARE_VERSION = 2 # 0x02 + ADDR_SERVO_ID = 4 # 0x04 + ADDR_REVERSE = 5 # 0x05 + ADDR_BAUD_RATE = 6 # 0x06 + ADDR_RETURN_DELAY = 7 # 0x07 + ADDR_CW_ANGLE_LIMIT_L = 8 # 0x08 + ADDR_CCW_ANGLE_LIMIT_L = 10 # 0x0A + ADDR_TEMPERATURE_LIMIT_L = 14 # 0x0E + ADDR_TORQUE_IN_SILENCE = 22 # 0x16 + ADDR_WARM_UP_TIME = 23 # 0x17 + ADDR_CW_COMPLIANCE_MARGIN = 24 # 0x18 + ADDR_CCW_COMPLIANCE_MARGIN = 25 # 0x19 + ADDR_CW_COMPLIANCE_SLOPE = 26 # 0x1A + ADDR_CCW_COMPLIANCE_SLOPE = 27 # 0x1B + ADDR_PUNCH_L = 28 # 0x1C + + ADDR_GOAL_POSITION_L = 30 # 0x1E + ADDR_GOAL_TIME_L = 32 # 0x20 + ADDR_MAX_TORQUE = 35 # 0x23 + ADDR_TORQUE_ENABLE = 36 # 0x24 + ADDR_PID_COEFFICIENT = 38 # 0x26 + ADDR_PRESENT_POSITION_L = 42 # 0x2A + ADDR_PRESENT_TIME_L = 44 # 0x2C + ADDR_PRESENT_SPEED_L = 46 # 0x2E + ADDR_PRESENT_CURRENT_L = 48 # 0x30 + ADDR_TEMPERATURE_L = 50 # 0x32 + ADDR_VOLTAGE_L = 52 # 0x34 + + ADDR_WRITE_FLASH_ROM = 255 # 0xFF + ADDR_RESET_MEMORY = 255 # 0xFF + + # フラグ + FLAG4_RESET_MEMORY_MAP = 16 # 0x10 + FLAG30_NO_RETURN_PACKET = 0 + FLAG30_MEM_MAP_ACK = 1 + FLAG30_MEM_MAP_00_29 = 3 + FLAG30_MEM_MAP_30_59 = 5 + FLAG30_MEM_MAP_20_29 = 7 + FLAG30_MEM_MAP_42_59 = 9 + FLAG30_MEM_MAP_30_41 = 11 + FLAG30_MEM_MAP_60_127 = 13 + FLAG30_MEM_MAP_SELECT = 15 + + PACKET_DATA_INDEX = 7 + + # 通信速度のIDと実際の設定値 + BAUD_RATE_INDEX_9600 = 0x00 + BAUD_RATE_INDEX_14400 = 0x01 + BAUD_RATE_INDEX_19200 = 0x02 + BAUD_RATE_INDEX_28800 = 0x03 + BAUD_RATE_INDEX_38400 = 0x04 + BAUD_RATE_INDEX_57600 = 0x05 + BAUD_RATE_INDEX_76800 = 0x06 + BAUD_RATE_INDEX_115200 = 0x07 + BAUD_RATE_INDEX_153600 = 0x08 + BAUD_RATE_INDEX_230400 = 0x09 + + def __init__(self, serial_interface: ISerialInterface, command_handler_class: ICommandHandler = None): + """初期化 + """ + + super(Futaba, self).__init__(serial_interface, command_handler_class) + + def is_complete_response(self, response_data): + """レスポンスデータをすべて受信できたかチェック""" + # print('#########', response_data) + + # Header, ID, Flags, Address, Length, Count, Data, Sum で7バイトは最低限ある + if len(response_data) < 6: + return False + else: + # カウント取得 + count = response_data[5] + # print('####', count, len(response_data)) + return len(response_data) == (8 + count) + + @staticmethod + def __get_checksum(data): + """チェックサムを生成 + + :param data: + :return: + """ + + checksum = 0 + for i in range(2, len(data)): + checksum ^= data[i] + return checksum + + @staticmethod + def __check_sid(sid): + """Servo IDのレンジをチェック + + :param sid: + :return: + """ + + if sid < 1 or sid > 127: + raise BadInputParametersException('sid: %d がレンジ外です。1から127のIDを設定してください。' % sid) + + def __generate_command(self, sid, addr, data=None, flag=0, count=1, length=None): + """コマンド生成 + + :param sid: Servo ID + :param addr: + :param data: + :param flag: + :param count: + :param length: + :return: + """ + + # Header: パケットの先頭を表します。ショートパケットではFAAFに設定します。 + # ID: サーボのIDです。1~127(01H~7FH)までの値が使用できます。 + # ID:255を指定すると、全IDのサーボへの共通指令になります(リターンデータは取れません)。 + # Flag: サーボからのリターンデータ取得やデータ書き込み時の設定をします + # Address: メモリーマップ上のアドレスを指定します。 + # このアドレスから「Length」に指定した長さ分のデータをメモリーマップに書き込みます。 + # Length: データ1ブロックの長さを指定します。ショートパケットでは Dataのバイト数になります。 + # Count: サーボの数を表します。ショートパケットでメモリーマップに書き込む時は 1 に設定します。 + # Data: メモリーマップに書き込むデータです + # Checksum: 送信データの確認用のチェックサムで、パケットのIDからDataの末尾までを1バイトずつ + # XORした値を指定します。 + + command = [] + + # Header + command.extend([0xFA, 0xAF]) + + # ID + command.append(sid) + + # Flag + command.append(flag) + + # Address + command.append(addr) + + # Length + if length is not None: + command.append(length) + elif data is not None: + command.append(len(data)) + else: + command.append(0) + + # Count + command.append(count) + + # Data + if data is not None and len(data) > 0: + command.extend(data) + + # Checksum + command.append(self.__get_checksum(command)) + + return command + + def __generate_burst_command(self, addr, length, vid_data_dict): + """バーストコマンド生成 + + :param addr: + :param length: + :param vid_data_dict: + :return: + """ + + # Header: パケットの先頭を表します。ショートパケットではFAAFに設定します。 + # ID: 常に00 + # Flag: 常に00 + # Address: メモリーマップ上のアドレスを指定します。 + # このアドレスから「Length」に指定した長さ分のデータを指定した複数サーボのメモリーマップに書き込みます。 + # Length: サーボ1つ分のデータ (VIDData) のバイト数を指定します。 + # Count: データを送信する対象となるサーボの数を表します。この数分 VID と Data を送信します。 + # VID: データを送信する個々のサーボの ID を表します。VID と Data が一組でサーボの数分のデータを送信します。 + # Data: メモリーマップに書き込むサーボ一つ分のデータです。VID と Data が一組でサーボの数分のデータを送信します。 + # Sum: 送信データの確認用のチェックサムで、パケットのIDからDataの末尾までを1バイトずつ + # XORした値を指定します。 + + command = [] + + # Header + command.extend([0xFA, 0xAF]) + + # ID + command.append(0) + + # Flag + command.append(0) + + # Address + command.append(addr) + + # Length + command.append(length) + + # Count + command.append(len(vid_data_dict)) + + # Data + for sid, data in vid_data_dict.items(): + command.append(sid) + + if data is not None and len(data) > 0: + command.extend(data) + + # Checksum + command.append(self.__get_checksum(command)) + + return command + + def __get_function(self, address, flag, length, response_process, sid=1, callback=None): + """Get系の処理をまとめた関数 + + :param address: + :param flag: + :param length: + :param response_process: + :param sid: + :param callback: + :return: + """ + + # データ + data = None + + # 受信済みフラグ + is_received = False + + # チェックサム不正エラー + is_checksum_error = False + + def temp_recv_callback(response): + nonlocal data + nonlocal is_received + nonlocal is_checksum_error + + recv_data = None + + # レスポンスデータのチェックサムが正しいかチェック + if self.__get_checksum(response[:-1]) != response[-1]: + logger.debug('Check sum error: ' + get_printable_hex(response)) + is_checksum_error = True + return + + # 受信済み + is_received = True + + if len(response) > self.PACKET_DATA_INDEX + length: + response_data = response[self.PACKET_DATA_INDEX:self.PACKET_DATA_INDEX + length] + recv_data = response_process(response_data) + + if callback is not None: + callback(recv_data) + else: + data = recv_data + + command = self.__generate_command(sid, address, flag=flag, count=0, length=length) + self.command_handler.add_command(command, recv_callback=temp_recv_callback) + + # コールバックが設定できていたら、コールバックに受信データを渡す + if callback is None: + # 指定以内にサーボからデータを受信できたかをチェック + start = time.time() + while not is_received: + elapsed_time = time.time() - start + if elapsed_time > self.command_handler.RECEIVE_DATA_TIMEOUT_SEC: + raise ReceiveDataTimeoutException(str(self.command_handler.RECEIVE_DATA_TIMEOUT_SEC) + '秒以内にデータ受信できませんでした') + elif is_checksum_error: + raise WrongCheckSumException('受信したデータのチェックサムが不正です') + + return data + else: + return True + + def get_torque_enable(self, sid, callback=None): + """トルクON取得 + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 1: + return response_data[0] == 0x01 + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_TORQUE_ENABLE, self.FLAG30_MEM_MAP_SELECT, 1, response_process, + sid=sid, callback=callback) + + def get_torque_enable_async(self, sid, loop=None): + """トルクON取得async版 + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_torque_enable(sid, callback=callback) + return f + + def close(self, force=False): + """閉じる + + :param force: + :return: + """ + + if self.command_handler: + self.command_handler.close() + + def ping(self, sid, callback=None): + """サーボにPINGを送る + FutabaにはPINGコマンドはないので、get_servo_idで代用。 + + :param sid: + :param callback: + :return: { + 'model_no': bytearray (2bytes), + 'version_firmware': int (1byte) + } + """ + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 3: + model_no = int.from_bytes(response_data[0:2], 'little', signed=True) + version_firmware = response_data[2] + return { + 'model_no': model_no, + 'version_firmware': version_firmware + } + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_MODEL_NUMBER_L, self.FLAG30_MEM_MAP_SELECT, 3, response_process, + sid=sid, callback=callback) + + def ping_async(self, sid, loop=None): + """サーボにPINGを送る async版 + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.ping(sid, callback=callback) + return f + + def set_torque_enable(self, on_off, sid): + """トルクON/OFF設定 + + :param on_off: + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # トルクデータ + torque_data = 0x01 if on_off else 0x00 + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_TORQUE_ENABLE, [torque_data]) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def get_temperature(self, sid, callback=None): + """温度取得(単位: ℃。おおよそ±3℃程度の誤差あり) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 2: + temperature = int.from_bytes(response_data, 'little', signed=True) + return temperature + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_TEMPERATURE_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, + sid=sid, callback=callback) + + def get_temperature_async(self, sid, loop=None): + """温度取得 async版(単位: ℃。おおよそ±3℃程度の誤差あり) + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_temperature(sid, callback=callback) + return f + + def get_current(self, sid, callback=None): + """電流(現在の負荷)取得 (単位: mA) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 2: + current = int.from_bytes(response_data, 'little', signed=False) + return current + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_PRESENT_CURRENT_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, + sid=sid, callback=callback) + + def get_current_async(self, sid, loop=None): + """電流(現在の負荷)取得 async版 (単位: mA) + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_current(sid, callback=callback) + return f + + def get_target_position(self, sid, callback=None): + """指示位置取得 (単位: 度) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 2: + # 単位は 0.1 度になっているので、度に変換 + position = int.from_bytes(response_data, 'little', signed=True) + position /= 10 + return -position + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_GOAL_POSITION_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, + sid=sid, callback=callback) + + def get_target_position_async(self, sid, loop=None): + """指示位置取得 async版 (単位: 度) + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_target_position(sid, callback=callback) + return f + + def set_target_position(self, position_degree, sid=1): + """指示位置設定 (単位: 度。設定可能な範囲は-150.0 度~150.0 度) + + :param position_degree: + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # 設定可能な範囲は-150.0 度~150.0 度 + if position_degree < -150: + position_degree = -150 + elif position_degree > 150: + position_degree = 150 + + position_hex = format(int(position_degree * -10) & 0xffff, '04x') + position_hex_h = int(position_hex[0:2], 16) + position_hex_l = int(position_hex[2:4], 16) + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_GOAL_POSITION_L, [position_hex_l, position_hex_h]) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def get_current_position(self, sid, callback=None): + """現在位置取得 (単位: 度) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 2: + # 単位は 0.1 度になっているので、度に変換 + position = int.from_bytes(response_data, 'little', signed=True) + position /= 10 + return -position + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_PRESENT_POSITION_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, + sid=sid, callback=callback) + + def get_current_position_async(self, sid, loop=None): + """現在位置取得 async版 (単位: 度) + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_current_position(sid, callback=callback) + return f + + def get_offset(self, sid, callback=None): + raise NotSupportException('Futabaではget_offsetに対応していません。') + + def get_offset_async(self, sid, loop=None): + raise NotSupportException('Futabaではget_offset_asyncに対応していません。') + + def set_offset(self, offset, sid): + raise NotSupportException('Futabaではset_offsetに対応していません。') + + def get_deadband(self, sid, callback=None): + raise NotSupportException('Futabaではget_deadbandに対応していません。') + + def get_deadband_async(self, sid, loop=None): + raise NotSupportException('Futabaではget_deadband_asyncに対応していません。') + + def set_deadband(self, deadband, sid): + raise NotSupportException('Futabaではset_deadbandに対応していません。') + + def get_voltage(self, sid, callback=None): + """電圧取得 (単位: V) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 2: + voltage = int.from_bytes(response_data, 'little', signed=True) + voltage /= 100 + return voltage + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_VOLTAGE_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, + sid=sid, callback=callback) + + def get_voltage_async(self, sid, loop=None): + """電圧取得 async版 (単位: V) + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_voltage(sid, callback=callback) + return f + + def get_target_time(self, sid, callback=None): + """目標位置までのサーボ移動時間を取得 (単位: 秒) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 2: + # 単位は 10ms 単位になっているので、秒に変更 + speed = int.from_bytes(response_data, 'little', signed=False) + speed /= 100 + return speed + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_GOAL_TIME_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, + sid=sid, callback=callback) + + def get_target_time_async(self, sid, loop=None): + """目標位置までのサーボ移動時間を取得 async版 (単位: 秒) + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_target_time(sid, callback=callback) + return f + + def set_target_time(self, speed_second, sid=1): + """目標位置までのサーボ移動時間を設定 (単位: 秒) + + :param speed_second: + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # 設定範囲は 0 から 3FFFH。つまり0秒から163830ms=163.83seconds + if speed_second < 0: + speed_second = 0 + elif speed_second > 163.83: + speed_second = 163.83 + + # 10ms 単位で設定。この関数のパラメータは秒指定なので*100する + speed_hex = format(int(speed_second * 100) & 0xffff, '04x') + speed_hex_h = int(speed_hex[0:2], 16) + speed_hex_l = int(speed_hex[2:4], 16) + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_GOAL_TIME_L, [speed_hex_l, speed_hex_h]) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def get_accel_time(self, sid, callback=None): + raise NotSupportException('Futabaではget_accel_timeに対応していません。') + + def get_accel_time_async(self, sid, loop=None): + raise NotSupportException('Futabaではget_accel_time_asyncに対応していません。') + + def set_accel_time(self, speed_second, sid): + raise NotSupportException('Futabaではset_accel_timeに対応していません。') + + def get_pid_coefficient(self, sid, callback=None): + """モータの制御係数を取得 (単位: %) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 1: + coef = response_data[0] + return coef + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_PID_COEFFICIENT, self.FLAG30_MEM_MAP_SELECT, 1, response_process, + sid=sid, callback=callback) + + def get_pid_coefficient_async(self, sid, loop=None): + """モータの制御係数を取得 async版 (単位: %) + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_pid_coefficient(sid, callback=callback) + return f + + def set_pid_coefficient(self, coef_percent, sid): + """モータの制御係数を設定 (単位: %) + + :param coef_percent: + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # 100%のとき設定値は、64H となります。設定範囲は 01H~FFH までです。 + if coef_percent < 1: + coef_percent = 1 + elif coef_percent == 10: + # なぜか10に設定すると反応がおかしいので9にシフトさせる + coef_percent = 9 + elif coef_percent > 255: + coef_percent = 255 + + coef_hex = int(coef_percent) + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_PID_COEFFICIENT, [coef_hex]) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def get_p_gain(self, sid, callback=None): + """ pGainの取得(単位無し + + :param sid: + :param callback + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_p_gainに対応していません。') + + def get_p_gain_async(self, sid, loop=None): + """ pGainの取得(単位無し async版 + + :param sid: + :param loop: + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_p_gain_asyncに対応していません。') + + def set_p_gain(self, gain, sid=1): + """ pGainの書き込み + + :param gain: + :param sid: + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではset_p_gainに対応していません。') + + + def get_i_gain(self, sid, callback=None): + """ iGainの取得(単位無し + + :param sid: + :param callback + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_i_gainに対応していません。') + + + def get_i_gain_async(self, sid, loop=None): + """ iGainの取得(単位無し async版 + + :param sid: + :param loop: + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_i_gain_asyncに対応していません。') + + def set_i_gain(self, gain, sid=1): + """ iGainの書き込み + + :param gain: + :param sid: + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではset_i_gainに対応していません。') + + + def get_d_gain(self, sid, callback=None): + """ dGainの取得(単位無し + + :param sid: + :param callback + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_d_gainに対応していません。') + + + def get_d_gain_async(self, sid, loop=None): + """ dGainの取得(単位無し async版 + + :param sid: + :param loop: + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_d_gain_asyncに対応していません。') + + def set_d_gain(self, gain, sid=1): + """ dGainの書き込み + + :param gain: + :param sid: + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではset_d_gainに対応していません。') + + def get_max_torque(self, sid, callback=None): + """最大トルク取得 (%) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 1: + max_torque = response_data[0] + return max_torque + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_MAX_TORQUE, self.FLAG30_MEM_MAP_SELECT, 1, response_process, + sid=sid, callback=callback) + + def get_max_torque_async(self, sid, loop=None): + """最大トルク取得 async版 (%) + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_max_torque(sid, callback=callback) + return f + + def set_max_torque(self, torque_percent, sid): + """最大トルク設定 (%) + + :param torque_percent: + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # 0-100%で設定 + if torque_percent < 0: + torque_percent = 0 + elif torque_percent > 100: + torque_percent = 100 + + torque_hex = int(torque_percent) + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_MAX_TORQUE, [torque_hex]) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def get_speed(self, sid, callback=None): + """現在の回転速度を取得 (deg/s) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 2: + speed = int.from_bytes(response_data, 'little', signed=True) + return speed + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_PRESENT_SPEED_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, + sid=sid, callback=callback) + + def get_speed_async(self, sid, loop=None): + """現在の回転速度を取得 async版 (deg/s) + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_speed(sid, callback=callback) + return f + + def set_speed(self, dps, sid): + """Futabaでは未サポート""" + raise NotSupportException('Futabaではset_speedに対応していません。set_target_time()で回転スピードを制御してください。') + + def get_servo_id(self, sid, callback=None): + """サーボIDを取得 + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 1: + servo_id = int.from_bytes(response_data, 'little', signed=False) + return servo_id + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_SERVO_ID, self.FLAG30_MEM_MAP_SELECT, 1, response_process, + sid=sid, callback=callback) + + def get_servo_id_async(self, sid, loop=None): + """サーボIDを取得 async版 + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_servo_id(sid, callback=callback) + return f + + def set_servo_id(self, new_sid, sid): + """サーボIDを設定 + + :param new_sid: + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(new_sid) + self.__check_sid(sid) + + # 0-100%で設定 + if new_sid < 1: + new_sid = 1 + elif new_sid > 127: + new_sid = 127 + + new_sid_hex = int(new_sid) + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_SERVO_ID, [new_sid_hex]) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def save_rom(self, sid): + """フラッシュROMに書き込む + + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_WRITE_FLASH_ROM, flag=0x40, count=0) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def load_rom(self, sid): + raise NotSupportException('Futabaではload_romに対応していません。') + + def get_baud_rate(self, sid, callback=None): + """通信速度を取得 + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 1: + baud_rate_list = [9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200, 153600, 230400] + baud_rate_id = int.from_bytes(response_data, 'little', signed=False) + return baud_rate_list[baud_rate_id] + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_BAUD_RATE, self.FLAG30_MEM_MAP_SELECT, 1, response_process, + sid=sid, callback=callback) + + def get_baud_rate_async(self, sid, loop=None): + """通信速度を取得 async版 + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_baud_rate(sid, callback=callback) + return f + + def set_baud_rate(self, baud_rate, sid): + """通信速度を設定 + + :param baud_rate: + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # 通信速度IDのチェック + baud_rate_list = [9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200, 153600, 230400] + try: + baud_rate_id = baud_rate_list.index(baud_rate) + except: + raise BadInputParametersException('baud_rate が不正な値です') + + baud_rate_id_hex = int(baud_rate_id) + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_BAUD_RATE, [baud_rate_id_hex]) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def get_limit_cw_position(self, sid, callback=None): + """右(時計回り)リミット角度の取得 + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 2: + limit_position = int.from_bytes(response_data, 'little', signed=True) + limit_position /= 10 + return -limit_position + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_CW_ANGLE_LIMIT_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, + sid=sid, callback=callback) + + def get_limit_cw_position_async(self, sid, loop=None): + """右(時計回り)リミット角度の取得 async版 + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_limit_cw_position(sid, callback=callback) + return f + + def set_limit_cw_position(self, limit_position, sid): + """右(時計回り)リミット角度を設定 + + :param limit_position: + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # リミット角度のチェック + if -150 < limit_position > 0: + raise BadInputParametersException('limit_position が不正な値です。-150〜0を設定してください。') + + limit_position = -limit_position; + + limit_position_hex = format(int(limit_position * 10) & 0xffff, '04x') + limit_position_hex_h = int(limit_position_hex[0:2], 16) + limit_position_hex_l = int(limit_position_hex[2:4], 16) + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_CW_ANGLE_LIMIT_L, [limit_position_hex_l, limit_position_hex_h]) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def get_limit_ccw_position(self, sid, callback=None): + """左(反時計回り)リミット角度の取得 + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 2: + limit_position = int.from_bytes(response_data, 'little', signed=True) + limit_position /= 10 + return -limit_position + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_CCW_ANGLE_LIMIT_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, + sid=sid, callback=callback) + + def get_limit_ccw_position_async(self, sid, loop=None): + """左(反時計回り)リミット角度の取得 async版 + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_limit_ccw_position(sid, callback=callback) + return f + + def set_limit_ccw_position(self, limit_position, sid): + """左(反時計回り)リミット角度を設定 + + :param limit_position: + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # リミット角度のチェック + if 0 < limit_position > 150: + raise BadInputParametersException('limit_position が不正な値です。0〜150を設定してください。') + + limit_position = -limit_position; + + limit_position_hex = format(int((limit_position * 10) & 0xffff), '04x') + limit_position_hex_h = int(limit_position_hex[0:2], 16) + limit_position_hex_l = int(limit_position_hex[2:4], 16) + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_CCW_ANGLE_LIMIT_L, + [limit_position_hex_l, limit_position_hex_h]) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def get_limit_temperature(self, sid, callback=None): + """温度リミットの取得 (℃) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == 2: + limit_temp = int.from_bytes(response_data, 'little', signed=True) + return limit_temp + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(self.ADDR_TEMPERATURE_LIMIT_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, + sid=sid, callback=callback) + + def get_limit_temperature_async(self, sid, loop=None): + """温度リミットの取得 (℃) async版 + + :param sid: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.get_limit_temperature(sid, callback=callback) + return f + + def set_limit_temperature(self, limit_temp, sid): + """Futabaでは未サポート""" + raise NotSupportException('Futabaではset_limit_temperatureに対応していません。') + + def get_limit_current(self, sid, callback=None): + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_limit_currentに対応していません。') + + def get_limit_current_async(self, sid, loop=None): + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_limit_current_asyncに対応していません。') + + def set_limit_current(self, limit_current, sid): + """Futabaでは未サポート""" + raise NotSupportException('Futabaではset_limit_currentに対応していません。') + + def get_drive_mode(self, sid, callback=None): + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_drive_modeに対応していません。') + def get_drive_mode_async(self, sid, loop=None): + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_drive_mode_asyncに対応していません。') + def set_drive_mode(self, drive_mode, sid): + """Futabaでは未サポート""" + raise NotSupportException('Futabaではset_drive_modeに対応していません。') + + + def set_burst_target_positions(self, sid_target_positions): + """複数のサーボの対象ポジションを一度に設定 + + :param sid_target_positions: + :return: + """ + + # データチェック & コマンドデータ生成 + vid_data = {} + for sid, position_degree in sid_target_positions.items(): + # サーボIDのチェック + self.__check_sid(sid) + + # 設定可能な範囲は-150.0 度~150.0 度 + if position_degree < -150: + position_degree = -150 + elif position_degree > 150: + position_degree = 150 + + position_hex = format(int(position_degree * -10) & 0xffff, '04x') + position_hex_h = int(position_hex[0:2], 16) + position_hex_l = int(position_hex[2:4], 16) + + vid_data[sid] = [position_hex_l, position_hex_h] + + # コマンド生成 + command = self.__generate_burst_command(self.ADDR_GOAL_POSITION_L, 3, vid_data) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def get_burst_positions(self, sids, callback=None): + """複数のサーボの現在のポジションを一気にリード + + :param sids: + :param callback: + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_burst_positionsに対応していません。') + + def get_burst_positions_async(self, sids, loop=None): + """複数のサーボの現在のポジションを一気にリード async版 + + :param sids: + :param loop: + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではget_burst_positions_asyncに対応していません。') + + def reset_memory(self, sid): + """ROMを工場出荷時のものに初期化する + + :param sid: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # コマンド生成 + command = self.__generate_command(sid, self.ADDR_RESET_MEMORY, flag=self.FLAG4_RESET_MEMORY_MAP, count=0) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def read(self, sid, address, length, callback=None): + """データを読み込む + + :param sid: + :param address: + :param length: + :param callback: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + def response_process(response_data): + if response_data is not None and len(response_data) == length: + return response_data + else: + raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + + return self.__get_function(address, self.FLAG30_MEM_MAP_SELECT, length, response_process, + sid=sid, callback=callback) + + def read_async(self, sid, address, length, loop=None): + """データを読み込む async版 + + :param sid: + :param address: + :param length: + :param loop: + :return: + """ + + f, callback = self.async_wrapper(loop) + self.read(sid, address, length, callback=callback) + return f + + def write(self, sid, address, data): + """データを書き込む + + :param sid: + :param address: + :param data: + :return: + """ + + # サーボIDのチェック + self.__check_sid(sid) + + # コマンド生成 + command = self.__generate_command(sid, address, data) + + # データ送信バッファに追加 + self.command_handler.add_command(command) + + def burst_read(self, address, length, sids, callback=None): + """複数サーボから一括でデータ読み取り + + :param address: + :param length: + :param sids: + :param callback: + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではburst_readに対応していません。') + + def burst_read_async(self, address, length, sids, loop=None): + """複数サーボから一括でデータ読み取り async版 + + :param address: + :param length: + :param sids: + :param loop: + :return: + """ + """Futabaでは未サポート""" + raise NotSupportException('Futabaではburst_read_asyncに対応していません。') + + def burst_write(self, address, length, sid_data): + """複数サーボに一括で書き込み + + :param address: + :param length: + :param sid_data: + :return: + """ + + # データチェック & コマンドデータ生成 + vid_data = {} + for sid, data in sid_data.items(): + # サーボIDのチェック + self.__check_sid(sid) + + vid_data[sid] = data + + # コマンド生成 + command = self.__generate_burst_command(address, length, vid_data) + + # データ送信バッファに追加 + self.command_handler.add_command(command) From c7f96e28010d4e3caebc0110505de43252c55755 Mon Sep 17 00:00:00 2001 From: realteck-ky <32591276+realteck-ky@users.noreply.github.com> Date: Thu, 13 Jun 2024 08:56:15 +0900 Subject: [PATCH 3/5] Add KONDO ICS Driver --- gs2d/Kondo.py | 1273 +++++++++++++++++++++------------------------- gs2d/__init__.py | 3 +- 2 files changed, 574 insertions(+), 702 deletions(-) diff --git a/gs2d/Kondo.py b/gs2d/Kondo.py index ff5dd3e..0cb055a 100644 --- a/gs2d/Kondo.py +++ b/gs2d/Kondo.py @@ -1,5 +1,7 @@ import time import logging +from types import FunctionType +from packaging.version import Version from .ICommandHandler import ICommandHandler from .ISerialInterface import ISerialInterface @@ -12,287 +14,320 @@ logger = logging.getLogger(__name__) -class Futaba(Driver): - """Futabaのシリアルサーボクラス +class KondoICS(Driver): + """KONDO ICSのシリアルサーボクラス """ - # アドレス空間 - ADDR_MODEL_NUMBER_L = 0 # 0x00 - ADDR_FIRMWARE_VERSION = 2 # 0x02 - ADDR_SERVO_ID = 4 # 0x04 - ADDR_REVERSE = 5 # 0x05 - ADDR_BAUD_RATE = 6 # 0x06 - ADDR_RETURN_DELAY = 7 # 0x07 - ADDR_CW_ANGLE_LIMIT_L = 8 # 0x08 - ADDR_CCW_ANGLE_LIMIT_L = 10 # 0x0A - ADDR_TEMPERATURE_LIMIT_L = 14 # 0x0E - ADDR_TORQUE_IN_SILENCE = 22 # 0x16 - ADDR_WARM_UP_TIME = 23 # 0x17 - ADDR_CW_COMPLIANCE_MARGIN = 24 # 0x18 - ADDR_CCW_COMPLIANCE_MARGIN = 25 # 0x19 - ADDR_CW_COMPLIANCE_SLOPE = 26 # 0x1A - ADDR_CCW_COMPLIANCE_SLOPE = 27 # 0x1B - ADDR_PUNCH_L = 28 # 0x1C - - ADDR_GOAL_POSITION_L = 30 # 0x1E - ADDR_GOAL_TIME_L = 32 # 0x20 - ADDR_MAX_TORQUE = 35 # 0x23 - ADDR_TORQUE_ENABLE = 36 # 0x24 - ADDR_PID_COEFFICIENT = 38 # 0x26 - ADDR_PRESENT_POSITION_L = 42 # 0x2A - ADDR_PRESENT_TIME_L = 44 # 0x2C - ADDR_PRESENT_SPEED_L = 46 # 0x2E - ADDR_PRESENT_CURRENT_L = 48 # 0x30 - ADDR_TEMPERATURE_L = 50 # 0x32 - ADDR_VOLTAGE_L = 52 # 0x34 - - ADDR_WRITE_FLASH_ROM = 255 # 0xFF - ADDR_RESET_MEMORY = 255 # 0xFF - - # フラグ - FLAG4_RESET_MEMORY_MAP = 16 # 0x10 - FLAG30_NO_RETURN_PACKET = 0 - FLAG30_MEM_MAP_ACK = 1 - FLAG30_MEM_MAP_00_29 = 3 - FLAG30_MEM_MAP_30_59 = 5 - FLAG30_MEM_MAP_20_29 = 7 - FLAG30_MEM_MAP_42_59 = 9 - FLAG30_MEM_MAP_30_41 = 11 - FLAG30_MEM_MAP_60_127 = 13 - FLAG30_MEM_MAP_SELECT = 15 - - PACKET_DATA_INDEX = 7 + # コマンド + SHIFT_CMD = 5 + CMD_POSITION = 0b100 << SHIFT_CMD + CMD_READ = 0b101 << SHIFT_CMD + CMD_WRITE = 0b110 << SHIFT_CMD + CMD_ID = 0b111 << SHIFT_CMD + CMD_RECV = 0b001 << SHIFT_CMD # Not use for servo + + # コマンドヘッダ用マスク + MASK_CMDHDR_MC = 0b1110_0000 # メインコマンド + MASK_CMDHDR_ID = 0b0001_1111 # ID + MASK_CMDHDR_RECV = 0b0111_1111 # 受信用マスク (MBR=0) + + # サブコマンド + SC_EEPROM = 0x00 + SC_STRC = 0x01 + SC_SPD = 0x02 + SC_CUR = 0x03 # ICS>=3.5 + SC_TMP = 0x04 # ICS>=3.5 + SC_TCH = 0x05 # ICS>=3.6 + + SC_ID_READ = 0x00 + SC_ID_WRITE = 0x01 + + # EEPROMデータのバイト位置 + # Pythonが0はじまりなので、本来の値-1している + ROM_STRC = 2 # 偶数値のみ + ROM_SPD = 4 + ROM_PANCH = 6 + ROM_DEADBAND = 8 + ROM_DUMPING = 10 + ROM_SAFETIM = 12 + ROM_FLAG = 14 + ROM_PULSELIM_UP = 16 # 4bit x 4byte + ROM_PULSELIM_DOWN = 20 # 4bit x 4byte + ROM_BAUDRATE = 26 # 0x00, 0x01, 0x10 + ROM_TEMPLIM = 28 + ROM_CURLIM = 30 + ROM_OFFSET = 52 + ROM_ID = 56 + ROM_CHARA1 = 58 + ROM_CHARA2 = 60 + ROM_CHARA3 = 62 # 通信速度のIDと実際の設定値 - BAUD_RATE_INDEX_9600 = 0x00 - BAUD_RATE_INDEX_14400 = 0x01 - BAUD_RATE_INDEX_19200 = 0x02 - BAUD_RATE_INDEX_28800 = 0x03 - BAUD_RATE_INDEX_38400 = 0x04 - BAUD_RATE_INDEX_57600 = 0x05 - BAUD_RATE_INDEX_76800 = 0x06 - BAUD_RATE_INDEX_115200 = 0x07 - BAUD_RATE_INDEX_153600 = 0x08 - BAUD_RATE_INDEX_230400 = 0x09 - - def __init__(self, serial_interface: ISerialInterface, command_handler_class: ICommandHandler = None): + BAUD_RATE_INDEX_1250000 = 0x00 + BAUD_RATE_INDEX_625000 = 0x01 + BAUD_RATE_INDEX_115200 = 0x10 + + def __init__(self, serial_interface: ISerialInterface, command_handler_class: ICommandHandler = None, version: str = "3.6", loopback: bool = True, slave:bool=False): """初期化 """ - super(Futaba, self).__init__(serial_interface, command_handler_class) + self.loopback:bool = bool(loopback) + self.slave:bool = bool(slave) + self.baudrate = 115200 + + if version == "3.6": + self.version = Version("3.6") + elif version == "3.5": + self.version = Version("3.5") + elif version == "3.0": + self.version = Version("3.0") + elif version == "2.0": + self.version = Version("2.0") + else: + raise BadInputParametersException("ics_version: 2.0、3.0、3.5、3.6 のみ対応です。入力値:", version) def is_complete_response(self, response_data): """レスポンスデータをすべて受信できたかチェック""" # print('#########', response_data) - # Header, ID, Flags, Address, Length, Count, Data, Sum で7バイトは最低限ある - if len(response_data) < 6: + # パケット長のデータがないため、パケット解析して返却 + if response_data is None or len(response_data) == 0: return False - else: - # カウント取得 - count = response_data[5] - # print('####', count, len(response_data)) - return len(response_data) == (8 + count) + elif self.loopback: # 送信データも受信 + if (response_data[0] & self.MASK_CMDHDR_MC) == self.CMD_POSITION: + return len(response_data) == 6 + elif (response_data[0] & self.MASK_CMDHDR_MC) == self.CMD_READ: + if len(response_data) < 2: + return False + elif response_data[1] == self.SC_EEPROM: + return len(response_data) == 68 + elif response_data[1] == self.SC_STRC \ + or response_data[1] == self.SC_SPD \ + or response_data[1] == self.SC_CUR \ + or response_data[1] == self.SC_TMP: + return len(response_data) == 5 + elif response_data[1] == self.SC_TCH: + return len(response_data) == 6 + elif (response_data[0] & self.MASK_CMDHDR_MC) == self.CMD_WRITE: + if len(response_data) < 2: + return False + elif response_data[1] == self.SC_EEPROM: + return len(response_data) == 68 + elif response_data[1] == self.SC_STRC \ + or response_data[1] == self.SC_SPD \ + or response_data[1] == self.SC_CUR \ + or response_data[1] == self.SC_TMP: + return len(response_data) == 6 + elif (response_data[0] & self.MASK_CMDHDR_MC) == self.CMD_ID: + return len(response_data) == 5 + else: # 受信データのみ + if (response_data[0] & self.MASK_CMDHDR_MC) == (self.CMD_POSITION & self.MASK_CMDHDR_RECV) \ + or ((self.baudrate == 115_200 and (response_data[0] & self.MASK_CMDHDR_ID) == 0 and (response_data[0] & self.MASK_CMDHDR_MC) == self.CMD_POSITION)): # 従来のICS2.0との互換性のため、ID0で通信速度が115.2Kの場合、RXのMSBは1になります。 + return len(response_data) == (6-3) + + elif (response_data[0] & self.MASK_CMDHDR_MC) == (self.CMD_READ & self.MASK_CMDHDR_RECV): + if len(response_data) < 2: + return False + elif response_data[1] == self.SC_EEPROM: + return len(response_data) == (68-2) + elif response_data[1] == self.SC_STRC \ + or response_data[1] == self.SC_SPD \ + or response_data[1] == self.SC_CUR \ + or response_data[1] == self.SC_TMP: + return len(response_data) == (5-2) + elif response_data[1] == self.SC_TCH: + return len(response_data) == (6-2) + elif (response_data[0] & self.MASK_CMDHDR_MC) == (self.CMD_WRITE & self.MASK_CMDHDR_RECV): + if len(response_data) < 2: + return False + elif response_data[1] == self.SC_EEPROM: + return len(response_data) == (68-66) + elif response_data[1] == self.SC_STRC \ + or response_data[1] == self.SC_SPD \ + or response_data[1] == self.SC_CUR \ + or response_data[1] == self.SC_TMP: + return len(response_data) == (6-3) + # IDコマンドのみサーボからの返事でもMSBのマスクはありません。 + elif (response_data[0] & self.MASK_CMDHDR_MC) == self.CMD_ID: + return len(response_data) == (5-4) + + # TODO + # raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + return False @staticmethod - def __get_checksum(data): - """チェックサムを生成 - - :param data: - :return: - """ - - checksum = 0 - for i in range(2, len(data)): - checksum ^= data[i] - return checksum - - @staticmethod - def __check_sid(sid): + def _check_sid(sid): """Servo IDのレンジをチェック :param sid: :return: """ - if sid < 1 or sid > 127: - raise BadInputParametersException('sid: %d がレンジ外です。1から127のIDを設定してください。' % sid) - - def __generate_command(self, sid, addr, data=None, flag=0, count=1, length=None): - """コマンド生成 - - :param sid: Servo ID - :param addr: - :param data: - :param flag: - :param count: - :param length: - :return: - """ - - # Header: パケットの先頭を表します。ショートパケットではFAAFに設定します。 - # ID: サーボのIDです。1~127(01H~7FH)までの値が使用できます。 - # ID:255を指定すると、全IDのサーボへの共通指令になります(リターンデータは取れません)。 - # Flag: サーボからのリターンデータ取得やデータ書き込み時の設定をします - # Address: メモリーマップ上のアドレスを指定します。 - # このアドレスから「Length」に指定した長さ分のデータをメモリーマップに書き込みます。 - # Length: データ1ブロックの長さを指定します。ショートパケットでは Dataのバイト数になります。 - # Count: サーボの数を表します。ショートパケットでメモリーマップに書き込む時は 1 に設定します。 - # Data: メモリーマップに書き込むデータです - # Checksum: 送信データの確認用のチェックサムで、パケットのIDからDataの末尾までを1バイトずつ - # XORした値を指定します。 - - command = [] + if sid != 0xFF: + if sid < 0 or sid > 31: + raise BadInputParametersException('sid: %d がレンジ外です。1から31のIDを設定してください。' % sid) - # Header - command.extend([0xFA, 0xAF]) - - # ID - command.append(sid) - - # Flag - command.append(flag) - - # Address - command.append(addr) - - # Length - if length is not None: - command.append(length) - elif data is not None: - command.append(len(data)) - else: - command.append(0) - - # Count - command.append(count) + @staticmethod + def _calc_par2deg(data): + hex = ((data[0] & 0x7F) << 7) | (data[1] & 0x7F) + deg = (int(hex) - 7500) * 135 / 4000 + return deg - # Data - if data is not None and len(data) > 0: - command.extend(data) + @staticmethod + def _calc_deg2par(deg): + pos_hex = int(deg * 4000 / 135 + 7500) & 0x3FFF + pos_hex_h = (pos_hex >> 7) & 0x7F + pos_hex_l = pos_hex & 0x7F + return [pos_hex_h, pos_hex_l] - # Checksum - command.append(self.__get_checksum(command)) + @staticmethod + def _get_hex_from_2x4bit(data): + hex = ((data[0] & 0x0F) << 4) | (data[1] & 0x0F) + return hex - return command + @staticmethod + def _set_hex_to_2x4bit(hex): + data = [hex & 0xF0, hex & 0x0F] + return data - def __generate_burst_command(self, addr, length, vid_data_dict): - """バーストコマンド生成 + def _send_recv(self, sid, mc, sc=None, send_data=None, response_process=None, callback=None): + """Send -> Receiveする関数 + KondoICSはスレーブモード以外応答が返る - :param addr: - :param length: - :param vid_data_dict: + :param sid: サーボID + :param mc: メインコマンド + :param sc: サブコマンド + :param response_process: 受信データ処理関数 + :param callback: ユーザー指定の受信データ処理関数 :return: """ - # Header: パケットの先頭を表します。ショートパケットではFAAFに設定します。 - # ID: 常に00 - # Flag: 常に00 - # Address: メモリーマップ上のアドレスを指定します。 - # このアドレスから「Length」に指定した長さ分のデータを指定した複数サーボのメモリーマップに書き込みます。 - # Length: サーボ1つ分のデータ (VIDData) のバイト数を指定します。 - # Count: データを送信する対象となるサーボの数を表します。この数分 VID と Data を送信します。 - # VID: データを送信する個々のサーボの ID を表します。VID と Data が一組でサーボの数分のデータを送信します。 - # Data: メモリーマップに書き込むサーボ一つ分のデータです。VID と Data が一組でサーボの数分のデータを送信します。 - # Sum: 送信データの確認用のチェックサムで、パケットのIDからDataの末尾までを1バイトずつ - # XORした値を指定します。 - - command = [] - - # Header - command.extend([0xFA, 0xAF]) - - # ID - command.append(0) - - # Flag - command.append(0) - - # Address - command.append(addr) - - # Length - command.append(length) - - # Count - command.append(len(vid_data_dict)) - - # Data - for sid, data in vid_data_dict.items(): - command.append(sid) + # データ + ret_data = None - if data is not None and len(data) > 0: - command.extend(data) + # 受信済みフラグ + is_received = False - # Checksum - command.append(self.__get_checksum(command)) + # 引数チェック + if response_process is not None: + if not isinstance(response_process, FunctionType): + raise BadInputParametersException("response_processには関数を指定してください") + if callback is not None: + if not isinstance(callback, FunctionType): + raise BadInputParametersException("callbackには関数を指定してください") + if self.slave: + raise BadInputParametersException("スレーブモードではデータを受信しないため、callbackは使えません。") + + if mc == self.CMD_POSITION: + if send_data is None or len(send_data) != 2 \ + or (send_data[0] & 0b1000_0000) != 0 or (send_data[1] & 0b1000_0000) != 0: + raise BadInputParametersException("角度指定時の送信データが不正です。") + elif mc == self.CMD_READ: + if sc not in (self.SC_EEPROM, self.SC_STRC, self.SC_SPD, self.SC_CUR, self.SC_TMP, self.SC_TCH): + raise BadInputParametersException("sc: サブコマンド %x が不正です。" % sc) + if sc == self.SC_TCH and self.version != Version("3.6"): + raise BadInputParametersException("sc: サブコマンド %x は使用できません。" % sc) + elif mc == self.CMD_WRITE: + if sc not in (self.SC_EEPROM, self.SC_STRC, self.SC_SPD, self.SC_CUR, self.SC_TMP, self.SC_TCH): + raise BadInputParametersException("sc: サブコマンド %x が不正です。" % sc) + elif sc == self.SC_EEPROM: + if send_data is None or len(send_data) != 64: + raise BadInputParametersException("送信データが不正です。") + elif sc in (self.SC_STRC, self.SC_SPD, self.SC_CUR, self.SC_TMP): + if sc == self.SC_CUR and self.version < Version("3.0") : + raise BadInputParametersException("sc: サブコマンド %x は使用できません。" % sc) + if sc == self.SC_TMP and self.version < Version("3.5") : + raise BadInputParametersException("sc: サブコマンド %x は使用できません。" % sc) + if send_data is None or len(send_data) != 1: + raise BadInputParametersException("送信データが不正です。") + elif sc == self.SC_TCH: + if self.version < Version("3.6") : + raise BadInputParametersException("sc: サブコマンド %x は使用できません。" % sc) + if send_data is None or len(send_data) != 2: + raise BadInputParametersException("送信データが不正です。") + elif mc == self.CMD_ID: + if sc != self.SC_ID_READ and sc != self.SC_ID_WRITE: + raise BadInputParametersException("sc: サブコマンド %x が不正です。" % sc) + else: + raise BadInputParametersException("mc: メインコマンド %x が不正です。" % mc) - return command - def __get_function(self, address, flag, length, response_process, sid=1, callback=None): - """Get系の処理をまとめた関数 + # 送信コマンド生成 + send_command = [] - :param address: - :param flag: - :param length: - :param response_process: - :param sid: - :param callback: - :return: - """ + send_command.append(sid | mc) + if mc == self.CMD_POSITION: + send_command.extend(send_data) + elif mc == self.CMD_READ: + send_command.append(sc) + elif mc == self.CMD_WRITE: + send_command.append(sc) + send_command.extend(send_data) + elif mc == self.CMD_ID: + send_command.append(sc) + send_command.append(sc) + send_command.append(sc) - # データ - data = None - - # 受信済みフラグ - is_received = False - - # チェックサム不正エラー - is_checksum_error = False + print("send_command:", send_command) + # 受信処理 def temp_recv_callback(response): - nonlocal data + nonlocal ret_data nonlocal is_received - nonlocal is_checksum_error - - recv_data = None - - # レスポンスデータのチェックサムが正しいかチェック - if self.__get_checksum(response[:-1]) != response[-1]: - logger.debug('Check sum error: ' + get_printable_hex(response)) - is_checksum_error = True - return + nonlocal send_command + + recv_byte_list = list(bytes(response)) + + offset = 0 + if self.loopback: + offset = len(send_command) + if recv_byte_list[:offset] != send_command: + raise InvalidResponseDataException("送信コマンドを正しくループバックできていません。") + recv_command = recv_byte_list[offset:] + + print("recv_command:", recv_command) + + send_mc = send_command[0] & self.MASK_CMDHDR_MC + recv_mc = recv_command[0] & self.MASK_CMDHDR_MC + # if (send_mc == self.CMD_ID and send_mc != recv_mc) or (send_mc & self.MASK_CMDHDR_RECV != recv_mc): + # raise InvalidResponseDataException("受信コマンドヘッダが不正です。") + + send_id = send_command[0] & self.MASK_CMDHDR_ID + recv_id = recv_command[0] & self.MASK_CMDHDR_ID + if send_id != 0xFF and send_id != recv_id: + raise InvalidResponseDataException("受信したサーボIDが不正です。") + + if callback is None: + if response_process is None: + ret_data = recv_command + else: + ret_data = response_process(recv_command) + else: + callback(recv_command) # 受信済み is_received = True - if len(response) > self.PACKET_DATA_INDEX + length: - response_data = response[self.PACKET_DATA_INDEX:self.PACKET_DATA_INDEX + length] - recv_data = response_process(response_data) - - if callback is not None: - callback(recv_data) - else: - data = recv_data - - command = self.__generate_command(sid, address, flag=flag, count=0, length=length) - self.command_handler.add_command(command, recv_callback=temp_recv_callback) - - # コールバックが設定できていたら、コールバックに受信データを渡す - if callback is None: - # 指定以内にサーボからデータを受信できたかをチェック - start = time.time() - while not is_received: - elapsed_time = time.time() - start - if elapsed_time > self.command_handler.RECEIVE_DATA_TIMEOUT_SEC: - raise ReceiveDataTimeoutException(str(self.command_handler.RECEIVE_DATA_TIMEOUT_SEC) + '秒以内にデータ受信できませんでした') - elif is_checksum_error: - raise WrongCheckSumException('受信したデータのチェックサムが不正です') - - return data - else: + # コマンド処理キューに追加 + if self.slave: + # No waiting data receive + self.command_handler.add_command(send_command) return True + else: + self.command_handler.add_command(send_command, recv_callback=temp_recv_callback) + + # コールバックが設定できていたら、コールバックに受信データを渡す + if callback is None: + # 指定以内にサーボからデータを受信できたかをチェック + start = time.time() + while not is_received: + elapsed_time = time.time() - start + if elapsed_time > self.command_handler.RECEIVE_DATA_TIMEOUT_SEC: + raise ReceiveDataTimeoutException(str(self.command_handler.RECEIVE_DATA_TIMEOUT_SEC) + '秒以内にデータ受信できませんでした') + return ret_data + else: + return True def get_torque_enable(self, sid, callback=None): """トルクON取得 @@ -303,16 +338,14 @@ def get_torque_enable(self, sid, callback=None): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) def response_process(response_data): - if response_data is not None and len(response_data) == 1: - return response_data[0] == 0x01 - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + data = response_data[2:] + # EEPROMのFREEの有効ビットを確認し、反対の結果を返す(トルク有効ならFREE==0) + return (data[self.ROM_FLAG+1] & 0b0000_0010) != 0b0000_0010 - return self.__get_function(self.ADDR_TORQUE_ENABLE, self.FLAG30_MEM_MAP_SELECT, 1, response_process, - sid=sid, callback=callback) + return self._send_recv(sid, self.CMD_READ, self.SC_EEPROM, response_process=response_process) def get_torque_enable_async(self, sid, loop=None): """トルクON取得async版 @@ -336,33 +369,27 @@ def close(self, force=False): if self.command_handler: self.command_handler.close() - def ping(self, sid, callback=None): + def ping(self, sid=0xFF, callback=None): """サーボにPINGを送る - FutabaにはPINGコマンドはないので、get_servo_idで代用。 + KondoICSにはPINGコマンドはないので、IDの取得で代用。 :param sid: :param callback: - :return: { - 'model_no': bytearray (2bytes), - 'version_firmware': int (1byte) - } + :return: """ + # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) def response_process(response_data): - if response_data is not None and len(response_data) == 3: - model_no = int.from_bytes(response_data[0:2], 'little', signed=True) - version_firmware = response_data[2] - return { - 'model_no': model_no, - 'version_firmware': version_firmware - } - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + servo_id = int(response_data[0] & self.MASK_CMDHDR_ID) + try: + self._check_sid(servo_id) + except BadInputParametersException as e: + return False + return True - return self.__get_function(self.ADDR_MODEL_NUMBER_L, self.FLAG30_MEM_MAP_SELECT, 3, response_process, - sid=sid, callback=callback) + return self._send_recv(sid, self.CMD_ID, response_process, callback=callback) def ping_async(self, sid, loop=None): """サーボにPINGを送る async版 @@ -385,40 +412,43 @@ def set_torque_enable(self, on_off, sid): """ # サーボIDのチェック - self.__check_sid(sid) - - # トルクデータ - torque_data = 0x01 if on_off else 0x00 + self._check_sid(sid) - # コマンド生成 - command = self.__generate_command(sid, self.ADDR_TORQUE_ENABLE, [torque_data]) - - # データ送信バッファに追加 - self.command_handler.add_command(command) + if on_off: + # KondoICSではトルクONコマンドが存在しないため、現在の位置を指定しなおす + pos = self.get_current_position(sid) + self.set_target_position(pos, sid=sid) + else: + # Free状態を送信 + self._send_recv(sid, self.CMD_POSITION, data=[0x00, 0x00]) def get_temperature(self, sid, callback=None): - """温度取得(単位: ℃。おおよそ±3℃程度の誤差あり) + """温度取得 (単位: ℃。おおよそ±3℃程度の誤差あり) :param sid: :param callback: :return: """ + # バージョンのチェック + if self.version < Version("3.5"): + raise NotSupportException('KondoICSのバージョン3.5未満ではget_temperatureに対応していません。') + # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) def response_process(response_data): - if response_data is not None and len(response_data) == 2: - temperature = int.from_bytes(response_data, 'little', signed=True) - return temperature - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + # 温度パラメーターは0から127までの値で、小さいほど温度が高いことを示します。 + # 目安として、パラメーター60で温度が約80度、パラメーター30で約100度です。 + data = response_data[2] + if data < 0 or data > 127: + raise InvalidResponseDataException("受信データが不正です。温度パラメータが %d で0~127の範囲外です。" % data) + return data * (80-100)/(60-30) + 120 - return self.__get_function(self.ADDR_TEMPERATURE_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, - sid=sid, callback=callback) + return self._send_recv(sid, self.CMD_READ, self.SC_TMP, response_process=response_process) def get_temperature_async(self, sid, loop=None): - """温度取得 async版(単位: ℃。おおよそ±3℃程度の誤差あり) + """温度取得 async版 (単位: ℃。おおよそ±3℃程度の誤差あり) :param sid: :param loop: @@ -437,18 +467,26 @@ def get_current(self, sid, callback=None): :return: """ + # バージョンのチェック + if self.version < Version("3.0"): + raise NotSupportException('KondoICSのバージョン3.0未満ではget_currentに対応していません。') + # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) def response_process(response_data): - if response_data is not None and len(response_data) == 2: - current = int.from_bytes(response_data, 'little', signed=False) - return current + # 設定範囲 正転 (Low) 0 ~ 63 (High) + # 逆転 (Low) 64 ~ 127 (High) + # 電流値 0.1A = 設定値 1 + data = response_data[2] + if data < 0 or data > 127: + raise InvalidResponseDataException("受信データが不正です。電流パラメータが %d で0~127の範囲外です。" % data) + if data < 64: + return 1000 * data / 10 else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + return -1000 * (data-64) / 10 - return self.__get_function(self.ADDR_PRESENT_CURRENT_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, - sid=sid, callback=callback) + return self._send_recv(sid, self.CMD_READ, self.SC_TMP, response_process=response_process) def get_current_async(self, sid, loop=None): """電流(現在の負荷)取得 async版 (単位: mA) @@ -463,42 +501,13 @@ def get_current_async(self, sid, loop=None): return f def get_target_position(self, sid, callback=None): - """指示位置取得 (単位: 度) - - :param sid: - :param callback: - :return: - """ - - # サーボIDのチェック - self.__check_sid(sid) - - def response_process(response_data): - if response_data is not None and len(response_data) == 2: - # 単位は 0.1 度になっているので、度に変換 - position = int.from_bytes(response_data, 'little', signed=True) - position /= 10 - return -position - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') - - return self.__get_function(self.ADDR_GOAL_POSITION_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, - sid=sid, callback=callback) + raise NotSupportException('KondoICSではget_target_positionに対応していません。') def get_target_position_async(self, sid, loop=None): - """指示位置取得 async版 (単位: 度) - - :param sid: - :param loop: - :return: - """ - - f, callback = self.async_wrapper(loop) - self.get_target_position(sid, callback=callback) - return f + raise NotSupportException('KondoICSではget_target_position_asyncに対応していません。') def set_target_position(self, position_degree, sid=1): - """指示位置設定 (単位: 度。設定可能な範囲は-150.0 度~150.0 度) + """指示位置設定 (単位: 度。設定可能な範囲は-135.0 度~135.0 度) :param position_degree: :param sid: @@ -506,23 +515,17 @@ def set_target_position(self, position_degree, sid=1): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) - # 設定可能な範囲は-150.0 度~150.0 度 - if position_degree < -150: - position_degree = -150 - elif position_degree > 150: - position_degree = 150 + # 設定可能な範囲は-135.0 度~135.0 度 + if position_degree < -135: + position_degree = -135 + elif position_degree > 135: + position_degree = 135 - position_hex = format(int(position_degree * -10) & 0xffff, '04x') - position_hex_h = int(position_hex[0:2], 16) - position_hex_l = int(position_hex[2:4], 16) + position_data = self._calc_deg2par(position_degree) - # コマンド生成 - command = self.__generate_command(sid, self.ADDR_GOAL_POSITION_L, [position_hex_l, position_hex_h]) - - # データ送信バッファに追加 - self.command_handler.add_command(command) + self._send_recv(sid, self.CMD_POSITION, send_data=position_data) def get_current_position(self, sid, callback=None): """現在位置取得 (単位: 度) @@ -533,19 +536,24 @@ def get_current_position(self, sid, callback=None): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) - def response_process(response_data): - if response_data is not None and len(response_data) == 2: - # 単位は 0.1 度になっているので、度に変換 - position = int.from_bytes(response_data, 'little', signed=True) - position /= 10 - return -position - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + if self.version == Version("3.6"): + def response_process(response_data): + return self._calc_par2deg(response_data[2:]) - return self.__get_function(self.ADDR_PRESENT_POSITION_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, - sid=sid, callback=callback) + return self._send_recv(sid, self.CMD_READ, self.SC_TCH, response_process=response_process, callback=callback) + else: + def response_process(response_data): + return self._calc_par2deg(response_data[1:]) + + is_torque_on = self.get_torque_enable(sid) + pos = self._send_recv(sid, self.CMD_POSITION, send_data=[0x00, 0x00], response_process=response_process, callback=callback) + + if is_torque_on: + self.set_target_position(pos) + + return pos def get_current_position_async(self, sid, loop=None): """現在位置取得 async版 (単位: 度) @@ -560,25 +568,7 @@ def get_current_position_async(self, sid, loop=None): return f def get_offset(self, sid, callback=None): - raise NotSupportException('Futabaではget_offsetに対応していません。') - - def get_offset_async(self, sid, loop=None): - raise NotSupportException('Futabaではget_offset_asyncに対応していません。') - - def set_offset(self, offset, sid): - raise NotSupportException('Futabaではset_offsetに対応していません。') - - def get_deadband(self, sid, callback=None): - raise NotSupportException('Futabaではget_deadbandに対応していません。') - - def get_deadband_async(self, sid, loop=None): - raise NotSupportException('Futabaではget_deadband_asyncに対応していません。') - - def set_deadband(self, deadband, sid): - raise NotSupportException('Futabaではset_deadbandに対応していません。') - - def get_voltage(self, sid, callback=None): - """電圧取得 (単位: V) + """オフセット角度取得 (単位: 度) :param sid: :param callback: @@ -586,21 +576,24 @@ def get_voltage(self, sid, callback=None): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) def response_process(response_data): - if response_data is not None and len(response_data) == 2: - voltage = int.from_bytes(response_data, 'little', signed=True) - voltage /= 100 - return voltage - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + rom_data = response_data[2:] + data = rom_data[self.ROM_OFFSET:self.ROM_OFFSET+2] + pos_hex = self._get_hex_from_2x4bit(data) - return self.__get_function(self.ADDR_VOLTAGE_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, - sid=sid, callback=callback) + # パラメータ範囲は-127~127 + deg = int(pos_hex & 0x7F) * 135 / 4000 + # 負の値 + if pos_hex & 0b1000_0000: + deg = -deg + return deg - def get_voltage_async(self, sid, loop=None): - """電圧取得 async版 (単位: V) + return self._send_recv(sid, self.CMD_READ, self.SC_EEPROM, response_process=response_process) + + def get_offset_async(self, sid, loop=None): + """オフセット角度取得 async版 (単位: 度) :param sid: :param loop: @@ -608,11 +601,41 @@ def get_voltage_async(self, sid, loop=None): """ f, callback = self.async_wrapper(loop) - self.get_voltage(sid, callback=callback) + self.get_offset(sid, callback=callback) return f - def get_target_time(self, sid, callback=None): - """目標位置までのサーボ移動時間を取得 (単位: 秒) + def set_offset(self, offset, sid): + """オフセット角度指定 (単位: 度) + + :param sid: + :param callback: + :return: + """ + + # サーボIDのチェック + self._check_sid(sid) + + # 設定可能なパラメータ範囲は-127~127 + offset_abs = 0x7f * 135 / 4000 + if offset > offset_abs: + offset_hex = 0x7f + elif offset < -offset_abs: + offset_hex = 0xff + else: + offset_hex = int(offset * 4000 / 135) + if offset < 0: + # 負の値 + offset_hex |= 0b1000_0000 + + rom_data = self.load_rom(sid) + data = self._set_hex_to_2x4bit(offset_hex) + rom_data[self.ROM_OFFSET] = data[0] + rom_data[self.ROM_OFFSET+1] = data[1] + + self._send_recv(sid, self.CMD_WRITE, self.SC_EEPROM, send_data=rom_data) + + def get_deadband(self, sid, callback=None): + """デッドバンド角度取得 (単位: 度) :param sid: :param callback: @@ -620,22 +643,21 @@ def get_target_time(self, sid, callback=None): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) def response_process(response_data): - if response_data is not None and len(response_data) == 2: - # 単位は 10ms 単位になっているので、秒に変更 - speed = int.from_bytes(response_data, 'little', signed=False) - speed /= 100 - return speed - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + rom_data = response_data[2:] + data = rom_data[self.ROM_DEADBAND:self.ROM_DEADBAND+2] - return self.__get_function(self.ADDR_GOAL_TIME_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, - sid=sid, callback=callback) + # TODO おそらくデッドバンド幅の片側の値のため2倍して返却 + pos_hex = self._get_hex_from_2x4bit(data) + deg = int(pos_hex) * 2 * 135 / 4000 + return deg - def get_target_time_async(self, sid, loop=None): - """目標位置までのサーボ移動時間を取得 async版 (単位: 秒) + return self._send_recv(sid, self.CMD_READ, self.SC_EEPROM, response_process=response_process) + + def get_deadband_async(self, sid, loop=None): + """デッドバンド角度取得 async版 (単位: 度) :param sid: :param loop: @@ -643,126 +665,98 @@ def get_target_time_async(self, sid, loop=None): """ f, callback = self.async_wrapper(loop) - self.get_target_time(sid, callback=callback) + self.get_deadband(sid, callback=callback) return f - def set_target_time(self, speed_second, sid=1): - """目標位置までのサーボ移動時間を設定 (単位: 秒) + def set_deadband(self, deadband, sid): + """デッドバンド角度指定 (単位: 度) - :param speed_second: :param sid: + :param callback: :return: """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) - # 設定範囲は 0 から 3FFFH。つまり0秒から163830ms=163.83seconds - if speed_second < 0: - speed_second = 0 - elif speed_second > 163.83: - speed_second = 163.83 + # TODO おそらくデッドバンド幅の片側の値のため1/2して設定 + deadband_abs = abs(deadband) + deadband_hex = int(deadband_abs * 135 / 4000 / 2) - # 10ms 単位で設定。この関数のパラメータは秒指定なので*100する - speed_hex = format(int(speed_second * 100) & 0xffff, '04x') - speed_hex_h = int(speed_hex[0:2], 16) - speed_hex_l = int(speed_hex[2:4], 16) + if deadband_hex > 0x05: + deadband_hex = 0x05 - # コマンド生成 - command = self.__generate_command(sid, self.ADDR_GOAL_TIME_L, [speed_hex_l, speed_hex_h]) + rom_data = self.load_rom(sid) + data = self._set_hex_to_2x4bit(deadband_hex) + rom_data[self.ROM_DEADBAND] = data[0] + rom_data[self.ROM_DEADBAND+1] = data[1] - # データ送信バッファに追加 - self.command_handler.add_command(command) + self._send_recv(sid, self.CMD_WRITE, self.SC_EEPROM, send_data=rom_data) - def get_accel_time(self, sid, callback=None): - raise NotSupportException('Futabaではget_accel_timeに対応していません。') - - def get_accel_time_async(self, sid, loop=None): - raise NotSupportException('Futabaではget_accel_time_asyncに対応していません。') - - def set_accel_time(self, speed_second, sid): - raise NotSupportException('Futabaではset_accel_timeに対応していません。') - - def get_pid_coefficient(self, sid, callback=None): - """モータの制御係数を取得 (単位: %) + def get_voltage(self, sid, callback=None): + """電圧取得 (単位: V) :param sid: :param callback: :return: """ + raise NotSupportException('KondoICSではget_voltageに対応していません。') - # サーボIDのチェック - self.__check_sid(sid) - - def response_process(response_data): - if response_data is not None and len(response_data) == 1: - coef = response_data[0] - return coef - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') - - return self.__get_function(self.ADDR_PID_COEFFICIENT, self.FLAG30_MEM_MAP_SELECT, 1, response_process, - sid=sid, callback=callback) - - def get_pid_coefficient_async(self, sid, loop=None): - """モータの制御係数を取得 async版 (単位: %) + def get_voltage_async(self, sid, loop=None): + """電圧取得 async版 (単位: V) :param sid: :param loop: :return: """ + raise NotSupportException('KondoICSではget_voltageに対応していません。') - f, callback = self.async_wrapper(loop) - self.get_pid_coefficient(sid, callback=callback) - return f + def get_target_time(self, sid, callback=None): + raise NotSupportException('KondoICSではget_target_timeに対応していません。') - def set_pid_coefficient(self, coef_percent, sid): - """モータの制御係数を設定 (単位: %) + def get_target_time_async(self, sid, loop=None): + raise NotSupportException('KondoICSではget_target_time_asyncに対応していません。') - :param coef_percent: - :param sid: - :return: - """ + def set_target_time(self, speed_second, sid=1): + raise NotSupportException('KondoICSではset_target_timeに対応していません。') - # サーボIDのチェック - self.__check_sid(sid) + def get_accel_time(self, sid, callback=None): + raise NotSupportException('KondoICSではget_accel_timeに対応していません。') - # 100%のとき設定値は、64H となります。設定範囲は 01H~FFH までです。 - if coef_percent < 1: - coef_percent = 1 - elif coef_percent == 10: - # なぜか10に設定すると反応がおかしいので9にシフトさせる - coef_percent = 9 - elif coef_percent > 255: - coef_percent = 255 + def get_accel_time_async(self, sid, loop=None): + raise NotSupportException('KondoICSではget_accel_time_asyncに対応していません。') - coef_hex = int(coef_percent) + def set_accel_time(self, speed_second, sid): + raise NotSupportException('KondoICSではset_accel_timeに対応していません。') - # コマンド生成 - command = self.__generate_command(sid, self.ADDR_PID_COEFFICIENT, [coef_hex]) + def get_pid_coefficient(self, sid, callback=None): + raise NotSupportException('KondoICSではget_pid_coefficientに対応していません。') - # データ送信バッファに追加 - self.command_handler.add_command(command) + def get_pid_coefficient_async(self, sid, loop=None): + raise NotSupportException('KondoICSではget_pid_coefficient_asyncに対応していません。') + + def set_pid_coefficient(self, coef_percent, sid): + raise NotSupportException('KondoICSではset_pid_coefficientに対応していません。') def get_p_gain(self, sid, callback=None): - """ pGainの取得(単位無し + """ pGainの取得 (単位無し) :param sid: :param callback :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_p_gainに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_p_gainに対応していません。') def get_p_gain_async(self, sid, loop=None): - """ pGainの取得(単位無し async版 + """ pGainの取得 (単位無し async版) :param sid: :param loop: :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_p_gain_asyncに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_p_gain_asyncに対応していません。') def set_p_gain(self, gain, sid=1): """ pGainの書き込み @@ -771,30 +765,30 @@ def set_p_gain(self, gain, sid=1): :param sid: :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではset_p_gainに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではset_p_gainに対応していません。') def get_i_gain(self, sid, callback=None): - """ iGainの取得(単位無し + """ iGainの取得 (単位無し) :param sid: :param callback :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_i_gainに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_i_gainに対応していません。') def get_i_gain_async(self, sid, loop=None): - """ iGainの取得(単位無し async版 + """ iGainの取得 (単位無し) async版 :param sid: :param loop: :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_i_gain_asyncに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_i_gain_asyncに対応していません。') def set_i_gain(self, gain, sid=1): """ iGainの書き込み @@ -803,30 +797,30 @@ def set_i_gain(self, gain, sid=1): :param sid: :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではset_i_gainに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではset_i_gainに対応していません。') def get_d_gain(self, sid, callback=None): - """ dGainの取得(単位無し + """ dGainの取得 (単位無し) :param sid: :param callback :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_d_gainに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_d_gainに対応していません。') def get_d_gain_async(self, sid, loop=None): - """ dGainの取得(単位無し async版 + """ dGainの取得 (単位無し) async版 :param sid: :param loop: :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_d_gain_asyncに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_d_gain_asyncに対応していません。') def set_d_gain(self, gain, sid=1): """ dGainの書き込み @@ -835,8 +829,8 @@ def set_d_gain(self, gain, sid=1): :param sid: :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではset_d_gainに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではset_d_gainに対応していません。') def get_max_torque(self, sid, callback=None): """最大トルク取得 (%) @@ -845,19 +839,8 @@ def get_max_torque(self, sid, callback=None): :param callback: :return: """ - - # サーボIDのチェック - self.__check_sid(sid) - - def response_process(response_data): - if response_data is not None and len(response_data) == 1: - max_torque = response_data[0] - return max_torque - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') - - return self.__get_function(self.ADDR_MAX_TORQUE, self.FLAG30_MEM_MAP_SELECT, 1, response_process, - sid=sid, callback=callback) + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_max_torqueに対応していません。') def get_max_torque_async(self, sid, loop=None): """最大トルク取得 async版 (%) @@ -866,10 +849,8 @@ def get_max_torque_async(self, sid, loop=None): :param loop: :return: """ - - f, callback = self.async_wrapper(loop) - self.get_max_torque(sid, callback=callback) - return f + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_max_torque_asyncに対応していません。') def set_max_torque(self, torque_percent, sid): """最大トルク設定 (%) @@ -878,23 +859,8 @@ def set_max_torque(self, torque_percent, sid): :param sid: :return: """ - - # サーボIDのチェック - self.__check_sid(sid) - - # 0-100%で設定 - if torque_percent < 0: - torque_percent = 0 - elif torque_percent > 100: - torque_percent = 100 - - torque_hex = int(torque_percent) - - # コマンド生成 - command = self.__generate_command(sid, self.ADDR_MAX_TORQUE, [torque_hex]) - - # データ送信バッファに追加 - self.command_handler.add_command(command) + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではset_max_torqueに対応していません。') def get_speed(self, sid, callback=None): """現在の回転速度を取得 (deg/s) @@ -903,19 +869,8 @@ def get_speed(self, sid, callback=None): :param callback: :return: """ - - # サーボIDのチェック - self.__check_sid(sid) - - def response_process(response_data): - if response_data is not None and len(response_data) == 2: - speed = int.from_bytes(response_data, 'little', signed=True) - return speed - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') - - return self.__get_function(self.ADDR_PRESENT_SPEED_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, - sid=sid, callback=callback) + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_speedに対応していません。') def get_speed_async(self, sid, loop=None): """現在の回転速度を取得 async版 (deg/s) @@ -924,14 +879,12 @@ def get_speed_async(self, sid, loop=None): :param loop: :return: """ - - f, callback = self.async_wrapper(loop) - self.get_speed(sid, callback=callback) - return f + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_speed_asyncに対応していません。') def set_speed(self, dps, sid): - """Futabaでは未サポート""" - raise NotSupportException('Futabaではset_speedに対応していません。set_target_time()で回転スピードを制御してください。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではset_speedに対応していません。') def get_servo_id(self, sid, callback=None): """サーボIDを取得 @@ -942,17 +895,19 @@ def get_servo_id(self, sid, callback=None): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) def response_process(response_data): - if response_data is not None and len(response_data) == 1: - servo_id = int.from_bytes(response_data, 'little', signed=False) - return servo_id - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + recv_id = int(response_data[0] & self.MASK_CMDHDR_ID) + if recv_id == 0xFF: + raise InvalidResponseDataException("受信IDが不正です。受信IDは %d で0~127の範囲外です。" % recv_id) + try: + self._check_sid(recv_id) + except BadInputParametersException as e: + raise InvalidResponseDataException("受信IDが不正です。受信IDは %d で0~127の範囲外です。" % recv_id) + return recv_id - return self.__get_function(self.ADDR_SERVO_ID, self.FLAG30_MEM_MAP_SELECT, 1, response_process, - sid=sid, callback=callback) + return self._send_recv(sid, self.CMD_ID, response_process, callback=callback) def get_servo_id_async(self, sid, loop=None): """サーボIDを取得 async版 @@ -975,22 +930,23 @@ def set_servo_id(self, new_sid, sid): """ # サーボIDのチェック - self.__check_sid(new_sid) - self.__check_sid(sid) + if sid == 0xFF: + raise BadInputParametersException("指定したサーボIDは %d で0~31の範囲外です。" % sid) + self._check_sid(sid) - # 0-100%で設定 - if new_sid < 1: - new_sid = 1 - elif new_sid > 127: - new_sid = 127 + if new_sid == 0xFF: + raise BadInputParametersException("新たに指定したサーボIDは %d で0~31の範囲外です。" % sid) + self._check_sid(new_sid) new_sid_hex = int(new_sid) # コマンド生成 - command = self.__generate_command(sid, self.ADDR_SERVO_ID, [new_sid_hex]) + rom_data = self.load_rom(sid) + data = self._set_hex_to_2x4bit(new_sid) + rom_data[self.ROM_ID] = data[0] + rom_data[self.ROM_ID+1] = data[1] - # データ送信バッファに追加 - self.command_handler.add_command(command) + self._send_recv(sid, self.CMD_ID, self.SC_EEPROM, send_data=rom_data) def save_rom(self, sid): """フラッシュROMに書き込む @@ -998,18 +954,24 @@ def save_rom(self, sid): :param sid: :return: """ + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではsave_romに対応していません。') - # サーボIDのチェック - self.__check_sid(sid) + def load_rom(self, sid): + """フラッシュROMを読み込む - # コマンド生成 - command = self.__generate_command(sid, self.ADDR_WRITE_FLASH_ROM, flag=0x40, count=0) + :param sid: + :param callback: + :return: + """ - # データ送信バッファに追加 - self.command_handler.add_command(command) + # サーボIDのチェック + self._check_sid(sid) - def load_rom(self, sid): - raise NotSupportException('Futabaではload_romに対応していません。') + def response_process(response_data): + return response_data[2:] + + return self._send_recv(sid, self.CMD_READ, self.SC_EEPROM, response_process=response_process) def get_baud_rate(self, sid, callback=None): """通信速度を取得 @@ -1020,18 +982,19 @@ def get_baud_rate(self, sid, callback=None): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) def response_process(response_data): - if response_data is not None and len(response_data) == 1: - baud_rate_list = [9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200, 153600, 230400] - baud_rate_id = int.from_bytes(response_data, 'little', signed=False) + data = response_data[2:] + baud_rate_hex = self._get_hex_from_2x4bit(data) + if baud_rate_hex in (0x00,0x01,0x10): + baud_rate_list = [1_250_000, 625_000, 115_200] + baud_rate_id = int(baud_rate_hex) return baud_rate_list[baud_rate_id] else: raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') - return self.__get_function(self.ADDR_BAUD_RATE, self.FLAG30_MEM_MAP_SELECT, 1, response_process, - sid=sid, callback=callback) + return self._send_recv(sid, self.CMD_READ, self.SC_EEPROM, response_process=response_process) def get_baud_rate_async(self, sid, loop=None): """通信速度を取得 async版 @@ -1054,10 +1017,10 @@ def set_baud_rate(self, baud_rate, sid): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) # 通信速度IDのチェック - baud_rate_list = [9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200, 153600, 230400] + baud_rate_list = [1_250_000, 625_000, 115_200] try: baud_rate_id = baud_rate_list.index(baud_rate) except: @@ -1065,11 +1028,12 @@ def set_baud_rate(self, baud_rate, sid): baud_rate_id_hex = int(baud_rate_id) - # コマンド生成 - command = self.__generate_command(sid, self.ADDR_BAUD_RATE, [baud_rate_id_hex]) + rom_data = self.load_rom(sid) + data = self._set_hex_to_2x4bit(baud_rate_id_hex) + rom_data[self.ROM_BAUDRATE] = data[0] + rom_data[self.ROM_BAUDRATE+1] = data[1] - # データ送信バッファに追加 - self.command_handler.add_command(command) + self._send_recv(sid, self.CMD_WRITE, self.SC_EEPROM, send_data=rom_data) def get_limit_cw_position(self, sid, callback=None): """右(時計回り)リミット角度の取得 @@ -1080,18 +1044,9 @@ def get_limit_cw_position(self, sid, callback=None): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) - def response_process(response_data): - if response_data is not None and len(response_data) == 2: - limit_position = int.from_bytes(response_data, 'little', signed=True) - limit_position /= 10 - return -limit_position - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') - - return self.__get_function(self.ADDR_CW_ANGLE_LIMIT_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, - sid=sid, callback=callback) + pass # TODO def get_limit_cw_position_async(self, sid, loop=None): """右(時計回り)リミット角度の取得 async版 @@ -1114,23 +1069,9 @@ def set_limit_cw_position(self, limit_position, sid): """ # サーボIDのチェック - self.__check_sid(sid) - - # リミット角度のチェック - if -150 < limit_position > 0: - raise BadInputParametersException('limit_position が不正な値です。-150〜0を設定してください。') + self._check_sid(sid) - limit_position = -limit_position; - - limit_position_hex = format(int(limit_position * 10) & 0xffff, '04x') - limit_position_hex_h = int(limit_position_hex[0:2], 16) - limit_position_hex_l = int(limit_position_hex[2:4], 16) - - # コマンド生成 - command = self.__generate_command(sid, self.ADDR_CW_ANGLE_LIMIT_L, [limit_position_hex_l, limit_position_hex_h]) - - # データ送信バッファに追加 - self.command_handler.add_command(command) + pass # TODO def get_limit_ccw_position(self, sid, callback=None): """左(反時計回り)リミット角度の取得 @@ -1141,18 +1082,9 @@ def get_limit_ccw_position(self, sid, callback=None): """ # サーボIDのチェック - self.__check_sid(sid) - - def response_process(response_data): - if response_data is not None and len(response_data) == 2: - limit_position = int.from_bytes(response_data, 'little', signed=True) - limit_position /= 10 - return -limit_position - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') + self._check_sid(sid) - return self.__get_function(self.ADDR_CCW_ANGLE_LIMIT_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, - sid=sid, callback=callback) + pass # TODO def get_limit_ccw_position_async(self, sid, loop=None): """左(反時計回り)リミット角度の取得 async版 @@ -1175,24 +1107,9 @@ def set_limit_ccw_position(self, limit_position, sid): """ # サーボIDのチェック - self.__check_sid(sid) - - # リミット角度のチェック - if 0 < limit_position > 150: - raise BadInputParametersException('limit_position が不正な値です。0〜150を設定してください。') + self._check_sid(sid) - limit_position = -limit_position; - - limit_position_hex = format(int((limit_position * 10) & 0xffff), '04x') - limit_position_hex_h = int(limit_position_hex[0:2], 16) - limit_position_hex_l = int(limit_position_hex[2:4], 16) - - # コマンド生成 - command = self.__generate_command(sid, self.ADDR_CCW_ANGLE_LIMIT_L, - [limit_position_hex_l, limit_position_hex_h]) - - # データ送信バッファに追加 - self.command_handler.add_command(command) + pass # TODO def get_limit_temperature(self, sid, callback=None): """温度リミットの取得 (℃) @@ -1203,17 +1120,9 @@ def get_limit_temperature(self, sid, callback=None): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) - def response_process(response_data): - if response_data is not None and len(response_data) == 2: - limit_temp = int.from_bytes(response_data, 'little', signed=True) - return limit_temp - else: - raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') - - return self.__get_function(self.ADDR_TEMPERATURE_LIMIT_L, self.FLAG30_MEM_MAP_SELECT, 2, response_process, - sid=sid, callback=callback) + pass # TODO def get_limit_temperature_async(self, sid, loop=None): """温度リミットの取得 (℃) async版 @@ -1228,30 +1137,26 @@ def get_limit_temperature_async(self, sid, loop=None): return f def set_limit_temperature(self, limit_temp, sid): - """Futabaでは未サポート""" - raise NotSupportException('Futabaではset_limit_temperatureに対応していません。') + pass # TODO def get_limit_current(self, sid, callback=None): - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_limit_currentに対応していません。') + pass # TODO def get_limit_current_async(self, sid, loop=None): - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_limit_current_asyncに対応していません。') + pass # TODO def set_limit_current(self, limit_current, sid): - """Futabaでは未サポート""" - raise NotSupportException('Futabaではset_limit_currentに対応していません。') + pass # TODO def get_drive_mode(self, sid, callback=None): - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_drive_modeに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_drive_modeに対応していません。') def get_drive_mode_async(self, sid, loop=None): - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_drive_mode_asyncに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_drive_mode_asyncに対応していません。') def set_drive_mode(self, drive_mode, sid): - """Futabaでは未サポート""" - raise NotSupportException('Futabaではset_drive_modeに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではset_drive_modeに対応していません。') def set_burst_target_positions(self, sid_target_positions): @@ -1260,30 +1165,8 @@ def set_burst_target_positions(self, sid_target_positions): :param sid_target_positions: :return: """ - - # データチェック & コマンドデータ生成 - vid_data = {} - for sid, position_degree in sid_target_positions.items(): - # サーボIDのチェック - self.__check_sid(sid) - - # 設定可能な範囲は-150.0 度~150.0 度 - if position_degree < -150: - position_degree = -150 - elif position_degree > 150: - position_degree = 150 - - position_hex = format(int(position_degree * -10) & 0xffff, '04x') - position_hex_h = int(position_hex[0:2], 16) - position_hex_l = int(position_hex[2:4], 16) - - vid_data[sid] = [position_hex_l, position_hex_h] - - # コマンド生成 - command = self.__generate_burst_command(self.ADDR_GOAL_POSITION_L, 3, vid_data) - - # データ送信バッファに追加 - self.command_handler.add_command(command) + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではset_burst_target_positionsに対応していません。') def get_burst_positions(self, sids, callback=None): """複数のサーボの現在のポジションを一気にリード @@ -1292,8 +1175,8 @@ def get_burst_positions(self, sids, callback=None): :param callback: :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_burst_positionsに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_burst_positionsに対応していません。') def get_burst_positions_async(self, sids, loop=None): """複数のサーボの現在のポジションを一気にリード async版 @@ -1302,8 +1185,8 @@ def get_burst_positions_async(self, sids, loop=None): :param loop: :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではget_burst_positions_asyncに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではget_burst_positions_asyncに対応していません。') def reset_memory(self, sid): """ROMを工場出荷時のものに初期化する @@ -1313,7 +1196,7 @@ def reset_memory(self, sid): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) # コマンド生成 command = self.__generate_command(sid, self.ADDR_RESET_MEMORY, flag=self.FLAG4_RESET_MEMORY_MAP, count=0) @@ -1332,7 +1215,7 @@ def read(self, sid, address, length, callback=None): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) def response_process(response_data): if response_data is not None and len(response_data) == length: @@ -1340,7 +1223,7 @@ def response_process(response_data): else: raise InvalidResponseDataException('サーボからのレスポンスデータが不正です') - return self.__get_function(address, self.FLAG30_MEM_MAP_SELECT, length, response_process, + return self._send_recv(address, self.FLAG30_MEM_MAP_SELECT, length, response_process, sid=sid, callback=callback) def read_async(self, sid, address, length, loop=None): @@ -1367,7 +1250,7 @@ def write(self, sid, address, data): """ # サーボIDのチェック - self.__check_sid(sid) + self._check_sid(sid) # コマンド生成 command = self.__generate_command(sid, address, data) @@ -1384,8 +1267,8 @@ def burst_read(self, address, length, sids, callback=None): :param callback: :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではburst_readに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではburst_readに対応していません。') def burst_read_async(self, address, length, sids, loop=None): """複数サーボから一括でデータ読み取り async版 @@ -1396,8 +1279,8 @@ def burst_read_async(self, address, length, sids, loop=None): :param loop: :return: """ - """Futabaでは未サポート""" - raise NotSupportException('Futabaではburst_read_asyncに対応していません。') + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではburst_read_asyncに対応していません。') def burst_write(self, address, length, sid_data): """複数サーボに一括で書き込み @@ -1407,17 +1290,5 @@ def burst_write(self, address, length, sid_data): :param sid_data: :return: """ - - # データチェック & コマンドデータ生成 - vid_data = {} - for sid, data in sid_data.items(): - # サーボIDのチェック - self.__check_sid(sid) - - vid_data[sid] = data - - # コマンド生成 - command = self.__generate_burst_command(address, length, vid_data) - - # データ送信バッファに追加 - self.command_handler.add_command(command) + """KondoICSでは未サポート""" + raise NotSupportException('KondoICSではburst_writeに対応していません。') diff --git a/gs2d/__init__.py b/gs2d/__init__.py index c5a20d9..4d4db45 100644 --- a/gs2d/__init__.py +++ b/gs2d/__init__.py @@ -1,4 +1,5 @@ -# from .Futaba import Futaba +from .Futaba import Futaba +from .Kondo import KondoICS from .RobotisP20 import RobotisP20 from .SerialInterface import SerialInterface from .DefaultCommandHandler import DefaultCommandHandler From fd21f081fc416e47f1c08a2dafcf570f43f87c56 Mon Sep 17 00:00:00 2001 From: realteck-ky <32591276+realteck-ky@users.noreply.github.com> Date: Thu, 13 Jun 2024 08:57:36 +0900 Subject: [PATCH 4/5] Rename SerialInterface method to set serial port setting --- gs2d/Kondo.py | 5 +++++ gs2d/SerialInterface.py | 10 +++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/gs2d/Kondo.py b/gs2d/Kondo.py index 0cb055a..b262df7 100644 --- a/gs2d/Kondo.py +++ b/gs2d/Kondo.py @@ -1,5 +1,6 @@ import time import logging +import serial from types import FunctionType from packaging.version import Version @@ -72,9 +73,13 @@ def __init__(self, serial_interface: ISerialInterface, command_handler_class: IC """初期化 """ + super(KondoICS, self).__init__(serial_interface, command_handler_class) self.loopback:bool = bool(loopback) self.slave:bool = bool(slave) self.baudrate = 115200 + if isinstance(self.command_handler.serial_interface.ser, serial.Serial): + self.baudrate = self.command_handler.serial_interface.ser.baudrate + self.command_handler.serial_interface.ser.parity = serial.PARITY_EVEN if version == "3.6": self.version = Version("3.6") diff --git a/gs2d/SerialInterface.py b/gs2d/SerialInterface.py index 3c71489..576ceac 100644 --- a/gs2d/SerialInterface.py +++ b/gs2d/SerialInterface.py @@ -53,7 +53,7 @@ def __init__(self, device=None, baudrate=None): logger.debug('Device name: {}, Baudrate: {}'.format(device, baudrate)) # シリアルデバイス生成 - self.__ser = serial.Serial(device, baudrate, timeout=0.1) + self.ser = serial.Serial(device, baudrate, timeout=0.1) def write(self, data): """データ送信 @@ -62,7 +62,7 @@ def write(self, data): :return: """ - return self.__ser.write(data) + return self.ser.write(data) def read(self): """サーボからのデータ1文字受信 @@ -70,7 +70,7 @@ def read(self): :return: """ - return self.__ser.read() + return self.ser.read() def is_open(self): """シリアルインタフェースがオープンされているかチェック @@ -78,7 +78,7 @@ def is_open(self): :return: """ - return self.__ser.is_open + return self.ser.is_open def close(self): """シリアルインタフェースをクローズ @@ -86,4 +86,4 @@ def close(self): :return: """ - self.__ser.close() + self.ser.close() From 316acb9385c6b22cd0177517dfc2866b72aca4ce Mon Sep 17 00:00:00 2001 From: realteck-ky <32591276+realteck-ky@users.noreply.github.com> Date: Fri, 14 Jun 2024 00:28:23 +0900 Subject: [PATCH 5/5] Add packaging library --- Pipfile | 1 + requirements.txt | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Pipfile b/Pipfile index 1a72192..f68200c 100644 --- a/Pipfile +++ b/Pipfile @@ -7,6 +7,7 @@ verify_ssl = true [packages] pyserial = "*" +packaging = "*" [requires] python_version = "3.8.1" diff --git a/requirements.txt b/requirements.txt index 4b0f250..5a11e95 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ pyserial - +packaging