Conversation
not sure if it works
There was a problem hiding this comment.
Pull Request Overview
This PR sets up the initial project environment by adding a test file and implementing a complete RS485 UART interface module for Raspberry Pi.
- Adds RS485Interface class with GPIO-controlled transceiver direction (DE/RE pins)
- Implements framed messaging protocol with CRC16 checksums and background receive thread
- Includes example usage in main() function
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| test.txt | Test file confirming successful push to repository |
| RS485Interface.py | Complete RS485 UART interface implementation with GPIO control, CRC16 framing, and threaded reception |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| buffer = bytearray() | ||
| while self._running.is_set(): | ||
| try: | ||
| data = self.ser.read(64) |
There was a problem hiding this comment.
Potential race condition: self.ser is accessed without holding self._lock in the background receive thread (_receiver_loop), while the send() method accesses it with the lock held. This could lead to undefined behavior if send and receive operations occur simultaneously. Consider protecting serial port access with the lock consistently, or document why it's safe.
| data = self.ser.read(64) | |
| with self._lock: | |
| data = self.ser.read(64) |
|
|
||
| return body[1:] | ||
|
|
||
| def send(self, payload: bytes) -> None: |
There was a problem hiding this comment.
Missing input validation: The send() method doesn't validate that payload is of type bytes. If a non-bytes object is passed, it will fail later in _build_frame(). Add validation at the API boundary: if not isinstance(payload, bytes): raise TypeError("Payload must be bytes")
| def send(self, payload: bytes) -> None: | |
| def send(self, payload: bytes) -> None: | |
| if not isinstance(payload, bytes): | |
| raise TypeError("Payload must be bytes") |
| self._running.clear() | ||
| if self._rx_thread: | ||
| self._rx_thread.join(1.0) | ||
| if self.ser and self.ser.is_open: |
There was a problem hiding this comment.
The close() method should be idempotent and handle being called multiple times safely. Additionally, it should check if self.ser exists before accessing it, as initialization could fail before the serial port is created. Consider adding: if hasattr(self, 'ser') and self.ser and self.ser.is_open:
| if self.ser and self.ser.is_open: | |
| if hasattr(self, 'ser') and self.ser and self.ser.is_open: |
| rs = RS485Interface( | ||
| port="/dev/ttyAMA0", | ||
| baudrate=115200, | ||
| de_pin=18, | ||
| re_pin=23 | ||
| ) | ||
|
|
||
| try: | ||
| i = 0 | ||
| while True: | ||
| payload = f"Hello {i}".encode() | ||
| print("TX", payload) | ||
| rs.send(payload) | ||
|
|
||
| rx = rs.read_frame(timeout=1.0) | ||
| if rx is not None: | ||
| print("RX", rx) | ||
|
|
||
| i += 1 | ||
| time.sleep(2) | ||
|
|
||
| except KeyboardInterrupt: | ||
| print("Exiting") | ||
| finally: | ||
| rs.close() |
There was a problem hiding this comment.
Inconsistent indentation detected. The main() function body uses 8 spaces (double indentation) instead of the standard 4 spaces used throughout the rest of the file. This should be corrected to use 4-space indentation to match the project's style.
| rs = RS485Interface( | |
| port="/dev/ttyAMA0", | |
| baudrate=115200, | |
| de_pin=18, | |
| re_pin=23 | |
| ) | |
| try: | |
| i = 0 | |
| while True: | |
| payload = f"Hello {i}".encode() | |
| print("TX", payload) | |
| rs.send(payload) | |
| rx = rs.read_frame(timeout=1.0) | |
| if rx is not None: | |
| print("RX", rx) | |
| i += 1 | |
| time.sleep(2) | |
| except KeyboardInterrupt: | |
| print("Exiting") | |
| finally: | |
| rs.close() | |
| rs = RS485Interface( | |
| port="/dev/ttyAMA0", | |
| baudrate=115200, | |
| de_pin=18, | |
| re_pin=23 | |
| ) | |
| try: | |
| i = 0 | |
| while True: | |
| payload = f"Hello {i}".encode() | |
| print("TX", payload) | |
| rs.send(payload) | |
| rx = rs.read_frame(timeout=1.0) | |
| if rx is not None: | |
| print("RX", rx) | |
| i += 1 | |
| time.sleep(2) | |
| except KeyboardInterrupt: | |
| print("Exiting") | |
| finally: | |
| rs.close() |
| @@ -0,0 +1 @@ | |||
| Neil has pushed this to github No newline at end of file | |||
There was a problem hiding this comment.
Spelling error: "Github" should be "GitHub" (with a capital H). The correct capitalization is important for proper noun accuracy.
| Neil has pushed this to github | |
| Neil has pushed this to GitHub |
| continue | ||
|
|
||
| except Exception as e: | ||
| print("RS485 RX thread error", e) |
There was a problem hiding this comment.
The error message lacks sufficient detail. It should include the exception type and message to aid debugging. Consider changing to: print(f"RS485 RX thread error: {type(e).__name__}: {e}", file=sys.stderr)
| print("RS485 RX thread error", e) | |
| print(f"RS485 RX thread error: {type(e).__name__}: {e}", file=sys.stderr) |
| def send(self, payload: bytes) -> None: | ||
| frame = self._build_frame(payload) | ||
| with self._lock: | ||
| self._set_transmit_mode() | ||
| self.ser.write(frame) | ||
| self.ser.flush() | ||
| time.sleep(len(frame) * 10 / self.baudrate) |
There was a problem hiding this comment.
Magic number used without explanation. The value 10 in the sleep calculation len(frame) * 10 / self.baudrate appears to represent bits per byte (1 start + 8 data + 1 stop = 10 bits). This should be defined as a named constant like BITS_PER_BYTE = 10 with a comment explaining the calculation for better maintainability.
| def send(self, payload: bytes) -> None: | |
| frame = self._build_frame(payload) | |
| with self._lock: | |
| self._set_transmit_mode() | |
| self.ser.write(frame) | |
| self.ser.flush() | |
| time.sleep(len(frame) * 10 / self.baudrate) | |
| # Number of bits per UART byte: 1 start bit + 8 data bits + 1 stop bit = 10 bits | |
| BITS_PER_BYTE = 10 | |
| def send(self, payload: bytes) -> None: | |
| frame = self._build_frame(payload) | |
| with self._lock: | |
| self._set_transmit_mode() | |
| self.ser.write(frame) | |
| self.ser.flush() | |
| time.sleep(len(frame) * self.BITS_PER_BYTE / self.baudrate) |
No description provided.