From 16cc5d2a53efec2b7880039d8e189370242b13a8 Mon Sep 17 00:00:00 2001 From: lloydzhou Date: Wed, 4 Feb 2026 14:09:57 +0800 Subject: [PATCH] Add multiple output formats to nchan_stub_status directive Extend the nchan_stub_status directive to support four output formats: - plain (default): original text format - json: formatted JSON with nested structure - html: HTML table with styling - prometheus: Prometheus metrics format --- config | 1 + src/nchan_commands.rb | 8 +- src/nchan_config_commands.c | 2 +- src/nchan_module.c | 70 ++----- src/nchan_setup.c | 27 ++- src/nchan_types.h | 2 + src/util/nchan_stub_status_formats.c | 263 +++++++++++++++++++++++++++ src/util/nchan_stub_status_formats.h | 23 +++ 8 files changed, 335 insertions(+), 61 deletions(-) create mode 100644 src/util/nchan_stub_status_formats.c create mode 100644 src/util/nchan_stub_status_formats.h diff --git a/config b/config index afc42d31..caaa99aa 100644 --- a/config +++ b/config @@ -133,6 +133,7 @@ _NCHAN_UTIL_SRCS=" \ $_nchan_util_dir/nchan_accumulator.c \ $_nchan_util_dir/nchan_timequeue.c \ $_nchan_util_dir/hdr_histogram.c \ + $_nchan_util_dir/nchan_stub_status_formats.c \ " _NCHAN_STORE_SRCS="\ diff --git a/src/nchan_commands.rb b/src/nchan_commands.rb index 9ea17e5f..c656bf28 100644 --- a/src/nchan_commands.rb +++ b/src/nchan_commands.rb @@ -1074,11 +1074,13 @@ nchan_stub_status [:loc], :nchan_stub_status_directive, :loc_conf, - args: 0, - + args: 0..1, + group: "meta", tags: ['introspection'], - info: "Similar to Nginx's stub_status directive, requests to an `nchan_stub_status` location get a response with some vital Nchan statistics. This data does not account for information from other Nchan instances, and monitors only local connections, published messages, etc.", + value: ["plain", "json", "html", "prometheus"], + default: "plain", + info: "Similar to Nginx's stub_status directive, requests to an `nchan_stub_status` location get a response with some vital Nchan statistics. This data does not account for information from other Nchan instances, and monitors only local connections, published messages, etc. The optional format argument can be 'plain' (default), 'json', 'html', or 'prometheus'.", uri: "#nchan_stub_status" nchan_channel_event_string [:srv, :loc, :if], diff --git a/src/nchan_config_commands.c b/src/nchan_config_commands.c index 793c5494..733d5036 100644 --- a/src/nchan_config_commands.c +++ b/src/nchan_config_commands.c @@ -912,7 +912,7 @@ static ngx_command_t nchan_commands[] = { NULL } , { ngx_string("nchan_stub_status"), - NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS, + NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS|NGX_CONF_TAKE1, nchan_stub_status_directive, NGX_HTTP_LOC_CONF_OFFSET, 0, diff --git a/src/nchan_module.c b/src/nchan_module.c index da8de53e..6c399e15 100644 --- a/src/nchan_module.c +++ b/src/nchan_module.c @@ -31,6 +31,7 @@ //#include //for debugging #endif #include +#include #include ngx_int_t nchan_worker_processes; @@ -236,66 +237,23 @@ static ngx_int_t nchan_http_publisher_handler(ngx_http_request_t * r, void (*bod } ngx_int_t nchan_stub_status_handler(ngx_http_request_t *r) { - ngx_buf_t *b; - ngx_chain_t out; - nchan_stats_global_t global; nchan_stats_worker_t worker; - nchan_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_nchan_module); - - float shmem_used, shmem_max; - - char *buf_fmt = "total published messages: %ui\n" - "stored messages: %ui\n" - "shared memory used: %fK\n" - "shared memory limit: %fK\n" - "channels: %ui\n" - "subscribers: %ui\n" - "redis pending commands: %ui\n" - "redis connected servers: %ui\n" - "redis unhealthy upstreams: %ui\n" - "total redis commands sent: %ui\n" - "total interprocess alerts received: %ui\n" - "interprocess alerts in transit: %ui\n" - "interprocess queued alerts: %ui\n" - "total interprocess send delay: %ui\n" - "total interprocess receive delay: %ui\n" - "nchan version: %s\n"; - - if ((b = ngx_pcalloc(r->pool, sizeof(*b) + 800)) == NULL) { - nchan_log_request_error(r, "Failed to allocate response buffer for nchan_stub_status."); - return NGX_HTTP_INTERNAL_SERVER_ERROR; - } - - shmem_used = (float )((float )nchan_get_used_shmem() / 1024.0); - shmem_max = (float )((float )mcf->shm_size / 1024.0); - - if(nchan_stats_get_all(&worker, &global) != NGX_OK) { - nchan_log_request_error(r, "Failed to get stub status stats."); - return NGX_HTTP_INTERNAL_SERVER_ERROR; - } - - b->start = (u_char *)&b[1]; - b->pos = b->start; - - b->end = ngx_snprintf(b->start, 800, buf_fmt, global.total_published_messages, worker.messages, shmem_used, shmem_max, worker.channels, worker.subscribers, worker.redis_pending_commands, worker.redis_connected_servers, worker.redis_unhealthy_upstreams, global.total_redis_commands_sent, global.total_ipc_alerts_received, global.total_ipc_alerts_sent - global.total_ipc_alerts_received, worker.ipc_queue_size, global.total_ipc_send_delay, global.total_ipc_receive_delay, NCHAN_VERSION); - b->last = b->end; + nchan_loc_conf_t *lcf = ngx_http_get_module_loc_conf(r, ngx_nchan_module); - b->memory = 1; - b->last_buf = 1; - - r->headers_out.status = NGX_HTTP_OK; - r->headers_out.content_type.len = sizeof("text/plain") - 1; - r->headers_out.content_type.data = (u_char *) "text/plain"; - - r->headers_out.content_length_n = b->end - b->start; - ngx_http_send_header(r); - - out.buf = b; - out.next = NULL; - - return ngx_http_output_filter(r, &out); + // Dispatch to format-specific handler based on stub_status_format + switch(lcf->stub_status_format) { + case 1: // json + return nchan_stub_status_json(r, &worker, &global, mcf); + case 2: // html + return nchan_stub_status_html(r, &worker, &global, mcf); + case 3: // prometheus + return nchan_stub_status_prometheus(r, &worker, &global, mcf); + case 0: // plain (default) + default: + return nchan_stub_status_plain(r, &worker, &global, mcf); + } } static ngx_int_t redis_stats_callback(ngx_int_t rc, void *d, void *pd) { diff --git a/src/nchan_setup.c b/src/nchan_setup.c index 8014cec5..1ede1aad 100644 --- a/src/nchan_setup.c +++ b/src/nchan_setup.c @@ -317,6 +317,9 @@ static void *nchan_create_loc_conf(ngx_conf_t *cf) { lcf->benchmark.subscribers_per_channel = NGX_CONF_UNSET; lcf->benchmark.subscriber_distribution = NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_UNSET; lcf->benchmark.publisher_distribution = NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_UNSET; + + lcf->stub_status_format = NGX_CONF_UNSET; + return lcf; } @@ -614,7 +617,9 @@ static char * nchan_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) { ngx_conf_merge_value(conf->benchmark.subscribers_per_channel, prev->benchmark.subscribers_per_channel, 100); MERGE_UNSET_CONF(conf->benchmark.subscriber_distribution, prev->benchmark.subscriber_distribution, NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_UNSET, NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_RANDOM); MERGE_UNSET_CONF(conf->benchmark.publisher_distribution, prev->benchmark.publisher_distribution, NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_UNSET, NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_RANDOM); - + + ngx_conf_merge_value(conf->stub_status_format, prev->stub_status_format, 0); + return NGX_CONF_OK; } @@ -1427,6 +1432,26 @@ static char *nchan_stub_status_directive(ngx_conf_t *cf, ngx_command_t *cmd, voi nchan_loc_conf_t *lcf = conf; nchan_stub_status_enabled = 1; lcf->request_handler = &nchan_stub_status_handler; + + // Parse format argument if provided + if (cf->args->nelts > 1) { + ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1]; + if (ngx_strncmp(val->data, "plain", val->len) == 0) { + lcf->stub_status_format = 0; + } else if (ngx_strncmp(val->data, "json", val->len) == 0) { + lcf->stub_status_format = 1; + } else if (ngx_strncmp(val->data, "html", val->len) == 0) { + lcf->stub_status_format = 2; + } else if (ngx_strncmp(val->data, "prometheus", val->len) == 0) { + lcf->stub_status_format = 3; + } else { + ngx_conf_log_error(NGX_LOG_ERR, cf, 0, + "invalid nchan_stub_status format: %V (valid: plain, json, html, prometheus)", val); + return NGX_CONF_ERROR; + } + } else { + lcf->stub_status_format = 0; // default: plain + } return NGX_CONF_OK; } diff --git a/src/nchan_types.h b/src/nchan_types.h index 778e15ab..b1d97f18 100644 --- a/src/nchan_types.h +++ b/src/nchan_types.h @@ -437,6 +437,8 @@ struct nchan_loc_conf_s { //nchan_loc_conf_t nchan_benchmark_conf_t benchmark; ngx_int_t (*request_handler)(ngx_http_request_t *r); + + ngx_int_t stub_status_format; // 0=plain, 1=json, 2=html, 3=prometheus };// nchan_loc_conf_t; typedef struct nchan_llist_timed_s { diff --git a/src/util/nchan_stub_status_formats.c b/src/util/nchan_stub_status_formats.c new file mode 100644 index 00000000..defbbb8a --- /dev/null +++ b/src/util/nchan_stub_status_formats.c @@ -0,0 +1,263 @@ +#include "nchan_stub_status_formats.h" +#include + +// Common function to get stats data +static ngx_int_t get_stats_data(ngx_http_request_t *r, + nchan_stats_worker_t *worker, nchan_stats_global_t *global, + nchan_main_conf_t *mcf, float *shmem_used, float *shmem_max) { + + if(nchan_stats_get_all(worker, global) != NGX_OK) { + nchan_log_request_error(r, "Failed to get stub status stats."); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + *shmem_used = (float)((float)nchan_get_used_shmem() / 1024.0); + *shmem_max = (float)((float)mcf->shm_size / 1024.0); + + return NGX_OK; +} + +// Plain text format (original implementation) +ngx_int_t nchan_stub_status_plain(ngx_http_request_t *r, + nchan_stats_worker_t *worker, nchan_stats_global_t *global, + nchan_main_conf_t *mcf) { + + ngx_buf_t *b; + ngx_chain_t out; + float shmem_used, shmem_max; + + char *buf_fmt = "total published messages: %ui\n" + "stored messages: %ui\n" + "shared memory used: %fK\n" + "shared memory limit: %fK\n" + "channels: %ui\n" + "subscribers: %ui\n" + "redis pending commands: %ui\n" + "redis connected servers: %ui\n" + "redis unhealthy upstreams: %ui\n" + "total redis commands sent: %ui\n" + "total interprocess alerts received: %ui\n" + "interprocess alerts in transit: %ui\n" + "interprocess queued alerts: %ui\n" + "total interprocess send delay: %ui\n" + "total interprocess receive delay: %ui\n" + "nchan version: %s\n"; + + if(get_stats_data(r, worker, global, mcf, &shmem_used, &shmem_max) != NGX_OK) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + if ((b = ngx_pcalloc(r->pool, sizeof(*b) + 800)) == NULL) { + nchan_log_request_error(r, "Failed to allocate response buffer for nchan_stub_status."); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + b->start = (u_char *)&b[1]; + b->pos = b->start; + + b->end = ngx_snprintf(b->start, 800, buf_fmt, global->total_published_messages, worker->messages, shmem_used, shmem_max, worker->channels, worker->subscribers, worker->redis_pending_commands, worker->redis_connected_servers, worker->redis_unhealthy_upstreams, global->total_redis_commands_sent, global->total_ipc_alerts_received, global->total_ipc_alerts_sent - global->total_ipc_alerts_received, worker->ipc_queue_size, global->total_ipc_send_delay, global->total_ipc_receive_delay, NCHAN_VERSION); + b->last = b->end; + + b->memory = 1; + b->last_buf = 1; + + r->headers_out.status = NGX_HTTP_OK; + r->headers_out.content_type.len = sizeof("text/plain") - 1; + r->headers_out.content_type.data = (u_char *) "text/plain"; + + r->headers_out.content_length_n = b->end - b->start; + ngx_http_send_header(r); + + out.buf = b; + out.next = NULL; + + return ngx_http_output_filter(r, &out); +} + +// JSON format +ngx_int_t nchan_stub_status_json(ngx_http_request_t *r, + nchan_stats_worker_t *worker, nchan_stats_global_t *global, + nchan_main_conf_t *mcf) { + + ngx_buf_t *b; + ngx_chain_t out; + float shmem_used, shmem_max; + + char *buf_fmt = "{\n" + " \"total_published_messages\": %ui,\n" + " \"stored_messages\": %ui,\n" + " \"shared_memory\": {\n" + " \"used_kb\": %f,\n" + " \"limit_kb\": %f\n" + " },\n" + " \"channels\": %ui,\n" + " \"subscribers\": %ui,\n" + " \"redis\": {\n" + " \"pending_commands\": %ui,\n" + " \"connected_servers\": %ui,\n" + " \"unhealthy_upstreams\": %ui,\n" + " \"total_commands_sent\": %ui\n" + " },\n" + " \"interprocess\": {\n" + " \"alerts\": {\n" + " \"sent\": %ui,\n" + " \"received\": %ui,\n" + " \"in_transit\": %ui\n" + " },\n" + " \"queued_alerts\": %ui,\n" + " \"send_delay\": %ui,\n" + " \"receive_delay\": %ui\n" + " },\n" + " \"version\": \"%s\"\n" + "}"; + + if(get_stats_data(r, worker, global, mcf, &shmem_used, &shmem_max) != NGX_OK) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + if ((b = ngx_pcalloc(r->pool, sizeof(*b) + 800)) == NULL) { + nchan_log_request_error(r, "Failed to allocate response buffer for nchan_stub_status JSON."); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + b->start = (u_char *)&b[1]; + b->pos = b->start; + + b->end = ngx_snprintf(b->start, 800, buf_fmt, global->total_published_messages, worker->messages, shmem_used, shmem_max, worker->channels, worker->subscribers, worker->redis_pending_commands, worker->redis_connected_servers, worker->redis_unhealthy_upstreams, global->total_redis_commands_sent, global->total_ipc_alerts_sent, global->total_ipc_alerts_received, global->total_ipc_alerts_sent - global->total_ipc_alerts_received, worker->ipc_queue_size, global->total_ipc_send_delay, global->total_ipc_receive_delay, NCHAN_VERSION); + b->last = b->end; + + b->memory = 1; + b->last_buf = 1; + + r->headers_out.status = NGX_HTTP_OK; + r->headers_out.content_type.len = sizeof("application/json") - 1; + r->headers_out.content_type.data = (u_char *) "application/json"; + + r->headers_out.content_length_n = b->end - b->start; + ngx_http_send_header(r); + + out.buf = b; + out.next = NULL; + + return ngx_http_output_filter(r, &out); +} + +// HTML table format +ngx_int_t nchan_stub_status_html(ngx_http_request_t *r, + nchan_stats_worker_t *worker, nchan_stats_global_t *global, + nchan_main_conf_t *mcf) { + + ngx_buf_t *b; + ngx_chain_t out; + float shmem_used, shmem_max; + + char *buf_fmt = "Nchan Status" + "" + "

Nchan Status

" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "
MetricValue
Total Published Messages%ui
Stored Messages%ui
Shared Memory Used%fK
Shared Memory Limit%fK
Channels%ui
Subscribers%ui
Redis Pending Commands%ui
Redis Connected Servers%ui
Redis Unhealthy Upstreams%ui
Total Redis Commands Sent%ui
Interprocess Alerts Sent%ui
Interprocess Alerts Received%ui
Interprocess Alerts In Transit%ui
Interprocess Queued Alerts%ui
Total Interprocess Send Delay%ui
Total Interprocess Receive Delay%ui
Nchan Version%s
"; + + if(get_stats_data(r, worker, global, mcf, &shmem_used, &shmem_max) != NGX_OK) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + if ((b = ngx_pcalloc(r->pool, sizeof(*b) + 2048)) == NULL) { + nchan_log_request_error(r, "Failed to allocate response buffer for nchan_stub_status HTML."); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + b->start = (u_char *)&b[1]; + b->pos = b->start; + + b->end = ngx_snprintf(b->start, 2048, buf_fmt, global->total_published_messages, worker->messages, shmem_used, shmem_max, worker->channels, worker->subscribers, worker->redis_pending_commands, worker->redis_connected_servers, worker->redis_unhealthy_upstreams, global->total_redis_commands_sent, global->total_ipc_alerts_sent, global->total_ipc_alerts_received, global->total_ipc_alerts_sent - global->total_ipc_alerts_received, worker->ipc_queue_size, global->total_ipc_send_delay, global->total_ipc_receive_delay, NCHAN_VERSION); + b->last = b->end; + + b->memory = 1; + b->last_buf = 1; + + r->headers_out.status = NGX_HTTP_OK; + r->headers_out.content_type.len = sizeof("text/html") - 1; + r->headers_out.content_type.data = (u_char *) "text/html"; + + r->headers_out.content_length_n = b->end - b->start; + ngx_http_send_header(r); + + out.buf = b; + out.next = NULL; + + return ngx_http_output_filter(r, &out); +} + +// Prometheus format +ngx_int_t nchan_stub_status_prometheus(ngx_http_request_t *r, + nchan_stats_worker_t *worker, nchan_stats_global_t *global, + nchan_main_conf_t *mcf) { + + ngx_buf_t *b; + ngx_chain_t out; + float shmem_used, shmem_max; + + char *buf_fmt = "nchan_published_messages_total %ui\n" + "nchan_stored_messages %ui\n" + "nchan_shared_memory_used_kb %f\n" + "nchan_shared_memory_limit_kb %f\n" + "nchan_channels %ui\n" + "nchan_subscribers %ui\n" + "nchan_redis_pending_commands %ui\n" + "nchan_redis_connected_servers %ui\n" + "nchan_redis_unhealthy_upstreams %ui\n" + "nchan_redis_commands_sent_total %ui\n" + "nchan_ipc_alerts_sent_total %ui\n" + "nchan_ipc_alerts_received_total %ui\n" + "nchan_ipc_alerts_in_transit %ui\n" + "nchan_ipc_queued_alerts %ui\n" + "nchan_ipc_send_delay_total %ui\n" + "nchan_ipc_receive_delay_total %ui\n"; + + if(get_stats_data(r, worker, global, mcf, &shmem_used, &shmem_max) != NGX_OK) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + if ((b = ngx_pcalloc(r->pool, sizeof(*b) + 800)) == NULL) { + nchan_log_request_error(r, "Failed to allocate response buffer for nchan_stub_status Prometheus."); + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + b->start = (u_char *)&b[1]; + b->pos = b->start; + + b->end = ngx_snprintf(b->start, 800, buf_fmt, global->total_published_messages, worker->messages, shmem_used, shmem_max, worker->channels, worker->subscribers, worker->redis_pending_commands, worker->redis_connected_servers, worker->redis_unhealthy_upstreams, global->total_redis_commands_sent, global->total_ipc_alerts_sent, global->total_ipc_alerts_received, global->total_ipc_alerts_sent - global->total_ipc_alerts_received, worker->ipc_queue_size, global->total_ipc_send_delay, global->total_ipc_receive_delay); + b->last = b->end; + + b->memory = 1; + b->last_buf = 1; + + r->headers_out.status = NGX_HTTP_OK; + r->headers_out.content_type.len = sizeof("text/plain") - 1; + r->headers_out.content_type.data = (u_char *) "text/plain"; + + r->headers_out.content_length_n = b->end - b->start; + ngx_http_send_header(r); + + out.buf = b; + out.next = NULL; + + return ngx_http_output_filter(r, &out); +} diff --git a/src/util/nchan_stub_status_formats.h b/src/util/nchan_stub_status_formats.h new file mode 100644 index 00000000..79a15fc7 --- /dev/null +++ b/src/util/nchan_stub_status_formats.h @@ -0,0 +1,23 @@ +#ifndef NCHAN_STUB_STATUS_FORMATS_H +#define NCHAN_STUB_STATUS_FORMATS_H + +#include +#include + +ngx_int_t nchan_stub_status_plain(ngx_http_request_t *r, + nchan_stats_worker_t *worker, nchan_stats_global_t *global, + nchan_main_conf_t *mcf); + +ngx_int_t nchan_stub_status_json(ngx_http_request_t *r, + nchan_stats_worker_t *worker, nchan_stats_global_t *global, + nchan_main_conf_t *mcf); + +ngx_int_t nchan_stub_status_html(ngx_http_request_t *r, + nchan_stats_worker_t *worker, nchan_stats_global_t *global, + nchan_main_conf_t *mcf); + +ngx_int_t nchan_stub_status_prometheus(ngx_http_request_t *r, + nchan_stats_worker_t *worker, nchan_stats_global_t *global, + nchan_main_conf_t *mcf); + +#endif //NCHAN_STUB_STATUS_FORMATS_H