Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 54 additions & 73 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,116 +5,97 @@ A teleoperation system for OpenArm robots using KER (Kinematic Equivalent Replic
## Features

- **Joint mapping**: Flexible configuration-based mapping from leader to follower joints
- **Serial communication**: Interface with M5Stack CoreS3 over UART
- **USB communication**: Interface with M5Stack CoreS3 via USB vendor mode

## Quick start
## Quick Start

### Install
### 1. Install system dependencies

```bash
pip install openarm_ker
sudo apt install libusb-1.0-0-dev
```

### Serial device permissions
### 2. Set up udev rules (run once)

On Linux, serial devices such as `/dev/ttyACM0` are usually owned by the
`dialout` group. Add your user to that group, then log out and log back in.
If you run the examples from VS Code or another terminal, restart that program
so it picks up the new group permissions.
**USB vendor mode only (normal use):**

```bash
sudo usermod -aG dialout "$USER"
echo 'SUBSYSTEM=="usb", ATTRS{idVendor}=="303a", MODE="0666"' | sudo tee /etc/udev/rules.d/99-m5stack.rules
sudo udevadm control --reload-rules && sudo udevadm trigger
```

For a temporary test, you can also relax the permission of the current device
node directly:
**If you also want to flash firmware (adds stable device name `/dev/m5_ker_485`):**

Put M5Stack into flashing mode (hold RST 3 seconds until green LED lights up), then run:

```bash
sudo chmod 666 /dev/ttyACM0
SERIAL=$(udevadm info -q property -n /dev/ttyACM0 | grep ID_SERIAL_SHORT | cut -d= -f2)
sudo tee /etc/udev/rules.d/99-m5stack.rules << EOF
# USB vendor mode (normal operation)
SUBSYSTEM=="usb", ATTRS{idVendor}=="303a", MODE="0666"

# Serial mode (flashing) with stable device name
SUBSYSTEM=="tty", ATTRS{idVendor}=="303a", ATTRS{idProduct}=="1001", ATTRS{serial}=="$SERIAL", MODE="0666", SYMLINK+="m5_ker_485"
EOF
sudo udevadm control --reload-rules && sudo udevadm trigger
```

This usually resets after the device is unplugged or the device node is
recreated. Use this only as a short-lived local test because it makes the device
writable by every local user. For regular use, prefer the `dialout` group or a
udev rule with `MODE="0660"`.
Press RST once to reboot normally.

To use a stable device name such as `/dev/m5_ker_485`, create a udev rule.
First inspect the device properties:
### 3. Install

```bash
udevadm info -q property -n /dev/ttyACM0
uv pip install openarm_ker
```

Record fields like these:

```bash
ID_VENDOR_ID=xxxx
ID_MODEL_ID=yyyy
ID_SERIAL_SHORT=zzzz
```
### 4. Connect M5Stack and verify

Create a rule file:
Plug the M5Stack CoreS3 into your PC via USB and run:

```bash
sudo nano /etc/udev/rules.d/99-openarm-ker.rules
openarm-ker-cli ping
```

Add a rule like this, replacing `xxxx`, `yyyy`, and `zzzz` with your device's
actual values:
Expected output:

```udev
SUBSYSTEM=="tty", ENV{ID_VENDOR_ID}=="xxxx", ENV{ID_MODEL_ID}=="yyyy", ENV{ID_SERIAL_SHORT}=="zzzz", SYMLINK+="m5_ker_485", GROUP="dialout", MODE="0660"
```json
{
"fw": "v1.0.0",
"hw": "KER-v1.0.0",
"updated": "2026-05-25"
}
```

If you do not need to distinguish between multiple devices of the same model,
you can omit `ENV{ID_SERIAL_SHORT}=="zzzz"`. For a personal development machine,
you can instead use `MODE="0666"` and omit `GROUP` to allow all local users to
access it.
### 5. Sample usage

Reload the rules:

```bash
sudo udevadm control --reload-rules
sudo udevadm trigger
```python
from openarm_ker import KERStream

with KERStream(transport="usb") as stream:
data = stream.latest()
if data is not None:
ts = data["timestamp"]
angles = data["angles"]
enc_val = data["encoder_value"]
enc_btn = data["encoder_button"]
angles_str = " | ".join([f"CH{i+1:02d}: {a:8.2f}°" for i, a in enumerate(angles)])
print(f"TS: {ts:10d} | {angles_str} | ENC: {enc_val:4d} (Btn: {int(enc_btn)})", end='\r')
```

Then reconnect the device and check that the stable device name was created:
## CLI

```bash
ls -l /dev/m5_ker_485
```
# Check device connection and fetch schema
openarm-ker-cli ping

You can then use `/dev/m5_ker_485` as the serial device path.
# Stream raw data to terminal
openarm-ker-cli stream

### Sample usage

```python
import numpy as np
import openarm_ker

m5_port = openarm_ker.M5Port("/dev/ttyACM0")

leader_joint_names = [f"right_arm_joint{i}" for i in range(1, 9)]
mapper = openarm_ker.Mapper(
mappingyaml_path="mapping_m5.yaml",
leader_joint_names=leader_joint_names,
mapping_key="right_arm_mappings",
)

m5_port.fetch_present_status_bulk()
leader_position = m5_port.present_position
follower_position = mapper.map(np.deg2rad(leader_position))
# Serial transport
openarm-ker-cli stream --transport serial --port /dev/m5_ker_485 --baud 2000000
```

### Mapper config

The main M5 mapping file is `src/openarm_ker/config/mapping_m5.yaml` in this
repository. It is bundled in the installed package under `openarm_ker/config/`,
so you can pass the bundled filename `mapping_m5.yaml`, or pass an explicit path
to a custom YAML file. For the left arm, use `left_arm_joint*` leader names with
`mapping_key="left_arm_mappings"`.

## Related links
## Related Links

- 📚 Read the [documentation](https://docs.openarm.dev/software/can/)
- 💬 Join the community on [Discord](https://discord.gg/FsZaZ4z3We)
Expand Down
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ version = "0.1.0"
authors = [{name = "Enactic, Inc."}]
license = {text = "Apache-2.0"}
readme = "README.md"
requires-python = ">=3.10"
requires-python = ">=3.11"

dependencies = [
"numpy>=1.20.0",
"PyYAML>=5.4.0",
"pyusb",
"pyserial",
"openarm-can>=0.1.0",
]

[project.scripts]
openarm-ker-cli = "openarm_ker.cli:main"

[project.urls]
changelog = "https://github.com/enactic/openarm_ker/releases"
issues = "https://github.com/enactic/openarm_ker/issues"
Expand Down
8 changes: 2 additions & 6 deletions src/openarm_ker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@

"""OpenArm KER - Teleoperation system for OpenArm robots."""

from .m5_port import (
M5Port as M5Port,
)

from .mapper import (
Mapper as Mapper,
from .ker_stream import (
KERStream as KERStream,
)
98 changes: 98 additions & 0 deletions src/openarm_ker/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright 2026 Enactic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Command-line interface utilities for OpenArm KER."""

import argparse
import json
import sys
import time
from typing import NoReturn

from .ker_stream import KERStream


def main() -> NoReturn | None:
"""Run the KER CLI.

Provides diagnostic utilities such as pinging the device and raw streaming.
"""
parser = argparse.ArgumentParser(
description="KERStream Command-Line Interface (CLI) Utility",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"command",
choices=["ping", "stream"],
help="Command to execute: 'ping' to fetch schema and device metadata, 'stream' to test continuous data reception.",
)
parser.add_argument(
"--transport",
type=str,
default="usb",
choices=["usb", "serial"],
help="Transport protocol connection type.",
)
parser.add_argument(
"--port",
type=str,
default="/dev/ttyACM0",
help="Serial port path (only applicable when transport is set to 'serial').",
)
parser.add_argument(
"--baud",
type=int,
default=2000000,
help="Baud rate speed (only applicable when transport is set to 'serial').",
)
args = parser.parse_args()

stream = KERStream(transport=args.transport, port=args.port, baud=args.baud)

if args.command == "ping":
metadata = stream.ping_only()
if metadata:
print(json.dumps(metadata, indent=2))
sys.exit(0)
else:
print(
"Error: Failed to fetch metadata or no response from the device.",
file=sys.stderr,
)
sys.exit(1)

elif args.command == "stream":
print(f"[Info] Starting data stream via {args.transport.upper()}...")
print("[Info] Press Ctrl+C to terminate the stream safely.\n")
try:
with stream:
while stream.is_connected:
data = stream.latest()
if data:
print(f"\r[Stream Data] {data}", end="", flush=True)
time.sleep(0.01)
print("\n[Warning] Stream loop terminated. Device connection lost.")
except KeyboardInterrupt:
print("\n[Info] Stream terminated by user. Cleaning up resources.")
sys.exit(0)
except Exception as e:
print(
f"\n[Critical] Stream crashed with an unexpected error: {e}",
file=sys.stderr,
)
sys.exit(1)


if __name__ == "__main__":
main()
Loading