Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions daemon/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,88 @@

#include "xt_RTPENGINE.h"

static socket_t timeout_warn_sock;

bool timeout_warn_init(void) {
if (!open_v46_socket(&timeout_warn_sock, SOCK_DGRAM)) {
ilog(LOG_ERR, "Failed to open/connect timeout warning socket: %s", strerror(errno));
return false;
}
return true;
}

static GString *timeout_warn_json_print(call_t *call, struct packet_stream *ps)
{

struct call_monologue *ml = ps->media->monologue;
GList *l = rtpe_config.ng_listen_ep.head;
endpoint_t *e = l->data;

GString *buf = g_string_new("");
GString *ip_buf = g_string_new("");
GString *log_intf_buf = g_string_new("");

struct local_intf *local_intf = get_interface_address(ml->logical_intf, ml->logical_intf->preferred_family);
if(local_intf && local_intf->spec){
sockaddr_print_gstring(log_intf_buf, &(local_intf->spec->local_address.addr));
}

sockaddr_print_gstring(ip_buf, &(e->address));


g_string_append_printf(buf, "{"
"\"type\":\"%s\","
"\"rtpe_ctrl_ip\":\"%s\","
"\"callid\":\""STR_FORMAT"\","
"\"log_intf\":\"%s\","
"\"src_tag\":\""STR_FORMAT"\",",
PS_ISSET(ps, RTCP)? "RTCP":"RTP",
ip_buf->str,
STR_FMT(&call->callid),
log_intf_buf->str,
STR_FMT(&ml->tag));

g_string_append_printf(buf, "\"dst_tags\":[");

// TODO check "to tag" not repeated ?
bool first = true;
for (__auto_type l = ps->rtp_sinks.head; l; l = l->next) {
ml = l->data->sink->media->monologue;
if(first){
g_string_append_printf(buf,
"\""STR_FORMAT"\"", STR_FMT(&ml->tag));
first = false;
}else{
g_string_append_printf(buf,
",\""STR_FORMAT"\"", STR_FMT(&ml->tag));
}
}

for (__auto_type l = ps->rtcp_sinks.head; l; l = l->next) {
ml = l->data->sink->media->monologue;
if(first){
g_string_append_printf(buf,
"\"" STR_FORMAT "\"", STR_FMT(&ml->tag));
first = false;
}else{
g_string_append_printf(buf,
", \"" STR_FORMAT "\"", STR_FMT(&ml->tag));

}
}
g_string_append_printf(buf, "], ");

g_string_append_printf(buf,
"\"type\":\"timeout_warn\",\"timestamp\":%lu,\"source_ip\":\"%s\"}",
(unsigned long) rtpe_now / 1000000,
sockaddr_print_buf(&ps->endpoint.address));

g_string_free_str(log_intf_buf);
g_string_free_str(ip_buf);

return buf;
}

struct iterator_helper {
uint64_t count;
GSList *del_timeout;
Expand Down Expand Up @@ -139,6 +221,9 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
rwlock_lock_r(&c->master_lock);
log_info_call(c);

int64_t warn_timeout = atomic_get_na(&rtpe_config.warn_timeout_us);
int64_t warn_backoff = atomic_get_na(&rtpe_config.warn_backoff_us);

// final timeout applicable to all calls (own and foreign)
int64_t final_timeout = atomic_get_na(&rtpe_config.final_timeout_us);
if (final_timeout && rtpe_now >= (c->created + final_timeout)) {
Expand Down Expand Up @@ -221,6 +306,30 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
CALL_CLEAR(sfd->call, FOREIGN_MEDIA);

no_sfd:
// check for timeout warning
bool deleted_streams = ps->call->deleted_us;
bool warning_before_establsh = (!c->established)||((rtpe_now - c->established) < warn_timeout);
bool timed_out = (rtpe_now - packet_stream_last_packet(ps)) > warn_timeout;
bool backoff = (rtpe_now - ps->last_warn_time) < warn_backoff;

bool issue_warning = warn_timeout && !deleted_streams && !warning_before_establsh && timed_out && !backoff;
if(issue_warning){
GString *buf = timeout_warn_json_print(c, ps);
if (socket_sendto(&timeout_warn_sock, buf->str, buf->len, &rtpe_config.timeout_warn_ep) < 0){
ilog(LOG_ERR, "Error sending timeout warning event info to UDP destination %s: %s",
endpoint_print_buf(&rtpe_config.timeout_warn_ep),
strerror(errno));
}
g_string_free(buf, TRUE);
// TODO put last_warn_time on call?
ps->last_warn_time = rtpe_now;
for (__auto_type l = ps->rtp_sinks.head; l; l = l->next) {
l->data->sink->last_warn_time = rtpe_now;
}
for (__auto_type l = ps->rtcp_sinks.head; l; l = l->next) {
l->data->sink->last_warn_time = rtpe_now;
}
}
if (good)
continue;

Expand Down Expand Up @@ -1006,6 +1115,7 @@ struct packet_stream *__packet_stream_new(call_t *call) {
mutex_init(&stream->lock);
stream->call = call;
atomic64_set_na(&stream->last_packet_us, rtpe_now);
stream->last_warn_time = 0;
stream->rtp_stats = rtp_stats_ht_new();
recording_init_stream(stream);
stream->send_timer = send_timer_new(stream);
Expand Down Expand Up @@ -4640,6 +4750,7 @@ static call_t *call_create(const str *callid) {
call_memory_arena_set(c);
c->callid = call_str_cpy(callid);
c->created = rtpe_now;
c->established = 0;
c->dtls_cert = dtls_cert();
c->tos = rtpe_config.default_tos;
c->poller = rtpe_get_poller();
Expand Down Expand Up @@ -5425,6 +5536,11 @@ static int call_get_dialogue(struct call_monologue *monologues[2], call_t *call,
* derived from the viabranch. */
__monologue_tag(ft, fromtag);

if(!call->established){
call->established = rtpe_now;
}


dialogue_unconfirm(ft, "dialogue signalling event");
dialogue_unconfirm(tt, "dialogue signalling event");

Expand Down
22 changes: 22 additions & 0 deletions daemon/cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ static void cli_incoming_set_maxcpu(str *instr, struct cli_writer *cw, const cli
static void cli_incoming_set_maxload(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_set_maxbw(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_set_timeout(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_set_warntimeout(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_set_warnbackoff(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_set_silenttimeout(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_set_offertimeout(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_set_finaltimeout(str *instr, struct cli_writer *cw, const cli_handler_t *);
Expand Down Expand Up @@ -94,6 +96,8 @@ static void cli_incoming_list_totals(str *instr, struct cli_writer *cw, const cl
static void cli_incoming_list_counters(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_list_sessions(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_list_timeout(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_list_warntimeout(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_list_warnbackoff(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_list_silenttimeout(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_list_offertimeout(str *instr, struct cli_writer *cw, const cli_handler_t *);
static void cli_incoming_list_finaltimeout(str *instr, struct cli_writer *cw, const cli_handler_t *);
Expand Down Expand Up @@ -158,6 +162,8 @@ HANDLER_START(cli_set_handlers)
HANDLER_CMD("maxload", cli_incoming_set_maxload, "<float>", "set maxmimum load average allowed")
HANDLER_CMD("maxbw", cli_incoming_set_maxbw, "<uint>", "set maxmimum bandwidth usage average allowed")
HANDLER_CMD("timeout", cli_incoming_set_timeout, "<uint>", "set the --timeout parameter")
HANDLER_CMD("warntimeout", cli_incoming_set_warntimeout, "<uint>", "set the --warn-timeout parameter")
HANDLER_CMD("warnbackoff", cli_incoming_set_warnbackoff, "<uint>", "set the --warn-backoff parameter")
HANDLER_CMD("silenttimeout", cli_incoming_set_silenttimeout, "<uint>", "set the --silent-timeout parameter")
HANDLER_CMD("offertimeout", cli_incoming_set_offertimeout, "<uint>", "set the --offer-timeout parameter")
HANDLER_CMD("finaltimeout", cli_incoming_set_finaltimeout, "<uint>", "set the --final-timeout parameter")
Expand All @@ -183,6 +189,8 @@ HANDLER_START(cli_list_handlers)
HANDLER_CMD("maxbw", cli_incoming_list_maxbw , NULL, "print maxmimum bandwidth usage average allowed")
HANDLER_CMD("timeout", cli_incoming_list_timeout, NULL, "print timeout parameter")
HANDLER_CMD("silenttimeout", cli_incoming_list_silenttimeout, NULL, "print silent-timeout parameter")
HANDLER_CMD("warntimeout", cli_incoming_list_warntimeout, NULL, "print warn-timeout parameter")
HANDLER_CMD("warnbackoff", cli_incoming_list_warnbackoff, NULL, "print warn-backoff parameter")
HANDLER_CMD("offertimeout", cli_incoming_list_offertimeout, NULL, "print offer-timeout parameter")
HANDLER_CMD("finaltimeout", cli_incoming_list_finaltimeout, NULL, "print final-timeout parameter")
HANDLER_CMD("loglevels", cli_incoming_list_loglevels, NULL, "list available log levels")
Expand Down Expand Up @@ -515,6 +523,8 @@ RTPE_CONFIG_ENDPOINT_QUEUE_PARAMS
X(load_limit, "max-load") \
X(bw_limit, "max-bw") \
X(timeout_us, "timeout") \
X(warn_timeout_us, "warn-timeout") \
X(warn_backoff_us, "warn-backoff") \
X(silent_timeout_us, "silent-timeout") \
X(final_timeout_us, "final-timeout") \
X(control_tos, "control-tos") \
Expand Down Expand Up @@ -653,6 +663,12 @@ static void cli_incoming_list_maxopenfiles(str *instr, struct cli_writer *cw, co
static void cli_incoming_list_timeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) {
cw->cw_printf(cw, "TIMEOUT=%" PRId64 "\n", rtpe_config.timeout_us / 1000000L);
}
static void cli_incoming_list_warntimeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) {
cw->cw_printf(cw, "WARN_TIMEOUT=%" PRId64 "\n", rtpe_config.warn_timeout_us / 1000000L);
}
static void cli_incoming_list_warnbackoff(str *instr, struct cli_writer *cw, const cli_handler_t *handler) {
cw->cw_printf(cw, "WARN_BACKOFF=%" PRId64 "\n", rtpe_config.warn_backoff_us / 1000000L);
}
static void cli_incoming_list_silenttimeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) {
cw->cw_printf(cw, "SILENT_TIMEOUT=%" PRId64 "\n", rtpe_config.silent_timeout_us / 1000000L);
}
Expand Down Expand Up @@ -1058,6 +1074,12 @@ static void cli_incoming_set_gentimeout_us(str *instr, struct cli_writer *cw, in
static void cli_incoming_set_timeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) {
cli_incoming_set_gentimeout_us(instr, cw, &rtpe_config.timeout_us);
}
static void cli_incoming_set_warntimeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) {
cli_incoming_set_gentimeout_us(instr, cw, &rtpe_config.warn_timeout_us);
}
static void cli_incoming_set_warnbackoff(str *instr, struct cli_writer *cw, const cli_handler_t *handler) {
cli_incoming_set_gentimeout_us(instr, cw, &rtpe_config.warn_backoff_us);
}
static void cli_incoming_set_silenttimeout(str *instr, struct cli_writer *cw, const cli_handler_t *handler) {
cli_incoming_set_gentimeout_us(instr, cw, &rtpe_config.silent_timeout_us);
}
Expand Down
21 changes: 21 additions & 0 deletions daemon/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ static void options(int *argc, char ***argv, charp_ht templates) {
bool codecs = false;
double max_load = 0;
double max_cpu = 0;
g_autoptr(char) timeout_warn_ep = NULL;
g_autoptr(char) dtmf_udp_ep = NULL;
g_autoptr(char) endpoint_learning = NULL;
g_autoptr(char) dtls_sig = NULL;
Expand Down Expand Up @@ -677,6 +678,8 @@ static void options(int *argc, char ***argv, charp_ht templates) {
g_autoptr(char) transcode_config = NULL;
int silent_timeout = 0;
int timeout = 0;
int warn_timeout = 0;
int warn_backoff = 0;
int final_timeout = 0;
int offer_timeout = 0;
int delete_delay = 30;
Expand Down Expand Up @@ -719,6 +722,8 @@ static void options(int *argc, char ***argv, charp_ht templates) {
{ "control-tos",0 , 0, G_OPTION_ARG_INT, &rtpe_config.control_tos, "Default TOS value to set on control-ng", "INT" },
{ "control-pmtu", 0,0, G_OPTION_ARG_STRING, &control_pmtu, "Path MTU discovery behaviour on UDP control sockets", "want|dont" },
{ "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" },
{ "warn-timeout", 0, 0, G_OPTION_ARG_INT, &warn_timeout, "RTP warning timeout", "SECS" },
{ "warn-backoff", 0, 0, G_OPTION_ARG_INT, &warn_backoff, " Backoff time between consecutive RTP timeout warnings for a packet stream", "SECS" },
{ "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout, "RTP timeout for muted", "SECS" },
{ "final-timeout",'a',0,G_OPTION_ARG_INT, &final_timeout, "Call timeout", "SECS" },
{ "offer-timeout",0,0, G_OPTION_ARG_INT, &offer_timeout, "Timeout for incomplete one-sided calls", "SECS" },
Expand Down Expand Up @@ -747,6 +752,7 @@ static void options(int *argc, char ***argv, charp_ht templates) {
{ "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &rtpe_config.b2b_url, "XMLRPC URL of B2B UA" , "STRING" },
{ "log-facility-cdr",0, 0, G_OPTION_ARG_STRING, &log_facility_cdr_s, "Syslog facility to use for logging CDRs", "daemon|local0|...|local7"},
{ "log-facility-rtcp",0, 0, G_OPTION_ARG_STRING, &log_facility_rtcp_s, "Syslog facility to use for logging RTCP", "daemon|local0|...|local7"},
{ "timeout-warn-dest", 0,0, G_OPTION_ARG_STRING, &timeout_warn_ep, "Destination address for RTP timeout warning via UDP", "IP46|HOSTNAME:PORT" },
#ifdef WITH_TRANSCODING
{ "log-facility-dtmf",0, 0, G_OPTION_ARG_STRING, &log_facility_dtmf_s, "Syslog facility to use for logging DTMF", "daemon|local0|...|local7"},
{ "dtmf-log-dest", 0,0, G_OPTION_ARG_STRING, &dtmf_udp_ep, "Destination address for DTMF logging via UDP", "IP46|HOSTNAME:PORT" },
Expand Down Expand Up @@ -1103,6 +1109,14 @@ static void options(int *argc, char ***argv, charp_ht templates) {
if (rtpe_config.timeout_us <= 0)
rtpe_config.timeout_us = 60 * 1000000LL;

rtpe_config.warn_timeout_us = warn_timeout * 1000000LL;
if (rtpe_config.warn_timeout_us < 0)
rtpe_config.warn_timeout_us = 0;

rtpe_config.warn_backoff_us = warn_backoff * 1000000LL;
if (rtpe_config.warn_backoff_us < 0)
rtpe_config.warn_backoff_us = 5 * 1000000LL;

rtpe_config.silent_timeout_us = silent_timeout * 1000000LL;
if (rtpe_config.silent_timeout_us <= 0)
rtpe_config.silent_timeout_us = 3600 * 1000000LL;
Expand Down Expand Up @@ -1201,6 +1215,11 @@ static void options(int *argc, char ***argv, charp_ht templates) {
if (debug_srtp)
rtpe_config.common.log_levels[log_level_index_srtp] = LOG_DEBUG;

if (timeout_warn_ep) {
if (!endpoint_parse_any_getaddrinfo_full(&rtpe_config.timeout_warn_ep, timeout_warn_ep))
die("Invalid IP or port '%s' (--timeout-warn-dest)", timeout_warn_ep);
}

if (dtmf_udp_ep) {
if (!endpoint_parse_any_getaddrinfo_full(&rtpe_config.dtmf_udp_ep, dtmf_udp_ep))
die("Invalid IP or port '%s' (--dtmf-log-dest)", dtmf_udp_ep);
Expand Down Expand Up @@ -1606,6 +1625,8 @@ static void init_everything(charp_ht templates) {
#endif
codeclib_init(0);
media_player_init();
if (!timeout_warn_init())
die("timeout_warn init failed, see log");
if (!dtmf_init())
die("DTMF init failed, see log");
jitter_buffer_init();
Expand Down
18 changes: 18 additions & 0 deletions docs/rtpengine.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,24 @@ at the command line. See the __\-\-config-file__ option below for details.
is removed from __rtpengine__'s internal state table.
Defaults to 60 seconds.

- __\-\-timeout-warn-dest=__*IP46*:*PORT*

Configures a target address for logging detected RTP timeout warning.
This sends the JSON payload to the
given address as UDP packets.

- __\-\-warn-timeout=__*SECS*

Takes the number of seconds as argument after which a warning will be issued
for a packet stream if no media traffic has been received.
Defaults is disabled(0).

- __\-\-warn-backoff=__*SECS*

Takes the number of seconds as backoff time between
consecutive RTP timeout warnings for a packet stream.
Defaults is 5 seconds.

- __-s__, __\-\-silent-timeout=__*SECS*

Ditto as the __\-\-timeout__ option, but applies to muted or inactive media
Expand Down
4 changes: 4 additions & 0 deletions include/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ struct packet_stream {
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
int64_t kernel_time_us;
int64_t last_warn_time;

struct stream_stats *stats_in;
struct stream_stats *stats_out;
Expand Down Expand Up @@ -780,6 +781,7 @@ struct call {
str callid;
str_q callid_aliases;
int64_t created;
int64_t established;
int64_t destroyed;
int64_t last_signal_us;
int64_t deleted_us;
Expand Down Expand Up @@ -910,6 +912,8 @@ bool __init_stream(struct packet_stream *ps);

const rtp_payload_type *__rtp_stats_codec(struct call_media *m);

bool timeout_warn_init(void);

#include "str.h"
#include "rtp.h"

Expand Down
3 changes: 3 additions & 0 deletions include/main.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ enum endpoint_learning {

#define RTPE_CONFIG_INT64_PARAMS \
X(bw_limit) \
X(warn_timeout_us) \
X(warn_backoff_us) \
X(silent_timeout_us) \
X(timeout_us) \
X(final_timeout_us) \
Expand Down Expand Up @@ -175,6 +177,7 @@ enum endpoint_learning {
X(redis_write_ep) \
X(redis_subscribe_ep) \
X(homer_ep) \
X(timeout_warn_ep) \
X(dtmf_udp_ep)

#define RTPE_CONFIG_ENDPOINT_QUEUE_PARAMS \
Expand Down