Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ _NCHAN_UTIL_SRCS=" \
$_nchan_util_dir/nchan_accumulator.c \
$_nchan_util_dir/nchan_timequeue.c \
$_nchan_util_dir/hdr_histogram.c \
$_nchan_util_dir/nchan_stub_status_formats.c \
"

_NCHAN_STORE_SRCS="\
Expand Down
8 changes: 5 additions & 3 deletions src/nchan_commands.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1074,11 +1074,13 @@
nchan_stub_status [:loc],
:nchan_stub_status_directive,
:loc_conf,
args: 0,
args: 0..1,

group: "meta",
tags: ['introspection'],
info: "Similar to Nginx's stub_status directive, requests to an `nchan_stub_status` location get a response with some vital Nchan statistics. This data does not account for information from other Nchan instances, and monitors only local connections, published messages, etc.",
value: ["plain", "json", "html", "prometheus"],
default: "plain",
info: "Similar to Nginx's stub_status directive, requests to an `nchan_stub_status` location get a response with some vital Nchan statistics. This data does not account for information from other Nchan instances, and monitors only local connections, published messages, etc. The optional format argument can be 'plain' (default), 'json', 'html', or 'prometheus'.",
uri: "#nchan_stub_status"

nchan_channel_event_string [:srv, :loc, :if],
Expand Down
2 changes: 1 addition & 1 deletion src/nchan_config_commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ static ngx_command_t nchan_commands[] = {
NULL } ,

{ ngx_string("nchan_stub_status"),
NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS|NGX_CONF_TAKE1,
nchan_stub_status_directive,
NGX_HTTP_LOC_CONF_OFFSET,
0,
Expand Down
70 changes: 14 additions & 56 deletions src/nchan_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//#include <store/memory/store-private.h> //for debugging
#endif
#include <util/nchan_output.h>
#include <util/nchan_stub_status_formats.h>
#include <nchan_websocket_publisher.h>

ngx_int_t nchan_worker_processes;
Expand Down Expand Up @@ -236,66 +237,23 @@ static ngx_int_t nchan_http_publisher_handler(ngx_http_request_t * r, void (*bod
}

ngx_int_t nchan_stub_status_handler(ngx_http_request_t *r) {
ngx_buf_t *b;
ngx_chain_t out;

nchan_stats_global_t global;
nchan_stats_worker_t worker;

nchan_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_nchan_module);

float shmem_used, shmem_max;

char *buf_fmt = "total published messages: %ui\n"
"stored messages: %ui\n"
"shared memory used: %fK\n"
"shared memory limit: %fK\n"
"channels: %ui\n"
"subscribers: %ui\n"
"redis pending commands: %ui\n"
"redis connected servers: %ui\n"
"redis unhealthy upstreams: %ui\n"
"total redis commands sent: %ui\n"
"total interprocess alerts received: %ui\n"
"interprocess alerts in transit: %ui\n"
"interprocess queued alerts: %ui\n"
"total interprocess send delay: %ui\n"
"total interprocess receive delay: %ui\n"
"nchan version: %s\n";

if ((b = ngx_pcalloc(r->pool, sizeof(*b) + 800)) == NULL) {
nchan_log_request_error(r, "Failed to allocate response buffer for nchan_stub_status.");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

shmem_used = (float )((float )nchan_get_used_shmem() / 1024.0);
shmem_max = (float )((float )mcf->shm_size / 1024.0);

if(nchan_stats_get_all(&worker, &global) != NGX_OK) {
nchan_log_request_error(r, "Failed to get stub status stats.");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

b->start = (u_char *)&b[1];
b->pos = b->start;

b->end = ngx_snprintf(b->start, 800, buf_fmt, global.total_published_messages, worker.messages, shmem_used, shmem_max, worker.channels, worker.subscribers, worker.redis_pending_commands, worker.redis_connected_servers, worker.redis_unhealthy_upstreams, global.total_redis_commands_sent, global.total_ipc_alerts_received, global.total_ipc_alerts_sent - global.total_ipc_alerts_received, worker.ipc_queue_size, global.total_ipc_send_delay, global.total_ipc_receive_delay, NCHAN_VERSION);
b->last = b->end;
nchan_loc_conf_t *lcf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);

b->memory = 1;
b->last_buf = 1;

r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_type.len = sizeof("text/plain") - 1;
r->headers_out.content_type.data = (u_char *) "text/plain";

r->headers_out.content_length_n = b->end - b->start;
ngx_http_send_header(r);

out.buf = b;
out.next = NULL;

return ngx_http_output_filter(r, &out);
// Dispatch to format-specific handler based on stub_status_format
switch(lcf->stub_status_format) {
case 1: // json
return nchan_stub_status_json(r, &worker, &global, mcf);
case 2: // html
return nchan_stub_status_html(r, &worker, &global, mcf);
case 3: // prometheus
return nchan_stub_status_prometheus(r, &worker, &global, mcf);
case 0: // plain (default)
default:
return nchan_stub_status_plain(r, &worker, &global, mcf);
}
}

static ngx_int_t redis_stats_callback(ngx_int_t rc, void *d, void *pd) {
Expand Down
27 changes: 26 additions & 1 deletion src/nchan_setup.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ static void *nchan_create_loc_conf(ngx_conf_t *cf) {
lcf->benchmark.subscribers_per_channel = NGX_CONF_UNSET;
lcf->benchmark.subscriber_distribution = NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_UNSET;
lcf->benchmark.publisher_distribution = NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_UNSET;

lcf->stub_status_format = NGX_CONF_UNSET;

return lcf;
}

Expand Down Expand Up @@ -614,7 +617,9 @@ static char * nchan_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) {
ngx_conf_merge_value(conf->benchmark.subscribers_per_channel, prev->benchmark.subscribers_per_channel, 100);
MERGE_UNSET_CONF(conf->benchmark.subscriber_distribution, prev->benchmark.subscriber_distribution, NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_UNSET, NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_RANDOM);
MERGE_UNSET_CONF(conf->benchmark.publisher_distribution, prev->benchmark.publisher_distribution, NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_UNSET, NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_RANDOM);


ngx_conf_merge_value(conf->stub_status_format, prev->stub_status_format, 0);

return NGX_CONF_OK;
}

Expand Down Expand Up @@ -1427,6 +1432,26 @@ static char *nchan_stub_status_directive(ngx_conf_t *cf, ngx_command_t *cmd, voi
nchan_loc_conf_t *lcf = conf;
nchan_stub_status_enabled = 1;
lcf->request_handler = &nchan_stub_status_handler;

// Parse format argument if provided
if (cf->args->nelts > 1) {
ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1];
if (ngx_strncmp(val->data, "plain", val->len) == 0) {
lcf->stub_status_format = 0;
} else if (ngx_strncmp(val->data, "json", val->len) == 0) {
lcf->stub_status_format = 1;
} else if (ngx_strncmp(val->data, "html", val->len) == 0) {
lcf->stub_status_format = 2;
} else if (ngx_strncmp(val->data, "prometheus", val->len) == 0) {
lcf->stub_status_format = 3;
} else {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0,
"invalid nchan_stub_status format: %V (valid: plain, json, html, prometheus)", val);
return NGX_CONF_ERROR;
}
} else {
lcf->stub_status_format = 0; // default: plain
}
return NGX_CONF_OK;
}

Expand Down
2 changes: 2 additions & 0 deletions src/nchan_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ struct nchan_loc_conf_s { //nchan_loc_conf_t
nchan_benchmark_conf_t benchmark;

ngx_int_t (*request_handler)(ngx_http_request_t *r);

ngx_int_t stub_status_format; // 0=plain, 1=json, 2=html, 3=prometheus
};// nchan_loc_conf_t;

typedef struct nchan_llist_timed_s {
Expand Down
Loading