diff --git a/daemon/call.c b/daemon/call.c index 78fd2498f0..457b799a60 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -41,6 +41,88 @@ #include "xt_RTPENGINE.h" +static socket_t timeout_warn_sock; + +bool timeout_warn_init(void) { + if (!open_v46_socket(&timeout_warn_sock, SOCK_DGRAM)) { + ilog(LOG_ERR, "Failed to open/connect timeout warning socket: %s", strerror(errno)); + return false; + } + return true; +} + +static GString *timeout_warn_json_print(call_t *call, struct packet_stream *ps) +{ + + struct call_monologue *ml = ps->media->monologue; + GList *l = rtpe_config.ng_listen_ep.head; + endpoint_t *e = l->data; + + GString *buf = g_string_new(""); + GString *ip_buf = g_string_new(""); + GString *log_intf_buf = g_string_new(""); + + struct local_intf *local_intf = get_interface_address(ml->logical_intf, ml->logical_intf->preferred_family); + if(local_intf && local_intf->spec){ + sockaddr_print_gstring(log_intf_buf, &(local_intf->spec->local_address.addr)); + } + + sockaddr_print_gstring(ip_buf, &(e->address)); + + + g_string_append_printf(buf, "{" + "\"type\":\"%s\"," + "\"rtpe_ctrl_ip\":\"%s\"," + "\"callid\":\""STR_FORMAT"\"," + "\"log_intf\":\"%s\"," + "\"src_tag\":\""STR_FORMAT"\",", + PS_ISSET(ps, RTCP)? "RTCP":"RTP", + ip_buf->str, + STR_FMT(&call->callid), + log_intf_buf->str, + STR_FMT(&ml->tag)); + + g_string_append_printf(buf, "\"dst_tags\":["); + + // TODO check "to tag" not repeated ? + bool first = true; + for (__auto_type l = ps->rtp_sinks.head; l; l = l->next) { + ml = l->data->sink->media->monologue; + if(first){ + g_string_append_printf(buf, + "\""STR_FORMAT"\"", STR_FMT(&ml->tag)); + first = false; + }else{ + g_string_append_printf(buf, + ",\""STR_FORMAT"\"", STR_FMT(&ml->tag)); + } + } + + for (__auto_type l = ps->rtcp_sinks.head; l; l = l->next) { + ml = l->data->sink->media->monologue; + if(first){ + g_string_append_printf(buf, + "\"" STR_FORMAT "\"", STR_FMT(&ml->tag)); + first = false; + }else{ + g_string_append_printf(buf, + ", \"" STR_FORMAT "\"", STR_FMT(&ml->tag)); + + } + } + g_string_append_printf(buf, "], "); + + g_string_append_printf(buf, + "\"type\":\"timeout_warn\",\"timestamp\":%lu,\"source_ip\":\"%s\"}", + (unsigned long) rtpe_now / 1000000, + sockaddr_print_buf(&ps->endpoint.address)); + + g_string_free_str(log_intf_buf); + g_string_free_str(ip_buf); + + return buf; +} + struct iterator_helper { uint64_t count; GSList *del_timeout; @@ -139,6 +221,9 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { rwlock_lock_r(&c->master_lock); log_info_call(c); + int64_t warn_timeout = atomic_get_na(&rtpe_config.warn_timeout_us); + int64_t warn_backoff = atomic_get_na(&rtpe_config.warn_backoff_us); + // final timeout applicable to all calls (own and foreign) int64_t final_timeout = atomic_get_na(&rtpe_config.final_timeout_us); if (final_timeout && rtpe_now >= (c->created + final_timeout)) { @@ -221,6 +306,30 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { CALL_CLEAR(sfd->call, FOREIGN_MEDIA); no_sfd: + // check for timeout warning + bool deleted_streams = ps->call->deleted_us; + bool warning_before_establsh = (!c->established)||((rtpe_now - c->established) < warn_timeout); + bool timed_out = (rtpe_now - packet_stream_last_packet(ps)) > warn_timeout; + bool backoff = (rtpe_now - ps->last_warn_time) < warn_backoff; + + bool issue_warning = warn_timeout && !deleted_streams && !warning_before_establsh && timed_out && !backoff; + if(issue_warning){ + GString *buf = timeout_warn_json_print(c, ps); + if (socket_sendto(&timeout_warn_sock, buf->str, buf->len, &rtpe_config.timeout_warn_ep) < 0){ + ilog(LOG_ERR, "Error sending timeout warning event info to UDP destination %s: %s", + endpoint_print_buf(&rtpe_config.timeout_warn_ep), + strerror(errno)); + } + g_string_free(buf, TRUE); + // TODO put last_warn_time on call? + ps->last_warn_time = rtpe_now; + for (__auto_type l = ps->rtp_sinks.head; l; l = l->next) { + l->data->sink->last_warn_time = rtpe_now; + } + for (__auto_type l = ps->rtcp_sinks.head; l; l = l->next) { + l->data->sink->last_warn_time = rtpe_now; + } + } if (good) continue; @@ -1006,6 +1115,7 @@ struct packet_stream *__packet_stream_new(call_t *call) { mutex_init(&stream->lock); stream->call = call; atomic64_set_na(&stream->last_packet_us, rtpe_now); + stream->last_warn_time = 0; stream->rtp_stats = rtp_stats_ht_new(); recording_init_stream(stream); stream->send_timer = send_timer_new(stream); @@ -4640,6 +4750,7 @@ static call_t *call_create(const str *callid) { call_memory_arena_set(c); c->callid = call_str_cpy(callid); c->created = rtpe_now; + c->established = 0; c->dtls_cert = dtls_cert(); c->tos = rtpe_config.default_tos; c->poller = rtpe_get_poller(); @@ -5425,6 +5536,11 @@ static int call_get_dialogue(struct call_monologue *monologues[2], call_t *call, * derived from the viabranch. */ __monologue_tag(ft, fromtag); + if(!call->established){ + call->established = rtpe_now; + } + + dialogue_unconfirm(ft, "dialogue signalling event"); dialogue_unconfirm(tt, "dialogue signalling event"); diff --git a/daemon/cli.c b/daemon/cli.c index 73fec4b650..f09f9ab021 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -67,6 +67,8 @@ static void cli_incoming_set_maxcpu(str *instr, struct cli_writer *cw, const cli static void cli_incoming_set_maxload(str *instr, struct cli_writer *cw, const cli_handler_t *); static void cli_incoming_set_maxbw(str *instr, struct cli_writer *cw, const cli_handler_t *); static void cli_incoming_set_timeout(str *instr, struct cli_writer *cw, const cli_handler_t *); +static void cli_incoming_set_warntimeout(str *instr, struct cli_writer *cw, const cli_handler_t *); +static void cli_incoming_set_warnbackoff(str *instr, struct cli_writer *cw, const cli_handler_t *); static void cli_incoming_set_silenttimeout(str *instr, struct cli_writer *cw, const cli_handler_t *); static void cli_incoming_set_offertimeout(str *instr, struct cli_writer *cw, const cli_handler_t *); static void cli_incoming_set_finaltimeout(str *instr, struct cli_writer *cw, const cli_handler_t *); @@ -94,6 +96,8 @@ static void cli_incoming_list_totals(str *instr, struct cli_writer *cw, const cl static void cli_incoming_list_counters(str *instr, struct cli_writer *cw, const cli_handler_t *); static void cli_incoming_list_sessions(str *instr, struct cli_writer *cw, const cli_handler_t *); static void cli_incoming_list_timeout(str *instr, struct cli_writer *cw, const cli_handler_t *); +static void cli_incoming_list_warntimeout(str *instr, struct cli_writer *cw, const cli_handler_t *); +static void cli_incoming_list_warnbackoff(str *instr, struct cli_writer *cw, const cli_handler_t *); static void cli_incoming_list_silenttimeout(str *instr, struct cli_writer *cw, const cli_handler_t *); static void cli_incoming_list_offertimeout(str *instr, struct cli_writer *cw, const cli_handler_t *); static void cli_incoming_list_finaltimeout(str *instr, struct cli_writer *cw, const cli_handler_t *); @@ -158,6 +162,8 @@ HANDLER_START(cli_set_handlers) HANDLER_CMD("maxload", cli_incoming_set_maxload, "", "set maxmimum load average allowed") HANDLER_CMD("maxbw", cli_incoming_set_maxbw, "", "set maxmimum bandwidth usage average allowed") HANDLER_CMD("timeout", cli_incoming_set_timeout, "", "set the --timeout parameter") + HANDLER_CMD("warntimeout", cli_incoming_set_warntimeout, "", "set the --warn-timeout parameter") + HANDLER_CMD("warnbackoff", cli_incoming_set_warnbackoff, "", "set the --warn-backoff parameter") HANDLER_CMD("silenttimeout", cli_incoming_set_silenttimeout, "", "set the --silent-timeout parameter") HANDLER_CMD("offertimeout", cli_incoming_set_offertimeout, "", "set the --offer-timeout parameter") HANDLER_CMD("finaltimeout", cli_incoming_set_finaltimeout, "", "set the --final-timeout parameter") @@ -183,6 +189,8 @@ HANDLER_START(cli_list_handlers) HANDLER_CMD("maxbw", cli_incoming_list_maxbw , NULL, "print maxmimum bandwidth usage average allowed") HANDLER_CMD("timeout", cli_incoming_list_timeout, NULL, "print timeout parameter") HANDLER_CMD("silenttimeout", cli_incoming_list_silenttimeout, NULL, "print silent-timeout parameter") + HANDLER_CMD("warntimeout", cli_incoming_list_warntimeout, NULL, "print warn-timeout parameter") + HANDLER_CMD("warnbackoff", cli_incoming_list_warnbackoff, NULL, "print warn-backoff parameter") HANDLER_CMD("offertimeout", cli_incoming_list_offertimeout, NULL, "print offer-timeout parameter") HANDLER_CMD("finaltimeout", cli_incoming_list_finaltimeout, NULL, "print final-timeout parameter") HANDLER_CMD("loglevels", cli_incoming_list_loglevels, NULL, "list available log levels") @@ -515,6 +523,8 @@ RTPE_CONFIG_ENDPOINT_QUEUE_PARAMS X(load_limit, "max-load") \ X(bw_limit, "max-bw") \ X(timeout_us, "timeout") \ + X(warn_timeout_us, "warn-timeout") \ + X(warn_backoff_us, "warn-backoff") \ X(silent_timeout_us, "silent-timeout") \ X(final_timeout_us, "final-timeout") \ X(control_tos, "control-tos") \ @@ -653,6 +663,12 @@ static void cli_incoming_list_maxopenfiles(str *instr, struct cli_writer *cw, co static void cli_incoming_list_timeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) { cw->cw_printf(cw, "TIMEOUT=%" PRId64 "\n", rtpe_config.timeout_us / 1000000L); } +static void cli_incoming_list_warntimeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) { + cw->cw_printf(cw, "WARN_TIMEOUT=%" PRId64 "\n", rtpe_config.warn_timeout_us / 1000000L); +} +static void cli_incoming_list_warnbackoff(str *instr, struct cli_writer *cw, const cli_handler_t *handler) { + cw->cw_printf(cw, "WARN_BACKOFF=%" PRId64 "\n", rtpe_config.warn_backoff_us / 1000000L); +} static void cli_incoming_list_silenttimeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) { cw->cw_printf(cw, "SILENT_TIMEOUT=%" PRId64 "\n", rtpe_config.silent_timeout_us / 1000000L); } @@ -1058,6 +1074,12 @@ static void cli_incoming_set_gentimeout_us(str *instr, struct cli_writer *cw, in static void cli_incoming_set_timeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) { cli_incoming_set_gentimeout_us(instr, cw, &rtpe_config.timeout_us); } +static void cli_incoming_set_warntimeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) { + cli_incoming_set_gentimeout_us(instr, cw, &rtpe_config.warn_timeout_us); +} +static void cli_incoming_set_warnbackoff(str *instr, struct cli_writer *cw, const cli_handler_t *handler) { + cli_incoming_set_gentimeout_us(instr, cw, &rtpe_config.warn_backoff_us); +} static void cli_incoming_set_silenttimeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) { cli_incoming_set_gentimeout_us(instr, cw, &rtpe_config.silent_timeout_us); } diff --git a/daemon/main.c b/daemon/main.c index 8d198cfded..1e8e8d2d61 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -649,6 +649,7 @@ static void options(int *argc, char ***argv, charp_ht templates) { bool codecs = false; double max_load = 0; double max_cpu = 0; + g_autoptr(char) timeout_warn_ep = NULL; g_autoptr(char) dtmf_udp_ep = NULL; g_autoptr(char) endpoint_learning = NULL; g_autoptr(char) dtls_sig = NULL; @@ -677,6 +678,8 @@ static void options(int *argc, char ***argv, charp_ht templates) { g_autoptr(char) transcode_config = NULL; int silent_timeout = 0; int timeout = 0; + int warn_timeout = 0; + int warn_backoff = 0; int final_timeout = 0; int offer_timeout = 0; int delete_delay = 30; @@ -719,6 +722,8 @@ static void options(int *argc, char ***argv, charp_ht templates) { { "control-tos",0 , 0, G_OPTION_ARG_INT, &rtpe_config.control_tos, "Default TOS value to set on control-ng", "INT" }, { "control-pmtu", 0,0, G_OPTION_ARG_STRING, &control_pmtu, "Path MTU discovery behaviour on UDP control sockets", "want|dont" }, { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, + { "warn-timeout", 0, 0, G_OPTION_ARG_INT, &warn_timeout, "RTP warning timeout", "SECS" }, + { "warn-backoff", 0, 0, G_OPTION_ARG_INT, &warn_backoff, " Backoff time between consecutive RTP timeout warnings for a packet stream", "SECS" }, { "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout, "RTP timeout for muted", "SECS" }, { "final-timeout",'a',0,G_OPTION_ARG_INT, &final_timeout, "Call timeout", "SECS" }, { "offer-timeout",0,0, G_OPTION_ARG_INT, &offer_timeout, "Timeout for incomplete one-sided calls", "SECS" }, @@ -747,6 +752,7 @@ static void options(int *argc, char ***argv, charp_ht templates) { { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &rtpe_config.b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, { "log-facility-cdr",0, 0, G_OPTION_ARG_STRING, &log_facility_cdr_s, "Syslog facility to use for logging CDRs", "daemon|local0|...|local7"}, { "log-facility-rtcp",0, 0, G_OPTION_ARG_STRING, &log_facility_rtcp_s, "Syslog facility to use for logging RTCP", "daemon|local0|...|local7"}, + { "timeout-warn-dest", 0,0, G_OPTION_ARG_STRING, &timeout_warn_ep, "Destination address for RTP timeout warning via UDP", "IP46|HOSTNAME:PORT" }, #ifdef WITH_TRANSCODING { "log-facility-dtmf",0, 0, G_OPTION_ARG_STRING, &log_facility_dtmf_s, "Syslog facility to use for logging DTMF", "daemon|local0|...|local7"}, { "dtmf-log-dest", 0,0, G_OPTION_ARG_STRING, &dtmf_udp_ep, "Destination address for DTMF logging via UDP", "IP46|HOSTNAME:PORT" }, @@ -1103,6 +1109,14 @@ static void options(int *argc, char ***argv, charp_ht templates) { if (rtpe_config.timeout_us <= 0) rtpe_config.timeout_us = 60 * 1000000LL; + rtpe_config.warn_timeout_us = warn_timeout * 1000000LL; + if (rtpe_config.warn_timeout_us < 0) + rtpe_config.warn_timeout_us = 0; + + rtpe_config.warn_backoff_us = warn_backoff * 1000000LL; + if (rtpe_config.warn_backoff_us < 0) + rtpe_config.warn_backoff_us = 5 * 1000000LL; + rtpe_config.silent_timeout_us = silent_timeout * 1000000LL; if (rtpe_config.silent_timeout_us <= 0) rtpe_config.silent_timeout_us = 3600 * 1000000LL; @@ -1201,6 +1215,11 @@ static void options(int *argc, char ***argv, charp_ht templates) { if (debug_srtp) rtpe_config.common.log_levels[log_level_index_srtp] = LOG_DEBUG; + if (timeout_warn_ep) { + if (!endpoint_parse_any_getaddrinfo_full(&rtpe_config.timeout_warn_ep, timeout_warn_ep)) + die("Invalid IP or port '%s' (--timeout-warn-dest)", timeout_warn_ep); + } + if (dtmf_udp_ep) { if (!endpoint_parse_any_getaddrinfo_full(&rtpe_config.dtmf_udp_ep, dtmf_udp_ep)) die("Invalid IP or port '%s' (--dtmf-log-dest)", dtmf_udp_ep); @@ -1606,6 +1625,8 @@ static void init_everything(charp_ht templates) { #endif codeclib_init(0); media_player_init(); + if (!timeout_warn_init()) + die("timeout_warn init failed, see log"); if (!dtmf_init()) die("DTMF init failed, see log"); jitter_buffer_init(); diff --git a/docs/rtpengine.md b/docs/rtpengine.md index 0c0741a757..32f4bdbc5d 100644 --- a/docs/rtpengine.md +++ b/docs/rtpengine.md @@ -272,6 +272,24 @@ at the command line. See the __\-\-config-file__ option below for details. is removed from __rtpengine__'s internal state table. Defaults to 60 seconds. +- __\-\-timeout-warn-dest=__*IP46*:*PORT* + + Configures a target address for logging detected RTP timeout warning. + This sends the JSON payload to the + given address as UDP packets. + +- __\-\-warn-timeout=__*SECS* + + Takes the number of seconds as argument after which a warning will be issued + for a packet stream if no media traffic has been received. + Defaults is disabled(0). + +- __\-\-warn-backoff=__*SECS* + + Takes the number of seconds as backoff time between + consecutive RTP timeout warnings for a packet stream. + Defaults is 5 seconds. + - __-s__, __\-\-silent-timeout=__*SECS* Ditto as the __\-\-timeout__ option, but applies to muted or inactive media diff --git a/include/call.h b/include/call.h index 666cecf1ce..7b3b3d0341 100644 --- a/include/call.h +++ b/include/call.h @@ -438,6 +438,7 @@ struct packet_stream { struct send_timer *send_timer; /* RO */ struct jitter_buffer *jb; /* RO */ int64_t kernel_time_us; + int64_t last_warn_time; struct stream_stats *stats_in; struct stream_stats *stats_out; @@ -780,6 +781,7 @@ struct call { str callid; str_q callid_aliases; int64_t created; + int64_t established; int64_t destroyed; int64_t last_signal_us; int64_t deleted_us; @@ -910,6 +912,8 @@ bool __init_stream(struct packet_stream *ps); const rtp_payload_type *__rtp_stats_codec(struct call_media *m); +bool timeout_warn_init(void); + #include "str.h" #include "rtp.h" diff --git a/include/main.h b/include/main.h index ac4a825b30..fdc8880d6b 100644 --- a/include/main.h +++ b/include/main.h @@ -93,6 +93,8 @@ enum endpoint_learning { #define RTPE_CONFIG_INT64_PARAMS \ X(bw_limit) \ + X(warn_timeout_us) \ + X(warn_backoff_us) \ X(silent_timeout_us) \ X(timeout_us) \ X(final_timeout_us) \ @@ -175,6 +177,7 @@ enum endpoint_learning { X(redis_write_ep) \ X(redis_subscribe_ep) \ X(homer_ep) \ + X(timeout_warn_ep) \ X(dtmf_udp_ep) #define RTPE_CONFIG_ENDPOINT_QUEUE_PARAMS \