Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 111 additions & 38 deletions dronecan_gui_tool/widgets/node_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@

REQUEST_PRIORITY = 30

PARAM_FETCH_PIPELINE_DEPTH = 5
PARAM_FETCH_MAX_RETRIES = 5
PARAM_FETCH_RETRY_DELAY = 0.1


class FieldValueWidget(QLineEdit):
def __init__(self, parent, initial_value=None):
Expand Down Expand Up @@ -578,7 +582,14 @@ def __init__(self, parent, node, target_node_id):

self._node = node
self._target_node_id = target_node_id
self._retries = 0

# Pipelined fetch state
self._fetch_in_progress = False
self._fetch_generation = 0
self._next_index_to_send = 0
self._end_index = None
self._pending = {} # {index: retry_count}
self._received = {} # {index: response} buffered awaiting in-order flush

self._read_all_button = make_icon_button('fa6s.arrows-rotate', 'Fetch all config parameters from the node', self,
text='Fetch All', on_clicked=self._do_reload)
Expand Down Expand Up @@ -659,54 +670,116 @@ def update_callback(value, is_melody=False):
win = ConfigParamEditWindow(self, self._node, self._target_node_id, self._params[index], update_callback)
win.show()

def _on_fetch_response(self, index, e):
def _send_param_request(self, index, retry_count, generation):
'''Send a single GetSet request for `index`. Returns True on success.'''
if generation != self._fetch_generation:
return False # fetch was aborted or restarted
self._pending[index] = retry_count
try:
self._node.request(dronecan.uavcan.protocol.param.GetSet.Request(index=index),
self._target_node_id,
partial(self._on_fetch_response, index, generation),
priority=REQUEST_PRIORITY)
return True
except Exception as ex:
logger.error('Param fetch send error', exc_info=True)
self.window().show_message('Could not send param get request for index %d: %r', index, ex)
self._pending.pop(index, None)
self._abort_fetch()
return False

def _send_next_param_request(self):
'''Send the next not-yet-requested index, if any remain.'''
if self._end_index is not None and self._next_index_to_send >= self._end_index:
return False
index = self._next_index_to_send
self._next_index_to_send += 1
return self._send_param_request(index, 0, self._fetch_generation)

def _flush_received(self):
'''Move contiguous responses from the receive buffer into the table.'''
next_idx = len(self._params)
while next_idx in self._received:
resp = self._received.pop(next_idx)
self._params.append(resp)
self._table.setRowCount(self._table.rowCount() + 1)
self._table.set_row(self._table.rowCount() - 1, (next_idx, resp))
next_idx += 1

def _try_finish_fetch(self):
if not self._fetch_in_progress:
return
if self._end_index is not None and not self._pending:
self._flush_received()
self._fetch_in_progress = False
self.window().show_message('%d params fetched successfully', len(self._params))

def _abort_fetch(self):
if not self._fetch_in_progress:
return
self._fetch_in_progress = False
# bump generation so any in-flight retries/responses are ignored
self._fetch_generation += 1

def _on_fetch_response(self, index, generation, e):
if generation != self._fetch_generation:
return # stale response from a prior/aborted fetch

if e is None:
if self._retries < 5:
self._retries += 1
self.window().show_message('Re-requesting index %d', index)
self._node.defer(0.1, lambda: self._node.request(dronecan.uavcan.protocol.param.GetSet.Request(index=index),
self._target_node_id,
partial(self._on_fetch_response, index),
priority=REQUEST_PRIORITY))
# Timeout. If we've already discovered the end and this index is past it,
# just drop it silently — the node has no param at this index.
if self._end_index is not None and index >= self._end_index:
self._pending.pop(index, None)
self._try_finish_fetch()
return
retry_count = self._pending.get(index, 0) + 1
if retry_count <= PARAM_FETCH_MAX_RETRIES:
self._pending[index] = retry_count
self.window().show_message('Re-requesting index %d (retry %d/%d)',
index, retry_count, PARAM_FETCH_MAX_RETRIES)
self._node.defer(PARAM_FETCH_RETRY_DELAY,
partial(self._send_param_request, index, retry_count, generation))
else:
self.window().show_message('Param fetch failed: request timed out')
self.window().show_message('Param fetch failed at index %d: timed out after %d retries',
index, PARAM_FETCH_MAX_RETRIES)
self._pending.pop(index, None)
self._abort_fetch()
return

# reset retries when we get a response
self._retries = 0
self._pending.pop(index, None)

if len(e.response.name) == 0:
self.window().show_message('%d params fetched successfully', index)
# End-of-params marker. Record the smallest empty index seen.
if self._end_index is None or index < self._end_index:
self._end_index = index
self._try_finish_fetch()
return

self._params.append(e.response)
self._table.setRowCount(self._table.rowCount() + 1)
self._table.set_row(self._table.rowCount() - 1, (index, e.response))
self._received[index] = e.response
self._flush_received()

try:
index += 1
self.window().show_message('Requesting index %d', index)
self._node.defer(0.1, lambda: self._node.request(dronecan.uavcan.protocol.param.GetSet.Request(index=index),
self._target_node_id,
partial(self._on_fetch_response, index),
priority=REQUEST_PRIORITY))
except Exception as ex:
logger.error('Param fetch error', exc_info=True)
self.window().show_message('Could not send param get request: %r', ex)
# Refill the pipeline slot freed by this response
self._send_next_param_request()
self.window().show_message('Fetching params... %d received, %d in flight',
len(self._params), len(self._pending))
self._try_finish_fetch()

def _do_reload(self):
try:
index = 0
self._node.request(dronecan.uavcan.protocol.param.GetSet.Request(index=index),
self._target_node_id,
partial(self._on_fetch_response, index),
priority=REQUEST_PRIORITY)
except Exception as ex:
show_error('Node error', 'Could not send param get request', ex, self)
else:
self.window().show_message('Param fetch request sent')
self._table.setRowCount(0)
self._params = []
if self._fetch_in_progress:
self.window().show_message('Param fetch already in progress')
return
self._fetch_generation += 1
self._fetch_in_progress = True
self._next_index_to_send = 0
self._end_index = None
self._pending = {}
self._received = {}
self._table.setRowCount(0)
self._params = []
self.window().show_message('Param fetch started (pipeline depth %d)', PARAM_FETCH_PIPELINE_DEPTH)
for _ in range(PARAM_FETCH_PIPELINE_DEPTH):
if not self._send_next_param_request():
break

def param_as_string(self, value, is_melody=False):
value_type = dronecan.get_active_union_field(value)
Expand Down
Loading