This repository was archived by the owner on Jan 23, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 18
Rework gpio driver with adafruit-blinka #227
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,4 +11,5 @@ can.md | |
| pyserial.md | ||
| sdwire.md | ||
| ustreamer.md | ||
| raspberrypi.md | ||
| ``` | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| # Raspberry Pi drivers | ||
|
|
||
|
NickCao marked this conversation as resolved.
|
||
| Raspberry Pi drivers are a set of drivers for the various peripherals on Pi and similar single board computers. | ||
|
|
||
| ## Driver configuration | ||
| ```yaml | ||
| export: | ||
| my_serial: | ||
| type: "jumpstarter_driver_raspberrypi.driver.DigitalIO" | ||
| config: | ||
| pin: "D3" | ||
| ``` | ||
|
|
||
| ### Config parameters | ||
|
|
||
| | Parameter | Description | Type | Required | Default | | ||
| |-----------|-------------|------|----------|---------| | ||
| | pin | Name of the GPIO pin to connect to, in [Adafruit Blinka format](https://docs.circuitpython.org/projects/blinka/en/latest/index.html#usage-example) | str | yes | | | ||
|
|
||
|
NickCao marked this conversation as resolved.
|
||
| ## DigitalIOClient API | ||
| ```{eval-rst} | ||
| .. autoclass:: jumpstarter_driver_raspberrypi.client.DigitalIOClient | ||
| :members: | ||
| ``` | ||
|
|
||
| ## Examples | ||
| Switch pin to push pull output and set output to high | ||
| ```{testcode} | ||
| digitalioclient.switch_to_output(value=False, drive_mode=digitalio.DriveMode.PUSH_PULL) # default to low | ||
| digitalioclient.value = True | ||
| ``` | ||
|
Comment on lines
+28
to
+31
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add import statements to the example. The example uses undefined symbols ( +from jumpstarter.client import Client
+from digitalio import DriveMode
+
+client = Client()
+digitalioclient = client.get_driver("my_gpio")
+
-digitalioclient.switch_to_output(value=False, drive_mode=digitalio.DriveMode.PUSH_PULL) # default to low
+digitalioclient.switch_to_output(value=False, drive_mode=DriveMode.PUSH_PULL) # default to low
digitalioclient.value = True |
||
|
|
||
| Switch pin to input with pull up and read value | ||
| ```{testcode} | ||
| digitalioclient.switch_to_input(pull=digitalio.Pull.UP) | ||
| print(digitalioclient.value) | ||
| ``` | ||
|
Comment on lines
+34
to
+37
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add import statements and fix undefined symbols. Similar to the previous example, this one needs proper imports and initialization. +from jumpstarter.client import Client
+from digitalio import Pull
+
+client = Client()
+digitalioclient = client.get_driver("my_gpio")
+
-digitalioclient.switch_to_input(pull=digitalio.Pull.UP)
+digitalioclient.switch_to_input(pull=Pull.UP)
print(digitalioclient.value) |
||
56 changes: 45 additions & 11 deletions
56
packages/jumpstarter-driver-raspberrypi/jumpstarter_driver_raspberrypi/client.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,21 +1,55 @@ | ||
| from dataclasses import dataclass | ||
|
|
||
| from digitalio import DriveMode, Pull | ||
|
|
||
| from jumpstarter.client import DriverClient | ||
|
|
||
|
|
||
| @dataclass(kw_only=True) | ||
| class DigitalOutputClient(DriverClient): | ||
| def off(self): | ||
| self.call("off") | ||
| class DigitalIOClient(DriverClient): | ||
| """DigitalIO (Digital GPIO) client class | ||
|
|
||
| def on(self): | ||
| self.call("on") | ||
| Client methods for the DigitalIO driver. | ||
| """ | ||
|
|
||
| def switch_to_output(self, value: bool = False, drive_mode: DriveMode = DriveMode.PUSH_PULL) -> None: | ||
| """ | ||
| Switch pin to output mode with given default value and drive mode | ||
| """ | ||
|
|
||
| @dataclass(kw_only=True) | ||
| class DigitalInputClient(DriverClient): | ||
| def wait_for_active(self, timeout: float | None = None): | ||
| self.call("wait_for_active", timeout) | ||
| match drive_mode: | ||
| case DriveMode.PUSH_PULL: | ||
| drive_mode = 0 | ||
| case DriveMode.OPEN_DRAIN: | ||
| drive_mode = 1 | ||
| case _: | ||
| raise ValueError("unrecognized drive_mode") | ||
| self.call("switch_to_output", value, drive_mode) | ||
|
|
||
| def switch_to_input(self, pull: Pull | None = None) -> None: | ||
| """ | ||
| Switch pin to input mode with given pull up/down mode | ||
| """ | ||
|
|
||
| match pull: | ||
| case None: | ||
| pull = 0 | ||
| case Pull.UP: | ||
| pull = 1 | ||
| case Pull.DOWN: | ||
| pull = 2 | ||
| case _: | ||
| raise ValueError("unrecognized pull") | ||
| self.call("switch_to_input", pull) | ||
|
|
||
| @property | ||
| def value(self) -> bool: | ||
| """ | ||
| Current value of the pin | ||
| """ | ||
|
|
||
| return self.call("get_value") | ||
|
|
||
| def wait_for_inactive(self, timeout: float | None = None): | ||
| self.call("wait_for_inactive", timeout) | ||
| @value.setter | ||
| def value(self, value: bool) -> None: | ||
| self.call("set_value", value) |
125 changes: 95 additions & 30 deletions
125
packages/jumpstarter-driver-raspberrypi/jumpstarter_driver_raspberrypi/driver.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,63 +1,128 @@ | ||
| from collections.abc import AsyncGenerator | ||
| from dataclasses import dataclass, field | ||
| from time import sleep | ||
|
|
||
| from gpiozero import DigitalInputDevice, DigitalOutputDevice, InputDevice | ||
| import board | ||
| from digitalio import DigitalInOut, DriveMode, Pull | ||
| from jumpstarter_driver_power.driver import PowerInterface, PowerReading | ||
|
|
||
| from jumpstarter.driver import Driver, export | ||
|
|
||
|
|
||
| @dataclass(kw_only=True) | ||
| class DigitalOutput(Driver): | ||
| pin: int | str | ||
| device: InputDevice = field(init=False) # Start as input | ||
| class DigitalIO(Driver): | ||
| pin: str | ||
| device: DigitalInOut = field(init=False) | ||
|
|
||
| @classmethod | ||
| def client(cls) -> str: | ||
| return "jumpstarter_driver_raspberrypi.client.DigitalOutputClient" | ||
| return "jumpstarter_driver_raspberrypi.client.DigitalIOClient" | ||
|
|
||
| def __post_init__(self): | ||
| if hasattr(super(), "__post_init__"): | ||
| super().__post_init__() | ||
| # Initialize as InputDevice first | ||
| self.device = InputDevice(pin=self.pin) | ||
| # Defaults to input with no pull | ||
| try: | ||
| self.device = DigitalInOut(pin=getattr(board, self.pin)) | ||
| except AttributeError as err: | ||
| raise ValueError(f"Invalid pin name: {self.pin}") from err | ||
|
|
||
| def close(self): | ||
| if hasattr(self, "device"): | ||
| self.device.close() | ||
| super().close() | ||
| self.device.deinit() | ||
|
|
||
| @export | ||
| def off(self): | ||
| if not isinstance(self.device, DigitalOutputDevice): | ||
| self.device.close() | ||
| self.device = DigitalOutputDevice(pin=self.pin, initial_value=None) | ||
| self.device.off() | ||
| def switch_to_output(self, value: bool = False, drive_mode: int = 0) -> None: | ||
| match drive_mode: | ||
| case 0: | ||
| drive_mode = DriveMode.PUSH_PULL | ||
| case 1: | ||
| drive_mode = DriveMode.OPEN_DRAIN | ||
| case _: | ||
| raise ValueError("unrecognized drive_mode") | ||
|
|
||
| self.device.switch_to_output(value, drive_mode) | ||
|
|
||
| @export | ||
| def on(self): | ||
| if not isinstance(self.device, DigitalOutputDevice): | ||
| self.device.close() | ||
| self.device = DigitalOutputDevice(pin=self.pin, initial_value=None) | ||
| self.device.on() | ||
| def switch_to_input(self, pull: int = 0) -> None: | ||
| match pull: | ||
| case 0: | ||
| pull = None | ||
| case 1: | ||
| pull = Pull.UP | ||
| case 2: | ||
| pull = Pull.DOWN | ||
| case _: | ||
| raise ValueError("unrecognized pull") | ||
|
|
||
| self.device.switch_to_input(pull) | ||
|
|
||
| @export | ||
| def set_value(self, value: bool) -> None: | ||
| self.device.value = value | ||
|
|
||
| @export | ||
| def get_value(self) -> bool: | ||
| return self.device.value | ||
|
|
||
|
|
||
| @dataclass(kw_only=True) | ||
| class DigitalInput(Driver): | ||
| pin: int | str | ||
| device: DigitalInputDevice = field(init=False) | ||
| class DigitalPowerSwitch(PowerInterface, DigitalIO): | ||
| value: bool = False | ||
| drive_mode: str = "PUSH_PULL" | ||
|
|
||
| @classmethod | ||
| def client(cls) -> str: | ||
| return "jumpstarter_driver_raspberrypi.client.DigitalInputClient" | ||
| def __post_init__(self): | ||
| if hasattr(super(), "__post_init__"): | ||
| super().__post_init__() | ||
|
|
||
| try: | ||
| self.device.switch_to_output(value=self.value, drive_mode=getattr(DriveMode, self.drive_mode)) | ||
| except AttributeError as err: | ||
| raise ValueError(f"Invalid drive mode: {self.drive_mode}") from err | ||
|
|
||
| @export | ||
| def on(self) -> None: | ||
| self.device.value = True | ||
|
|
||
| @export | ||
| def off(self) -> None: | ||
| self.device.value = False | ||
|
|
||
| @export | ||
| def read(self) -> AsyncGenerator[PowerReading, None]: | ||
| raise NotImplementedError | ||
|
|
||
|
|
||
| @dataclass(kw_only=True) | ||
| class DigitalPowerButton(PowerInterface, DigitalIO): | ||
| value: bool = False | ||
| drive_mode: str = "OPEN_DRAIN" | ||
| on_press_seconds: int = 1 | ||
| off_press_seconds: int = 5 | ||
|
|
||
| def __post_init__(self): | ||
| if hasattr(super(), "__post_init__"): | ||
| super().__post_init__() | ||
| self.device = DigitalInputDevice(pin=self.pin) | ||
|
|
||
| try: | ||
| self.device.switch_to_output(value=self.value, drive_mode=getattr(DriveMode, self.drive_mode)) | ||
| except AttributeError as err: | ||
| raise ValueError(f"Invalid drive mode: {self.drive_mode}") from err | ||
|
|
||
| def press(self, seconds: int) -> None: | ||
| self.device.value = self.value | ||
| self.device.value = not self.value | ||
| sleep(seconds) | ||
| self.device.value = self.value | ||
|
|
||
| @export | ||
| def on(self) -> None: | ||
| self.press(self.on_press_seconds) | ||
|
|
||
| @export | ||
| def wait_for_active(self, timeout: float | None = None): | ||
| self.device.wait_for_active(timeout) | ||
| def off(self) -> None: | ||
| self.press(self.off_press_seconds) | ||
|
|
||
| @export | ||
| def wait_for_inactive(self, timeout: float | None = None): | ||
| self.device.wait_for_inactive(timeout) | ||
| def read(self) -> AsyncGenerator[PowerReading, None]: | ||
| raise NotImplementedError |
61 changes: 31 additions & 30 deletions
61
packages/jumpstarter-driver-raspberrypi/jumpstarter_driver_raspberrypi/driver_test.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,46 +1,47 @@ | ||
| from concurrent.futures import ThreadPoolExecutor | ||
|
|
||
| from gpiozero import Device | ||
| from gpiozero.pins.mock import MockFactory | ||
|
|
||
| from jumpstarter_driver_raspberrypi.driver import DigitalInput, DigitalOutput | ||
|
|
||
| from jumpstarter.common.utils import serve | ||
|
|
||
| Device.pin_factory = MockFactory() | ||
|
|
||
| def test_drivers_gpio_digital_input(monkeypatch): | ||
| monkeypatch.setenv("BLINKA_OS_AGNOSTIC", "1") | ||
|
|
||
| def test_drivers_gpio_digital_output(): | ||
| pin_factory = MockFactory() | ||
| Device.pin_factory = pin_factory | ||
| pin_number = 1 | ||
| mock_pin = pin_factory.pin(pin_number) | ||
| from digitalio import Pull | ||
|
|
||
| instance = DigitalOutput(pin=pin_number) | ||
| from jumpstarter_driver_raspberrypi.driver import DigitalIO | ||
|
|
||
| assert not mock_pin.state | ||
| with serve(DigitalIO(pin="Dx_INPUT_TOGGLE")) as client: | ||
| client.switch_to_input(pull=Pull.UP) | ||
| assert client.value | ||
| assert not client.value | ||
| assert client.value | ||
|
|
||
| with serve(instance) as client: | ||
| client.off() | ||
| assert not mock_pin.state | ||
|
|
||
| client.on() | ||
| assert mock_pin.state | ||
| def test_drivers_gpio_digital_output(monkeypatch): | ||
| monkeypatch.setenv("BLINKA_OS_AGNOSTIC", "1") | ||
|
|
||
| client.off() | ||
| assert not mock_pin.state | ||
| from digitalio import DriveMode | ||
|
|
||
| from jumpstarter_driver_raspberrypi.driver import DigitalIO | ||
|
|
||
| mock_pin.assert_states([False, True, False]) | ||
| with serve(DigitalIO(pin="Dx_OUTPUT")) as client: | ||
| client.switch_to_output(value=True, drive_mode=DriveMode.PUSH_PULL) | ||
| client.value = True | ||
| assert client.value | ||
| client.value = False | ||
| # Dx_OUTPUT is always True | ||
| assert client.value | ||
|
|
||
|
|
||
| def test_drivers_gpio_digital_input(): | ||
| instance = DigitalInput(pin=4) | ||
| def test_drivers_gpio_power(monkeypatch): | ||
| monkeypatch.setenv("BLINKA_OS_AGNOSTIC", "1") | ||
|
|
||
| with serve(instance) as client: | ||
| with ThreadPoolExecutor() as pool: | ||
| pool.submit(client.wait_for_active) | ||
| instance.device.pin.drive_high() | ||
| from jumpstarter_driver_raspberrypi.driver import DigitalPowerButton, DigitalPowerSwitch | ||
|
|
||
| with ThreadPoolExecutor() as pool: | ||
| pool.submit(client.wait_for_inactive) | ||
| instance.device.pin.drive_low() | ||
| with serve(DigitalPowerSwitch(pin="Dx_OUTPUT", drive_mode="PUSH_PULL")) as client: | ||
| client.off() | ||
| client.on() | ||
|
|
||
| with serve(DigitalPowerButton(pin="Dx_OUTPUT", drive_mode="PUSH_PULL", off_press_seconds=1)) as client: | ||
| client.off() | ||
| client.on() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.