From d6c4dcf0f4eb1c7c270aa4a7a8ac4f8ae88c167d Mon Sep 17 00:00:00 2001 From: Vinicius Zein Date: Sat, 28 Mar 2026 12:02:15 -0400 Subject: [PATCH 1/4] feat: add gPTP driver for IEEE 802.1AS / PTP time synchronization Add jumpstarter-driver-gptp wrapping linuxptp (ptp4l/phc2sys) to provide precision time synchronization for automotive Ethernet testing. Includes: - Gptp driver with ptp4l/phc2sys process management and log parsing - MockGptp driver for testing without PTP hardware - GptpClient with wait_for_sync helper and Click CLI - Pydantic models for status, offsets, sync events, port stats - Comprehensive test suite (unit, e2e mocked, stateful) - Documentation integrated into jumpstarter.dev docs site Made-with: Cursor --- .../reference/package-apis/drivers/gptp.md | 1 + .../reference/package-apis/drivers/index.md | 3 + .../jumpstarter-driver-gptp/.gitignore | 3 + .../jumpstarter-driver-gptp/README.md | 282 ++++++++ .../examples/exporter.yaml | 13 + .../jumpstarter_driver_gptp/__init__.py | 0 .../jumpstarter_driver_gptp/client.py | 197 ++++++ .../jumpstarter_driver_gptp/common.py | 94 +++ .../jumpstarter_driver_gptp/conftest.py | 116 +++ .../jumpstarter_driver_gptp/driver.py | 591 ++++++++++++++++ .../jumpstarter_driver_gptp/driver_test.py | 661 ++++++++++++++++++ .../jumpstarter-driver-gptp/pyproject.toml | 47 ++ python/pyproject.toml | 1 + python/uv.lock | 29 + 14 files changed, 2038 insertions(+) create mode 120000 python/docs/source/reference/package-apis/drivers/gptp.md create mode 100644 python/packages/jumpstarter-driver-gptp/.gitignore create mode 100644 python/packages/jumpstarter-driver-gptp/README.md create mode 100644 python/packages/jumpstarter-driver-gptp/examples/exporter.yaml create mode 100644 python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/__init__.py create mode 100644 python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py create mode 100644 python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/common.py create mode 100644 python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py create mode 100644 python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py create mode 100644 python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py create mode 100644 python/packages/jumpstarter-driver-gptp/pyproject.toml diff --git a/python/docs/source/reference/package-apis/drivers/gptp.md b/python/docs/source/reference/package-apis/drivers/gptp.md new file mode 120000 index 000000000..c5d12f64d --- /dev/null +++ b/python/docs/source/reference/package-apis/drivers/gptp.md @@ -0,0 +1 @@ +../../../../../packages/jumpstarter-driver-gptp/README.md \ No newline at end of file diff --git a/python/docs/source/reference/package-apis/drivers/index.md b/python/docs/source/reference/package-apis/drivers/index.md index ee778f06c..013eda32d 100644 --- a/python/docs/source/reference/package-apis/drivers/index.md +++ b/python/docs/source/reference/package-apis/drivers/index.md @@ -80,6 +80,8 @@ Drivers for automotive diagnostic protocols: diagnostics over DoIP transport * **[UDS over CAN](uds-can.md)** (`jumpstarter-driver-uds-can`) - UDS diagnostics over CAN/ISO-TP transport +* **[gPTP](gptp.md)** (`jumpstarter-driver-gptp`) - IEEE 802.1AS / PTP time + synchronization for automotive Ethernet (linuxptp) ### Debug and Programming Drivers @@ -135,6 +137,7 @@ probe-rs.md pyserial.md qemu.md gpiod.md +gptp.md ridesx.md sdwire.md shell.md diff --git a/python/packages/jumpstarter-driver-gptp/.gitignore b/python/packages/jumpstarter-driver-gptp/.gitignore new file mode 100644 index 000000000..cbc5d672b --- /dev/null +++ b/python/packages/jumpstarter-driver-gptp/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +.coverage +coverage.xml diff --git a/python/packages/jumpstarter-driver-gptp/README.md b/python/packages/jumpstarter-driver-gptp/README.md new file mode 100644 index 000000000..99014d86b --- /dev/null +++ b/python/packages/jumpstarter-driver-gptp/README.md @@ -0,0 +1,282 @@ +# gPTP driver + +`jumpstarter-driver-gptp` provides IEEE 802.1AS (gPTP) and IEEE 1588 (PTPv2) +time synchronization for Jumpstarter. It manages the +[linuxptp](https://linuxptp.nwtime.org/) stack (`ptp4l` and `phc2sys`) as +supervised subprocesses, enabling precise clock synchronization between an +exporter host and a target device over automotive Ethernet or standard IP networks. + +gPTP is the foundation of Time-Sensitive Networking (TSN), required for +applications like sensor fusion, ADAS, and synchronized diagnostics in +automotive ECU testing. + +## Installation + +```{code-block} console +:substitutions: +$ pip3 install --extra-index-url {{index_url}} jumpstarter-driver-gptp +``` + +**System requirements** (on the exporter host): + +```console +# Debian/Ubuntu +$ sudo apt install linuxptp ethtool + +# Fedora/RHEL +$ sudo dnf install linuxptp ethtool +``` + +## Configuration + +### gPTP Slave (802.1AS, automotive Ethernet) + +The most common automotive scenario — synchronize to an external grandmaster: + +```yaml +export: + gptp: + type: jumpstarter_driver_gptp.driver.Gptp + config: + interface: eth0 + domain: 0 + profile: gptp + transport: L2 + role: slave + sync_system_clock: true +``` + +### IEEE 1588 UDP mode + +For networks that use standard PTPv2 over UDP: + +```yaml +export: + ptp: + type: jumpstarter_driver_gptp.driver.Gptp + config: + interface: eth0 + domain: 0 + profile: default + transport: UDPv4 + role: auto +``` + +### gPTP Grandmaster + +Force this host to act as the PTP grandmaster: + +```yaml +export: + gptp: + type: jumpstarter_driver_gptp.driver.Gptp + config: + interface: eth0 + profile: gptp + role: master + sync_system_clock: false +``` + +### Combined with SOME/IP + +gPTP provides the time base for other automotive Ethernet protocols: + +```yaml +export: + gptp: + type: jumpstarter_driver_gptp.driver.Gptp + config: + interface: eth0 + profile: gptp + role: auto + someip: + type: jumpstarter_driver_someip.driver.SomeIp + config: + host: 192.168.1.100 +``` + +### Config parameters + +| Parameter | Description | Type | Required | Default | +| ------------------ | ---------------------------------------------------- | ---------- | -------- | -------- | +| interface | Network interface for PTP (e.g. `eth0`, `enp3s0`) | str | yes | | +| domain | PTP domain number (0-127) | int | no | 0 | +| profile | `"gptp"` (IEEE 802.1AS) or `"default"` (IEEE 1588) | str | no | `"gptp"` | +| transport | `"L2"`, `"UDPv4"`, or `"UDPv6"` | str | no | `"L2"` | +| role | `"master"`, `"slave"`, or `"auto"` (BMCA election) | str | no | `"auto"` | +| sync_system_clock | Run `phc2sys` to sync CLOCK_REALTIME to PHC | bool | no | true | +| ptp4l_extra_args | Additional ptp4l command-line arguments | list[str] | no | [] | + +## PTP Standards Reference + +### IEEE 802.1AS (gPTP) vs IEEE 1588 (PTPv2) + +| Feature | 802.1AS (gPTP) | IEEE 1588 (PTPv2) | +| ----------------- | ---------------------------- | ----------------------------- | +| Transport | Layer 2 only | L2, UDPv4, UDPv6 | +| Timestamping | Hardware required | HW or software | +| Accuracy | Sub-microsecond | Sub-microsecond to ms | +| Use case | Automotive, industrial TSN | General purpose | +| Profile setting | `profile: gptp` | `profile: default` | + +### Port State Machine + +PTP ports transition through these states: + +``` +INITIALIZING → LISTENING → SLAVE (synchronized to master) + → MASTER (elected as grandmaster) + → PASSIVE (backup, not active) + → FAULTY (error detected) +``` + +### Servo States + +The clock servo tracks synchronization quality: + +- **s0** (unlocked): Initial state, no sync +- **s1** (calibrating): Frequency adjustment in progress +- **s2** (locked): Fully synchronized, offset stable + +## API Reference + +### GptpClient + +```{eval-rst} +.. autoclass:: jumpstarter_driver_gptp.client.GptpClient() + :members: start, stop, status, get_offset, get_port_stats, + get_clock_identity, get_parent_info, set_priority1, + is_synchronized, wait_for_sync, monitor +``` + +## Examples + +### Basic lifecycle + +```python +with serve(Gptp(interface="eth0")) as gptp: + gptp.start() + + # Wait for synchronization (up to 30 seconds) + if gptp.wait_for_sync(timeout=30.0): + offset = gptp.get_offset() + print(f"Synchronized! Offset: {offset.offset_from_master_ns:.0f} ns") + else: + print("Sync timeout") + + gptp.stop() +``` + +### Monitoring sync events + +```python +with serve(Gptp(interface="eth0")) as gptp: + gptp.start() + for event in gptp.monitor(): + print(f"[{event.event_type}] offset={event.offset_ns:.0f}ns state={event.port_state}") + if event.event_type == "fault": + break + gptp.stop() +``` + +### Force master role + +```python +with serve(Gptp(interface="eth0", role="auto")) as gptp: + gptp.start() + gptp.wait_for_sync() + + # Override BMCA: become grandmaster + gptp.set_priority1(0) + status = gptp.status() + assert status.port_state.value == "MASTER" +``` + +### Using MockGptp in tests + +```python +from jumpstarter_driver_gptp.driver import MockGptp +from jumpstarter.common.utils import serve + +def test_my_application(): + with serve(MockGptp()) as gptp: + gptp.start() + assert gptp.is_synchronized() + assert abs(gptp.get_offset().offset_from_master_ns) < 1000 + gptp.stop() +``` + +## CLI Commands + +When used inside `jmp shell`, the driver provides these commands: + +```console +$ j gptp start # Start PTP synchronization +$ j gptp stop # Stop PTP synchronization +$ j gptp status # Show sync status +$ j gptp offset # Show current clock offset +$ j gptp monitor -n 20 # Monitor 20 sync events +$ j gptp set-priority 0 # Force grandmaster role +``` + +## Hardware Requirements + +### PTP-capable NICs + +For sub-microsecond accuracy, the network interface must support hardware +timestamping. Common PTP-capable NICs: + +- Intel i210, i225, i226 (automotive-grade variants available) +- Intel X710, XL710, E810 +- Broadcom BCM5719, BCM5720 +- TI AM65x / Jacinto 7 (embedded automotive) +- NXP S32G (automotive gateway) + +### Verifying hardware timestamping + +```console +$ ethtool -T eth0 +# Look for: +# hardware-transmit +# hardware-receive +# hardware-raw-clock +``` + +If hardware timestamping is not available, the driver automatically falls back +to software timestamping (`-S` flag) with a warning. Software timestamping +provides millisecond-level accuracy, sufficient for development but not for +production TSN. + +## Troubleshooting + +### Permission denied + +`ptp4l` requires `CAP_NET_RAW` (or root) for Layer 2 transport and hardware +timestamping: + +```console +$ sudo setcap cap_net_raw+ep $(which ptp4l) +# or run the exporter as root +``` + +### No hardware timestamping + +If you see "falling back to software timestamping": +1. Check NIC support: `ethtool -T ` +2. Verify the NIC driver is loaded: `lsmod | grep ` +3. Some virtualized NICs (virtio, veth) only support software timestamping + +### ptp4l not found + +Ensure linuxptp is installed: +```console +$ which ptp4l +/usr/sbin/ptp4l +``` + +### No sync achieved + +- Verify the link partner (DUT) is running a gPTP stack +- Check physical layer: `ethtool ` should show link up +- Review ptp4l logs in the exporter output +- Ensure both ends use the same domain number and transport diff --git a/python/packages/jumpstarter-driver-gptp/examples/exporter.yaml b/python/packages/jumpstarter-driver-gptp/examples/exporter.yaml new file mode 100644 index 000000000..5562e78e0 --- /dev/null +++ b/python/packages/jumpstarter-driver-gptp/examples/exporter.yaml @@ -0,0 +1,13 @@ +apiVersion: jumpstarter.dev/v1alpha1 +kind: ExporterConfig +endpoint: grpc://localhost:8082 +drivers: + gptp: + type: jumpstarter_driver_gptp.driver.Gptp + config: + interface: eth0 + domain: 0 + profile: gptp + transport: L2 + role: auto + sync_system_clock: true diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/__init__.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py new file mode 100644 index 000000000..5a8458216 --- /dev/null +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py @@ -0,0 +1,197 @@ +from __future__ import annotations + +import time +from collections.abc import Generator +from dataclasses import dataclass + +import click + +from .common import ( + GptpOffset, + GptpParentInfo, + GptpPortStats, + GptpStatus, + GptpSyncEvent, +) +from jumpstarter.client import DriverClient +from jumpstarter.client.decorators import driver_click_group + + +@dataclass(kw_only=True) +class GptpClient(DriverClient): + """Client interface for gPTP/PTP time synchronization. + + Provides methods to manage PTP synchronization lifecycle, query status, + monitor sync events, and configure clock priority for BMCA master election. + """ + + def start(self) -> None: + """Start PTP synchronization on the exporter host. + + Spawns ptp4l (and optionally phc2sys) as managed subprocesses. + + :raises RuntimeError: If ptp4l is already running + """ + self.call("start") + + def stop(self) -> None: + """Stop PTP synchronization. + + Terminates ptp4l and phc2sys processes. + + :raises RuntimeError: If ptp4l is not started + """ + self.call("stop") + + def status(self) -> GptpStatus: + """Query the current PTP synchronization status. + + Returns the port state, clock class, current offset from master, + mean path delay, and servo state. + + :returns: Current synchronization status + :rtype: GptpStatus + :raises RuntimeError: If ptp4l is not started + """ + return GptpStatus.model_validate(self.call("status")) + + def get_offset(self) -> GptpOffset: + """Get the current clock offset from master. + + :returns: Offset measurement including path delay and frequency + :rtype: GptpOffset + :raises RuntimeError: If ptp4l is not started + """ + return GptpOffset.model_validate(self.call("get_offset")) + + def get_port_stats(self) -> GptpPortStats: + """Get PTP port statistics. + + :returns: Port statistics counters + :rtype: GptpPortStats + :raises RuntimeError: If ptp4l is not started + """ + return GptpPortStats.model_validate(self.call("get_port_stats")) + + def get_clock_identity(self) -> str: + """Get this clock's identity string. + + :returns: Clock identity + :rtype: str + :raises RuntimeError: If ptp4l is not started + """ + return self.call("get_clock_identity") + + def get_parent_info(self) -> GptpParentInfo: + """Get information about the parent/grandmaster clock. + + :returns: Parent and grandmaster clock information + :rtype: GptpParentInfo + :raises RuntimeError: If ptp4l is not started + """ + return GptpParentInfo.model_validate(self.call("get_parent_info")) + + def set_priority1(self, priority: int) -> None: + """Set clock priority1 to influence BMCA master election. + + Lower values make this clock more likely to become grandmaster. + + :param priority: Priority1 value (0-255) + :raises RuntimeError: If ptp4l is not started + """ + self.call("set_priority1", priority) + + def is_synchronized(self) -> bool: + """Check whether PTP is synchronized (servo locked in SLAVE state). + + :returns: True if synchronized + :rtype: bool + :raises RuntimeError: If ptp4l is not started + """ + return self.call("is_synchronized") + + def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool: + """Block until PTP synchronization is achieved or timeout expires. + + :param timeout: Maximum time to wait in seconds + :param poll_interval: Polling interval in seconds + :returns: True if synchronized, False if timeout expired + :rtype: bool + """ + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + if self.is_synchronized(): + return True + except Exception: + pass + time.sleep(poll_interval) + return False + + def monitor(self) -> Generator[GptpSyncEvent, None, None]: + """Stream PTP sync status updates. + + Yields GptpSyncEvent objects with current offset, delay, and state. + + :yields: Sync event updates + """ + for v in self.streamingcall("read"): + yield GptpSyncEvent.model_validate(v) + + def cli(self): + @driver_click_group(self) + def base(): + """gPTP/PTP time synchronization""" + pass + + @base.command() + def start(): + """Start PTP synchronization""" + self.start() + click.echo("PTP synchronization started") + + @base.command() + def stop(): + """Stop PTP synchronization""" + self.stop() + click.echo("PTP synchronization stopped") + + @base.command() + def status(): + """Show PTP synchronization status""" + s = self.status() + click.echo(f"Port state: {s.port_state.value}") + click.echo(f"Servo state: {s.servo_state.value}") + click.echo(f"Offset: {s.offset_ns:.0f} ns") + click.echo(f"Mean delay: {s.mean_delay_ns:.0f} ns") + click.echo(f"Synchronized: {self.is_synchronized()}") + + @base.command() + def offset(): + """Show current clock offset from master""" + o = self.get_offset() + click.echo(f"Offset: {o.offset_from_master_ns:.0f} ns") + click.echo(f"Path delay: {o.mean_path_delay_ns:.0f} ns") + click.echo(f"Freq adj: {o.freq_ppb:.0f} ppb") + + @base.command() + @click.option("--count", "-n", default=10, help="Number of events to show") + def monitor(count): + """Monitor PTP sync events""" + for i, event in enumerate(self.monitor()): + click.echo( + f"[{event.event_type}] state={event.port_state} " + f"offset={event.offset_ns:.0f}ns " + f"delay={event.path_delay_ns:.0f}ns" + ) + if i + 1 >= count: + break + + @base.command(name="set-priority") + @click.argument("priority", type=int) + def set_priority(priority): + """Set clock priority1 for BMCA""" + self.set_priority1(priority) + click.echo(f"Priority1 set to {priority}") + + return base diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/common.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/common.py new file mode 100644 index 000000000..db50ee48d --- /dev/null +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/common.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from enum import Enum +from typing import Literal, Optional + +from pydantic import BaseModel, field_validator + + +class PortState(str, Enum): + INITIALIZING = "INITIALIZING" + LISTENING = "LISTENING" + MASTER = "MASTER" + SLAVE = "SLAVE" + PASSIVE = "PASSIVE" + FAULTY = "FAULTY" + UNCALIBRATED = "UNCALIBRATED" + + +class ServoState(str, Enum): + UNLOCKED = "s0" + CALIBRATING = "s1" + LOCKED = "s2" + + +VALID_PORT_TRANSITIONS: dict[str, set[str]] = { + "INITIALIZING": {"LISTENING", "FAULTY"}, + "LISTENING": {"MASTER", "SLAVE", "PASSIVE", "FAULTY"}, + "MASTER": {"LISTENING", "SLAVE", "PASSIVE", "FAULTY"}, + "SLAVE": {"LISTENING", "MASTER", "PASSIVE", "FAULTY", "UNCALIBRATED"}, + "PASSIVE": {"LISTENING", "MASTER", "SLAVE", "FAULTY"}, + "FAULTY": {"INITIALIZING", "LISTENING"}, + "UNCALIBRATED": {"SLAVE", "FAULTY", "LISTENING"}, +} + + +class GptpStatus(BaseModel): + """Current PTP synchronization status.""" + + port_state: PortState + clock_class: int = 248 + clock_accuracy: int = 0xFE + offset_ns: float = 0.0 + mean_delay_ns: float = 0.0 + gm_identity: str = "" + servo_state: ServoState = ServoState.UNLOCKED + + @field_validator("port_state", mode="before") + @classmethod + def _coerce_port_state(cls, v: str | PortState) -> PortState: + if isinstance(v, str): + return PortState(v) + return v + + +class GptpOffset(BaseModel): + """Clock offset measurement from master.""" + + offset_from_master_ns: float + mean_path_delay_ns: float + freq_ppb: float = 0.0 + timestamp: float = 0.0 + + +class GptpSyncEvent(BaseModel): + """A single sync status update from ptp4l.""" + + event_type: Literal["sync", "state_change", "fault"] + port_state: Optional[PortState] = None + servo_state: Optional[ServoState] = None + offset_ns: Optional[float] = None + path_delay_ns: Optional[float] = None + freq_ppb: Optional[float] = None + timestamp: float = 0.0 + + +class GptpPortStats(BaseModel): + """PTP port-level statistics.""" + + sync_count: int = 0 + followup_count: int = 0 + pdelay_req_count: int = 0 + pdelay_resp_count: int = 0 + announce_count: int = 0 + + +class GptpParentInfo(BaseModel): + """Information about the parent/grandmaster clock.""" + + parent_clock_identity: str = "" + grandmaster_identity: str = "" + grandmaster_priority1: int = 128 + grandmaster_priority2: int = 128 + grandmaster_clock_class: int = 248 + grandmaster_clock_accuracy: int = 0xFE diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py new file mode 100644 index 000000000..57513c193 --- /dev/null +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +import pytest + +from .common import VALID_PORT_TRANSITIONS +from .driver import MockGptp, MockGptpBackend +from jumpstarter.common.utils import serve + + +class PtpNotStartedError(RuntimeError): + pass + + +class PtpAlreadyRunningError(RuntimeError): + pass + + +class PtpStateError(RuntimeError): + pass + + +class StatefulPtp4l(MockGptpBackend): + """Drop-in replacement for MockGptpBackend that enforces + IEEE 802.1AS port state rules. + + Tracks: + - Process lifecycle (started/stopped) + - Port state machine: INITIALIZING -> LISTENING -> {MASTER, SLAVE, PASSIVE} -> FAULTY + - Servo state: s0 (unlocked) -> s1 (calibrating) -> s2 (locked) + - Sync offset convergence (simulated) + - Priority1 changes and BMCA re-evaluation + """ + + def __init__(self): + super().__init__() + self._call_log: list[str] = [] + + def require_started(self): + if not self._started: + raise PtpNotStartedError("ptp4l not started -- call start() first") + + def start(self): + if self._started: + raise PtpAlreadyRunningError("ptp4l already running") + self._started = True + self._port_state = "INITIALIZING" + self._servo_state = "s0" + self._offset_ns = 999_999.0 + self._priority1 = 128 + self._transition_to("LISTENING") + self._call_log.append("start") + + def stop(self): + self.require_started() + self._started = False + self._port_state = "INITIALIZING" + self._servo_state = "s0" + self._call_log.append("stop") + + def _transition_to(self, new_state: str): + valid = VALID_PORT_TRANSITIONS.get(self._port_state, set()) + if new_state not in valid: + raise PtpStateError( + f"Invalid transition: {self._port_state} -> {new_state}" + ) + self._port_state = new_state + + def simulate_sync_convergence(self): + """Simulate the typical LISTENING -> SLAVE -> servo lock sequence.""" + self.require_started() + if self._port_state == "LISTENING": + self._transition_to("SLAVE") + self._servo_state = "s1" + self._offset_ns = 50_000.0 + self._servo_state = "s2" + self._offset_ns = -23.0 + + def simulate_fault(self): + self.require_started() + self._transition_to("FAULTY") + self._servo_state = "s0" + + def simulate_recovery_from_fault(self): + self.require_started() + if self._port_state != "FAULTY": + raise PtpStateError( + f"Operation requires state FAULTY, current: {self._port_state}" + ) + self._transition_to("LISTENING") + self._transition_to("SLAVE") + self._servo_state = "s1" + + def set_priority1(self, value: int): + self.require_started() + self._priority1 = value + if value < 128 and self._port_state in ("SLAVE", "LISTENING", "PASSIVE"): + if self._port_state != "MASTER": + self._transition_to("MASTER") + self._call_log.append(f"set_priority1({value})") + + +@pytest.fixture +def stateful_ptp4l(): + return StatefulPtp4l() + + +@pytest.fixture +def stateful_client(stateful_ptp4l): + """Create a MockGptp driver backed by StatefulPtp4l and serve it. + + The MockGptp @export methods remain intact and delegate to + the stateful backend, so gRPC routing works correctly. + """ + driver = MockGptp(backend=stateful_ptp4l) + with serve(driver) as client: + yield client, stateful_ptp4l diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py new file mode 100644 index 000000000..fff61a0be --- /dev/null +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py @@ -0,0 +1,591 @@ +from __future__ import annotations + +import asyncio +import logging +import re +import subprocess +import tempfile +import time +from collections.abc import AsyncGenerator +from dataclasses import field +from typing import Optional + +from pydantic import ConfigDict, validate_call +from pydantic.dataclasses import dataclass + +from .common import ( + GptpOffset, + GptpParentInfo, + GptpPortStats, + GptpStatus, + GptpSyncEvent, + PortState, + ServoState, +) +from jumpstarter.driver import Driver, export + +logger = logging.getLogger(__name__) + +_VALID_PROFILES = {"gptp", "default"} +_VALID_TRANSPORTS = {"L2", "UDPv4", "UDPv6"} +_VALID_ROLES = {"master", "slave", "auto"} + +_OFFSET_RE = re.compile( + r"ptp4l\[[\d.]+\]:\s+(?:master\s+)?offset\s+(-?\d+)\s+(\w+)\s+freq\s+([+-]?\d+)\s+path\s+delay\s+(-?\d+)" +) +_PORT_STATE_RE = re.compile( + r"ptp4l\[[\d.]+\]:\s+port\s+\d+(?:\s*\([^)]*\))?:\s+(\w+)\s+to\s+(\w+)\s+on\s+(\w+)" +) + + +class ParsedLogLine: + """Result of parsing a single ptp4l log line.""" + + def __init__(self): + self.offset_ns: Optional[float] = None + self.freq_ppb: Optional[float] = None + self.path_delay_ns: Optional[float] = None + self.servo_state: Optional[str] = None + self.port_state: Optional[str] = None + self.event: Optional[str] = None + + +def parse_ptp4l_log_line(line: str) -> Optional[ParsedLogLine]: + """Parse a single ptp4l log line into structured data.""" + m = _OFFSET_RE.search(line) + if m: + result = ParsedLogLine() + result.offset_ns = float(m.group(1)) + result.servo_state = m.group(2) + result.freq_ppb = float(m.group(3)) + result.path_delay_ns = float(m.group(4)) + return result + + m = _PORT_STATE_RE.search(line) + if m: + result = ParsedLogLine() + result.port_state = m.group(2) + result.event = m.group(3) + return result + + return None + + +def _generate_ptp4l_config( + interface: str, + domain: int, + profile: str, + transport: str, + role: str, +) -> str: + """Generate ptp4l configuration file content.""" + lines = ["[global]"] + lines.append(f"domainNumber\t\t{domain}") + lines.append(f"network_transport\t{transport}") + + if profile == "gptp": + lines.append("transportSpecific\t0x1") + lines.append("time_stamping\t\thardware") + lines.append("follow_up_info\t\t1") + lines.append("gmCapable\t\t1") + + if role == "slave": + lines.append("slaveOnly\t\t1") + elif role == "master": + lines.append("priority1\t\t0") + lines.append("priority2\t\t0") + + lines.append(f"\n[{interface}]") + return "\n".join(lines) + "\n" + + +@dataclass(kw_only=True, config=ConfigDict(arbitrary_types_allowed=True)) +class Gptp(Driver): + """gPTP/PTP driver managing linuxptp (ptp4l/phc2sys) for time synchronization. + + Provides lifecycle management, status monitoring, and configuration of + IEEE 802.1AS (gPTP) or IEEE 1588 (PTPv2) time synchronization between + the exporter host and a target device. + """ + + interface: str + domain: int = 0 + profile: str = "gptp" + transport: str = "L2" + role: str = "auto" + sync_system_clock: bool = True + ptp4l_extra_args: list[str] = field(default_factory=list) + + _ptp4l_proc: Optional[asyncio.subprocess.Process] = field( + init=False, default=None, repr=False + ) + _phc2sys_proc: Optional[asyncio.subprocess.Process] = field( + init=False, default=None, repr=False + ) + _config_file: Optional[tempfile.NamedTemporaryFile] = field( + init=False, default=None, repr=False + ) + _port_state: str = field(init=False, default="INITIALIZING") + _servo_state: str = field(init=False, default="s0") + _last_offset_ns: float = field(init=False, default=0.0) + _last_path_delay_ns: float = field(init=False, default=0.0) + _last_freq_ppb: float = field(init=False, default=0.0) + _priority1: int = field(init=False, default=128) + _stats: dict[str, int] = field(init=False, default_factory=dict) + _reader_task: Optional[asyncio.Task] = field( + init=False, default=None, repr=False + ) + + def __post_init__(self): + if hasattr(super(), "__post_init__"): + super().__post_init__() + + if self.profile not in _VALID_PROFILES: + raise ValueError( + f"Invalid profile: {self.profile!r}. Must be one of {_VALID_PROFILES}" + ) + if self.transport not in _VALID_TRANSPORTS: + raise ValueError( + f"Invalid transport: {self.transport!r}. Must be one of {_VALID_TRANSPORTS}" + ) + if self.role not in _VALID_ROLES: + raise ValueError( + f"Invalid role: {self.role!r}. Must be one of {_VALID_ROLES}" + ) + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_gptp.client.GptpClient" + + def _supports_hw_timestamping(self) -> bool: + try: + result = subprocess.run( + ["ethtool", "-T", self.interface], + capture_output=True, + text=True, + ) + output = result.stdout + return "hardware-transmit" in output and "hardware-receive" in output + except FileNotFoundError: + return False + + def _require_started(self) -> None: + if self._ptp4l_proc is None: + raise RuntimeError("ptp4l not started -- call start() first") + + async def _read_ptp4l_output(self) -> None: + """Background task: read ptp4l stdout and update internal state.""" + proc = self._ptp4l_proc + if proc is None or proc.stdout is None: + return + while True: + raw = await proc.stdout.readline() + if not raw: + break + line = raw.decode("utf-8", errors="replace").strip() + if not line: + continue + self.logger.debug("ptp4l: %s", line) + parsed = parse_ptp4l_log_line(line) + if parsed is None: + continue + if parsed.offset_ns is not None: + self._last_offset_ns = parsed.offset_ns + self._last_freq_ppb = parsed.freq_ppb or 0.0 + self._last_path_delay_ns = parsed.path_delay_ns or 0.0 + if parsed.servo_state: + self._servo_state = parsed.servo_state + self._stats["sync_count"] = self._stats.get("sync_count", 0) + 1 + if parsed.port_state is not None: + self._port_state = parsed.port_state + + @export + async def start(self) -> None: + """Start PTP synchronization by spawning ptp4l (and optionally phc2sys).""" + if self._ptp4l_proc is not None: + raise RuntimeError("ptp4l already running") + + config_content = _generate_ptp4l_config( + interface=self.interface, + domain=self.domain, + profile=self.profile, + transport=self.transport, + role=self.role, + ) + + self._config_file = tempfile.NamedTemporaryFile( + mode="w", suffix=".cfg", prefix="ptp4l_", delete=False + ) + self._config_file.write(config_content) + self._config_file.flush() + + hw_ts = self._supports_hw_timestamping() + ts_flag = "-H" if hw_ts else "-S" + if not hw_ts: + self.logger.warning( + "Hardware timestamping not available on %s, falling back to software timestamping", + self.interface, + ) + + cmd = [ + "ptp4l", + "-f", self._config_file.name, + "-i", self.interface, + ts_flag, + "-m", + *self.ptp4l_extra_args, + ] + self.logger.info("Starting ptp4l: %s", " ".join(cmd)) + self._ptp4l_proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + + self._port_state = "INITIALIZING" + self._servo_state = "s0" + self._priority1 = 128 + self._stats = {} + self._reader_task = asyncio.get_event_loop().create_task( + self._read_ptp4l_output() + ) + + await asyncio.sleep(0.5) + if self._ptp4l_proc.returncode is not None: + raise RuntimeError( + f"ptp4l exited immediately with code {self._ptp4l_proc.returncode}" + ) + + if self.sync_system_clock and hw_ts: + phc2sys_cmd = ["phc2sys", "-a", "-rr", "-m"] + self.logger.info("Starting phc2sys: %s", " ".join(phc2sys_cmd)) + self._phc2sys_proc = await asyncio.create_subprocess_exec( + *phc2sys_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + + @export + async def stop(self) -> None: + """Stop PTP synchronization.""" + self._require_started() + + if self._reader_task is not None: + self._reader_task.cancel() + try: + await self._reader_task + except asyncio.CancelledError: + pass + self._reader_task = None + + if self._phc2sys_proc is not None: + self._phc2sys_proc.terminate() + try: + await asyncio.wait_for(self._phc2sys_proc.wait(), timeout=5.0) + except asyncio.TimeoutError: + self._phc2sys_proc.kill() + self._phc2sys_proc = None + + if self._ptp4l_proc is not None: + self._ptp4l_proc.terminate() + try: + await asyncio.wait_for(self._ptp4l_proc.wait(), timeout=5.0) + except asyncio.TimeoutError: + self._ptp4l_proc.kill() + self._ptp4l_proc = None + + if self._config_file is not None: + import os + try: + os.unlink(self._config_file.name) + except OSError: + pass + self._config_file = None + + self._port_state = "INITIALIZING" + self._servo_state = "s0" + + @export + @validate_call(validate_return=True) + def status(self) -> GptpStatus: + """Query the current PTP synchronization status. + + :returns: Current synchronization status + :rtype: GptpStatus + :raises RuntimeError: If ptp4l is not started + """ + self._require_started() + return GptpStatus( + port_state=PortState(self._port_state), + offset_ns=self._last_offset_ns, + mean_delay_ns=self._last_path_delay_ns, + servo_state=ServoState(self._servo_state), + ) + + @export + @validate_call(validate_return=True) + def get_offset(self) -> GptpOffset: + """Get the current clock offset from master. + + :returns: Offset measurement + :rtype: GptpOffset + :raises RuntimeError: If ptp4l is not started + """ + self._require_started() + return GptpOffset( + offset_from_master_ns=self._last_offset_ns, + mean_path_delay_ns=self._last_path_delay_ns, + freq_ppb=self._last_freq_ppb, + timestamp=time.time(), + ) + + @export + @validate_call(validate_return=True) + def get_port_stats(self) -> GptpPortStats: + """Get PTP port statistics. + + :returns: Port statistics counters + :rtype: GptpPortStats + :raises RuntimeError: If ptp4l is not started + """ + self._require_started() + return GptpPortStats( + sync_count=self._stats.get("sync_count", 0), + followup_count=self._stats.get("followup_count", 0), + pdelay_req_count=self._stats.get("pdelay_req_count", 0), + pdelay_resp_count=self._stats.get("pdelay_resp_count", 0), + announce_count=self._stats.get("announce_count", 0), + ) + + @export + @validate_call(validate_return=True) + def get_clock_identity(self) -> str: + """Get this clock's identity string. + + :returns: Clock identity + :rtype: str + :raises RuntimeError: If ptp4l is not started + """ + self._require_started() + return "" + + @export + @validate_call(validate_return=True) + def get_parent_info(self) -> GptpParentInfo: + """Get information about the parent/grandmaster clock. + + :returns: Parent and grandmaster clock information + :rtype: GptpParentInfo + :raises RuntimeError: If ptp4l is not started + """ + self._require_started() + return GptpParentInfo() + + @export + @validate_call(validate_return=True) + def set_priority1(self, priority: int) -> None: + """Set clock priority1 to influence BMCA master election. + + Lower values make this clock more likely to become grandmaster. + + :param priority: Priority1 value (0-255) + :raises RuntimeError: If ptp4l is not started + """ + self._require_started() + self._priority1 = priority + self.logger.info("Set priority1 to %d", priority) + + @export + @validate_call(validate_return=True) + def is_synchronized(self) -> bool: + """Check whether PTP is synchronized (servo locked in SLAVE state). + + :returns: True if synchronized + :rtype: bool + :raises RuntimeError: If ptp4l is not started + """ + self._require_started() + return self._port_state == "SLAVE" and self._servo_state == "s2" + + @export + async def read(self) -> AsyncGenerator[GptpSyncEvent, None]: + """Stream periodic sync status updates. + + Yields a GptpSyncEvent approximately once per second with current + offset, delay, and state information. + """ + self._require_started() + prev_state = self._port_state + for _ in range(100): + event_type = "sync" + if self._port_state != prev_state: + event_type = "state_change" + prev_state = self._port_state + if self._port_state == "FAULTY": + event_type = "fault" + + yield GptpSyncEvent( + event_type=event_type, + port_state=PortState(self._port_state) if self._port_state in PortState.__members__ else None, + servo_state=ServoState(self._servo_state) if self._servo_state in ("s0", "s1", "s2") else None, + offset_ns=self._last_offset_ns, + path_delay_ns=self._last_path_delay_ns, + freq_ppb=self._last_freq_ppb, + timestamp=time.time(), + ) + await asyncio.sleep(1.0) + + +class MockGptpBackend: + """Default backend for MockGptp. Can be replaced with StatefulPtp4l for stateful testing.""" + + def __init__(self): + self._started = False + self._port_state = "INITIALIZING" + self._servo_state = "s0" + self._offset_ns = 0.0 + self._priority1 = 128 + + def require_started(self): + if not self._started: + raise RuntimeError("ptp4l not started -- call start() first") + + def start(self): + if self._started: + raise RuntimeError("ptp4l already running") + self._started = True + self._port_state = "SLAVE" + self._servo_state = "s2" + self._offset_ns = -23.0 + self._priority1 = 128 + + def stop(self): + self.require_started() + self._started = False + self._port_state = "INITIALIZING" + self._servo_state = "s0" + self._offset_ns = 0.0 + + def set_priority1(self, priority: int): + self.require_started() + self._priority1 = priority + if priority < 128 and self._port_state in ("SLAVE", "LISTENING", "PASSIVE"): + self._port_state = "MASTER" + + +@dataclass(kw_only=True, config=ConfigDict(arbitrary_types_allowed=True)) +class MockGptp(Driver): + """Mock gPTP driver for testing without real PTP hardware. + + Simulates PTP synchronization behavior: after start(), immediately enters + SLAVE state with a small simulated offset. + + Accepts an optional ``backend`` to replace the default mock behavior, + enabling stateful testing with ``StatefulPtp4l``. + """ + + backend: Optional[MockGptpBackend] = field(default=None, repr=False) + + _internal_backend: MockGptpBackend = field(init=False, repr=False) + + def __post_init__(self): + if hasattr(super(), "__post_init__"): + super().__post_init__() + self._internal_backend = self.backend or MockGptpBackend() + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_gptp.client.GptpClient" + + @export + async def start(self) -> None: + """Start mock PTP synchronization.""" + self._internal_backend.start() + self.logger.info("MockGptp started") + + @export + async def stop(self) -> None: + """Stop mock PTP synchronization.""" + self._internal_backend.stop() + self.logger.info("MockGptp stopped") + + @export + @validate_call(validate_return=True) + def status(self) -> GptpStatus: + """Query the current PTP synchronization status.""" + b = self._internal_backend + b.require_started() + return GptpStatus( + port_state=PortState(b._port_state), + offset_ns=b._offset_ns, + mean_delay_ns=567.0, + servo_state=ServoState(b._servo_state), + ) + + @export + @validate_call(validate_return=True) + def get_offset(self) -> GptpOffset: + """Get the current clock offset from master.""" + b = self._internal_backend + b.require_started() + return GptpOffset( + offset_from_master_ns=b._offset_ns, + mean_path_delay_ns=567.0, + freq_ppb=1234.0, + timestamp=time.time(), + ) + + @export + @validate_call(validate_return=True) + def get_port_stats(self) -> GptpPortStats: + """Get PTP port statistics.""" + self._internal_backend.require_started() + return GptpPortStats(sync_count=42) + + @export + @validate_call(validate_return=True) + def get_clock_identity(self) -> str: + """Get this clock's identity string.""" + self._internal_backend.require_started() + return "aa:bb:cc:ff:fe:dd:ee:ff" + + @export + @validate_call(validate_return=True) + def get_parent_info(self) -> GptpParentInfo: + """Get information about the parent/grandmaster clock.""" + self._internal_backend.require_started() + return GptpParentInfo( + grandmaster_identity="11:22:33:ff:fe:44:55:66", + grandmaster_priority1=128, + ) + + @export + @validate_call(validate_return=True) + def set_priority1(self, priority: int) -> None: + """Set clock priority1.""" + self._internal_backend.set_priority1(priority) + + @export + @validate_call(validate_return=True) + def is_synchronized(self) -> bool: + """Check whether PTP is synchronized.""" + b = self._internal_backend + b.require_started() + return b._port_state == "SLAVE" and b._servo_state == "s2" + + @export + async def read(self) -> AsyncGenerator[GptpSyncEvent, None]: + """Stream simulated sync events.""" + b = self._internal_backend + b.require_started() + for _ in range(100): + yield GptpSyncEvent( + event_type="sync", + port_state=PortState(b._port_state) if b._port_state in PortState.__members__ else None, + servo_state=ServoState(b._servo_state) if b._servo_state in ("s0", "s1", "s2") else None, + offset_ns=b._offset_ns, + path_delay_ns=567.0, + freq_ppb=1234.0, + timestamp=time.time(), + ) + await asyncio.sleep(0.1) diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py new file mode 100644 index 000000000..fc4934b5c --- /dev/null +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py @@ -0,0 +1,661 @@ +from __future__ import annotations + +import os +import platform +from unittest.mock import MagicMock, patch + +import pytest +from pydantic import ValidationError + +from .common import ( + GptpOffset, + GptpParentInfo, + GptpPortStats, + GptpStatus, + GptpSyncEvent, + PortState, +) +from .conftest import PtpStateError +from .driver import Gptp, MockGptp, _generate_ptp4l_config, parse_ptp4l_log_line +from jumpstarter.client.core import DriverError +from jumpstarter.common.utils import serve + +# ============================================================================= +# Level 1: Unit Tests (No system dependencies, always run) +# ============================================================================= + + +class TestPtp4lLogParsing: + """1a. Parse ptp4l log lines into structured data.""" + + def test_parse_offset_line(self): + line = "ptp4l[1234.567]: master offset -23 s2 freq +1234 path delay 567" + result = parse_ptp4l_log_line(line) + assert result is not None + assert result.offset_ns == -23 + assert result.freq_ppb == 1234 + assert result.path_delay_ns == 567 + assert result.servo_state == "s2" + + def test_parse_port_state_change(self): + line = "ptp4l[1234.567]: port 1: LISTENING to SLAVE on MASTER_CLOCK_SELECTED" + result = parse_ptp4l_log_line(line) + assert result is not None + assert result.port_state == "SLAVE" + assert result.event == "MASTER_CLOCK_SELECTED" + + def test_parse_init_line(self): + line = "ptp4l[0.000]: port 1: INITIALIZING to LISTENING on INIT_COMPLETE" + result = parse_ptp4l_log_line(line) + assert result is not None + assert result.port_state == "LISTENING" + + def test_parse_unrecognized_line(self): + line = "ptp4l[0.000]: some unrecognized message" + result = parse_ptp4l_log_line(line) + assert result is None + + def test_parse_fault_line(self): + line = "ptp4l[5.678]: port 1: SLAVE to FAULTY on FAULT_DETECTED" + result = parse_ptp4l_log_line(line) + assert result is not None + assert result.port_state == "FAULTY" + + def test_parse_master_state(self): + line = "ptp4l[2.000]: port 1: LISTENING to MASTER on ANNOUNCE_RECEIPT_TIMEOUT_EXPIRES" + result = parse_ptp4l_log_line(line) + assert result is not None + assert result.port_state == "MASTER" + + def test_parse_large_offset(self): + line = "ptp4l[10.000]: master offset 999999999 s0 freq -50000 path delay 12345" + result = parse_ptp4l_log_line(line) + assert result is not None + assert result.offset_ns == 999999999 + assert result.servo_state == "s0" + assert result.freq_ppb == -50000 + + def test_parse_negative_freq(self): + line = "ptp4l[3.000]: master offset 42 s1 freq -9999 path delay 100" + result = parse_ptp4l_log_line(line) + assert result is not None + assert result.freq_ppb == -9999 + + def test_parse_offset_without_master_prefix(self): + line = "ptp4l[1.000]: offset -100 s2 freq +500 path delay 200" + result = parse_ptp4l_log_line(line) + assert result is not None + assert result.offset_ns == -100 + + def test_parse_port_with_interface_name(self): + line = "ptp4l[1.000]: port 1 (eth0): INITIALIZING to LISTENING on INIT_COMPLETE" + result = parse_ptp4l_log_line(line) + assert result is not None + assert result.port_state == "LISTENING" + + +class TestPtp4lConfigGeneration: + """1b. Generate ptp4l configuration from driver fields.""" + + def test_generate_gptp_config(self): + config = _generate_ptp4l_config("eth0", 0, "gptp", "L2", "auto") + assert "domainNumber\t\t0" in config + assert "network_transport\tL2" in config + assert "transportSpecific\t0x1" in config + + def test_generate_master_config(self): + config = _generate_ptp4l_config("eth0", 0, "gptp", "L2", "master") + assert "priority1\t\t0" in config + assert "priority2\t\t0" in config + + def test_generate_slave_config(self): + config = _generate_ptp4l_config("eth0", 0, "default", "UDPv4", "slave") + assert "network_transport\tUDPv4" in config + assert "slaveOnly\t\t1" in config + + def test_generate_ieee1588_config(self): + config = _generate_ptp4l_config("eth0", 0, "default", "UDPv4", "auto") + assert "transportSpecific" not in config + + def test_generate_config_custom_domain(self): + config = _generate_ptp4l_config("eth0", 42, "gptp", "L2", "auto") + assert "domainNumber\t\t42" in config + + def test_generate_config_udpv6(self): + config = _generate_ptp4l_config("eth0", 0, "default", "UDPv6", "auto") + assert "network_transport\tUDPv6" in config + + def test_generate_config_has_interface_section(self): + config = _generate_ptp4l_config("enp3s0", 0, "gptp", "L2", "auto") + assert "[enp3s0]" in config + + +class TestHwTimestampingDetection: + """1c. Detect hardware timestamping support.""" + + @patch("jumpstarter_driver_gptp.driver.subprocess.run") + def test_detect_hw_timestamping(self, mock_run): + import subprocess + mock_run.return_value = subprocess.CompletedProcess( + args=[], returncode=0, + stdout="Capabilities:\n hardware-transmit\n hardware-receive\n hardware-raw-clock\n", + ) + driver = Gptp.__new__(Gptp) + driver.interface = "eth0" + assert driver._supports_hw_timestamping() is True + + @patch("jumpstarter_driver_gptp.driver.subprocess.run") + def test_detect_sw_only_timestamping(self, mock_run): + import subprocess + mock_run.return_value = subprocess.CompletedProcess( + args=[], returncode=0, + stdout="Capabilities:\n software-transmit\n software-receive\n", + ) + driver = Gptp.__new__(Gptp) + driver.interface = "eth0" + assert driver._supports_hw_timestamping() is False + + @patch("jumpstarter_driver_gptp.driver.subprocess.run") + def test_detect_timestamping_ethtool_missing(self, mock_run): + mock_run.side_effect = FileNotFoundError("ethtool not found") + driver = Gptp.__new__(Gptp) + driver.interface = "eth0" + assert driver._supports_hw_timestamping() is False + + +class TestPydanticModels: + """1d. Pydantic model validation.""" + + def test_gptp_status_model(self): + status = GptpStatus( + port_state="SLAVE", + clock_class=248, + clock_accuracy=0x21, + offset_ns=-23, + mean_delay_ns=567, + gm_identity="aa:bb:cc:ff:fe:dd:ee:ff", + ) + assert status.port_state == PortState.SLAVE + assert status.offset_ns == -23 + + def test_gptp_status_from_enum(self): + status = GptpStatus(port_state=PortState.MASTER) + assert status.port_state == PortState.MASTER + + def test_gptp_status_invalid_port_state(self): + with pytest.raises(ValueError): + GptpStatus(port_state="INVALID_STATE") + + def test_gptp_offset_model(self): + offset = GptpOffset( + offset_from_master_ns=-42, + mean_path_delay_ns=123, + timestamp=1234567890.0, + ) + assert offset.offset_from_master_ns == -42 + + def test_gptp_sync_event(self): + event = GptpSyncEvent( + event_type="sync", + offset_ns=100.0, + port_state=PortState.SLAVE, + ) + assert event.event_type == "sync" + + def test_gptp_sync_event_invalid_type(self): + with pytest.raises(ValidationError): + GptpSyncEvent(event_type="invalid") + + def test_gptp_port_stats(self): + stats = GptpPortStats(sync_count=10, followup_count=10) + assert stats.sync_count == 10 + + def test_gptp_parent_info(self): + info = GptpParentInfo( + grandmaster_identity="11:22:33:ff:fe:44:55:66", + grandmaster_priority1=0, + ) + assert info.grandmaster_priority1 == 0 + + +class TestDriverConfigValidation: + """1e. Driver configuration validation.""" + + @patch("jumpstarter_driver_gptp.driver.subprocess.run") + def test_gptp_valid_config(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + driver = Gptp(interface="eth0") + assert driver.interface == "eth0" + assert driver.domain == 0 + assert driver.profile == "gptp" + + def test_gptp_invalid_profile(self): + with pytest.raises(ValueError, match="profile"): + Gptp(interface="eth0", profile="invalid_profile") + + def test_gptp_invalid_transport(self): + with pytest.raises(ValueError, match="transport"): + Gptp(interface="eth0", transport="SCTP") + + def test_gptp_invalid_role(self): + with pytest.raises(ValueError, match="role"): + Gptp(interface="eth0", role="observer") + + +# ============================================================================= +# Level 2: E2E Tests with MockGptp (No system dependencies, always run) +# ============================================================================= + + +class TestMockGptpLifecycle: + """2a. MockGptp simulated driver tests.""" + + def test_mock_gptp_lifecycle(self): + with serve(MockGptp()) as client: + client.start() + status = client.status() + assert status.port_state == PortState.SLAVE + assert abs(status.offset_ns) < 1000 + assert client.is_synchronized() is True + client.stop() + + def test_mock_gptp_streaming(self): + with serve(MockGptp()) as client: + client.start() + events = [] + for event in client.monitor(): + events.append(event) + if len(events) >= 3: + break + assert len(events) == 3 + assert all(e.event_type == "sync" for e in events) + client.stop() + + def test_mock_gptp_get_offset(self): + with serve(MockGptp()) as client: + client.start() + offset = client.get_offset() + assert isinstance(offset.offset_from_master_ns, float) + assert isinstance(offset.mean_path_delay_ns, float) + assert offset.timestamp > 0 + client.stop() + + def test_mock_gptp_get_port_stats(self): + with serve(MockGptp()) as client: + client.start() + stats = client.get_port_stats() + assert stats.sync_count == 42 + client.stop() + + def test_mock_gptp_get_clock_identity(self): + with serve(MockGptp()) as client: + client.start() + identity = client.get_clock_identity() + assert "ff:fe" in identity + client.stop() + + def test_mock_gptp_get_parent_info(self): + with serve(MockGptp()) as client: + client.start() + parent = client.get_parent_info() + assert parent.grandmaster_identity != "" + assert parent.grandmaster_priority1 == 128 + client.stop() + + def test_mock_gptp_set_priority_forces_master(self): + with serve(MockGptp()) as client: + client.start() + assert client.status().port_state == PortState.SLAVE + client.set_priority1(0) + assert client.status().port_state == PortState.MASTER + client.stop() + + +class TestMockGptpErrorPaths: + """2c. Error path tests.""" + + def test_status_before_start(self): + with serve(MockGptp()) as client: + with pytest.raises(DriverError, match="not started"): + client.status() + + def test_double_start(self): + with serve(MockGptp()) as client: + client.start() + with pytest.raises(DriverError, match="already running"): + client.start() + + def test_stop_before_start(self): + with serve(MockGptp()) as client: + with pytest.raises(DriverError, match="not started"): + client.stop() + + def test_get_offset_before_start(self): + with serve(MockGptp()) as client: + with pytest.raises(DriverError, match="not started"): + client.get_offset() + + def test_is_synchronized_before_start(self): + with serve(MockGptp()) as client: + with pytest.raises(DriverError, match="not started"): + client.is_synchronized() + + def test_set_priority_before_start(self): + with serve(MockGptp()) as client: + with pytest.raises(DriverError, match="not started"): + client.set_priority1(0) + + +class TestClientCli: + """2d. Client CLI tests.""" + + def test_cli_interface(self): + with serve(MockGptp()) as client: + cli = client.cli() + assert hasattr(cli, "commands") + expected = {"start", "stop", "status", "offset", "monitor", "set-priority"} + assert expected.issubset(set(cli.commands.keys())) + + def test_cli_status_command(self): + with serve(MockGptp()) as client: + client.start() + cli = client.cli() + assert cli.commands["status"].name == "status" + client.stop() + + +# ============================================================================= +# Level 2.5: Stateful Tests (No system dependencies, always run) +# ============================================================================= + + +class TestStatefulPortStateTransitions: + """2.5a. PTP port state machine transitions.""" + + def test_stateful_normal_sync_lifecycle(self, stateful_client): + client, ptp = stateful_client + client.start() + assert ptp._port_state == "LISTENING" + + ptp.simulate_sync_convergence() + status = client.status() + assert status.port_state == PortState.SLAVE + assert ptp._servo_state == "s2" + assert client.is_synchronized() is True + + client.stop() + assert ptp._started is False + + def test_stateful_init_to_master(self, stateful_client): + client, ptp = stateful_client + client.start() + ptp._transition_to("MASTER") + status = client.status() + assert status.port_state == PortState.MASTER + + def test_stateful_invalid_transition_rejected(self, stateful_ptp4l): + ptp = stateful_ptp4l + ptp.start() + with pytest.raises(PtpStateError, match="Invalid transition"): + ptp._transition_to("UNCALIBRATED") + + def test_stateful_full_state_cycle(self, stateful_client): + """Walk through: start -> LISTENING -> SLAVE -> FAULTY -> recovery -> SLAVE -> stop""" + client, ptp = stateful_client + client.start() + assert ptp._port_state == "LISTENING" + + ptp.simulate_sync_convergence() + assert ptp._port_state == "SLAVE" + assert ptp._servo_state == "s2" + + ptp.simulate_fault() + assert ptp._port_state == "FAULTY" + assert ptp._servo_state == "s0" + assert client.is_synchronized() is False + + ptp.simulate_recovery_from_fault() + assert ptp._port_state == "SLAVE" + assert ptp._servo_state == "s1" + + client.stop() + + +class TestStatefulOperationOrdering: + """2.5b. Operation ordering enforcement.""" + + def test_stateful_operations_before_start_raise(self, stateful_client): + client, ptp = stateful_client + with pytest.raises(DriverError): + client.status() + with pytest.raises(DriverError): + client.get_offset() + with pytest.raises(DriverError): + client.is_synchronized() + + def test_stateful_double_start_raises(self, stateful_client): + client, ptp = stateful_client + client.start() + with pytest.raises(DriverError): + client.start() + + def test_stateful_stop_before_start_raises(self, stateful_client): + client, ptp = stateful_client + with pytest.raises(DriverError): + client.stop() + + def test_stateful_set_priority_before_start_raises(self, stateful_client): + client, ptp = stateful_client + with pytest.raises(DriverError): + client.set_priority1(0) + + +class TestStatefulPriorityBmca: + """2.5c. Priority / BMCA role changes.""" + + def test_stateful_priority_forces_master(self, stateful_client): + client, ptp = stateful_client + client.start() + ptp.simulate_sync_convergence() + assert ptp._port_state == "SLAVE" + + client.set_priority1(0) + assert ptp._port_state == "MASTER" + assert ptp._priority1 == 0 + + def test_stateful_priority_keeps_slave(self, stateful_client): + client, ptp = stateful_client + client.start() + ptp.simulate_sync_convergence() + client.set_priority1(255) + assert ptp._port_state == "SLAVE" + assert ptp._priority1 == 255 + + +class TestStatefulFaultRecovery: + """2.5d. Fault recovery and resilience.""" + + def test_stateful_fault_clears_sync(self, stateful_client): + client, ptp = stateful_client + client.start() + ptp.simulate_sync_convergence() + assert client.is_synchronized() is True + + ptp.simulate_fault() + assert client.is_synchronized() is False + status = client.status() + assert status.port_state == PortState.FAULTY + + def test_stateful_recovery_restores_sync_capability(self, stateful_client): + client, ptp = stateful_client + client.start() + ptp.simulate_sync_convergence() + ptp.simulate_fault() + ptp.simulate_recovery_from_fault() + + assert ptp._port_state == "SLAVE" + assert ptp._servo_state == "s1" + assert client.is_synchronized() is False + + def test_stateful_multiple_fault_recovery_cycles(self, stateful_client): + client, ptp = stateful_client + client.start() + for _ in range(3): + ptp.simulate_sync_convergence() + assert ptp._port_state == "SLAVE" + ptp.simulate_fault() + assert ptp._port_state == "FAULTY" + ptp.simulate_recovery_from_fault() + assert ptp._port_state == "SLAVE" + + +class TestStatefulRestartReset: + """2.5e. Restart (Stop + Start) state reset.""" + + def test_stateful_restart_resets_state(self, stateful_client): + client, ptp = stateful_client + client.start() + ptp.simulate_sync_convergence() + assert ptp._servo_state == "s2" + assert ptp._port_state == "SLAVE" + + client.stop() + client.start() + assert ptp._port_state == "LISTENING" + assert ptp._servo_state == "s0" + assert client.is_synchronized() is False + + def test_stateful_restart_clears_priority(self, stateful_client): + client, ptp = stateful_client + client.start() + client.set_priority1(0) + client.stop() + client.start() + assert ptp._priority1 == 128 + + +class TestStatefulCallLog: + """2.5f. Call log / audit trail.""" + + def test_stateful_call_log_records_operations(self, stateful_client): + client, ptp = stateful_client + client.start() + client.set_priority1(50) + client.stop() + assert ptp._call_log == ["start", "set_priority1(50)", "stop"] + + def test_stateful_full_workflow_log(self, stateful_client): + client, ptp = stateful_client + client.start() + ptp.simulate_sync_convergence() + _ = client.status() + _ = client.get_offset() + client.set_priority1(0) + client.stop() + assert "start" in ptp._call_log + assert "set_priority1(0)" in ptp._call_log + assert "stop" in ptp._call_log + + +# ============================================================================= +# Level 3-5: Integration Tests (Env-gated) +# ============================================================================= + +_RUN_INTEGRATION = ( + os.environ.get("GPTP_INTEGRATION_TESTS", "0") == "1" + and platform.system() == "Linux" +) + +_RUN_HW_TESTS = os.environ.get("GPTP_HW_TESTS", "0") == "1" + + +@pytest.mark.skipif(not _RUN_INTEGRATION, reason="GPTP_INTEGRATION_TESTS not set or not Linux") +class TestSoftwareTimestampingIntegration: + """Level 3: Real ptp4l with software timestamping on veth pairs.""" + + @pytest.fixture + def veth_pair(self): + import subprocess as sp + cmds = [ + "ip netns add ns-ptp-master", + "ip netns add ns-ptp-slave", + "ip link add veth-m type veth peer name veth-s", + "ip link set veth-m netns ns-ptp-master", + "ip link set veth-s netns ns-ptp-slave", + "ip netns exec ns-ptp-master ip addr add 10.99.0.1/24 dev veth-m", + "ip netns exec ns-ptp-slave ip addr add 10.99.0.2/24 dev veth-s", + "ip netns exec ns-ptp-master ip link set veth-m up", + "ip netns exec ns-ptp-slave ip link set veth-s up", + ] + for cmd in cmds: + sp.run(cmd.split(), check=True) + yield ("ns-ptp-master", "veth-m", "ns-ptp-slave", "veth-s") + sp.run("ip netns del ns-ptp-master".split(), check=False) + sp.run("ip netns del ns-ptp-slave".split(), check=False) + + @pytest.fixture + def ptp_master(self, veth_pair): + import subprocess as sp + import time + ns, iface, _, _ = veth_pair + proc = sp.Popen( + ["ip", "netns", "exec", ns, "ptp4l", "-i", iface, "-S", "-m", + "--masterOnly=1", "--domainNumber=0"], + stdout=sp.PIPE, stderr=sp.STDOUT, + ) + time.sleep(2) + yield proc + proc.terminate() + proc.wait(timeout=5) + + def test_gptp_real_sync_software_timestamping(self, veth_pair, ptp_master): + import time + _, _, slave_ns, slave_iface = veth_pair + driver = Gptp( + interface=slave_iface, domain=0, profile="default", + transport="UDPv4", role="slave", sync_system_clock=False, + ) + with serve(driver) as client: + client.start() + time.sleep(10) + status = client.status() + assert status.port_state == PortState.SLAVE + offset = client.get_offset() + assert abs(offset.offset_from_master_ns) < 10_000_000 + assert client.is_synchronized() is True + client.stop() + + +@pytest.mark.skipif(not _RUN_HW_TESTS, reason="GPTP_HW_TESTS not set") +class TestHardwareTimestampingIntegration: + """Level 4: Real ptp4l with hardware timestamping.""" + + def test_gptp_hw_timestamping_sub_microsecond(self): + import time + iface = os.environ.get("GPTP_TEST_INTERFACE", "eth0") + driver = Gptp( + interface=iface, domain=0, profile="gptp", + transport="L2", role="slave", sync_system_clock=True, + ) + with serve(driver) as client: + client.start() + time.sleep(30) + offset = client.get_offset() + assert abs(offset.offset_from_master_ns) < 1000 + parent = client.get_parent_info() + assert parent.grandmaster_identity is not None + client.stop() + + def test_gptp_hw_master_role(self): + import time + iface = os.environ.get("GPTP_TEST_INTERFACE", "eth0") + driver = Gptp( + interface=iface, domain=0, profile="gptp", + transport="L2", role="master", sync_system_clock=False, + ) + with serve(driver) as client: + client.start() + time.sleep(10) + status = client.status() + assert status.port_state == PortState.MASTER + client.stop() diff --git a/python/packages/jumpstarter-driver-gptp/pyproject.toml b/python/packages/jumpstarter-driver-gptp/pyproject.toml new file mode 100644 index 000000000..4a1daef63 --- /dev/null +++ b/python/packages/jumpstarter-driver-gptp/pyproject.toml @@ -0,0 +1,47 @@ +[project] +name = "jumpstarter-driver-gptp" +dynamic = ["version", "urls"] +description = "gPTP/PTP time synchronization driver for Jumpstarter, wrapping linuxptp (ptp4l/phc2sys)" +readme = "README.md" +license = "Apache-2.0" +authors = [ + { name = "Vinicius Zein", email = "vtzein@gmail.com" } +] +requires-python = ">=3.11" +dependencies = [ + "anyio>=4.10.0", + "jumpstarter", + "click", +] + +[project.entry-points."jumpstarter.drivers"] +Gptp = "jumpstarter_driver_gptp.driver:Gptp" +MockGptp = "jumpstarter_driver_gptp.driver:MockGptp" + +[tool.hatch.version] +source = "vcs" +raw-options = { 'root' = '../../../'} + +[tool.hatch.metadata.hooks.vcs.urls] +Homepage = "https://jumpstarter.dev" +source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip" + +[tool.pytest.ini_options] +addopts = "--cov --cov-report=html --cov-report=xml" +log_cli = true +log_cli_level = "INFO" +testpaths = ["jumpstarter_driver_gptp"] +asyncio_mode = "auto" + +[build-system] +requires = ["hatchling", "hatch-vcs", "hatch-pin-jumpstarter"] +build-backend = "hatchling.build" + +[tool.hatch.build.hooks.pin_jumpstarter] +name = "pin_jumpstarter" + +[dependency-groups] +dev = [ + "pytest-cov>=6.0.0", + "pytest>=8.3.3", +] diff --git a/python/pyproject.toml b/python/pyproject.toml index dbecb9b3b..f46d9429a 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -46,6 +46,7 @@ jumpstarter-driver-ustreamer = { workspace = true } jumpstarter-driver-yepkit = { workspace = true } jumpstarter-driver-vnc = { workspace = true } jumpstarter-driver-xcp = { workspace = true } +jumpstarter-driver-gptp = { workspace = true } jumpstarter-imagehash = { workspace = true } jumpstarter-kubernetes = { workspace = true } jumpstarter-mcp = { workspace = true } diff --git a/python/uv.lock b/python/uv.lock index 840197663..7ae17a9ad 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -28,6 +28,7 @@ members = [ "jumpstarter-driver-esp32", "jumpstarter-driver-flashers", "jumpstarter-driver-gpiod", + "jumpstarter-driver-gptp", "jumpstarter-driver-http", "jumpstarter-driver-http-power", "jumpstarter-driver-iscsi", @@ -2580,6 +2581,34 @@ dev = [ { name = "pytest-cov", specifier = ">=5.0.0" }, ] +[[package]] +name = "jumpstarter-driver-gptp" +source = { editable = "packages/jumpstarter-driver-gptp" } +dependencies = [ + { name = "anyio" }, + { name = "click" }, + { name = "jumpstarter" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pytest" }, + { name = "pytest-cov" }, +] + +[package.metadata] +requires-dist = [ + { name = "anyio", specifier = ">=4.10.0" }, + { name = "click" }, + { name = "jumpstarter", editable = "packages/jumpstarter" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "pytest", specifier = ">=8.3.3" }, + { name = "pytest-cov", specifier = ">=6.0.0" }, +] + [[package]] name = "jumpstarter-driver-http" source = { editable = "packages/jumpstarter-driver-http" } From 8f58d23deb2e70fd1d1884e3f523115b552fe894 Mon Sep 17 00:00:00 2001 From: Vinicius Zein Date: Wed, 8 Apr 2026 23:10:10 -0400 Subject: [PATCH 2/4] fix: address review findings for gPTP driver Security: - Validate interface name with regex (reject injection attacks) - Add denylist for ptp4l_extra_args (-f, --config, -i, --uds_address, etc.) - Set temp config file permissions to 0o600 via os.fchmod Async correctness: - Convert _supports_hw_timestamping to async with asyncio.create_subprocess_exec and 10s timeout (was blocking subprocess.run in async context) - Replace deprecated asyncio.get_event_loop().create_task with asyncio.create_task - Add done_callback to reader task for unhandled exception logging Process lifecycle: - Wrap start() in try/except with _cleanup() on any failure path - Add phc2sys immediate-exit check (matching ptp4l pattern) - Fix teardown order: terminate ptp4l first, then cancel reader, then phc2sys - Add await proc.wait() after kill() to prevent zombie processes - Invalidate session on ptp4l EOF (_ptp4l_proc = None, reset state) - Check returncode in _require_started() to detect exited processes phc2sys: - Replace -a (all PHCs) with -s -c CLOCK_REALTIME -w (scoped) - Use stdout=DEVNULL instead of PIPE (prevent pipe buffer deadlock) - Add start_new_session=True to both ptp4l and phc2sys API changes: - get_clock_identity, get_parent_info, set_priority1 now raise NotImplementedError on real driver (require UDS integration) - MockGptp still implements these for testing - Remove MockGptp from entry points (not a production driver) - read() streams indefinitely (while True) instead of range(100) Client: - Narrow wait_for_sync except to RuntimeError only Tests: - Update HW timestamping tests for async _supports_hw_timestamping - Add interface name validation tests (injection, too-long, valid names) - Add extra_args denylist tests - Add config generation priority1 test - Fix veth_pair fixture: keep both interfaces in root namespace - Use pytest-asyncio with asyncio_mode=auto (project convention) Documentation: - Add comprehensive docstrings to all public APIs (~80% coverage) - Add text language tag to state-machine code fence (MD040) - Remove $ prompts from CLI examples (MD014) - Document NotImplementedError for stub methods in README - Fix source_archive URL in pyproject.toml Made-with: Cursor --- .../jumpstarter-driver-gptp/README.md | 35 +- .../jumpstarter_driver_gptp/client.py | 107 ++-- .../jumpstarter_driver_gptp/common.py | 67 ++- .../jumpstarter_driver_gptp/conftest.py | 59 +- .../jumpstarter_driver_gptp/driver.py | 526 +++++++++++++----- .../jumpstarter_driver_gptp/driver_test.py | 168 ++++-- .../jumpstarter-driver-gptp/pyproject.toml | 4 +- python/uv.lock | 56 ++ 8 files changed, 766 insertions(+), 256 deletions(-) diff --git a/python/packages/jumpstarter-driver-gptp/README.md b/python/packages/jumpstarter-driver-gptp/README.md index 99014d86b..b5f37fb90 100644 --- a/python/packages/jumpstarter-driver-gptp/README.md +++ b/python/packages/jumpstarter-driver-gptp/README.md @@ -123,7 +123,7 @@ export: PTP ports transition through these states: -``` +```text INITIALIZING → LISTENING → SLAVE (synchronized to master) → MASTER (elected as grandmaster) → PASSIVE (backup, not active) @@ -145,10 +145,16 @@ The clock servo tracks synchronization quality: ```{eval-rst} .. autoclass:: jumpstarter_driver_gptp.client.GptpClient() :members: start, stop, status, get_offset, get_port_stats, - get_clock_identity, get_parent_info, set_priority1, is_synchronized, wait_for_sync, monitor ``` +```{note} +`get_clock_identity()`, `get_parent_info()`, and `set_priority1()` are +available on the ``MockGptp`` driver for testing. On the real ``Gptp`` driver +they raise ``NotImplementedError`` until ptp4l UDS management socket +integration is added. +``` + ## Examples ### Basic lifecycle @@ -179,19 +185,6 @@ with serve(Gptp(interface="eth0")) as gptp: gptp.stop() ``` -### Force master role - -```python -with serve(Gptp(interface="eth0", role="auto")) as gptp: - gptp.start() - gptp.wait_for_sync() - - # Override BMCA: become grandmaster - gptp.set_priority1(0) - status = gptp.status() - assert status.port_state.value == "MASTER" -``` - ### Using MockGptp in tests ```python @@ -211,12 +204,12 @@ def test_my_application(): When used inside `jmp shell`, the driver provides these commands: ```console -$ j gptp start # Start PTP synchronization -$ j gptp stop # Stop PTP synchronization -$ j gptp status # Show sync status -$ j gptp offset # Show current clock offset -$ j gptp monitor -n 20 # Monitor 20 sync events -$ j gptp set-priority 0 # Force grandmaster role +j gptp start # Start PTP synchronization +j gptp stop # Stop PTP synchronization +j gptp status # Show sync status +j gptp offset # Show current clock offset +j gptp monitor -n 20 # Monitor 20 sync events +j gptp set-priority 0 # Force grandmaster role ``` ## Hardware Requirements diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py index 5a8458216..c56ea9150 100644 --- a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py @@ -30,64 +30,76 @@ def start(self) -> None: Spawns ptp4l (and optionally phc2sys) as managed subprocesses. - :raises RuntimeError: If ptp4l is already running + Raises: + RuntimeError: If ptp4l is already running. """ self.call("start") def stop(self) -> None: """Stop PTP synchronization. - Terminates ptp4l and phc2sys processes. + Terminates ptp4l and phc2sys processes and cleans up temp files. - :raises RuntimeError: If ptp4l is not started + Raises: + RuntimeError: If ptp4l is not started. """ self.call("stop") def status(self) -> GptpStatus: """Query the current PTP synchronization status. - Returns the port state, clock class, current offset from master, - mean path delay, and servo state. + Returns: + Current synchronization status including port state, + offset, delay, and servo state. - :returns: Current synchronization status - :rtype: GptpStatus - :raises RuntimeError: If ptp4l is not started + Raises: + RuntimeError: If ptp4l is not started. """ return GptpStatus.model_validate(self.call("status")) def get_offset(self) -> GptpOffset: """Get the current clock offset from master. - :returns: Offset measurement including path delay and frequency - :rtype: GptpOffset - :raises RuntimeError: If ptp4l is not started + Returns: + Offset measurement including path delay and frequency. + + Raises: + RuntimeError: If ptp4l is not started. """ return GptpOffset.model_validate(self.call("get_offset")) def get_port_stats(self) -> GptpPortStats: """Get PTP port statistics. - :returns: Port statistics counters - :rtype: GptpPortStats - :raises RuntimeError: If ptp4l is not started + Returns: + Port statistics counters. + + Raises: + RuntimeError: If ptp4l is not started. """ return GptpPortStats.model_validate(self.call("get_port_stats")) def get_clock_identity(self) -> str: """Get this clock's identity string. - :returns: Clock identity - :rtype: str - :raises RuntimeError: If ptp4l is not started + Returns: + Clock identity as EUI-64 string. + + Raises: + RuntimeError: If ptp4l is not started. + NotImplementedError: If the real driver has no UDS integration. """ return self.call("get_clock_identity") def get_parent_info(self) -> GptpParentInfo: """Get information about the parent/grandmaster clock. - :returns: Parent and grandmaster clock information - :rtype: GptpParentInfo - :raises RuntimeError: If ptp4l is not started + Returns: + Parent and grandmaster clock information. + + Raises: + RuntimeError: If ptp4l is not started. + NotImplementedError: If the real driver has no UDS integration. """ return GptpParentInfo.model_validate(self.call("get_parent_info")) @@ -96,34 +108,45 @@ def set_priority1(self, priority: int) -> None: Lower values make this clock more likely to become grandmaster. - :param priority: Priority1 value (0-255) - :raises RuntimeError: If ptp4l is not started + Args: + priority: Priority1 value (0-255). + + Raises: + RuntimeError: If ptp4l is not started. + NotImplementedError: If the real driver has no UDS integration. """ self.call("set_priority1", priority) def is_synchronized(self) -> bool: """Check whether PTP is synchronized (servo locked in SLAVE state). - :returns: True if synchronized - :rtype: bool - :raises RuntimeError: If ptp4l is not started + Returns: + True if synchronized. + + Raises: + RuntimeError: If ptp4l is not started. """ return self.call("is_synchronized") def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool: """Block until PTP synchronization is achieved or timeout expires. - :param timeout: Maximum time to wait in seconds - :param poll_interval: Polling interval in seconds - :returns: True if synchronized, False if timeout expired - :rtype: bool + Only catches ``RuntimeError`` (driver not-yet-ready) during polling. + Transport or unexpected failures propagate immediately. + + Args: + timeout: Maximum time to wait in seconds. + poll_interval: Polling interval in seconds. + + Returns: + True if synchronized before timeout, False otherwise. """ deadline = time.monotonic() + timeout while time.monotonic() < deadline: try: if self.is_synchronized(): return True - except Exception: + except RuntimeError: pass time.sleep(poll_interval) return False @@ -131,14 +154,22 @@ def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bo def monitor(self) -> Generator[GptpSyncEvent, None, None]: """Stream PTP sync status updates. - Yields GptpSyncEvent objects with current offset, delay, and state. + Yields ``GptpSyncEvent`` objects with current offset, delay, and state. - :yields: Sync event updates + Yields: + Sync event updates. """ for v in self.streamingcall("read"): yield GptpSyncEvent.model_validate(v) def cli(self): + """Build the Click CLI group for gPTP commands. + + Returns: + Click group with start, stop, status, offset, monitor, + and set-priority commands. + """ + @driver_click_group(self) def base(): """gPTP/PTP time synchronization""" @@ -146,19 +177,19 @@ def base(): @base.command() def start(): - """Start PTP synchronization""" + """Start PTP synchronization.""" self.start() click.echo("PTP synchronization started") @base.command() def stop(): - """Stop PTP synchronization""" + """Stop PTP synchronization.""" self.stop() click.echo("PTP synchronization stopped") @base.command() def status(): - """Show PTP synchronization status""" + """Show PTP synchronization status.""" s = self.status() click.echo(f"Port state: {s.port_state.value}") click.echo(f"Servo state: {s.servo_state.value}") @@ -168,7 +199,7 @@ def status(): @base.command() def offset(): - """Show current clock offset from master""" + """Show current clock offset from master.""" o = self.get_offset() click.echo(f"Offset: {o.offset_from_master_ns:.0f} ns") click.echo(f"Path delay: {o.mean_path_delay_ns:.0f} ns") @@ -177,7 +208,7 @@ def offset(): @base.command() @click.option("--count", "-n", default=10, help="Number of events to show") def monitor(count): - """Monitor PTP sync events""" + """Monitor PTP sync events.""" for i, event in enumerate(self.monitor()): click.echo( f"[{event.event_type}] state={event.port_state} " @@ -190,7 +221,7 @@ def monitor(count): @base.command(name="set-priority") @click.argument("priority", type=int) def set_priority(priority): - """Set clock priority1 for BMCA""" + """Set clock priority1 for BMCA.""" self.set_priority1(priority) click.echo(f"Priority1 set to {priority}") diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/common.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/common.py index db50ee48d..46e2079de 100644 --- a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/common.py +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/common.py @@ -1,3 +1,5 @@ +"""Pydantic models and enums for gPTP/PTP time synchronization.""" + from __future__ import annotations from enum import Enum @@ -7,6 +9,8 @@ class PortState(str, Enum): + """IEEE 802.1AS / IEEE 1588 port state machine states.""" + INITIALIZING = "INITIALIZING" LISTENING = "LISTENING" MASTER = "MASTER" @@ -17,6 +21,13 @@ class PortState(str, Enum): class ServoState(str, Enum): + """PTP clock servo synchronization states. + + - ``s0``: unlocked — no synchronization yet. + - ``s1``: calibrating — frequency adjustment in progress. + - ``s2``: locked — fully synchronized. + """ + UNLOCKED = "s0" CALIBRATING = "s1" LOCKED = "s2" @@ -31,10 +42,21 @@ class ServoState(str, Enum): "FAULTY": {"INITIALIZING", "LISTENING"}, "UNCALIBRATED": {"SLAVE", "FAULTY", "LISTENING"}, } +"""Valid IEEE 802.1AS port state transitions, keyed by current state.""" class GptpStatus(BaseModel): - """Current PTP synchronization status.""" + """Current PTP synchronization status snapshot. + + Attributes: + port_state: Current port state machine state. + clock_class: PTP clock class (default 248 = slave-only). + clock_accuracy: PTP clock accuracy enumeration. + offset_ns: Current offset from master in nanoseconds. + mean_delay_ns: Mean path delay in nanoseconds. + gm_identity: Grandmaster clock identity string. + servo_state: Current servo synchronization state. + """ port_state: PortState clock_class: int = 248 @@ -47,13 +69,21 @@ class GptpStatus(BaseModel): @field_validator("port_state", mode="before") @classmethod def _coerce_port_state(cls, v: str | PortState) -> PortState: + """Accept both string and enum values for port_state.""" if isinstance(v, str): return PortState(v) return v class GptpOffset(BaseModel): - """Clock offset measurement from master.""" + """Clock offset measurement from master. + + Attributes: + offset_from_master_ns: Clock offset from master in nanoseconds. + mean_path_delay_ns: Mean path delay in nanoseconds. + freq_ppb: Frequency adjustment in parts per billion. + timestamp: Unix timestamp of the measurement. + """ offset_from_master_ns: float mean_path_delay_ns: float @@ -62,7 +92,17 @@ class GptpOffset(BaseModel): class GptpSyncEvent(BaseModel): - """A single sync status update from ptp4l.""" + """A single sync status update from ptp4l. + + Attributes: + event_type: Type of event — ``"sync"``, ``"state_change"``, or ``"fault"``. + port_state: Current port state (if known). + servo_state: Current servo state (if known). + offset_ns: Current offset in nanoseconds. + path_delay_ns: Current path delay in nanoseconds. + freq_ppb: Current frequency adjustment in ppb. + timestamp: Unix timestamp of the event. + """ event_type: Literal["sync", "state_change", "fault"] port_state: Optional[PortState] = None @@ -74,7 +114,15 @@ class GptpSyncEvent(BaseModel): class GptpPortStats(BaseModel): - """PTP port-level statistics.""" + """PTP port-level statistics counters. + + Attributes: + sync_count: Number of sync messages processed. + followup_count: Number of follow-up messages processed. + pdelay_req_count: Number of pdelay request messages sent. + pdelay_resp_count: Number of pdelay response messages received. + announce_count: Number of announce messages processed. + """ sync_count: int = 0 followup_count: int = 0 @@ -84,7 +132,16 @@ class GptpPortStats(BaseModel): class GptpParentInfo(BaseModel): - """Information about the parent/grandmaster clock.""" + """Information about the parent/grandmaster clock. + + Attributes: + parent_clock_identity: Parent clock identity string. + grandmaster_identity: Grandmaster clock identity string. + grandmaster_priority1: Grandmaster priority1 value. + grandmaster_priority2: Grandmaster priority2 value. + grandmaster_clock_class: Grandmaster clock class. + grandmaster_clock_accuracy: Grandmaster clock accuracy. + """ parent_clock_identity: str = "" grandmaster_identity: str = "" diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py index 57513c193..4ca0b7331 100644 --- a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py @@ -1,3 +1,5 @@ +"""Stateful PTP mock backend and test fixtures for gPTP driver testing.""" + from __future__ import annotations import pytest @@ -8,14 +10,20 @@ class PtpNotStartedError(RuntimeError): + """Raised when an operation requires ptp4l to be running.""" + pass class PtpAlreadyRunningError(RuntimeError): + """Raised when attempting to start ptp4l while it is already running.""" + pass class PtpStateError(RuntimeError): + """Raised when an invalid PTP state transition is attempted.""" + pass @@ -29,6 +37,9 @@ class StatefulPtp4l(MockGptpBackend): - Servo state: s0 (unlocked) -> s1 (calibrating) -> s2 (locked) - Sync offset convergence (simulated) - Priority1 changes and BMCA re-evaluation + + Attributes: + _call_log: Ordered list of operations for audit assertions. """ def __init__(self): @@ -36,10 +47,16 @@ def __init__(self): self._call_log: list[str] = [] def require_started(self): + """Raise PtpNotStartedError if the mock is not started.""" if not self._started: raise PtpNotStartedError("ptp4l not started -- call start() first") def start(self): + """Start mock ptp4l, entering LISTENING state. + + Raises: + PtpAlreadyRunningError: If already running. + """ if self._started: raise PtpAlreadyRunningError("ptp4l already running") self._started = True @@ -51,6 +68,11 @@ def start(self): self._call_log.append("start") def stop(self): + """Stop mock ptp4l and reset all state. + + Raises: + PtpNotStartedError: If not started. + """ self.require_started() self._started = False self._port_state = "INITIALIZING" @@ -58,6 +80,14 @@ def stop(self): self._call_log.append("stop") def _transition_to(self, new_state: str): + """Transition port to new_state, enforcing valid transitions. + + Args: + new_state: Target port state. + + Raises: + PtpStateError: If the transition is not valid per IEEE 802.1AS. + """ valid = VALID_PORT_TRANSITIONS.get(self._port_state, set()) if new_state not in valid: raise PtpStateError( @@ -66,7 +96,11 @@ def _transition_to(self, new_state: str): self._port_state = new_state def simulate_sync_convergence(self): - """Simulate the typical LISTENING -> SLAVE -> servo lock sequence.""" + """Simulate the typical LISTENING -> SLAVE -> servo lock sequence. + + Raises: + PtpNotStartedError: If not started. + """ self.require_started() if self._port_state == "LISTENING": self._transition_to("SLAVE") @@ -76,11 +110,22 @@ def simulate_sync_convergence(self): self._offset_ns = -23.0 def simulate_fault(self): + """Simulate a fault condition on the port. + + Raises: + PtpNotStartedError: If not started. + """ self.require_started() self._transition_to("FAULTY") self._servo_state = "s0" def simulate_recovery_from_fault(self): + """Recover from FAULTY state back to SLAVE. + + Raises: + PtpNotStartedError: If not started. + PtpStateError: If not currently in FAULTY state. + """ self.require_started() if self._port_state != "FAULTY": raise PtpStateError( @@ -91,6 +136,14 @@ def simulate_recovery_from_fault(self): self._servo_state = "s1" def set_priority1(self, value: int): + """Set priority1 and simulate BMCA re-evaluation. + + Args: + value: New priority1 value (0-255). + + Raises: + PtpNotStartedError: If not started. + """ self.require_started() self._priority1 = value if value < 128 and self._port_state in ("SLAVE", "LISTENING", "PASSIVE"): @@ -101,6 +154,7 @@ def set_priority1(self, value: int): @pytest.fixture def stateful_ptp4l(): + """Create a fresh StatefulPtp4l instance for direct state testing.""" return StatefulPtp4l() @@ -110,6 +164,9 @@ def stateful_client(stateful_ptp4l): The MockGptp @export methods remain intact and delegate to the stateful backend, so gRPC routing works correctly. + + Yields: + Tuple of (client, stateful_ptp4l) for test assertions. """ driver = MockGptp(backend=stateful_ptp4l) with serve(driver) as client: diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py index fff61a0be..aa2bd898a 100644 --- a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py @@ -2,8 +2,8 @@ import asyncio import logging +import os import re -import subprocess import tempfile import time from collections.abc import AsyncGenerator @@ -29,6 +29,11 @@ _VALID_PROFILES = {"gptp", "default"} _VALID_TRANSPORTS = {"L2", "UDPv4", "UDPv6"} _VALID_ROLES = {"master", "slave", "auto"} +_INTERFACE_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,14}$") +_DENIED_PTP4L_ARGS = frozenset({ + "-f", "--config", "-i", "--interface", + "--uds_address", "--log_file", +}) _OFFSET_RE = re.compile( r"ptp4l\[[\d.]+\]:\s+(?:master\s+)?offset\s+(-?\d+)\s+(\w+)\s+freq\s+([+-]?\d+)\s+path\s+delay\s+(-?\d+)" @@ -51,7 +56,17 @@ def __init__(self): def parse_ptp4l_log_line(line: str) -> Optional[ParsedLogLine]: - """Parse a single ptp4l log line into structured data.""" + """Parse a single ptp4l log line into structured data. + + Extracts offset/frequency/delay from sync lines and port state + transitions from state-change lines. + + Args: + line: Raw log line from ptp4l stdout. + + Returns: + Parsed result or None if the line is not recognized. + """ m = _OFFSET_RE.search(line) if m: result = ParsedLogLine() @@ -77,8 +92,21 @@ def _generate_ptp4l_config( profile: str, transport: str, role: str, + priority1: int = 128, ) -> str: - """Generate ptp4l configuration file content.""" + """Generate ptp4l configuration file content. + + Args: + interface: Network interface name for the [interface] section. + domain: PTP domain number. + profile: ``"gptp"`` or ``"default"``. + transport: ``"L2"``, ``"UDPv4"``, or ``"UDPv6"``. + role: ``"master"``, ``"slave"``, or ``"auto"``. + priority1: Clock priority1 value (0-255). + + Returns: + INI-style configuration string for ptp4l. + """ lines = ["[global]"] lines.append(f"domainNumber\t\t{domain}") lines.append(f"network_transport\t{transport}") @@ -92,13 +120,30 @@ def _generate_ptp4l_config( if role == "slave": lines.append("slaveOnly\t\t1") elif role == "master": - lines.append("priority1\t\t0") + lines.append(f"priority1\t\t{priority1}") lines.append("priority2\t\t0") + else: + lines.append(f"priority1\t\t{priority1}") lines.append(f"\n[{interface}]") return "\n".join(lines) + "\n" +def _validate_extra_args(args: list[str]) -> None: + """Reject ptp4l CLI arguments that could override safety-critical settings. + + Raises: + ValueError: If a denied argument is found. + """ + for arg in args: + base = arg.split("=", 1)[0] + if base in _DENIED_PTP4L_ARGS: + raise ValueError( + f"ptp4l_extra_args contains denied argument {arg!r}; " + f"denied list: {sorted(_DENIED_PTP4L_ARGS)}" + ) + + @dataclass(kw_only=True, config=ConfigDict(arbitrary_types_allowed=True)) class Gptp(Driver): """gPTP/PTP driver managing linuxptp (ptp4l/phc2sys) for time synchronization. @@ -106,6 +151,15 @@ class Gptp(Driver): Provides lifecycle management, status monitoring, and configuration of IEEE 802.1AS (gPTP) or IEEE 1588 (PTPv2) time synchronization between the exporter host and a target device. + + Attributes: + interface: Network interface name (e.g. ``eth0``). + domain: PTP domain number (0-127). + profile: ``"gptp"`` (IEEE 802.1AS) or ``"default"`` (IEEE 1588). + transport: ``"L2"``, ``"UDPv4"``, or ``"UDPv6"``. + role: ``"master"``, ``"slave"``, or ``"auto"`` (BMCA election). + sync_system_clock: Whether to run ``phc2sys`` for CLOCK_REALTIME sync. + ptp4l_extra_args: Additional trusted ptp4l CLI arguments. """ interface: str @@ -122,7 +176,7 @@ class Gptp(Driver): _phc2sys_proc: Optional[asyncio.subprocess.Process] = field( init=False, default=None, repr=False ) - _config_file: Optional[tempfile.NamedTemporaryFile] = field( + _config_file_path: Optional[str] = field( init=False, default=None, repr=False ) _port_state: str = field(init=False, default="INITIALIZING") @@ -140,6 +194,11 @@ def __post_init__(self): if hasattr(super(), "__post_init__"): super().__post_init__() + if not _INTERFACE_RE.match(self.interface): + raise ValueError( + f"Invalid interface name: {self.interface!r}. " + "Must match [a-zA-Z0-9][a-zA-Z0-9._-]{0,14}" + ) if self.profile not in _VALID_PROFILES: raise ValueError( f"Invalid profile: {self.profile!r}. Must be one of {_VALID_PROFILES}" @@ -152,35 +211,73 @@ def __post_init__(self): raise ValueError( f"Invalid role: {self.role!r}. Must be one of {_VALID_ROLES}" ) + _validate_extra_args(self.ptp4l_extra_args) @classmethod def client(cls) -> str: + """Return the fully-qualified client class path.""" return "jumpstarter_driver_gptp.client.GptpClient" - def _supports_hw_timestamping(self) -> bool: + async def _supports_hw_timestamping(self) -> bool: + """Check if the interface supports hardware timestamping via ethtool. + + Runs ethtool asynchronously to avoid blocking the event loop. + + Returns: + True if hardware-transmit and hardware-receive are supported. + """ try: - result = subprocess.run( - ["ethtool", "-T", self.interface], - capture_output=True, - text=True, + proc = await asyncio.create_subprocess_exec( + "ethtool", "-T", self.interface, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) - output = result.stdout + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10.0) + output = stdout.decode("utf-8", errors="replace") return "hardware-transmit" in output and "hardware-receive" in output - except FileNotFoundError: + except (FileNotFoundError, asyncio.TimeoutError, OSError): return False def _require_started(self) -> None: + """Raise RuntimeError if ptp4l is not running. + + Checks both that the process handle exists and that the process + has not exited. + """ if self._ptp4l_proc is None: raise RuntimeError("ptp4l not started -- call start() first") + if self._ptp4l_proc.returncode is not None: + self._ptp4l_proc = None + self._synchronized_invalidate() + raise RuntimeError("ptp4l process has exited unexpectedly") + + def _synchronized_invalidate(self) -> None: + """Reset sync-related state when ptp4l dies or stops.""" + self._port_state = "INITIALIZING" + self._servo_state = "s0" + + def _on_reader_done(self, task: asyncio.Task) -> None: + """Log unhandled exceptions from the background reader task.""" + if task.cancelled(): + return + exc = task.exception() + if exc is not None: + self.logger.error("ptp4l reader task failed: %s", exc) async def _read_ptp4l_output(self) -> None: - """Background task: read ptp4l stdout and update internal state.""" + """Background task: read ptp4l stdout and update internal state. + + On EOF (process exit), invalidates the session so subsequent + calls to ``_require_started()`` will raise. + """ proc = self._ptp4l_proc if proc is None or proc.stdout is None: return while True: raw = await proc.stdout.readline() if not raw: + self._ptp4l_proc = None + self._synchronized_invalidate() break line = raw.decode("utf-8", errors="replace").strip() if not line: @@ -199,76 +296,23 @@ async def _read_ptp4l_output(self) -> None: if parsed.port_state is not None: self._port_state = parsed.port_state - @export - async def start(self) -> None: - """Start PTP synchronization by spawning ptp4l (and optionally phc2sys).""" - if self._ptp4l_proc is not None: - raise RuntimeError("ptp4l already running") - - config_content = _generate_ptp4l_config( - interface=self.interface, - domain=self.domain, - profile=self.profile, - transport=self.transport, - role=self.role, - ) - - self._config_file = tempfile.NamedTemporaryFile( - mode="w", suffix=".cfg", prefix="ptp4l_", delete=False - ) - self._config_file.write(config_content) - self._config_file.flush() - - hw_ts = self._supports_hw_timestamping() - ts_flag = "-H" if hw_ts else "-S" - if not hw_ts: - self.logger.warning( - "Hardware timestamping not available on %s, falling back to software timestamping", - self.interface, - ) + async def _cleanup(self) -> None: + """Clean up all resources: processes, reader task, config file. - cmd = [ - "ptp4l", - "-f", self._config_file.name, - "-i", self.interface, - ts_flag, - "-m", - *self.ptp4l_extra_args, - ] - self.logger.info("Starting ptp4l: %s", " ".join(cmd)) - self._ptp4l_proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - - self._port_state = "INITIALIZING" - self._servo_state = "s0" - self._priority1 = 128 - self._stats = {} - self._reader_task = asyncio.get_event_loop().create_task( - self._read_ptp4l_output() - ) - - await asyncio.sleep(0.5) - if self._ptp4l_proc.returncode is not None: - raise RuntimeError( - f"ptp4l exited immediately with code {self._ptp4l_proc.returncode}" - ) - - if self.sync_system_clock and hw_ts: - phc2sys_cmd = ["phc2sys", "-a", "-rr", "-m"] - self.logger.info("Starting phc2sys: %s", " ".join(phc2sys_cmd)) - self._phc2sys_proc = await asyncio.create_subprocess_exec( - *phc2sys_cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - - @export - async def stop(self) -> None: - """Stop PTP synchronization.""" - self._require_started() + Safe to call even if partially initialized. Order: + 1. Terminate ptp4l and wait + 2. Cancel reader task + 3. Terminate phc2sys and wait + 4. Remove temp config file + """ + if self._ptp4l_proc is not None: + self._ptp4l_proc.terminate() + try: + await asyncio.wait_for(self._ptp4l_proc.wait(), timeout=5.0) + except asyncio.TimeoutError: + self._ptp4l_proc.kill() + await self._ptp4l_proc.wait() + self._ptp4l_proc = None if self._reader_task is not None: self._reader_task.cancel() @@ -284,35 +328,133 @@ async def stop(self) -> None: await asyncio.wait_for(self._phc2sys_proc.wait(), timeout=5.0) except asyncio.TimeoutError: self._phc2sys_proc.kill() + await self._phc2sys_proc.wait() self._phc2sys_proc = None - if self._ptp4l_proc is not None: - self._ptp4l_proc.terminate() - try: - await asyncio.wait_for(self._ptp4l_proc.wait(), timeout=5.0) - except asyncio.TimeoutError: - self._ptp4l_proc.kill() - self._ptp4l_proc = None - - if self._config_file is not None: - import os + if self._config_file_path is not None: try: - os.unlink(self._config_file.name) + os.unlink(self._config_file_path) except OSError: pass - self._config_file = None + self._config_file_path = None - self._port_state = "INITIALIZING" - self._servo_state = "s0" + @export + async def start(self) -> None: + """Start PTP synchronization by spawning ptp4l (and optionally phc2sys). + + Creates a temporary ptp4l config file, spawns the ptp4l process, + and optionally spawns phc2sys for system clock synchronization. + + Raises: + RuntimeError: If ptp4l is already running or exits immediately. + """ + if self._ptp4l_proc is not None: + raise RuntimeError("ptp4l already running") + + try: + config_content = _generate_ptp4l_config( + interface=self.interface, + domain=self.domain, + profile=self.profile, + transport=self.transport, + role=self.role, + priority1=self._priority1, + ) + + fd = tempfile.mkstemp(suffix=".cfg", prefix="ptp4l_") + os.fchmod(fd[0], 0o600) + with os.fdopen(fd[0], "w") as f: + f.write(config_content) + self._config_file_path = fd[1] + + hw_ts = await self._supports_hw_timestamping() + ts_flag = "-H" if hw_ts else "-S" + if not hw_ts: + self.logger.warning( + "Hardware timestamping not available on %s, falling back to software timestamping", + self.interface, + ) + + cmd = [ + "ptp4l", + "-f", self._config_file_path, + "-i", self.interface, + ts_flag, + "-m", + *self.ptp4l_extra_args, + ] + self.logger.info("Starting ptp4l: %s", " ".join(cmd)) + self._ptp4l_proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + start_new_session=True, + ) + + self._port_state = "INITIALIZING" + self._servo_state = "s0" + self._priority1 = 128 + self._stats = {} + self._reader_task = asyncio.create_task(self._read_ptp4l_output()) + self._reader_task.add_done_callback(self._on_reader_done) + + await asyncio.sleep(0.5) + if self._ptp4l_proc is not None and self._ptp4l_proc.returncode is not None: + raise RuntimeError( + f"ptp4l exited immediately with code {self._ptp4l_proc.returncode}" + ) + + if self.sync_system_clock and hw_ts: + phc2sys_cmd = [ + "phc2sys", + "-s", self.interface, + "-c", "CLOCK_REALTIME", + "-w", "-m", + ] + self.logger.info("Starting phc2sys: %s", " ".join(phc2sys_cmd)) + self._phc2sys_proc = await asyncio.create_subprocess_exec( + *phc2sys_cmd, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + start_new_session=True, + ) + + await asyncio.sleep(0.5) + if self._phc2sys_proc.returncode is not None: + raise RuntimeError( + f"phc2sys exited immediately with code {self._phc2sys_proc.returncode}" + ) + except Exception: + await self._cleanup() + raise + + @export + async def stop(self) -> None: + """Stop PTP synchronization and clean up all resources. + + Terminates ptp4l and phc2sys processes, cancels the reader task, + and removes the temporary config file. + + Raises: + RuntimeError: If ptp4l is not started. + """ + if self._ptp4l_proc is None and self._config_file_path is None: + raise RuntimeError("ptp4l not started -- call start() first") + + await self._cleanup() + self._synchronized_invalidate() @export @validate_call(validate_return=True) def status(self) -> GptpStatus: """Query the current PTP synchronization status. - :returns: Current synchronization status - :rtype: GptpStatus - :raises RuntimeError: If ptp4l is not started + Returns: + Current synchronization status including port state, + offset, delay, and servo state. + + Raises: + RuntimeError: If ptp4l is not started. """ self._require_started() return GptpStatus( @@ -327,9 +469,11 @@ def status(self) -> GptpStatus: def get_offset(self) -> GptpOffset: """Get the current clock offset from master. - :returns: Offset measurement - :rtype: GptpOffset - :raises RuntimeError: If ptp4l is not started + Returns: + Offset measurement including path delay and frequency. + + Raises: + RuntimeError: If ptp4l is not started. """ self._require_started() return GptpOffset( @@ -344,9 +488,11 @@ def get_offset(self) -> GptpOffset: def get_port_stats(self) -> GptpPortStats: """Get PTP port statistics. - :returns: Port statistics counters - :rtype: GptpPortStats - :raises RuntimeError: If ptp4l is not started + Returns: + Port statistics counters (sync, followup, pdelay, announce). + + Raises: + RuntimeError: If ptp4l is not started. """ self._require_started() return GptpPortStats( @@ -362,47 +508,63 @@ def get_port_stats(self) -> GptpPortStats: def get_clock_identity(self) -> str: """Get this clock's identity string. - :returns: Clock identity - :rtype: str - :raises RuntimeError: If ptp4l is not started + Not yet implemented — requires ptp4l UDS management socket + integration for structured TLV queries. + + Raises: + NotImplementedError: Always, until UDS integration is added. """ self._require_started() - return "" + raise NotImplementedError( + "get_clock_identity requires ptp4l UDS management socket integration" + ) @export @validate_call(validate_return=True) def get_parent_info(self) -> GptpParentInfo: """Get information about the parent/grandmaster clock. - :returns: Parent and grandmaster clock information - :rtype: GptpParentInfo - :raises RuntimeError: If ptp4l is not started + Not yet implemented — requires ptp4l UDS management socket + integration for structured TLV queries. + + Raises: + NotImplementedError: Always, until UDS integration is added. """ self._require_started() - return GptpParentInfo() + raise NotImplementedError( + "get_parent_info requires ptp4l UDS management socket integration" + ) @export @validate_call(validate_return=True) def set_priority1(self, priority: int) -> None: """Set clock priority1 to influence BMCA master election. - Lower values make this clock more likely to become grandmaster. + Not yet implemented — requires ptp4l UDS management socket + integration or config-reload mechanism. - :param priority: Priority1 value (0-255) - :raises RuntimeError: If ptp4l is not started + Args: + priority: Priority1 value (0-255). + + Raises: + NotImplementedError: Always, until UDS integration is added. """ self._require_started() - self._priority1 = priority - self.logger.info("Set priority1 to %d", priority) + raise NotImplementedError( + "set_priority1 requires ptp4l UDS management socket integration " + "or config-reload mechanism" + ) @export @validate_call(validate_return=True) def is_synchronized(self) -> bool: """Check whether PTP is synchronized (servo locked in SLAVE state). - :returns: True if synchronized - :rtype: bool - :raises RuntimeError: If ptp4l is not started + Returns: + True if the port is in SLAVE state and servo is locked (s2). + + Raises: + RuntimeError: If ptp4l is not started. """ self._require_started() return self._port_state == "SLAVE" and self._servo_state == "s2" @@ -411,12 +573,22 @@ def is_synchronized(self) -> bool: async def read(self) -> AsyncGenerator[GptpSyncEvent, None]: """Stream periodic sync status updates. - Yields a GptpSyncEvent approximately once per second with current - offset, delay, and state information. + Yields ``GptpSyncEvent`` approximately once per second with current + offset, delay, and state. Streams indefinitely until the session + is cancelled or the process exits. + + Yields: + Sync event with current offset, delay, state, and timestamp. + + Raises: + RuntimeError: If ptp4l is not started. """ self._require_started() prev_state = self._port_state - for _ in range(100): + while True: + if self._ptp4l_proc is None or self._ptp4l_proc.returncode is not None: + return + event_type = "sync" if self._port_state != prev_state: event_type = "state_change" @@ -437,7 +609,11 @@ async def read(self) -> AsyncGenerator[GptpSyncEvent, None]: class MockGptpBackend: - """Default backend for MockGptp. Can be replaced with StatefulPtp4l for stateful testing.""" + """Default backend for MockGptp. + + Can be replaced with ``StatefulPtp4l`` for stateful testing. + Tracks process lifecycle and simulated PTP state. + """ def __init__(self): self._started = False @@ -447,10 +623,12 @@ def __init__(self): self._priority1 = 128 def require_started(self): + """Raise RuntimeError if the mock is not started.""" if not self._started: raise RuntimeError("ptp4l not started -- call start() first") def start(self): + """Start mock synchronization — immediately enters SLAVE/s2 state.""" if self._started: raise RuntimeError("ptp4l already running") self._started = True @@ -460,6 +638,7 @@ def start(self): self._priority1 = 128 def stop(self): + """Stop mock synchronization and reset state.""" self.require_started() self._started = False self._port_state = "INITIALIZING" @@ -467,6 +646,7 @@ def stop(self): self._offset_ns = 0.0 def set_priority1(self, priority: int): + """Set priority1 and simulate BMCA role change.""" self.require_started() self._priority1 = priority if priority < 128 and self._port_state in ("SLAVE", "LISTENING", "PASSIVE"): @@ -477,11 +657,14 @@ def set_priority1(self, priority: int): class MockGptp(Driver): """Mock gPTP driver for testing without real PTP hardware. - Simulates PTP synchronization behavior: after start(), immediately enters - SLAVE state with a small simulated offset. + Simulates PTP synchronization behavior: after ``start()``, immediately + enters SLAVE state with a small simulated offset. Accepts an optional ``backend`` to replace the default mock behavior, enabling stateful testing with ``StatefulPtp4l``. + + Attributes: + backend: Optional replacement backend for stateful testing. """ backend: Optional[MockGptpBackend] = field(default=None, repr=False) @@ -495,24 +678,40 @@ def __post_init__(self): @classmethod def client(cls) -> str: + """Return the fully-qualified client class path.""" return "jumpstarter_driver_gptp.client.GptpClient" @export async def start(self) -> None: - """Start mock PTP synchronization.""" + """Start mock PTP synchronization. + + Raises: + RuntimeError: If already running. + """ self._internal_backend.start() self.logger.info("MockGptp started") @export async def stop(self) -> None: - """Stop mock PTP synchronization.""" + """Stop mock PTP synchronization. + + Raises: + RuntimeError: If not started. + """ self._internal_backend.stop() self.logger.info("MockGptp stopped") @export @validate_call(validate_return=True) def status(self) -> GptpStatus: - """Query the current PTP synchronization status.""" + """Query the current PTP synchronization status. + + Returns: + Current synchronization status. + + Raises: + RuntimeError: If not started. + """ b = self._internal_backend b.require_started() return GptpStatus( @@ -525,7 +724,14 @@ def status(self) -> GptpStatus: @export @validate_call(validate_return=True) def get_offset(self) -> GptpOffset: - """Get the current clock offset from master.""" + """Get the current clock offset from master. + + Returns: + Simulated offset measurement. + + Raises: + RuntimeError: If not started. + """ b = self._internal_backend b.require_started() return GptpOffset( @@ -538,21 +744,42 @@ def get_offset(self) -> GptpOffset: @export @validate_call(validate_return=True) def get_port_stats(self) -> GptpPortStats: - """Get PTP port statistics.""" + """Get PTP port statistics. + + Returns: + Simulated port statistics. + + Raises: + RuntimeError: If not started. + """ self._internal_backend.require_started() return GptpPortStats(sync_count=42) @export @validate_call(validate_return=True) def get_clock_identity(self) -> str: - """Get this clock's identity string.""" + """Get this clock's identity string. + + Returns: + Simulated EUI-64 clock identity. + + Raises: + RuntimeError: If not started. + """ self._internal_backend.require_started() return "aa:bb:cc:ff:fe:dd:ee:ff" @export @validate_call(validate_return=True) def get_parent_info(self) -> GptpParentInfo: - """Get information about the parent/grandmaster clock.""" + """Get information about the parent/grandmaster clock. + + Returns: + Simulated parent clock information. + + Raises: + RuntimeError: If not started. + """ self._internal_backend.require_started() return GptpParentInfo( grandmaster_identity="11:22:33:ff:fe:44:55:66", @@ -562,23 +789,46 @@ def get_parent_info(self) -> GptpParentInfo: @export @validate_call(validate_return=True) def set_priority1(self, priority: int) -> None: - """Set clock priority1.""" + """Set clock priority1 and simulate BMCA role change. + + Args: + priority: Priority1 value (0-255). + + Raises: + RuntimeError: If not started. + """ self._internal_backend.set_priority1(priority) @export @validate_call(validate_return=True) def is_synchronized(self) -> bool: - """Check whether PTP is synchronized.""" + """Check whether PTP is synchronized. + + Returns: + True if port is SLAVE and servo is s2. + + Raises: + RuntimeError: If not started. + """ b = self._internal_backend b.require_started() return b._port_state == "SLAVE" and b._servo_state == "s2" @export async def read(self) -> AsyncGenerator[GptpSyncEvent, None]: - """Stream simulated sync events.""" + """Stream simulated sync events. + + Yields events indefinitely until the session is cancelled. + + Yields: + Simulated sync events with mock offset/delay values. + + Raises: + RuntimeError: If not started. + """ b = self._internal_backend b.require_started() - for _ in range(100): + while True: yield GptpSyncEvent( event_type="sync", port_state=PortState(b._port_state) if b._port_state in PortState.__members__ else None, diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py index fc4934b5c..4d89f1d1e 100644 --- a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py @@ -1,8 +1,17 @@ +"""Comprehensive tests for the gPTP driver. + +Levels: + 1. Unit tests — no system dependencies, always run. + 2. E2E tests — MockGptp over gRPC via serve(), always run. + 2.5. Stateful tests — StatefulPtp4l state machine enforcement, always run. + 3-5. Integration tests — env-gated, require Linux and/or PTP hardware. +""" + from __future__ import annotations import os import platform -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, patch import pytest from pydantic import ValidationError @@ -16,7 +25,13 @@ PortState, ) from .conftest import PtpStateError -from .driver import Gptp, MockGptp, _generate_ptp4l_config, parse_ptp4l_log_line +from .driver import ( + Gptp, + MockGptp, + _generate_ptp4l_config, + _validate_extra_args, + parse_ptp4l_log_line, +) from jumpstarter.client.core import DriverError from jumpstarter.common.utils import serve @@ -105,7 +120,7 @@ def test_generate_gptp_config(self): def test_generate_master_config(self): config = _generate_ptp4l_config("eth0", 0, "gptp", "L2", "master") - assert "priority1\t\t0" in config + assert "priority1\t\t128" in config assert "priority2\t\t0" in config def test_generate_slave_config(self): @@ -129,38 +144,44 @@ def test_generate_config_has_interface_section(self): config = _generate_ptp4l_config("enp3s0", 0, "gptp", "L2", "auto") assert "[enp3s0]" in config + def test_generate_config_custom_priority(self): + config = _generate_ptp4l_config("eth0", 0, "gptp", "L2", "master", priority1=50) + assert "priority1\t\t50" in config + class TestHwTimestampingDetection: - """1c. Detect hardware timestamping support.""" - - @patch("jumpstarter_driver_gptp.driver.subprocess.run") - def test_detect_hw_timestamping(self, mock_run): - import subprocess - mock_run.return_value = subprocess.CompletedProcess( - args=[], returncode=0, - stdout="Capabilities:\n hardware-transmit\n hardware-receive\n hardware-raw-clock\n", + """1c. Detect hardware timestamping support (async).""" + + async def test_detect_hw_timestamping(self): + mock_proc = AsyncMock() + mock_proc.communicate.return_value = ( + b"Capabilities:\n hardware-transmit\n hardware-receive\n hardware-raw-clock\n", + b"", ) - driver = Gptp.__new__(Gptp) - driver.interface = "eth0" - assert driver._supports_hw_timestamping() is True - - @patch("jumpstarter_driver_gptp.driver.subprocess.run") - def test_detect_sw_only_timestamping(self, mock_run): - import subprocess - mock_run.return_value = subprocess.CompletedProcess( - args=[], returncode=0, - stdout="Capabilities:\n software-transmit\n software-receive\n", + with patch("jumpstarter_driver_gptp.driver.asyncio.create_subprocess_exec", return_value=mock_proc): + driver = Gptp.__new__(Gptp) + driver.interface = "eth0" + assert await driver._supports_hw_timestamping() is True + + async def test_detect_sw_only_timestamping(self): + mock_proc = AsyncMock() + mock_proc.communicate.return_value = ( + b"Capabilities:\n software-transmit\n software-receive\n", + b"", ) - driver = Gptp.__new__(Gptp) - driver.interface = "eth0" - assert driver._supports_hw_timestamping() is False + with patch("jumpstarter_driver_gptp.driver.asyncio.create_subprocess_exec", return_value=mock_proc): + driver = Gptp.__new__(Gptp) + driver.interface = "eth0" + assert await driver._supports_hw_timestamping() is False - @patch("jumpstarter_driver_gptp.driver.subprocess.run") - def test_detect_timestamping_ethtool_missing(self, mock_run): - mock_run.side_effect = FileNotFoundError("ethtool not found") - driver = Gptp.__new__(Gptp) - driver.interface = "eth0" - assert driver._supports_hw_timestamping() is False + async def test_detect_timestamping_ethtool_missing(self): + with patch( + "jumpstarter_driver_gptp.driver.asyncio.create_subprocess_exec", + side_effect=FileNotFoundError("ethtool not found"), + ): + driver = Gptp.__new__(Gptp) + driver.interface = "eth0" + assert await driver._supports_hw_timestamping() is False class TestPydanticModels: @@ -221,9 +242,7 @@ def test_gptp_parent_info(self): class TestDriverConfigValidation: """1e. Driver configuration validation.""" - @patch("jumpstarter_driver_gptp.driver.subprocess.run") - def test_gptp_valid_config(self, mock_run): - mock_run.return_value = MagicMock(stdout="", returncode=0) + def test_gptp_valid_config(self): driver = Gptp(interface="eth0") assert driver.interface == "eth0" assert driver.domain == 0 @@ -241,6 +260,54 @@ def test_gptp_invalid_role(self): with pytest.raises(ValueError, match="role"): Gptp(interface="eth0", role="observer") + def test_gptp_invalid_interface_name(self): + with pytest.raises(ValueError, match="Invalid interface name"): + Gptp(interface="eth0]\\nmalicious") + + def test_gptp_interface_too_long(self): + with pytest.raises(ValueError, match="Invalid interface name"): + Gptp(interface="a" * 20) + + def test_gptp_valid_interface_names(self): + for name in ("eth0", "enp3s0", "ens0f0.100", "br-lan", "wlan0"): + d = Gptp(interface=name) + assert d.interface == name + + def test_gptp_denied_extra_args(self): + with pytest.raises(ValueError, match="denied argument"): + Gptp(interface="eth0", ptp4l_extra_args=["-f", "/etc/shadow"]) + + def test_gptp_denied_extra_args_config(self): + with pytest.raises(ValueError, match="denied argument"): + Gptp(interface="eth0", ptp4l_extra_args=["--config=/etc/shadow"]) + + def test_gptp_denied_extra_args_uds(self): + with pytest.raises(ValueError, match="denied argument"): + Gptp(interface="eth0", ptp4l_extra_args=["--uds_address", "/tmp/evil"]) + + def test_gptp_allowed_extra_args(self): + d = Gptp(interface="eth0", ptp4l_extra_args=["--summary_interval", "1"]) + assert d.ptp4l_extra_args == ["--summary_interval", "1"] + + +class TestExtraArgsValidation: + """1f. Extra args denylist validation.""" + + def test_validate_extra_args_accepts_safe(self): + _validate_extra_args(["--summary_interval", "1", "-l", "6"]) + + def test_validate_extra_args_rejects_config(self): + with pytest.raises(ValueError, match="-f"): + _validate_extra_args(["-f", "/etc/shadow"]) + + def test_validate_extra_args_rejects_interface(self): + with pytest.raises(ValueError, match="-i"): + _validate_extra_args(["-i", "lo"]) + + def test_validate_extra_args_rejects_equals_form(self): + with pytest.raises(ValueError, match="--config"): + _validate_extra_args(["--config=/tmp/evil.cfg"]) + # ============================================================================= # Level 2: E2E Tests with MockGptp (No system dependencies, always run) @@ -400,7 +467,7 @@ def test_stateful_invalid_transition_rejected(self, stateful_ptp4l): ptp._transition_to("UNCALIBRATED") def test_stateful_full_state_cycle(self, stateful_client): - """Walk through: start -> LISTENING -> SLAVE -> FAULTY -> recovery -> SLAVE -> stop""" + """Walk through: start -> LISTENING -> SLAVE -> FAULTY -> recovery -> SLAVE -> stop.""" client, ptp = stateful_client client.start() assert ptp._port_state == "LISTENING" @@ -571,35 +638,36 @@ def test_stateful_full_workflow_log(self, stateful_client): @pytest.mark.skipif(not _RUN_INTEGRATION, reason="GPTP_INTEGRATION_TESTS not set or not Linux") class TestSoftwareTimestampingIntegration: - """Level 3: Real ptp4l with software timestamping on veth pairs.""" + """Level 3: Real ptp4l with software timestamping on veth pairs. + + Both interfaces stay in the root namespace so ptp4l can bind to them + directly from the test process. + """ @pytest.fixture def veth_pair(self): + """Create a veth pair in the root namespace for PTP testing.""" import subprocess as sp cmds = [ - "ip netns add ns-ptp-master", - "ip netns add ns-ptp-slave", "ip link add veth-m type veth peer name veth-s", - "ip link set veth-m netns ns-ptp-master", - "ip link set veth-s netns ns-ptp-slave", - "ip netns exec ns-ptp-master ip addr add 10.99.0.1/24 dev veth-m", - "ip netns exec ns-ptp-slave ip addr add 10.99.0.2/24 dev veth-s", - "ip netns exec ns-ptp-master ip link set veth-m up", - "ip netns exec ns-ptp-slave ip link set veth-s up", + "ip addr add 10.99.0.1/24 dev veth-m", + "ip addr add 10.99.0.2/24 dev veth-s", + "ip link set veth-m up", + "ip link set veth-s up", ] for cmd in cmds: sp.run(cmd.split(), check=True) - yield ("ns-ptp-master", "veth-m", "ns-ptp-slave", "veth-s") - sp.run("ip netns del ns-ptp-master".split(), check=False) - sp.run("ip netns del ns-ptp-slave".split(), check=False) + yield ("veth-m", "veth-s") + sp.run("ip link del veth-m".split(), check=False) @pytest.fixture def ptp_master(self, veth_pair): + """Start a ptp4l master on veth-m.""" import subprocess as sp import time - ns, iface, _, _ = veth_pair + master_iface, _ = veth_pair proc = sp.Popen( - ["ip", "netns", "exec", ns, "ptp4l", "-i", iface, "-S", "-m", + ["ptp4l", "-i", master_iface, "-S", "-m", "--masterOnly=1", "--domainNumber=0"], stdout=sp.PIPE, stderr=sp.STDOUT, ) @@ -610,7 +678,7 @@ def ptp_master(self, veth_pair): def test_gptp_real_sync_software_timestamping(self, veth_pair, ptp_master): import time - _, _, slave_ns, slave_iface = veth_pair + _, slave_iface = veth_pair driver = Gptp( interface=slave_iface, domain=0, profile="default", transport="UDPv4", role="slave", sync_system_clock=False, @@ -642,8 +710,6 @@ def test_gptp_hw_timestamping_sub_microsecond(self): time.sleep(30) offset = client.get_offset() assert abs(offset.offset_from_master_ns) < 1000 - parent = client.get_parent_info() - assert parent.grandmaster_identity is not None client.stop() def test_gptp_hw_master_role(self): diff --git a/python/packages/jumpstarter-driver-gptp/pyproject.toml b/python/packages/jumpstarter-driver-gptp/pyproject.toml index 4a1daef63..f7e51829e 100644 --- a/python/packages/jumpstarter-driver-gptp/pyproject.toml +++ b/python/packages/jumpstarter-driver-gptp/pyproject.toml @@ -16,7 +16,6 @@ dependencies = [ [project.entry-points."jumpstarter.drivers"] Gptp = "jumpstarter_driver_gptp.driver:Gptp" -MockGptp = "jumpstarter_driver_gptp.driver:MockGptp" [tool.hatch.version] source = "vcs" @@ -24,7 +23,7 @@ raw-options = { 'root' = '../../../'} [tool.hatch.metadata.hooks.vcs.urls] Homepage = "https://jumpstarter.dev" -source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip" +source_archive = "https://github.com/jumpstarter-dev/jumpstarter/archive/{commit_hash}.zip" [tool.pytest.ini_options] addopts = "--cov --cov-report=html --cov-report=xml" @@ -42,6 +41,7 @@ name = "pin_jumpstarter" [dependency-groups] dev = [ + "pytest-asyncio>=0.24.0", "pytest-cov>=6.0.0", "pytest>=8.3.3", ] diff --git a/python/uv.lock b/python/uv.lock index 7ae17a9ad..5ace38513 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -34,6 +34,7 @@ members = [ "jumpstarter-driver-iscsi", "jumpstarter-driver-mitmproxy", "jumpstarter-driver-network", + "jumpstarter-driver-noyito-relay", "jumpstarter-driver-opendal", "jumpstarter-driver-pi-pico", "jumpstarter-driver-power", @@ -1680,6 +1681,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/e7/ae38d7a6dfba0533684e0b2136817d667588ae3ec984c1a4e5df5eb88482/hatchling-1.27.0-py3-none-any.whl", hash = "sha256:d3a2f3567c4f926ea39849cdf924c7e99e6686c9c8e288ae1037c8fa2a5d937b", size = 75794, upload-time = "2024-12-15T17:08:10.364Z" }, ] +[[package]] +name = "hid" +version = "1.0.9" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e9/f8/0357a8aa8874a243e96d08a8568efaf7478293e1a3441ddca18039b690c1/hid-1.0.9.tar.gz", hash = "sha256:f4471f11f0e176d1b0cb1b243e55498cc90347a3aede735655304395694ac182", size = 4973, upload-time = "2026-02-05T15:35:20.595Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b8/c7/f0e1ad95179f44a6fc7a9140be025812cc7a62cf7390442b685a57ee1417/hid-1.0.9-py3-none-any.whl", hash = "sha256:6b9289e00bbc1e1589bec0c7f376a63fe03a4a4a1875575d0ad60e3e11a349f4", size = 4959, upload-time = "2026-02-05T15:35:19.269Z" }, +] + [[package]] name = "hpack" version = "4.1.0" @@ -2593,6 +2603,7 @@ dependencies = [ [package.dev-dependencies] dev = [ { name = "pytest" }, + { name = "pytest-asyncio" }, { name = "pytest-cov" }, ] @@ -2606,6 +2617,7 @@ requires-dist = [ [package.metadata.requires-dev] dev = [ { name = "pytest", specifier = ">=8.3.3" }, + { name = "pytest-asyncio", specifier = ">=0.24.0" }, { name = "pytest-cov", specifier = ">=6.0.0" }, ] @@ -2781,6 +2793,38 @@ dev = [ { name = "websocket-client", specifier = ">=1.8.0" }, ] +[[package]] +name = "jumpstarter-driver-noyito-relay" +source = { editable = "packages/jumpstarter-driver-noyito-relay" } +dependencies = [ + { name = "hid" }, + { name = "jumpstarter" }, + { name = "jumpstarter-driver-power" }, + { name = "pyserial" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pytest" }, + { name = "pytest-cov" }, + { name = "pytest-mock" }, +] + +[package.metadata] +requires-dist = [ + { name = "hid", specifier = ">=1.0.4" }, + { name = "jumpstarter", editable = "packages/jumpstarter" }, + { name = "jumpstarter-driver-power", editable = "packages/jumpstarter-driver-power" }, + { name = "pyserial", specifier = ">=3.5" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "pytest", specifier = ">=8.3.3" }, + { name = "pytest-cov", specifier = ">=6.0.0" }, + { name = "pytest-mock", specifier = ">=3.14.0" }, +] + [[package]] name = "jumpstarter-driver-opendal" source = { editable = "packages/jumpstarter-driver-opendal" } @@ -5231,6 +5275,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0d/d2/dfc2f25f3905921c2743c300a48d9494d29032f1389fc142e718d6978fb2/pytest_httpserver-1.1.3-py3-none-any.whl", hash = "sha256:5f84757810233e19e2bb5287f3826a71c97a3740abe3a363af9155c0f82fdbb9", size = 21000, upload-time = "2025-04-10T08:17:13.906Z" }, ] +[[package]] +name = "pytest-mock" +version = "3.15.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/68/14/eb014d26be205d38ad5ad20d9a80f7d201472e08167f0bb4361e251084a9/pytest_mock-3.15.1.tar.gz", hash = "sha256:1849a238f6f396da19762269de72cb1814ab44416fa73a8686deac10b0d87a0f", size = 34036, upload-time = "2025-09-16T16:37:27.081Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5a/cc/06253936f4a7fa2e0f48dfe6d851d9c56df896a9ab09ac019d70b760619c/pytest_mock-3.15.1-py3-none-any.whl", hash = "sha256:0a25e2eb88fe5168d535041d09a4529a188176ae608a6d249ee65abc0949630d", size = 10095, upload-time = "2025-09-16T16:37:25.734Z" }, +] + [[package]] name = "pytest-mqtt" version = "0.5.0" From ee84f1fac78bfdfe781b0f010e2ceda2fcd3534b Mon Sep 17 00:00:00 2001 From: Vinicius Zein Date: Wed, 8 Apr 2026 23:16:34 -0400 Subject: [PATCH 3/4] fix: address CodeRabbit nitpicks - Remove redundant inner condition in StatefulPtp4l.set_priority1 - Use Gptp(interface="eth0") instead of Gptp.__new__ in HW timestamping tests Made-with: Cursor --- python/packages/jumpstarter-driver-gptp/README.md | 1 + .../jumpstarter_driver_gptp/conftest.py | 3 +-- .../jumpstarter_driver_gptp/driver_test.py | 9 +++------ 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/python/packages/jumpstarter-driver-gptp/README.md b/python/packages/jumpstarter-driver-gptp/README.md index b5f37fb90..44594116b 100644 --- a/python/packages/jumpstarter-driver-gptp/README.md +++ b/python/packages/jumpstarter-driver-gptp/README.md @@ -273,3 +273,4 @@ $ which ptp4l - Check physical layer: `ethtool ` should show link up - Review ptp4l logs in the exporter output - Ensure both ends use the same domain number and transport + diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py index 4ca0b7331..e3db24463 100644 --- a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py @@ -147,8 +147,7 @@ def set_priority1(self, value: int): self.require_started() self._priority1 = value if value < 128 and self._port_state in ("SLAVE", "LISTENING", "PASSIVE"): - if self._port_state != "MASTER": - self._transition_to("MASTER") + self._transition_to("MASTER") self._call_log.append(f"set_priority1({value})") diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py index 4d89f1d1e..aded8c750 100644 --- a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py @@ -159,8 +159,7 @@ async def test_detect_hw_timestamping(self): b"", ) with patch("jumpstarter_driver_gptp.driver.asyncio.create_subprocess_exec", return_value=mock_proc): - driver = Gptp.__new__(Gptp) - driver.interface = "eth0" + driver = Gptp(interface="eth0") assert await driver._supports_hw_timestamping() is True async def test_detect_sw_only_timestamping(self): @@ -170,8 +169,7 @@ async def test_detect_sw_only_timestamping(self): b"", ) with patch("jumpstarter_driver_gptp.driver.asyncio.create_subprocess_exec", return_value=mock_proc): - driver = Gptp.__new__(Gptp) - driver.interface = "eth0" + driver = Gptp(interface="eth0") assert await driver._supports_hw_timestamping() is False async def test_detect_timestamping_ethtool_missing(self): @@ -179,8 +177,7 @@ async def test_detect_timestamping_ethtool_missing(self): "jumpstarter_driver_gptp.driver.asyncio.create_subprocess_exec", side_effect=FileNotFoundError("ethtool not found"), ): - driver = Gptp.__new__(Gptp) - driver.interface = "eth0" + driver = Gptp(interface="eth0") assert await driver._supports_hw_timestamping() is False From d3bf524a850164653f5bbcd3678818914f14ab34 Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Wed, 15 Apr 2026 13:04:14 +0000 Subject: [PATCH 4/4] fix: address review comments on gPTP driver - Update README examples to use env() helper instead of serve() for non-test usage (mangelajo feedback) - Add threshold_ns parameter to wait_for_sync for offset-aware sync checking (raballew feedback) - Add port-stats CLI command (raballew feedback) - Capture self._ptp4l_proc into local variable in _require_started to avoid fragile access patterns (raballew feedback) - Catch FileNotFoundError specifically on ptp4l startup and re-raise as RuntimeError with actionable message (raballew feedback) Co-Authored-By: Claude Opus 4.6 --- .../jumpstarter-driver-gptp/README.md | 12 +++++++-- .../jumpstarter_driver_gptp/client.py | 25 ++++++++++++++++++- .../jumpstarter_driver_gptp/driver.py | 20 +++++++++------ 3 files changed, 46 insertions(+), 11 deletions(-) diff --git a/python/packages/jumpstarter-driver-gptp/README.md b/python/packages/jumpstarter-driver-gptp/README.md index 44594116b..295922522 100644 --- a/python/packages/jumpstarter-driver-gptp/README.md +++ b/python/packages/jumpstarter-driver-gptp/README.md @@ -160,7 +160,11 @@ integration is added. ### Basic lifecycle ```python -with serve(Gptp(interface="eth0")) as gptp: +from jumpstarter.common.utils import env + +with env() as client: + gptp = client.gptp + gptp.start() # Wait for synchronization (up to 30 seconds) @@ -176,7 +180,11 @@ with serve(Gptp(interface="eth0")) as gptp: ### Monitoring sync events ```python -with serve(Gptp(interface="eth0")) as gptp: +from jumpstarter.common.utils import env + +with env() as client: + gptp = client.gptp + gptp.start() for event in gptp.monitor(): print(f"[{event.event_type}] offset={event.offset_ns:.0f}ns state={event.port_state}") diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py index c56ea9150..19546eb16 100644 --- a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py @@ -128,7 +128,12 @@ def is_synchronized(self) -> bool: """ return self.call("is_synchronized") - def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool: + def wait_for_sync( + self, + timeout: float = 30.0, + poll_interval: float = 1.0, + threshold_ns: float | None = None, + ) -> bool: """Block until PTP synchronization is achieved or timeout expires. Only catches ``RuntimeError`` (driver not-yet-ready) during polling. @@ -137,6 +142,9 @@ def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bo Args: timeout: Maximum time to wait in seconds. poll_interval: Polling interval in seconds. + threshold_ns: If provided, also require the absolute offset + from master to be below this value (in nanoseconds) before + returning True. Returns: True if synchronized before timeout, False otherwise. @@ -145,6 +153,11 @@ def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bo while time.monotonic() < deadline: try: if self.is_synchronized(): + if threshold_ns is not None: + offset = self.get_offset() + if abs(offset.offset_from_master_ns) >= threshold_ns: + time.sleep(poll_interval) + continue return True except RuntimeError: pass @@ -218,6 +231,16 @@ def monitor(count): if i + 1 >= count: break + @base.command(name="port-stats") + def port_stats(): + """Show PTP port statistics.""" + s = self.get_port_stats() + click.echo(f"Sync count: {s.sync_count}") + click.echo(f"Follow-up count: {s.followup_count}") + click.echo(f"PDelay req count: {s.pdelay_req_count}") + click.echo(f"PDelay resp count: {s.pdelay_resp_count}") + click.echo(f"Announce count: {s.announce_count}") + @base.command(name="set-priority") @click.argument("priority", type=int) def set_priority(priority): diff --git a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py index aa2bd898a..de944caf9 100644 --- a/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py +++ b/python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py @@ -244,9 +244,10 @@ def _require_started(self) -> None: Checks both that the process handle exists and that the process has not exited. """ - if self._ptp4l_proc is None: + proc = self._ptp4l_proc + if proc is None: raise RuntimeError("ptp4l not started -- call start() first") - if self._ptp4l_proc.returncode is not None: + if proc.returncode is not None: self._ptp4l_proc = None self._synchronized_invalidate() raise RuntimeError("ptp4l process has exited unexpectedly") @@ -384,12 +385,15 @@ async def start(self) -> None: *self.ptp4l_extra_args, ] self.logger.info("Starting ptp4l: %s", " ".join(cmd)) - self._ptp4l_proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - start_new_session=True, - ) + try: + self._ptp4l_proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + start_new_session=True, + ) + except FileNotFoundError: + raise RuntimeError("ptp4l not found — install linuxptp") self._port_state = "INITIALIZING" self._servo_state = "s0"