Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions plugins/mqtt/azure-iot.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,65 @@
]
}
},
"offline-cache": {
"name": "Offline Data Caching",
"name_zh": "离线缓存",
"description": "Offline caching switch. Cache MQTT messages when offline, and sync cached messages when back online.",
"description_zh": "离线缓存开关。连接断开时缓存 MQTT 消息,连接重建时同步缓存的 MQTT 消息到服务器。",
"attribute": "optional",
"type": "bool",
"default": false,
"valid": {}
},
"cache-mem-size": {
"name": "Cache Memory Size (MB)",
"name_zh": "缓存内存大小(MB)",
"description": "Max in-memory cache size in megabytes when MQTT connection exception occurs. Should be smaller than cache disk size.",
"description_zh": "当 MQTT 连接异常时,最大的内存缓存大小(单位:MB)。应该小于缓存磁盘大小。",
"type": "int",
"attribute": "required",
"condition": {
"field": "offline-cache",
"value": true
},
"valid": {
"min": 1,
"max": 1024
}
},
"cache-disk-size": {
"name": "Cache Disk Size (MB)",
"name_zh": "缓存磁盘大小(MB)",
"description": "Max in-disk cache size in megabytes when MQTT connection exception occurs. Should be larger than cache memory size. If nonzero, cache memory size should also be nonzero.",
"description_zh": "当 MQTT 连接异常时,最大的磁盘缓存大小(单位:MB)。应该大于缓存内存大小。如果不为 0,缓存内存大小也应该不为 0。",
"type": "int",
"attribute": "required",
"condition": {
"field": "offline-cache",
"value": true
},
"valid": {
"min": 1,
"max": 10240
}
},
"cache-sync-interval": {
"name": "Cache Sync Interval (MS)",
"name_zh": "缓存消息重传间隔(MS)",
"description": "Interval in milliseconds at which cached messages are synced to the broker once the connection is restored.",
"description_zh": "连接恢复后缓存消息向服务端重传的时间间隔(单位:毫秒)。",
"type": "int",
"attribute": "required",
"condition": {
"field": "offline-cache",
"value": true
},
"default": 100,
"valid": {
"min": 10,
"max": 120000
}
},
"host": {
"name": "IoT Hub Hostname",
"name_zh": "IoT 中心域名",
Expand Down
116 changes: 96 additions & 20 deletions plugins/mqtt/azure_iot_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,81 @@
#define AUTH_SAS 0
#define AUTH_CERT 1

#define MB 1000000

static int parse_cache_params(neu_plugin_t *plugin, const char *setting,
mqtt_config_t *config)
{
int ret = 0;
char * err_param = NULL;
neu_json_elem_t offline_cache = { .name = "offline-cache",
.t = NEU_JSON_BOOL };
neu_json_elem_t cache_mem_size = { .name = "cache-mem-size",
.t = NEU_JSON_INT };
neu_json_elem_t cache_disk_size = { .name = "cache-disk-size",
.t = NEU_JSON_INT };
neu_json_elem_t cache_sync_interval = { .name = "cache-sync-interval",
.t = NEU_JSON_INT };

// offline-cache flag, optional (backward compatible: missing -> disabled)
ret = neu_parse_param(setting, NULL, 1, &offline_cache);
if (0 != ret || !offline_cache.v.val_bool) {
config->cache = false;
config->cache_mem_size = 0;
config->cache_disk_size = 0;
config->cache_sync_interval = NEU_MQTT_CACHE_SYNC_INTERVAL_DEFAULT;
return 0;
}

// cache enabled, mem and disk size are required
ret = neu_parse_param(setting, &err_param, 2, &cache_mem_size,
&cache_disk_size);
if (0 != ret) {
plog_error(plugin, "parsing cache size fail, key: `%s`", err_param);
free(err_param);
return -1;
}

if (cache_mem_size.v.val_int < 1 || cache_mem_size.v.val_int > 1024) {
plog_error(plugin, "setting invalid cache memory size: %" PRIi64,
cache_mem_size.v.val_int);
return -1;
}

if (cache_disk_size.v.val_int < 1 || cache_disk_size.v.val_int > 10240) {
plog_error(plugin, "setting invalid cache disk size: %" PRIi64,
cache_disk_size.v.val_int);
return -1;
}

if (cache_mem_size.v.val_int > cache_disk_size.v.val_int) {
plog_error(plugin,
"setting cache memory size %" PRIi64
" larger than cache disk size %" PRIi64,
cache_mem_size.v.val_int, cache_disk_size.v.val_int);
return -1;
}

// cache-sync-interval, optional
ret = neu_parse_param(setting, NULL, 1, &cache_sync_interval);
if (0 == ret) {
if (cache_sync_interval.v.val_int < NEU_MQTT_CACHE_SYNC_INTERVAL_MIN ||
NEU_MQTT_CACHE_SYNC_INTERVAL_MAX < cache_sync_interval.v.val_int) {
plog_error(plugin, "setting invalid cache sync interval: %" PRIi64,
cache_sync_interval.v.val_int);
return -1;
}
} else {
cache_sync_interval.v.val_int = NEU_MQTT_CACHE_SYNC_INTERVAL_DEFAULT;
}

config->cache = true;
config->cache_mem_size = cache_mem_size.v.val_int * MB;
config->cache_disk_size = cache_disk_size.v.val_int * MB;
config->cache_sync_interval = cache_sync_interval.v.val_int;
return 0;
}

static int parse_ssl_params(neu_plugin_t *plugin, const char *setting,
neu_json_elem_t *ca, neu_json_elem_t *cert,
neu_json_elem_t *key)
Expand Down Expand Up @@ -145,6 +220,11 @@ static int azure_parse_config(neu_plugin_t *plugin, const char *setting,
}
}

// offline cache
if (0 != parse_cache_params(plugin, setting, config)) {
goto error;
}

// write request topic
if (0 > neu_asprintf(&write_req_topic, "devices/%s/messages/devicebound/#",
client_id.v.val_str)) {
Expand All @@ -165,26 +245,22 @@ static int azure_parse_config(neu_plugin_t *plugin, const char *setting,
goto error;
}

config->version = NEU_MQTT_VERSION_V311;
config->client_id = client_id.v.val_str;
config->qos = qos.v.val_int;
config->format = format.v.val_int;
config->write_req_topic = write_req_topic;
config->write_resp_topic = write_resp_topic;
config->cache = false;
config->cache_mem_size = 0;
config->cache_disk_size = 0;
config->cache_sync_interval = NEU_MQTT_CACHE_SYNC_INTERVAL_DEFAULT;
config->host = host.v.val_str;
config->port = 8883;
config->username = username;
config->password = password.v.val_str;
config->ssl = true;
config->ca = ca.v.val_str;
config->cert = cert.v.val_str;
config->key = key.v.val_str;
config->upload_drv_state = false;
config->enable_topic = true;
config->version = NEU_MQTT_VERSION_V311;
config->client_id = client_id.v.val_str;
config->qos = qos.v.val_int;
config->format = format.v.val_int;
config->write_req_topic = write_req_topic;
config->write_resp_topic = write_resp_topic;
config->host = host.v.val_str;
config->port = 8883;
config->username = username;
config->password = password.v.val_str;
config->ssl = true;
config->ca = ca.v.val_str;
config->cert = cert.v.val_str;
config->key = key.v.val_str;
config->upload_drv_state = false;
config->enable_topic = true;

plog_notice(plugin, "config MQTT version : %d", config->version);
plog_notice(plugin, "config client-id : %s", config->client_id);
Expand Down
Loading