From 28a24002b7694ab6e72decaaee5c08f1e7860777 Mon Sep 17 00:00:00 2001 From: Florian Sager Date: Sat, 31 Jan 2026 15:01:40 +0100 Subject: [PATCH 1/3] MESHCORE_SIMULATOR patch --- src/Dispatcher.cpp | 16 +++++++++++++--- src/helpers/BaseChatMesh.cpp | 2 +- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 9d7a11131d..46e5c16752 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -4,9 +4,13 @@ #include #endif +#ifdef MESHCORE_SIMULATOR + #include "sim_context.h" +#endif + #include -namespace mesh { + namespace mesh { #define MAX_RX_DELAY_MILLIS 32000 // 32 seconds #define MIN_TX_BUDGET_RESERVE_MS 100 // min budget (ms) required before allowing next TX @@ -383,7 +387,13 @@ bool Dispatcher::millisHasNowPassed(unsigned long timestamp) const { } unsigned long Dispatcher::futureMillis(int millis_from_now) const { - return _ms->getMillis() + millis_from_now; + unsigned long wake_time = _ms->getMillis() + millis_from_now; + #ifdef MESHCORE_SIMULATOR // Register wake time with simulator for accurate scheduling + if (auto *ctx = SIM_CTX()) { + ctx->wake_registry.registerWakeTime(wake_time); + } + #endif + + return wake_time; } - } \ No newline at end of file diff --git a/src/helpers/BaseChatMesh.cpp b/src/helpers/BaseChatMesh.cpp index e2b116b456..57780eb0cc 100644 --- a/src/helpers/BaseChatMesh.cpp +++ b/src/helpers/BaseChatMesh.cpp @@ -784,7 +784,7 @@ void BaseChatMesh::resetPathTo(ContactInfo& recipient) { recipient.out_path_len = OUT_PATH_UNKNOWN; } -static ContactInfo* table; // pass via global :-( +static thread_local ContactInfo *table; // pass via global (thread_local for simulation) static int cmp_adv_timestamp(const void *a, const void *b) { int a_idx = *((int *)a); From 5a040a739e9042b6ffa36456c61ac518e8c21f28 Mon Sep 17 00:00:00 2001 From: Florian Sager Date: Mon, 1 Jun 2026 23:28:57 +0200 Subject: [PATCH 2/3] Implemented repeated sending for error correction --- docs/cli_commands.md | 12 ++ examples/companion_radio/DataStore.cpp | 2 + examples/companion_radio/MyMesh.cpp | 6 + examples/companion_radio/MyMesh.h | 1 + examples/companion_radio/NodePrefs.h | 1 + examples/simple_repeater/MyMesh.cpp | 1 + examples/simple_repeater/MyMesh.h | 3 + examples/simple_room_server/MyMesh.cpp | 1 + examples/simple_room_server/MyMesh.h | 3 + examples/simple_sensor/SensorMesh.cpp | 1 + examples/simple_sensor/SensorMesh.h | 1 + src/Dispatcher.cpp | 173 +++++++++++++++--------- src/Dispatcher.h | 23 ++++ src/Mesh.cpp | 56 +++++++- src/Mesh.h | 4 +- src/MeshCore.h | 2 +- src/Packet.cpp | 31 +++-- src/Packet.h | 9 +- src/helpers/CommonCLI.cpp | 18 ++- src/helpers/CommonCLI.h | 1 + src/helpers/SimpleMeshTables.h | 56 ++++++-- src/helpers/StaticPoolPacketManager.cpp | 2 + 22 files changed, 311 insertions(+), 96 deletions(-) diff --git a/docs/cli_commands.md b/docs/cli_commands.md index f63e6879e1..15bb832906 100644 --- a/docs/cli_commands.md +++ b/docs/cli_commands.md @@ -466,6 +466,18 @@ This document provides an overview of CLI commands that can be sent to MeshCore --- +#### View or change the maximum direct-route resend attempts +**Usage:** +- `get max.resend` +- `set max.resend ` + +**Parameters:** +- `value`: Maximum number of resend attempts for direct-routed packets (0–5). `0` disables resending entirely. + +**Default:** `2` + +--- + #### View or change the retransmit delay factor for flood traffic **Usage:** - `get txdelay` diff --git a/examples/companion_radio/DataStore.cpp b/examples/companion_radio/DataStore.cpp index c7988bb344..ef837b9eaa 100644 --- a/examples/companion_radio/DataStore.cpp +++ b/examples/companion_radio/DataStore.cpp @@ -233,6 +233,7 @@ void DataStore::loadPrefsInt(const char *filename, NodePrefs& _prefs, double& no file.read((uint8_t *)&_prefs.rx_boosted_gain, sizeof(_prefs.rx_boosted_gain)); // 89 file.read((uint8_t *)_prefs.default_scope_name, sizeof(_prefs.default_scope_name)); // 90 file.read((uint8_t *)_prefs.default_scope_key, sizeof(_prefs.default_scope_key)); // 121 + file.read((uint8_t *)&_prefs.max_resend_attempts, sizeof(_prefs.max_resend_attempts)); // 137 file.close(); } @@ -273,6 +274,7 @@ void DataStore::savePrefs(const NodePrefs& _prefs, double node_lat, double node_ file.write((uint8_t *)&_prefs.rx_boosted_gain, sizeof(_prefs.rx_boosted_gain)); // 89 file.write((uint8_t *)_prefs.default_scope_name, sizeof(_prefs.default_scope_name)); // 90 file.write((uint8_t *)_prefs.default_scope_key, sizeof(_prefs.default_scope_key)); // 121 + file.write((uint8_t *)&_prefs.max_resend_attempts, sizeof(_prefs.max_resend_attempts)); // 137 file.close(); } diff --git a/examples/companion_radio/MyMesh.cpp b/examples/companion_radio/MyMesh.cpp index f180421912..15c5898038 100644 --- a/examples/companion_radio/MyMesh.cpp +++ b/examples/companion_radio/MyMesh.cpp @@ -271,6 +271,7 @@ uint32_t MyMesh::getRetransmitDelay(const mesh::Packet *packet) { uint32_t t = (_radio->getEstAirtimeFor(packet->getPathByteLen() + packet->payload_len + 2) * 0.5f); return getRNG()->nextInt(0, 5*t + 1); } + uint32_t MyMesh::getDirectRetransmitDelay(const mesh::Packet *packet) { uint32_t t = (_radio->getEstAirtimeFor(packet->getPathByteLen() + packet->payload_len + 2) * 0.2f); return getRNG()->nextInt(0, 5*t + 1); @@ -878,6 +879,7 @@ MyMesh::MyMesh(mesh::Radio &radio, mesh::RNG &rng, mesh::RTCClock &rtc, SimpleMe _prefs.tx_power_dbm = LORA_TX_POWER; _prefs.gps_enabled = 0; // GPS disabled by default _prefs.gps_interval = 0; // No automatic GPS updates by default + _prefs.max_resend_attempts = 2; //_prefs.rx_delay_base = 10.0f; enable once new algo fixed #if defined(USE_SX1262) || defined(USE_SX1268) #ifdef SX126X_RX_BOOSTED_GAIN @@ -935,6 +937,7 @@ void MyMesh::begin(bool has_display) { _prefs.tx_power_dbm = constrain(_prefs.tx_power_dbm, -9, MAX_LORA_TX_POWER); _prefs.gps_enabled = constrain(_prefs.gps_enabled, 0, 1); // Ensure boolean 0 or 1 _prefs.gps_interval = constrain(_prefs.gps_interval, 0, 86400); // Max 24 hours + _prefs.max_resend_attempts = constrain(_prefs.max_resend_attempts, 0, 5); #ifdef BLE_PIN_CODE // 123456 by default if (_prefs.ble_pin == 0) { @@ -1437,6 +1440,9 @@ void MyMesh::handleCmdFrame(size_t len) { _prefs.advert_loc_policy = cmd_frame[3]; if (len >= 5) { _prefs.multi_acks = cmd_frame[4]; + if (len >= 6) { + _prefs.max_resend_attempts = constrain(cmd_frame[5], 0, 5); + } } } } diff --git a/examples/companion_radio/MyMesh.h b/examples/companion_radio/MyMesh.h index f189a2c5e5..d7c727253b 100644 --- a/examples/companion_radio/MyMesh.h +++ b/examples/companion_radio/MyMesh.h @@ -109,6 +109,7 @@ class MyMesh : public BaseChatMesh, public DataStoreHost { uint32_t getRetransmitDelay(const mesh::Packet *packet) override; uint32_t getDirectRetransmitDelay(const mesh::Packet *packet) override; uint8_t getExtraAckTransmitCount() const override; + uint8_t getMaxResendAttempts() const override { return _prefs.max_resend_attempts; } bool filterRecvFloodPacket(mesh::Packet* packet) override; bool allowPacketForward(const mesh::Packet* packet) override; diff --git a/examples/companion_radio/NodePrefs.h b/examples/companion_radio/NodePrefs.h index 48c381ceaf..6acf5554e1 100644 --- a/examples/companion_radio/NodePrefs.h +++ b/examples/companion_radio/NodePrefs.h @@ -34,4 +34,5 @@ struct NodePrefs { // persisted to file uint8_t autoadd_max_hops; // 0 = no limit, 1 = direct (0 hops), N = up to N-1 hops (max 64) char default_scope_name[31]; uint8_t default_scope_key[16]; + uint8_t max_resend_attempts; // 0 = disabled, 1-5, default 2 }; \ No newline at end of file diff --git a/examples/simple_repeater/MyMesh.cpp b/examples/simple_repeater/MyMesh.cpp index 1f68c6f2a0..25f20b23bf 100644 --- a/examples/simple_repeater/MyMesh.cpp +++ b/examples/simple_repeater/MyMesh.cpp @@ -874,6 +874,7 @@ MyMesh::MyMesh(mesh::MainBoard &board, mesh::Radio &radio, mesh::MillisecondCloc _prefs.rx_delay_base = 0.0f; // turn off by default, was 10.0; _prefs.tx_delay_factor = 0.5f; // was 0.25f _prefs.direct_tx_delay_factor = 0.3f; // was 0.2 + _prefs.max_resend_attempts = 2; StrHelper::strncpy(_prefs.node_name, ADVERT_NAME, sizeof(_prefs.node_name)); _prefs.node_lat = ADVERT_LAT; _prefs.node_lon = ADVERT_LON; diff --git a/examples/simple_repeater/MyMesh.h b/examples/simple_repeater/MyMesh.h index 8ed0317e69..6d210e5d22 100644 --- a/examples/simple_repeater/MyMesh.h +++ b/examples/simple_repeater/MyMesh.h @@ -156,6 +156,9 @@ class MyMesh : public mesh::Mesh, public CommonCLICallbacks { uint8_t getExtraAckTransmitCount() const override { return _prefs.multi_acks; } + uint8_t getMaxResendAttempts() const override { + return _prefs.max_resend_attempts; + } #if ENV_INCLUDE_GPS == 1 void applyGpsPrefs() { diff --git a/examples/simple_room_server/MyMesh.cpp b/examples/simple_room_server/MyMesh.cpp index 2fb80be24c..00230065b4 100644 --- a/examples/simple_room_server/MyMesh.cpp +++ b/examples/simple_room_server/MyMesh.cpp @@ -630,6 +630,7 @@ MyMesh::MyMesh(mesh::MainBoard &board, mesh::Radio &radio, mesh::MillisecondCloc _prefs.rx_delay_base = 0.0f; // off by default, was 10.0 _prefs.tx_delay_factor = 0.5f; // was 0.25f; _prefs.direct_tx_delay_factor = 0.2f; // was zero + _prefs.max_resend_attempts = 2; StrHelper::strncpy(_prefs.node_name, ADVERT_NAME, sizeof(_prefs.node_name)); _prefs.node_lat = ADVERT_LAT; _prefs.node_lon = ADVERT_LON; diff --git a/examples/simple_room_server/MyMesh.h b/examples/simple_room_server/MyMesh.h index 1b35ae95a1..8cd8246c21 100644 --- a/examples/simple_room_server/MyMesh.h +++ b/examples/simple_room_server/MyMesh.h @@ -150,6 +150,9 @@ class MyMesh : public mesh::Mesh, public CommonCLICallbacks { uint8_t getExtraAckTransmitCount() const override { return _prefs.multi_acks; } + uint8_t getMaxResendAttempts() const override { + return _prefs.max_resend_attempts; + } bool filterRecvFloodPacket(mesh::Packet* pkt) override; diff --git a/examples/simple_sensor/SensorMesh.cpp b/examples/simple_sensor/SensorMesh.cpp index 879fcbf026..64e1c72b07 100644 --- a/examples/simple_sensor/SensorMesh.cpp +++ b/examples/simple_sensor/SensorMesh.cpp @@ -712,6 +712,7 @@ SensorMesh::SensorMesh(mesh::MainBoard& board, mesh::Radio& radio, mesh::Millise _prefs.rx_delay_base = 0.0f; // turn off by default, was 10.0; _prefs.tx_delay_factor = 0.5f; // was 0.25f _prefs.direct_tx_delay_factor = 0.2f; // was zero + _prefs.max_resend_attempts = 2; StrHelper::strncpy(_prefs.node_name, ADVERT_NAME, sizeof(_prefs.node_name)); _prefs.node_lat = ADVERT_LAT; _prefs.node_lon = ADVERT_LON; diff --git a/examples/simple_sensor/SensorMesh.h b/examples/simple_sensor/SensorMesh.h index 424b16c175..0ce151b549 100644 --- a/examples/simple_sensor/SensorMesh.h +++ b/examples/simple_sensor/SensorMesh.h @@ -121,6 +121,7 @@ class SensorMesh : public mesh::Mesh, public CommonCLICallbacks { uint32_t getDirectRetransmitDelay(const mesh::Packet* packet) override; int getInterferenceThreshold() const override; int getAGCResetInterval() const override; + uint8_t getMaxResendAttempts() const override { return _prefs.max_resend_attempts; } void onAnonDataRecv(mesh::Packet* packet, const uint8_t* secret, const mesh::Identity& sender, uint8_t* data, size_t len) override; int searchPeersByHash(const uint8_t* hash) override; void getPeerSharedSecret(uint8_t* dest_secret, int peer_idx) override; diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 46e5c16752..9195fc12d6 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -10,7 +10,7 @@ #include - namespace mesh { +namespace mesh { #define MAX_RX_DELAY_MILLIS 32000 // 32 seconds #define MIN_TX_BUDGET_RESERVE_MS 100 // min budget (ms) required before allowing next TX @@ -68,10 +68,6 @@ uint32_t Dispatcher::getCADFailMaxDuration() const { } void Dispatcher::loop() { - if (millisHasNowPassed(next_floor_calib_time)) { - _radio->triggerNoiseFloorCalibrate(getInterferenceThreshold()); - next_floor_calib_time = futureMillis(NOISE_FLOOR_CALIB_INTERVAL); - } _radio->loop(); // check for radio 'stuck' in mode other than Rx @@ -115,10 +111,13 @@ void Dispatcher::loop() { } else { n_sent_direct++; } - releasePacket(outbound); // return to pool + // allow for possible retransmission for reliability + if (!resendPacket(outbound)) { + releasePacket(outbound); // return to pool + } outbound = NULL; } else if (millisHasNowPassed(outbound_expiry)) { - MESH_DEBUG_PRINTLN("%s Dispatcher::loop(): WARNING: outbound packed send timed out!", getLogDateTime()); + MESH_DEBUG_PRINTLN("%s Dispatcher::loop(): WARNING: outbound packet %s send timed out!", getLogDateTime(), outbound->getHashHex()); _radio->onSendFinished(); logTxFail(outbound, 2 + outbound->getPathByteLen() + outbound->payload_len); @@ -147,6 +146,14 @@ void Dispatcher::loop() { } checkRecv(); checkSend(); + + // Do noise floor calibration LAST, when no critical operations are pending + if (millisHasNowPassed(next_floor_calib_time)) { + if (!_radio->isReceiving() && _mgr->getOutboundCount(_ms->getMillis()) == 0) { + _radio->triggerNoiseFloorCalibrate(getInterferenceThreshold()); + next_floor_calib_time = futureMillis(NOISE_FLOOR_CALIB_INTERVAL); + } + } } bool Dispatcher::tryParsePacket(Packet* pkt, const uint8_t* raw, int len) { @@ -192,76 +199,82 @@ bool Dispatcher::tryParsePacket(Packet* pkt, const uint8_t* raw, int len) { } void Dispatcher::checkRecv() { - Packet* pkt; - float score; - uint32_t air_time; - { - uint8_t raw[MAX_TRANS_UNIT+1]; + while (true) { + + Packet *pkt = nullptr; + float score = 0.0f; + uint32_t air_time = 0; + + uint8_t raw[MAX_TRANS_UNIT + 1]; int len = _radio->recvRaw(raw, MAX_TRANS_UNIT); - if (len > 0) { - logRxRaw(_radio->getLastSNR(), _radio->getLastRSSI(), raw, len); + if (len <= 0) { + break; + } - pkt = _mgr->allocNew(); - if (pkt == NULL) { - MESH_DEBUG_PRINTLN("%s Dispatcher::checkRecv(): WARNING: received data, no unused packets available!", getLogDateTime()); - } else { - if (tryParsePacket(pkt, raw, len)) { - pkt->_snr = _radio->getLastSNR() * 4.0f; - score = _radio->packetScore(_radio->getLastSNR(), len); - air_time = _radio->getEstAirtimeFor(len); - rx_air_time += air_time; - } else { - _mgr->free(pkt); // put back into pool - pkt = NULL; - } - } - } else { - pkt = NULL; + logRxRaw(_radio->getLastSNR(), _radio->getLastRSSI(), raw, len); + + pkt = _mgr->allocNew(); + if (pkt == NULL) { + MESH_DEBUG_PRINTLN("%s Dispatcher::checkRecv(): WARNING: received data, no unused packets available!", getLogDateTime()); + continue; } - } - if (pkt) { - #if MESH_PACKET_LOGGING - Serial.print(getLogDateTime()); - Serial.printf(": RX, len=%d (type=%d, route=%s, payload_len=%d) SNR=%d RSSI=%d score=%d time=%d", - pkt->getRawLength(), pkt->getPayloadType(), pkt->isRouteDirect() ? "D" : "F", pkt->payload_len, - (int)pkt->getSNR(), (int)_radio->getLastRSSI(), (int)(score*1000), air_time); - - static uint8_t packet_hash[MAX_HASH_SIZE]; - pkt->calculatePacketHash(packet_hash); - Serial.print(" hash="); - mesh::Utils::printHex(Serial, packet_hash, MAX_HASH_SIZE); - - if (pkt->getPayloadType() == PAYLOAD_TYPE_PATH || pkt->getPayloadType() == PAYLOAD_TYPE_REQ - || pkt->getPayloadType() == PAYLOAD_TYPE_RESPONSE || pkt->getPayloadType() == PAYLOAD_TYPE_TXT_MSG) { - Serial.printf(" [%02X -> %02X]\n", (uint32_t)pkt->payload[1], (uint32_t)pkt->payload[0]); + + if (tryParsePacket(pkt, raw, len)) { + pkt->_snr = _radio->getLastSNR() * 4.0f; + score = _radio->packetScore(_radio->getLastSNR(), len); + air_time = _radio->getEstAirtimeFor(len); + rx_air_time += air_time; } else { - Serial.printf("\n"); + _mgr->free(pkt); // put back into pool + pkt = NULL; } - #endif - logRx(pkt, pkt->getRawLength(), score); // hook for custom logging - if (pkt->isRouteFlood()) { - n_recv_flood++; + if (pkt) { +#if MESH_PACKET_LOGGING + Serial.print(getLogDateTime()); + Serial.printf(": RX, len=%d (type=%d, route=%s, payload_len=%d) SNR=%d RSSI=%d score=%d time=%d", + pkt->getRawLength(), pkt->getPayloadType(), pkt->isRouteDirect() ? "D" : "F", pkt->payload_len, + (int)pkt->getSNR(), (int)_radio->getLastRSSI(), (int)(score*1000), air_time); + + pkt->calculatePacketHash(); + Serial.print(" hash="); + mesh::Utils::printHex(Serial, pkt->hash, MAX_HASH_SIZE); - int _delay = calcRxDelay(score, air_time); - if (_delay < 50) { - MESH_DEBUG_PRINTLN("%s Dispatcher::checkRecv(), score delay below threshold (%d)", getLogDateTime(), _delay); - processRecvPacket(pkt); // is below the score delay threshold, so process immediately + if (pkt->getPayloadType() == PAYLOAD_TYPE_PATH || pkt->getPayloadType() == PAYLOAD_TYPE_REQ + || pkt->getPayloadType() == PAYLOAD_TYPE_RESPONSE || pkt->getPayloadType() == PAYLOAD_TYPE_TXT_MSG) { + Serial.printf(" [%02X -> %02X]\n", (uint32_t)pkt->payload[1], (uint32_t)pkt->payload[0]); } else { - MESH_DEBUG_PRINTLN("%s Dispatcher::checkRecv(), score delay is: %d millis", getLogDateTime(), _delay); - if (_delay > MAX_RX_DELAY_MILLIS) { - _delay = MAX_RX_DELAY_MILLIS; + Serial.printf("\n"); + } +#endif + logRx(pkt, pkt->getRawLength(), score); // hook for custom logging + + if (pkt->isRouteFlood()) { + n_recv_flood++; + + int _delay = calcRxDelay(score, air_time); + if (_delay < 50) { + MESH_DEBUG_PRINTLN("%s Dispatcher::checkRecv(), score delay below threshold (%d)", getLogDateTime(), _delay); + processRecvPacket(pkt); // is below the score delay threshold, so process immediately + } else { + MESH_DEBUG_PRINTLN("%s Dispatcher::checkRecv(), score delay is: %d millis", getLogDateTime(), _delay); + if (_delay > MAX_RX_DELAY_MILLIS) { + _delay = MAX_RX_DELAY_MILLIS; + } + _mgr->queueInbound(pkt, futureMillis(_delay)); // add to delayed inbound queue } - _mgr->queueInbound(pkt, futureMillis(_delay)); // add to delayed inbound queue + } else { + n_recv_direct++; + processRecvPacket(pkt); } - } else { - n_recv_direct++; - processRecvPacket(pkt); } } } void Dispatcher::processRecvPacket(Packet* pkt) { + + MESH_DEBUG_PRINTLN("Dispatcher::processRecvPacket %s", pkt->getHashHex()); + DispatcherAction action = onRecvPacket(pkt); if (action == ACTION_RELEASE) { _mgr->free(pkt); @@ -343,8 +356,8 @@ void Dispatcher::checkSend() { #if MESH_PACKET_LOGGING Serial.print(getLogDateTime()); - Serial.printf(": TX, len=%d (type=%d, route=%s, payload_len=%d)", - len, outbound->getPayloadType(), outbound->isRouteDirect() ? "D" : "F", outbound->payload_len); + Serial.printf(": TX, len=%d (type=%d, route=%s, payload_len=%d, attempt=%d)", len, + outbound->getPayloadType(), outbound->isRouteDirect() ? "D" : "F", outbound->payload_len, outbound->sending_attempts); if (outbound->getPayloadType() == PAYLOAD_TYPE_PATH || outbound->getPayloadType() == PAYLOAD_TYPE_REQ || outbound->getPayloadType() == PAYLOAD_TYPE_RESPONSE || outbound->getPayloadType() == PAYLOAD_TYPE_TXT_MSG) { Serial.printf(" [%02X -> %02X]\n", (uint32_t)outbound->payload[1], (uint32_t)outbound->payload[0]); @@ -388,12 +401,38 @@ bool Dispatcher::millisHasNowPassed(unsigned long timestamp) const { unsigned long Dispatcher::futureMillis(int millis_from_now) const { unsigned long wake_time = _ms->getMillis() + millis_from_now; - #ifdef MESHCORE_SIMULATOR // Register wake time with simulator for accurate scheduling +#ifdef MESHCORE_SIMULATOR // Register wake time with simulator for accurate scheduling if (auto *ctx = SIM_CTX()) { ctx->wake_registry.registerWakeTime(wake_time); } - #endif - +#endif + return wake_time; } + +bool Dispatcher::resendPacket(mesh::Packet *packet) { + + // prepare error correction via potential retransmit: + // re-send only direct routed packets, with remaining path hops whose retransmits can be recognized; + // the final hop will ACK separately, so out-of-scope here + if (packet->isRouteDirect() && packet->path_len > 0 && packet->sending_attempts < getMaxResendAttempts()) { + packet->sending_attempts++; + + MESH_DEBUG_PRINTLN("Dispatcher::resendPacket %s attempt=%d", packet->getHashHex(), + packet->sending_attempts); + + // Schedule re-send after the equivalent post-TX airtime silence has elapsed. + // We compute the silence directly from the packet's estimated airtime × budget factor. + // Adding 100ms jitter on top gives the downstream repeater enough time to forward the + // packet, and for us to hear that forwarding (and cancel this re-send) before we + // actually transmit. This avoids unnecessary retransmissions and collisions. + uint32_t retransmit_delay = getDirectRetransmitDelay(packet); + uint32_t packet_airtime_ms = _radio->getEstAirtimeFor(packet->getPathByteLen() + packet->payload_len + 2); + uint32_t silence_ms = (uint32_t)(packet_airtime_ms * getAirtimeBudgetFactor()); + _mgr->queueOutbound(packet, 1, futureMillis((int)(silence_ms + retransmit_delay + 100))); + return true; + } + + return false; +} } \ No newline at end of file diff --git a/src/Dispatcher.h b/src/Dispatcher.h index dd032f130d..e501d4351c 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -114,7 +114,10 @@ typedef uint32_t DispatcherAction; * and scheduling of outbound Packets. */ class Dispatcher { +protected: Packet* outbound; // current outbound packet + +private: unsigned long outbound_expiry, outbound_start, total_air_time, rx_air_time; unsigned long next_tx_time; unsigned long cad_busy_start; @@ -169,6 +172,11 @@ class Dispatcher { virtual int getAGCResetInterval() const { return 0; } // disabled by default virtual unsigned long getDutyCycleWindowMs() const { return 3600000; } + /** + * \returns maximum number of direct-route resend attempts (0 = disabled, default = 2, max = 5). + */ + virtual uint8_t getMaxResendAttempts() const { return 2; } + public: void begin(); void loop(); @@ -176,6 +184,21 @@ class Dispatcher { Packet* obtainNewPacket(); void releasePacket(Packet* packet); void sendPacket(Packet* packet, uint8_t priority, uint32_t delay_millis=0); + /** + * \brief re-send the given packet (for retransmission) if conditions apply. + * \return true, if packet was re-sent. + */ + bool resendPacket(Packet *packet); + + /** + * \returns number of milliseconds delay to apply to retransmitting the given packet. + */ + virtual uint32_t getRetransmitDelay(const Packet *packet) { return 0; }; + + /** + * \returns number of milliseconds delay to apply to retransmitting the given packet, for DIRECT mode. + */ + virtual uint32_t getDirectRetransmitDelay(const Packet *packet) { return 0; }; unsigned long getTotalAirTime() const { return total_air_time; } unsigned long getReceiveAirTime() const {return rx_air_time; } diff --git a/src/Mesh.cpp b/src/Mesh.cpp index 7252974a92..31b77fc058 100644 --- a/src/Mesh.cpp +++ b/src/Mesh.cpp @@ -72,7 +72,54 @@ DispatcherAction Mesh::onRecvPacket(Packet* pkt) { return ACTION_RELEASE; } - if (pkt->isRouteDirect() && pkt->getPathHashCount() > 0) { + if (pkt->isRouteDirect()) { + const bool is_next_hop = (pkt->getPathHashCount() > 0) && self_id.isHashMatch(pkt->path, pkt->getPathHashSize()); + // Check if this is a retransmit of a packet we recently sent; + // if yes, the next hop successfully forwarded it, so remove our scheduled retransmit from outbound queue. + // This runs for ANY path_len (including 0 = downstream relay forwarding to final destination), + // so that we cancel pending retries as soon as we overhear a relay forwarding our packet. + + // Only do this when the packet is NOT addressed to us as next hop to avoid dropping fresh packets. + if (!is_next_hop) { + pkt->calculatePacketHash(); + const uint8_t *recv_hash = pkt->hash; + + // First check the current outbound packet being prepared/sent + if (outbound && outbound->sending_attempts > 0 && outbound->isRouteDirect()) { + const uint8_t *outbound_hash = outbound->calculatePacketHash(); + if (memcmp(recv_hash, outbound_hash, MAX_HASH_SIZE) == 0) { + MESH_DEBUG_PRINTLN( + "%s Mesh::onRecvPacket(): downstream forwarded current outbound, canceling (attempt=%d)", + getLogDateTime(), outbound->sending_attempts); + releasePacket(outbound); + outbound = NULL; + return ACTION_RELEASE; + } + } + + if (_mgr->getOutboundTotal() > 0) { + for (int i = _mgr->getOutboundTotal() - 1; i >= 0; i--) { + Packet *queued_pkt = _mgr->getOutboundByIdx(i); + if (queued_pkt && queued_pkt->sending_attempts > 0 && queued_pkt->isRouteDirect()) { + const uint8_t *queued_hash = queued_pkt->calculatePacketHash(); + if (memcmp(recv_hash, queued_hash, MAX_HASH_SIZE) == 0) { + MESH_DEBUG_PRINTLN("%s Mesh::onRecvPacket(): downstream forwarded packet detected, canceling " + "retransmit (attempt=%d)", + getLogDateTime(), queued_pkt->sending_attempts); + Packet *removed = _mgr->removeOutboundByIdx(i); + if (removed) _mgr->free(removed); + return ACTION_RELEASE; // don't process further: confirmed successful forwarding + } + } + } + } + + } + + // For path_len=0 direct packets, no further path-based processing applies; + // fall through to general switch-case handling (e.g. ACK delivery via onAckRecv). + if (pkt->path_len < PATH_HASH_SIZE) goto direct_path_done; + // check for 'early received' ACK if (pkt->getPayloadType() == PAYLOAD_TYPE_ACK) { int i = 0; @@ -83,7 +130,7 @@ DispatcherAction Mesh::onRecvPacket(Packet* pkt) { } } - if (self_id.isHashMatch(pkt->path, pkt->getPathHashSize()) && allowPacketForward(pkt)) { + if (is_next_hop && allowPacketForward(pkt)) { if (pkt->getPayloadType() == PAYLOAD_TYPE_MULTIPART) { return forwardMultipartDirect(pkt); } else if (pkt->getPayloadType() == PAYLOAD_TYPE_ACK) { @@ -97,6 +144,8 @@ DispatcherAction Mesh::onRecvPacket(Packet* pkt) { if (!_tables->hasSeen(pkt)) { removeSelfFromPath(pkt); + MESH_DEBUG_PRINTLN("Mesh::onRecvPacket(): prepare to repeat packet %s", pkt->getHashHex()); + uint32_t d = getDirectRetransmitDelay(pkt); return ACTION_RETRANSMIT_DELAYED(0, d); // Routed traffic is HIGHEST priority } @@ -104,6 +153,7 @@ DispatcherAction Mesh::onRecvPacket(Packet* pkt) { return ACTION_RELEASE; // this node is NOT the next hop (OR this packet has already been forwarded), so discard. } +direct_path_done: if (pkt->isRouteFlood() && filterRecvFloodPacket(pkt)) return ACTION_RELEASE; DispatcherAction action = ACTION_RELEASE; @@ -275,7 +325,7 @@ DispatcherAction Mesh::onRecvPacket(Packet* pkt) { } break; } - case PAYLOAD_TYPE_RAW_CUSTOM: { + case PAYLOAD_TYPE_RAW_CUSTOM: { if (pkt->isRouteDirect() && !_tables->hasSeen(pkt)) { onRawDataRecv(pkt); //action = routeRecvPacket(pkt); don't flood route these (yet) diff --git a/src/Mesh.h b/src/Mesh.h index d53d6d25fa..a8ad93ac3a 100644 --- a/src/Mesh.h +++ b/src/Mesh.h @@ -58,12 +58,12 @@ class Mesh : public Dispatcher { /** * \returns number of milliseconds delay to apply to retransmitting the given packet. */ - virtual uint32_t getRetransmitDelay(const Packet* packet); + virtual uint32_t getRetransmitDelay(const Packet* packet) override; /** * \returns number of milliseconds delay to apply to retransmitting the given packet, for DIRECT mode. */ - virtual uint32_t getDirectRetransmitDelay(const Packet* packet); + virtual uint32_t getDirectRetransmitDelay(const Packet* packet) override; /** * \returns number of extra (Direct) ACK transmissions wanted. diff --git a/src/MeshCore.h b/src/MeshCore.h index b4c57faf32..159f5ce87a 100644 --- a/src/MeshCore.h +++ b/src/MeshCore.h @@ -21,7 +21,7 @@ #define MAX_PATH_SIZE 64 #define MAX_TRANS_UNIT 255 -#if MESH_DEBUG && ARDUINO +#if (MESH_DEBUG && ARDUINO) || defined(MESHCORE_SIMULATOR) #include #define MESH_DEBUG_PRINT(F, ...) Serial.printf("DEBUG: " F, ##__VA_ARGS__) #define MESH_DEBUG_PRINTLN(F, ...) Serial.printf("DEBUG: " F "\n", ##__VA_ARGS__) diff --git a/src/Packet.cpp b/src/Packet.cpp index aad3e2f48e..3a12f018cc 100644 --- a/src/Packet.cpp +++ b/src/Packet.cpp @@ -1,13 +1,19 @@ #include "Packet.h" +#include "Utils.h" #include #include namespace mesh { +static const uint8_t ZERO_HASH[MAX_HASH_SIZE] = { 0 }; + Packet::Packet() { header = 0; path_len = 0; payload_len = 0; + memcpy(hash, ZERO_HASH, MAX_HASH_SIZE); + memset(hash_hex, 0, sizeof(hash_hex)); + sending_attempts = 0; } bool Packet::isValidPathLen(uint8_t path_len) { @@ -38,15 +44,24 @@ int Packet::getRawLength() const { return 2 + getPathByteLen() + payload_len + (hasTransportCodes() ? 4 : 0); } -void Packet::calculatePacketHash(uint8_t* hash) const { - SHA256 sha; - uint8_t t = getPayloadType(); - sha.update(&t, 1); - if (t == PAYLOAD_TYPE_TRACE) { - sha.update(&path_len, sizeof(path_len)); // CAVEAT: TRACE packets can revisit same node on return path +uint8_t *Packet::calculatePacketHash() const { + if (memcmp(this->hash, ZERO_HASH, MAX_HASH_SIZE) == 0) { + SHA256 sha; + uint8_t t = getPayloadType(); + sha.update(&t, 1); + if (t == PAYLOAD_TYPE_TRACE) { + sha.update(&path_len, sizeof(path_len)); // CAVEAT: TRACE packets can revisit same node on return path + } + sha.update(payload, payload_len); + sha.finalize((uint8_t *)this->hash, MAX_HASH_SIZE); } - sha.update(payload, payload_len); - sha.finalize(hash, MAX_HASH_SIZE); + return (uint8_t *)this->hash; +} + +const char* Packet::getHashHex() const { + calculatePacketHash(); + Utils::toHex(hash_hex, hash, MAX_HASH_SIZE); + return hash_hex; } uint8_t Packet::writeTo(uint8_t dest[]) const { diff --git a/src/Packet.h b/src/Packet.h index 0886a06c4e..044ff59b39 100644 --- a/src/Packet.h +++ b/src/Packet.h @@ -49,12 +49,17 @@ class Packet { uint8_t path[MAX_PATH_SIZE]; uint8_t payload[MAX_PACKET_PAYLOAD]; int8_t _snr; + uint8_t hash[MAX_HASH_SIZE]; + mutable char hash_hex[MAX_HASH_SIZE * 2 + 1]; + uint8_t sending_attempts; /** * \brief calculate the hash of payload + type - * \param dest_hash destination to store the hash (must be MAX_HASH_SIZE bytes) + * \return hash pointer */ - void calculatePacketHash(uint8_t* dest_hash) const; + uint8_t *calculatePacketHash() const; + + const char* getHashHex() const; /** * \returns one of ROUTE_ values diff --git a/src/helpers/CommonCLI.cpp b/src/helpers/CommonCLI.cpp index cae8bfd8a4..83b9abe902 100644 --- a/src/helpers/CommonCLI.cpp +++ b/src/helpers/CommonCLI.cpp @@ -89,7 +89,8 @@ void CommonCLI::loadPrefsInt(FILESYSTEM* fs, const char* filename) { file.read((uint8_t *)&_prefs->adc_multiplier, sizeof(_prefs->adc_multiplier)); // 166 file.read((uint8_t *)_prefs->owner_info, sizeof(_prefs->owner_info)); // 170 file.read((uint8_t *)&_prefs->rx_boosted_gain, sizeof(_prefs->rx_boosted_gain)); // 290 - // next: 291 + file.read((uint8_t *)&_prefs->max_resend_attempts, sizeof(_prefs->max_resend_attempts)); // 291 + // next: 292 // sanitise bad pref values _prefs->rx_delay_base = constrain(_prefs->rx_delay_base, 0, 20.0f); @@ -119,6 +120,7 @@ void CommonCLI::loadPrefsInt(FILESYSTEM* fs, const char* filename) { // sanitise settings _prefs->rx_boosted_gain = constrain(_prefs->rx_boosted_gain, 0, 1); // boolean + _prefs->max_resend_attempts = constrain(_prefs->max_resend_attempts, 0, 5); file.close(); } @@ -180,7 +182,8 @@ void CommonCLI::savePrefs(FILESYSTEM* fs) { file.write((uint8_t *)&_prefs->adc_multiplier, sizeof(_prefs->adc_multiplier)); // 166 file.write((uint8_t *)_prefs->owner_info, sizeof(_prefs->owner_info)); // 170 file.write((uint8_t *)&_prefs->rx_boosted_gain, sizeof(_prefs->rx_boosted_gain)); // 290 - // next: 291 + file.write((uint8_t *)&_prefs->max_resend_attempts, sizeof(_prefs->max_resend_attempts)); // 291 + // next: 292 file.close(); } @@ -494,6 +497,15 @@ void CommonCLI::handleSetCmd(uint32_t sender_timestamp, char* command, char* rep _prefs->multi_acks = atoi(&config[11]); savePrefs(); strcpy(reply, "OK"); + } else if (memcmp(config, "max.resend ", 11) == 0) { + int v = atoi(&config[11]); + if (v < 0 || v > 5) { + strcpy(reply, "ERROR: max.resend must be 0-5"); + } else { + _prefs->max_resend_attempts = (uint8_t)v; + savePrefs(); + strcpy(reply, "OK"); + } } else if (memcmp(config, "allow.read.only ", 16) == 0) { _prefs->allow_read_only = memcmp(&config[16], "on", 2) == 0; savePrefs(); @@ -748,6 +760,8 @@ void CommonCLI::handleGetCmd(uint32_t sender_timestamp, char* command, char* rep sprintf(reply, "> %d", ((uint32_t) _prefs->agc_reset_interval) * 4); } else if (memcmp(config, "multi.acks", 10) == 0) { sprintf(reply, "> %d", (uint32_t) _prefs->multi_acks); + } else if (memcmp(config, "max.resend", 10) == 0) { + sprintf(reply, "> %d", (uint32_t) _prefs->max_resend_attempts); } else if (memcmp(config, "allow.read.only", 15) == 0) { sprintf(reply, "> %s", _prefs->allow_read_only ? "on" : "off"); } else if (memcmp(config, "flood.advert.interval", 21) == 0) { diff --git a/src/helpers/CommonCLI.h b/src/helpers/CommonCLI.h index ffdc7c6536..d88b4fc68b 100644 --- a/src/helpers/CommonCLI.h +++ b/src/helpers/CommonCLI.h @@ -61,6 +61,7 @@ struct NodePrefs { // persisted to file uint8_t rx_boosted_gain; // power settings uint8_t path_hash_mode; // which path mode to use when sending uint8_t loop_detect; + uint8_t max_resend_attempts; // 0 = disabled, 1-5, default 2 }; class CommonCLICallbacks { diff --git a/src/helpers/SimpleMeshTables.h b/src/helpers/SimpleMeshTables.h index 0b79cfb422..36716574b7 100644 --- a/src/helpers/SimpleMeshTables.h +++ b/src/helpers/SimpleMeshTables.h @@ -7,16 +7,21 @@ #endif #define MAX_PACKET_HASHES (128+32) +#define MAX_PACKET_ACKS 64 class SimpleMeshTables : public mesh::MeshTables { uint8_t _hashes[MAX_PACKET_HASHES*MAX_HASH_SIZE]; int _next_idx; + uint32_t _acks[MAX_PACKET_ACKS]; + int _next_ack_idx; uint32_t _direct_dups, _flood_dups; public: SimpleMeshTables() { memset(_hashes, 0, sizeof(_hashes)); _next_idx = 0; + memset(_acks, 0, sizeof(_acks)); + _next_ack_idx = 0; _direct_dups = _flood_dups = 0; } @@ -32,12 +37,31 @@ class SimpleMeshTables : public mesh::MeshTables { #endif bool hasSeen(const mesh::Packet* packet) override { - uint8_t hash[MAX_HASH_SIZE]; - packet->calculatePacketHash(hash); + if (packet->getPayloadType() == PAYLOAD_TYPE_ACK) { + uint32_t ack; + memcpy(&ack, packet->payload, 4); + for (int i = 0; i < MAX_PACKET_ACKS; i++) { + if (ack == _acks[i]) { + if (packet->isRouteDirect()) { + _direct_dups++; // keep some stats + } else { + _flood_dups++; + } + MESH_DEBUG_PRINTLN("SimpleMeshTables::hasSeen(): packet %s already seen", packet->getHashHex()); + return true; + } + } + + _acks[_next_ack_idx] = ack; + _next_ack_idx = (_next_ack_idx + 1) % MAX_PACKET_ACKS; // cyclic table + return false; + } + + packet->calculatePacketHash(); const uint8_t* sp = _hashes; for (int i = 0; i < MAX_PACKET_HASHES; i++, sp += MAX_HASH_SIZE) { - if (memcmp(hash, sp, MAX_HASH_SIZE) == 0) { + if (memcmp(packet->hash, sp, MAX_HASH_SIZE) == 0) { if (packet->isRouteDirect()) { _direct_dups++; // keep some stats } else { @@ -47,20 +71,30 @@ class SimpleMeshTables : public mesh::MeshTables { } } - memcpy(&_hashes[_next_idx*MAX_HASH_SIZE], hash, MAX_HASH_SIZE); + memcpy(&_hashes[_next_idx * MAX_HASH_SIZE], packet->hash, MAX_HASH_SIZE); _next_idx = (_next_idx + 1) % MAX_PACKET_HASHES; // cyclic table return false; } void clear(const mesh::Packet* packet) override { - uint8_t hash[MAX_HASH_SIZE]; - packet->calculatePacketHash(hash); + if (packet->getPayloadType() == PAYLOAD_TYPE_ACK) { + uint32_t ack; + memcpy(&ack, packet->payload, 4); + for (int i = 0; i < MAX_PACKET_ACKS; i++) { + if (ack == _acks[i]) { + _acks[i] = 0; + break; + } + } + } else { + packet->calculatePacketHash(); - uint8_t* sp = _hashes; - for (int i = 0; i < MAX_PACKET_HASHES; i++, sp += MAX_HASH_SIZE) { - if (memcmp(hash, sp, MAX_HASH_SIZE) == 0) { - memset(sp, 0, MAX_HASH_SIZE); - break; + uint8_t* sp = _hashes; + for (int i = 0; i < MAX_PACKET_HASHES; i++, sp += MAX_HASH_SIZE) { + if (memcmp(packet->hash, sp, MAX_HASH_SIZE) == 0) { + memset(sp, 0, MAX_HASH_SIZE); + break; + } } } } diff --git a/src/helpers/StaticPoolPacketManager.cpp b/src/helpers/StaticPoolPacketManager.cpp index b8926df0cc..7677ea368c 100644 --- a/src/helpers/StaticPoolPacketManager.cpp +++ b/src/helpers/StaticPoolPacketManager.cpp @@ -80,6 +80,8 @@ mesh::Packet* StaticPoolPacketManager::allocNew() { } void StaticPoolPacketManager::free(mesh::Packet* packet) { + memset(packet->hash, 0, MAX_HASH_SIZE); // clear stale cached hash so calculatePacketHash() recomputes on next use + packet->hash_hex[0] = '\0'; unused.add(packet, 0, 0); } From 5b387eed26f3d2df56eee20181879d21f9ebd06b Mon Sep 17 00:00:00 2001 From: Florian Sager Date: Tue, 2 Jun 2026 18:31:00 +0200 Subject: [PATCH 3/3] Added comments to explain changes related to the repeated-sending feature --- src/Dispatcher.cpp | 25 ++++++++++++++++++++++++- src/helpers/SimpleMeshTables.h | 15 +++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 9195fc12d6..6e7a34eb3a 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -147,7 +147,22 @@ void Dispatcher::loop() { checkRecv(); checkSend(); - // Do noise floor calibration LAST, when no critical operations are pending + // Noise floor calibration is placed here intentionally – at the very end of loop(), + // after checkRecv() and checkSend() have completed. This avoids two classes of problem: + // + // 1. Calibration disrupts radio reception: triggerNoiseFloorCalibrate() temporarily + // takes the radio out of normal RX mode. Running it at the top of loop() (the + // previous location) risked aborting an incoming packet that had not yet been read + // out by _radio->loop() / checkRecv(). + // + // 2. Interference with outbound retransmissions: the repeated-sending feature queues + // direct-routed packets for re-transmission (resendPacket()). Calibrating while + // packets are pending in the outbound queue would disrupt those time-sensitive + // transmissions. The guard '_mgr->getOutboundCount() == 0' ensures calibration + // is deferred until the queue is empty. + // + // Calibration is a low-priority background task and must only run when the radio is + // demonstrably idle. if (millisHasNowPassed(next_floor_calib_time)) { if (!_radio->isReceiving() && _mgr->getOutboundCount(_ms->getMillis()) == 0) { _radio->triggerNoiseFloorCalibrate(getInterferenceThreshold()); @@ -199,6 +214,14 @@ bool Dispatcher::tryParsePacket(Packet* pkt, const uint8_t* raw, int len) { } void Dispatcher::checkRecv() { + // Drain the entire RX buffer in one loop() pass instead of processing only a single + // packet per call. This is critical for the repeated-sending / retransmit-cancellation + // mechanism: when a downstream relay forwards our packet we must detect that forwarding + // echo (via hash comparison in onRecvPacket()) before the scheduled retransmit timer + // fires. With the old single-read design the echo might not be consumed until a later + // loop() iteration, by which time the retransmit had already been sent unnecessarily. + // Draining all pending packets here minimises that race window. + // As a secondary benefit this prevents RX-FIFO overflow under high packet load. while (true) { Packet *pkt = nullptr; diff --git a/src/helpers/SimpleMeshTables.h b/src/helpers/SimpleMeshTables.h index 36716574b7..3f60d684f9 100644 --- a/src/helpers/SimpleMeshTables.h +++ b/src/helpers/SimpleMeshTables.h @@ -37,6 +37,21 @@ class SimpleMeshTables : public mesh::MeshTables { #endif bool hasSeen(const mesh::Packet* packet) override { + // ACK packets receive dedicated deduplication for three reasons: + // + // 1. The first 4 payload bytes (ack_crc) already uniquely identify the ACK – they + // are the CRC of the original packet being acknowledged. A direct uint32_t + // comparison is therefore both correct and far cheaper than computing a full + // SHA256 via calculatePacketHash() (which the general path below would do). + // + // 2. With repeated sending (max_resend_attempts > 0) and multi-ACKs the same ACK + // can arrive multiple times. Storing each occurrence in the shared _hashes[] + // table (160 entries) would evict entries needed for long-lived flood-packet + // deduplication. The dedicated _acks[] table (MAX_PACKET_ACKS entries) keeps + // ACK and flood deduplication budgets separate. + // + // 3. clear() for ACKs is a simple 4-byte linear scan; using the general table + // would require SHA256 + a full memcmp loop over MAX_PACKET_HASHES entries. if (packet->getPayloadType() == PAYLOAD_TYPE_ACK) { uint32_t ack; memcpy(&ack, packet->payload, 4);