diff --git a/plugins/mqtt/azure-iot.json b/plugins/mqtt/azure-iot.json index 14fd6ecd2..cdc373e73 100644 --- a/plugins/mqtt/azure-iot.json +++ b/plugins/mqtt/azure-iot.json @@ -52,6 +52,65 @@ ] } }, + "offline-cache": { + "name": "Offline Data Caching", + "name_zh": "离线缓存", + "description": "Offline caching switch. Cache MQTT messages when offline, and sync cached messages when back online.", + "description_zh": "离线缓存开关。连接断开时缓存 MQTT 消息,连接重建时同步缓存的 MQTT 消息到服务器。", + "attribute": "optional", + "type": "bool", + "default": false, + "valid": {} + }, + "cache-mem-size": { + "name": "Cache Memory Size (MB)", + "name_zh": "缓存内存大小(MB)", + "description": "Max in-memory cache size in megabytes when MQTT connection exception occurs. Should be smaller than cache disk size.", + "description_zh": "当 MQTT 连接异常时,最大的内存缓存大小(单位:MB)。应该小于缓存磁盘大小。", + "type": "int", + "attribute": "required", + "condition": { + "field": "offline-cache", + "value": true + }, + "valid": { + "min": 1, + "max": 1024 + } + }, + "cache-disk-size": { + "name": "Cache Disk Size (MB)", + "name_zh": "缓存磁盘大小(MB)", + "description": "Max in-disk cache size in megabytes when MQTT connection exception occurs. Should be larger than cache memory size. If nonzero, cache memory size should also be nonzero.", + "description_zh": "当 MQTT 连接异常时,最大的磁盘缓存大小(单位:MB)。应该大于缓存内存大小。如果不为 0,缓存内存大小也应该不为 0。", + "type": "int", + "attribute": "required", + "condition": { + "field": "offline-cache", + "value": true + }, + "valid": { + "min": 1, + "max": 10240 + } + }, + "cache-sync-interval": { + "name": "Cache Sync Interval (MS)", + "name_zh": "缓存消息重传间隔(MS)", + "description": "Interval in milliseconds at which cached messages are synced to the broker once the connection is restored.", + "description_zh": "连接恢复后缓存消息向服务端重传的时间间隔(单位:毫秒)。", + "type": "int", + "attribute": "required", + "condition": { + "field": "offline-cache", + "value": true + }, + "default": 100, + "valid": { + "min": 10, + "max": 120000 + } + }, "host": { "name": "IoT Hub Hostname", "name_zh": "IoT 中心域名", diff --git a/plugins/mqtt/azure_iot_plugin.c b/plugins/mqtt/azure_iot_plugin.c index 0c7c5a43f..014f43a83 100644 --- a/plugins/mqtt/azure_iot_plugin.c +++ b/plugins/mqtt/azure_iot_plugin.c @@ -28,6 +28,81 @@ #define AUTH_SAS 0 #define AUTH_CERT 1 +#define MB 1000000 + +static int parse_cache_params(neu_plugin_t *plugin, const char *setting, + mqtt_config_t *config) +{ + int ret = 0; + char * err_param = NULL; + neu_json_elem_t offline_cache = { .name = "offline-cache", + .t = NEU_JSON_BOOL }; + neu_json_elem_t cache_mem_size = { .name = "cache-mem-size", + .t = NEU_JSON_INT }; + neu_json_elem_t cache_disk_size = { .name = "cache-disk-size", + .t = NEU_JSON_INT }; + neu_json_elem_t cache_sync_interval = { .name = "cache-sync-interval", + .t = NEU_JSON_INT }; + + // offline-cache flag, optional (backward compatible: missing -> disabled) + ret = neu_parse_param(setting, NULL, 1, &offline_cache); + if (0 != ret || !offline_cache.v.val_bool) { + config->cache = false; + config->cache_mem_size = 0; + config->cache_disk_size = 0; + config->cache_sync_interval = NEU_MQTT_CACHE_SYNC_INTERVAL_DEFAULT; + return 0; + } + + // cache enabled, mem and disk size are required + ret = neu_parse_param(setting, &err_param, 2, &cache_mem_size, + &cache_disk_size); + if (0 != ret) { + plog_error(plugin, "parsing cache size fail, key: `%s`", err_param); + free(err_param); + return -1; + } + + if (cache_mem_size.v.val_int < 1 || cache_mem_size.v.val_int > 1024) { + plog_error(plugin, "setting invalid cache memory size: %" PRIi64, + cache_mem_size.v.val_int); + return -1; + } + + if (cache_disk_size.v.val_int < 1 || cache_disk_size.v.val_int > 10240) { + plog_error(plugin, "setting invalid cache disk size: %" PRIi64, + cache_disk_size.v.val_int); + return -1; + } + + if (cache_mem_size.v.val_int > cache_disk_size.v.val_int) { + plog_error(plugin, + "setting cache memory size %" PRIi64 + " larger than cache disk size %" PRIi64, + cache_mem_size.v.val_int, cache_disk_size.v.val_int); + return -1; + } + + // cache-sync-interval, optional + ret = neu_parse_param(setting, NULL, 1, &cache_sync_interval); + if (0 == ret) { + if (cache_sync_interval.v.val_int < NEU_MQTT_CACHE_SYNC_INTERVAL_MIN || + NEU_MQTT_CACHE_SYNC_INTERVAL_MAX < cache_sync_interval.v.val_int) { + plog_error(plugin, "setting invalid cache sync interval: %" PRIi64, + cache_sync_interval.v.val_int); + return -1; + } + } else { + cache_sync_interval.v.val_int = NEU_MQTT_CACHE_SYNC_INTERVAL_DEFAULT; + } + + config->cache = true; + config->cache_mem_size = cache_mem_size.v.val_int * MB; + config->cache_disk_size = cache_disk_size.v.val_int * MB; + config->cache_sync_interval = cache_sync_interval.v.val_int; + return 0; +} + static int parse_ssl_params(neu_plugin_t *plugin, const char *setting, neu_json_elem_t *ca, neu_json_elem_t *cert, neu_json_elem_t *key) @@ -145,6 +220,11 @@ static int azure_parse_config(neu_plugin_t *plugin, const char *setting, } } + // offline cache + if (0 != parse_cache_params(plugin, setting, config)) { + goto error; + } + // write request topic if (0 > neu_asprintf(&write_req_topic, "devices/%s/messages/devicebound/#", client_id.v.val_str)) { @@ -165,26 +245,22 @@ static int azure_parse_config(neu_plugin_t *plugin, const char *setting, goto error; } - config->version = NEU_MQTT_VERSION_V311; - config->client_id = client_id.v.val_str; - config->qos = qos.v.val_int; - config->format = format.v.val_int; - config->write_req_topic = write_req_topic; - config->write_resp_topic = write_resp_topic; - config->cache = false; - config->cache_mem_size = 0; - config->cache_disk_size = 0; - config->cache_sync_interval = NEU_MQTT_CACHE_SYNC_INTERVAL_DEFAULT; - config->host = host.v.val_str; - config->port = 8883; - config->username = username; - config->password = password.v.val_str; - config->ssl = true; - config->ca = ca.v.val_str; - config->cert = cert.v.val_str; - config->key = key.v.val_str; - config->upload_drv_state = false; - config->enable_topic = true; + config->version = NEU_MQTT_VERSION_V311; + config->client_id = client_id.v.val_str; + config->qos = qos.v.val_int; + config->format = format.v.val_int; + config->write_req_topic = write_req_topic; + config->write_resp_topic = write_resp_topic; + config->host = host.v.val_str; + config->port = 8883; + config->username = username; + config->password = password.v.val_str; + config->ssl = true; + config->ca = ca.v.val_str; + config->cert = cert.v.val_str; + config->key = key.v.val_str; + config->upload_drv_state = false; + config->enable_topic = true; plog_notice(plugin, "config MQTT version : %d", config->version); plog_notice(plugin, "config client-id : %s", config->client_id);