|
| 1 | +from collections.abc import AsyncGenerator |
1 | 2 | from dataclasses import dataclass, field |
| 3 | +from time import sleep |
2 | 4 |
|
3 | | -from gpiozero import DigitalInputDevice, DigitalOutputDevice, InputDevice |
| 5 | +import board |
| 6 | +from digitalio import DigitalInOut, DriveMode, Pull |
4 | 7 | from jumpstarter.driver import Driver, export |
| 8 | +from jumpstarter_driver_power.driver import PowerInterface, PowerReading |
5 | 9 |
|
6 | 10 |
|
7 | 11 | @dataclass(kw_only=True) |
8 | | -class DigitalOutput(Driver): |
9 | | - pin: int | str |
10 | | - device: InputDevice = field(init=False) # Start as input |
| 12 | +class DigitalIO(Driver): |
| 13 | + pin: str |
| 14 | + device: DigitalInOut = field(init=False) |
11 | 15 |
|
12 | 16 | @classmethod |
13 | 17 | def client(cls) -> str: |
14 | | - return "jumpstarter_driver_raspberrypi.client.DigitalOutputClient" |
| 18 | + return "jumpstarter_driver_raspberrypi.client.DigitalIOClient" |
15 | 19 |
|
16 | 20 | def __post_init__(self): |
17 | 21 | super().__post_init__() |
18 | | - # Initialize as InputDevice first |
19 | | - self.device = InputDevice(pin=self.pin) |
| 22 | + # Defaults to input with no pull |
| 23 | + try: |
| 24 | + self.device = DigitalInOut(pin=getattr(board, self.pin)) |
| 25 | + except AttributeError as err: |
| 26 | + raise ValueError(f"Invalid pin name: {self.pin}") from err |
20 | 27 |
|
21 | 28 | def close(self): |
22 | 29 | if hasattr(self, "device"): |
23 | | - self.device.close() |
24 | | - super().close() |
| 30 | + self.device.deinit() |
25 | 31 |
|
26 | 32 | @export |
27 | | - def off(self): |
28 | | - if not isinstance(self.device, DigitalOutputDevice): |
29 | | - self.device.close() |
30 | | - self.device = DigitalOutputDevice(pin=self.pin, initial_value=None) |
31 | | - self.device.off() |
| 33 | + def switch_to_output(self, value: bool = False, drive_mode: int = 0) -> None: |
| 34 | + match drive_mode: |
| 35 | + case 0: |
| 36 | + drive_mode = DriveMode.PUSH_PULL |
| 37 | + case 1: |
| 38 | + drive_mode = DriveMode.OPEN_DRAIN |
| 39 | + case _: |
| 40 | + raise ValueError("unrecognized drive_mode") |
| 41 | + |
| 42 | + self.device.switch_to_output(value, drive_mode) |
| 43 | + |
| 44 | + @export |
| 45 | + def switch_to_input(self, pull: int = 0) -> None: |
| 46 | + match pull: |
| 47 | + case 0: |
| 48 | + pull = None |
| 49 | + case 1: |
| 50 | + pull = Pull.UP |
| 51 | + case 2: |
| 52 | + pull = Pull.DOWN |
| 53 | + case _: |
| 54 | + raise ValueError("unrecognized pull") |
| 55 | + |
| 56 | + self.device.switch_to_input(pull) |
| 57 | + |
| 58 | + @export |
| 59 | + def set_value(self, value: bool) -> None: |
| 60 | + self.device.value = value |
32 | 61 |
|
33 | 62 | @export |
34 | | - def on(self): |
35 | | - if not isinstance(self.device, DigitalOutputDevice): |
36 | | - self.device.close() |
37 | | - self.device = DigitalOutputDevice(pin=self.pin, initial_value=None) |
38 | | - self.device.on() |
| 63 | + def get_value(self) -> bool: |
| 64 | + return self.device.value |
39 | 65 |
|
40 | 66 |
|
41 | 67 | @dataclass(kw_only=True) |
42 | | -class DigitalInput(Driver): |
43 | | - pin: int | str |
44 | | - device: DigitalInputDevice = field(init=False) |
| 68 | +class DigitalPowerSwitch(PowerInterface, DigitalIO): |
| 69 | + value: bool = False |
| 70 | + drive_mode: str = "PUSH_PULL" |
| 71 | + |
| 72 | + def __post_init__(self): |
| 73 | + super().__post_init__() |
| 74 | + |
| 75 | + try: |
| 76 | + self.device.switch_to_output(value=self.value, drive_mode=getattr(DriveMode, self.drive_mode)) |
| 77 | + except AttributeError as err: |
| 78 | + raise ValueError(f"Invalid drive mode: {self.drive_mode}") from err |
| 79 | + |
| 80 | + @export |
| 81 | + def on(self) -> None: |
| 82 | + self.device.value = True |
| 83 | + |
| 84 | + @export |
| 85 | + def off(self) -> None: |
| 86 | + self.device.value = False |
| 87 | + |
| 88 | + @export |
| 89 | + def read(self) -> AsyncGenerator[PowerReading, None]: |
| 90 | + raise NotImplementedError |
45 | 91 |
|
46 | | - @classmethod |
47 | | - def client(cls) -> str: |
48 | | - return "jumpstarter_driver_raspberrypi.client.DigitalInputClient" |
| 92 | + |
| 93 | +@dataclass(kw_only=True) |
| 94 | +class DigitalPowerButton(PowerInterface, DigitalIO): |
| 95 | + value: bool = False |
| 96 | + drive_mode: str = "OPEN_DRAIN" |
| 97 | + on_press_seconds: int = 1 |
| 98 | + off_press_seconds: int = 5 |
49 | 99 |
|
50 | 100 | def __post_init__(self): |
51 | 101 | super().__post_init__() |
52 | | - self.device = DigitalInputDevice(pin=self.pin) |
| 102 | + |
| 103 | + try: |
| 104 | + self.device.switch_to_output(value=self.value, drive_mode=getattr(DriveMode, self.drive_mode)) |
| 105 | + except AttributeError as err: |
| 106 | + raise ValueError(f"Invalid drive mode: {self.drive_mode}") from err |
| 107 | + |
| 108 | + def press(self, seconds: int) -> None: |
| 109 | + self.device.value = self.value |
| 110 | + self.device.value = not self.value |
| 111 | + sleep(seconds) |
| 112 | + self.device.value = self.value |
| 113 | + |
| 114 | + @export |
| 115 | + def on(self) -> None: |
| 116 | + self.press(self.on_press_seconds) |
53 | 117 |
|
54 | 118 | @export |
55 | | - def wait_for_active(self, timeout: float | None = None): |
56 | | - self.device.wait_for_active(timeout) |
| 119 | + def off(self) -> None: |
| 120 | + self.press(self.off_press_seconds) |
57 | 121 |
|
58 | 122 | @export |
59 | | - def wait_for_inactive(self, timeout: float | None = None): |
60 | | - self.device.wait_for_inactive(timeout) |
| 123 | + def read(self) -> AsyncGenerator[PowerReading, None]: |
| 124 | + raise NotImplementedError |
0 commit comments