From c8bee21af7b37644482b7d14adaaf98e62f93346 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Mon, 18 May 2026 16:37:07 +1000 Subject: [PATCH] node_properties: pipeline param fetch with per-index retries Fetch up to 5 GetSet requests in flight at once to reduce wall-clock time on lossy/high-latency links, with up to 5 retries per index so a single dropped frame doesn't abort the whole fetch. Co-Authored-By: Claude Opus 4.7 (1M context) --- dronecan_gui_tool/widgets/node_properties.py | 149 ++++++++++++++----- 1 file changed, 111 insertions(+), 38 deletions(-) diff --git a/dronecan_gui_tool/widgets/node_properties.py b/dronecan_gui_tool/widgets/node_properties.py index d3e09e3..8a38862 100644 --- a/dronecan_gui_tool/widgets/node_properties.py +++ b/dronecan_gui_tool/widgets/node_properties.py @@ -26,6 +26,10 @@ REQUEST_PRIORITY = 30 +PARAM_FETCH_PIPELINE_DEPTH = 5 +PARAM_FETCH_MAX_RETRIES = 5 +PARAM_FETCH_RETRY_DELAY = 0.1 + class FieldValueWidget(QLineEdit): def __init__(self, parent, initial_value=None): @@ -578,7 +582,14 @@ def __init__(self, parent, node, target_node_id): self._node = node self._target_node_id = target_node_id - self._retries = 0 + + # Pipelined fetch state + self._fetch_in_progress = False + self._fetch_generation = 0 + self._next_index_to_send = 0 + self._end_index = None + self._pending = {} # {index: retry_count} + self._received = {} # {index: response} buffered awaiting in-order flush self._read_all_button = make_icon_button('fa6s.arrows-rotate', 'Fetch all config parameters from the node', self, text='Fetch All', on_clicked=self._do_reload) @@ -659,54 +670,116 @@ def update_callback(value, is_melody=False): win = ConfigParamEditWindow(self, self._node, self._target_node_id, self._params[index], update_callback) win.show() - def _on_fetch_response(self, index, e): + def _send_param_request(self, index, retry_count, generation): + '''Send a single GetSet request for `index`. Returns True on success.''' + if generation != self._fetch_generation: + return False # fetch was aborted or restarted + self._pending[index] = retry_count + try: + self._node.request(dronecan.uavcan.protocol.param.GetSet.Request(index=index), + self._target_node_id, + partial(self._on_fetch_response, index, generation), + priority=REQUEST_PRIORITY) + return True + except Exception as ex: + logger.error('Param fetch send error', exc_info=True) + self.window().show_message('Could not send param get request for index %d: %r', index, ex) + self._pending.pop(index, None) + self._abort_fetch() + return False + + def _send_next_param_request(self): + '''Send the next not-yet-requested index, if any remain.''' + if self._end_index is not None and self._next_index_to_send >= self._end_index: + return False + index = self._next_index_to_send + self._next_index_to_send += 1 + return self._send_param_request(index, 0, self._fetch_generation) + + def _flush_received(self): + '''Move contiguous responses from the receive buffer into the table.''' + next_idx = len(self._params) + while next_idx in self._received: + resp = self._received.pop(next_idx) + self._params.append(resp) + self._table.setRowCount(self._table.rowCount() + 1) + self._table.set_row(self._table.rowCount() - 1, (next_idx, resp)) + next_idx += 1 + + def _try_finish_fetch(self): + if not self._fetch_in_progress: + return + if self._end_index is not None and not self._pending: + self._flush_received() + self._fetch_in_progress = False + self.window().show_message('%d params fetched successfully', len(self._params)) + + def _abort_fetch(self): + if not self._fetch_in_progress: + return + self._fetch_in_progress = False + # bump generation so any in-flight retries/responses are ignored + self._fetch_generation += 1 + + def _on_fetch_response(self, index, generation, e): + if generation != self._fetch_generation: + return # stale response from a prior/aborted fetch + if e is None: - if self._retries < 5: - self._retries += 1 - self.window().show_message('Re-requesting index %d', index) - self._node.defer(0.1, lambda: self._node.request(dronecan.uavcan.protocol.param.GetSet.Request(index=index), - self._target_node_id, - partial(self._on_fetch_response, index), - priority=REQUEST_PRIORITY)) + # Timeout. If we've already discovered the end and this index is past it, + # just drop it silently — the node has no param at this index. + if self._end_index is not None and index >= self._end_index: + self._pending.pop(index, None) + self._try_finish_fetch() + return + retry_count = self._pending.get(index, 0) + 1 + if retry_count <= PARAM_FETCH_MAX_RETRIES: + self._pending[index] = retry_count + self.window().show_message('Re-requesting index %d (retry %d/%d)', + index, retry_count, PARAM_FETCH_MAX_RETRIES) + self._node.defer(PARAM_FETCH_RETRY_DELAY, + partial(self._send_param_request, index, retry_count, generation)) else: - self.window().show_message('Param fetch failed: request timed out') + self.window().show_message('Param fetch failed at index %d: timed out after %d retries', + index, PARAM_FETCH_MAX_RETRIES) + self._pending.pop(index, None) + self._abort_fetch() return - # reset retries when we get a response - self._retries = 0 + self._pending.pop(index, None) if len(e.response.name) == 0: - self.window().show_message('%d params fetched successfully', index) + # End-of-params marker. Record the smallest empty index seen. + if self._end_index is None or index < self._end_index: + self._end_index = index + self._try_finish_fetch() return - self._params.append(e.response) - self._table.setRowCount(self._table.rowCount() + 1) - self._table.set_row(self._table.rowCount() - 1, (index, e.response)) + self._received[index] = e.response + self._flush_received() - try: - index += 1 - self.window().show_message('Requesting index %d', index) - self._node.defer(0.1, lambda: self._node.request(dronecan.uavcan.protocol.param.GetSet.Request(index=index), - self._target_node_id, - partial(self._on_fetch_response, index), - priority=REQUEST_PRIORITY)) - except Exception as ex: - logger.error('Param fetch error', exc_info=True) - self.window().show_message('Could not send param get request: %r', ex) + # Refill the pipeline slot freed by this response + self._send_next_param_request() + self.window().show_message('Fetching params... %d received, %d in flight', + len(self._params), len(self._pending)) + self._try_finish_fetch() def _do_reload(self): - try: - index = 0 - self._node.request(dronecan.uavcan.protocol.param.GetSet.Request(index=index), - self._target_node_id, - partial(self._on_fetch_response, index), - priority=REQUEST_PRIORITY) - except Exception as ex: - show_error('Node error', 'Could not send param get request', ex, self) - else: - self.window().show_message('Param fetch request sent') - self._table.setRowCount(0) - self._params = [] + if self._fetch_in_progress: + self.window().show_message('Param fetch already in progress') + return + self._fetch_generation += 1 + self._fetch_in_progress = True + self._next_index_to_send = 0 + self._end_index = None + self._pending = {} + self._received = {} + self._table.setRowCount(0) + self._params = [] + self.window().show_message('Param fetch started (pipeline depth %d)', PARAM_FETCH_PIPELINE_DEPTH) + for _ in range(PARAM_FETCH_PIPELINE_DEPTH): + if not self._send_next_param_request(): + break def param_as_string(self, value, is_melody=False): value_type = dronecan.get_active_union_field(value)