diff --git a/include/fluent-bit/flb_io.h b/include/fluent-bit/flb_io.h index 9a6afd1e0cd..45f14f5cfb4 100644 --- a/include/fluent-bit/flb_io.h +++ b/include/fluent-bit/flb_io.h @@ -35,9 +35,11 @@ #define FLB_IO_OPT_TLS 4 /* use TCP and optional TLS */ #define FLB_IO_ASYNC 8 /* use async mode (depends on event loop) */ #define FLB_IO_TCP_KA 16 /* use async mode (depends on event loop) */ +#define FLB_IO_UDP 32 /* use plain UDP */ +#define FLB_IO_DTLS 64 /* use DTLS over UDP */ /* Other features */ -#define FLB_IO_IPV6 32 /* network I/O uses IPv6 */ +#define FLB_IO_IPV6 128 /* network I/O uses IPv6 */ struct flb_connection; diff --git a/include/fluent-bit/flb_stream.h b/include/fluent-bit/flb_stream.h index d1b858a9090..ab55ea70f7f 100644 --- a/include/fluent-bit/flb_stream.h +++ b/include/fluent-bit/flb_stream.h @@ -136,7 +136,8 @@ static inline int flb_stream_is_keepalive(struct flb_stream *stream) static inline int flb_stream_is_secure(struct flb_stream *stream) { - return flb_stream_get_flag_status(stream, FLB_IO_TLS); + return flb_stream_get_flag_status(stream, FLB_IO_TLS) || + flb_stream_get_flag_status(stream, FLB_IO_DTLS); } static inline int flb_stream_is_thread_safe(struct flb_stream *stream) @@ -205,4 +206,4 @@ static inline int flb_stream_release_lock(struct flb_stream *stream) return result; } -#endif \ No newline at end of file +#endif diff --git a/include/fluent-bit/flb_upstream.h b/include/fluent-bit/flb_upstream.h index 544f97fee90..44d585c5bd9 100644 --- a/include/fluent-bit/flb_upstream.h +++ b/include/fluent-bit/flb_upstream.h @@ -41,6 +41,8 @@ * --- flb_io.h --- * #define FLB_IO_TCP 1 * #define FLB_IO_TLS 2 + * #define FLB_IO_UDP 32 + * #define FLB_IO_DTLS 64 * #define FLB_IO_ASYNC 8 * #define FLB_IO_TCP_KA 16 * --- diff --git a/include/fluent-bit/tls/flb_tls.h b/include/fluent-bit/tls/flb_tls.h index 018231218e2..bea34d65e1a 100644 --- a/include/fluent-bit/tls/flb_tls.h +++ b/include/fluent-bit/tls/flb_tls.h @@ -46,6 +46,8 @@ #define FLB_TLS_CLIENT_MODE 0 #define FLB_TLS_SERVER_MODE 1 +#define FLB_TLS_CLIENT_MODE_DGRAM 2 +#define FLB_TLS_SERVER_MODE_DGRAM 3 struct flb_tls; struct flb_connection; diff --git a/plugins/out_syslog/syslog.c b/plugins/out_syslog/syslog.c index b191fe778b9..c59d061ed33 100644 --- a/plugins/out_syslog/syslog.c +++ b/plugins/out_syslog/syslog.c @@ -25,6 +25,7 @@ #include #include #include +#include #include "syslog_conf.h" @@ -101,6 +102,90 @@ static struct { { NULL, 0,-1 }, }; +#ifdef FLB_HAVE_TLS +static int syslog_configure_tls_options(struct flb_output_instance *ins) +{ + int ret; + + if (ins->tls_verify_hostname == FLB_TRUE) { + ret = flb_tls_set_verify_hostname(ins->tls, ins->tls_verify_hostname); + if (ret == -1) { + return -1; + } + } + + if (ins->tls_min_version != NULL || ins->tls_max_version != NULL) { + ret = flb_tls_set_minmax_proto(ins->tls, + ins->tls_min_version, + ins->tls_max_version); + if (ret != 0) { + return -1; + } + } + + if (ins->tls_ciphers != NULL) { + ret = flb_tls_set_ciphers(ins->tls, ins->tls_ciphers); + if (ret != 0) { + return -1; + } + } + +#if defined(FLB_SYSTEM_WINDOWS) + if (ins->tls_win_use_enterprise_certstore) { + ret = flb_tls_set_use_enterprise_store(ins->tls, + ins->tls_win_use_enterprise_certstore); + if (ret == -1) { + return -1; + } + } + + if (ins->tls_win_thumbprints) { + ret = flb_tls_set_client_thumbprints(ins->tls, ins->tls_win_thumbprints); + if (ret == -1) { + return -1; + } + } + + if (ins->tls_win_certstore_name) { + ret = flb_tls_set_certstore_name(ins->tls, ins->tls_win_certstore_name); + if (ret == -1) { + return -1; + } + + ret = flb_tls_load_system_certificates(ins->tls); + if (ret == -1) { + return -1; + } + } +#endif + + return 0; +} + +static int syslog_configure_dtls_context(struct flb_output_instance *ins) +{ + if (ins->tls != NULL) { + flb_tls_destroy(ins->tls); + ins->tls = NULL; + } + + ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE_DGRAM, + ins->tls_verify, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (ins->tls == NULL) { + return -1; + } + + return syslog_configure_tls_options(ins); +} +#endif + /* '"', '\' ']' */ static char rfc5424_sp_value[256] = { 0, 0, 0 , 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 , 0 , 0, 0, @@ -894,6 +979,7 @@ static void cb_syslog_flush(struct flb_event_chunk *event_chunk, static int cb_syslog_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { + int ret; int io_flags; struct flb_syslog *ctx = NULL; @@ -926,9 +1012,30 @@ static int cb_syslog_init(struct flb_output_instance *ins, struct flb_config *co } } else { +#ifdef FLB_HAVE_TLS + if (ctx->parsed_mode == FLB_SYSLOG_DTLS) { + ret = syslog_configure_dtls_context(ins); + if (ret != 0) { + flb_plg_error(ins, "could not initialize DTLS context"); + flb_syslog_config_destroy(ctx); + return -1; + } + } +#else + if (ctx->parsed_mode == FLB_SYSLOG_DTLS) { + flb_plg_error(ins, "could not initialize DTLS context"); + flb_syslog_config_destroy(ctx); + return -1; + } +#endif - /* use TLS ? */ - if (ins->use_tls == FLB_TRUE) { + if (ctx->parsed_mode == FLB_SYSLOG_UDP) { + io_flags = FLB_IO_UDP; + } + else if (ctx->parsed_mode == FLB_SYSLOG_DTLS) { + io_flags = FLB_IO_DTLS; + } + else if (ins->use_tls == FLB_TRUE) { io_flags = FLB_IO_TLS; } else { @@ -940,7 +1047,7 @@ static int cb_syslog_init(struct flb_output_instance *ins, struct flb_config *co } ctx->u = flb_upstream_create(config, ins->host.name, ins->host.port, - io_flags, ins->tls); + io_flags, ins->tls); if (!(ctx->u)) { flb_syslog_config_destroy(ctx); return -1; @@ -948,7 +1055,7 @@ static int cb_syslog_init(struct flb_output_instance *ins, struct flb_config *co flb_output_upstream_set(ctx->u, ins); } - /* Set the plugin context */ + /* Set the plugin context for all modes, including UDP. */ flb_output_set_context(ins, ctx); flb_plg_info(ctx->ins, "setup done for %s:%i (TLS=%s)", @@ -1046,9 +1153,8 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "mode", "udp", 0, FLB_TRUE, offsetof(struct flb_syslog, mode), - "Set the desired transport type, the available options are tcp and udp. If you need to " - "use a TLS secure channel, choose 'tcp' mode here and enable the 'tls' option separately. " - "DTLS over udp is not supported by this plugin." + "Set the desired transport type, the available options are udp, tcp, tls and dtls. " + "Use tls=on together with mode=dtls." }, { diff --git a/plugins/out_syslog/syslog_conf.c b/plugins/out_syslog/syslog_conf.c index 5523c1f0dca..04ab53583f1 100644 --- a/plugins/out_syslog/syslog_conf.c +++ b/plugins/out_syslog/syslog_conf.c @@ -87,9 +87,6 @@ struct flb_syslog *flb_syslog_config_create(struct flb_output_instance *ins, return NULL; } - /* Set context */ - flb_output_set_context(ins, ctx); - /* Config Mode */ tmp = flb_output_get_property("mode", ins); if (tmp) { @@ -102,6 +99,9 @@ struct flb_syslog *flb_syslog_config_create(struct flb_output_instance *ins, else if (!strcasecmp(tmp, "udp")) { ctx->parsed_mode = FLB_SYSLOG_UDP; } + else if (!strcasecmp(tmp, "dtls")) { + ctx->parsed_mode = FLB_SYSLOG_DTLS; + } else { flb_plg_error(ctx->ins, "unknown syslog mode %s", tmp); flb_syslog_config_destroy(ctx); @@ -111,8 +111,15 @@ struct flb_syslog *flb_syslog_config_create(struct flb_output_instance *ins, if (ctx->parsed_mode == FLB_SYSLOG_UDP && ins->use_tls == FLB_TRUE) { flb_plg_error(ctx->ins, - "invalid configuration: mode=udp with tls=on is unsupported " - "(DTLS is not implemented)"); + "invalid configuration: mode=udp with tls=on is unsupported; " + "use mode=dtls for secure datagram transport"); + flb_syslog_config_destroy(ctx); + return NULL; + } + + if (ctx->parsed_mode == FLB_SYSLOG_DTLS && ins->use_tls == FLB_FALSE) { + flb_plg_error(ctx->ins, + "invalid configuration: mode=dtls requires tls=on"); flb_syslog_config_destroy(ctx); return NULL; } @@ -161,6 +168,9 @@ struct flb_syslog *flb_syslog_config_create(struct flb_output_instance *ins, } } + /* Set context after validation succeeds */ + flb_output_set_context(ins, ctx); + return ctx; } diff --git a/plugins/out_syslog/syslog_conf.h b/plugins/out_syslog/syslog_conf.h index 0e8d3911d96..80c8d2ea383 100644 --- a/plugins/out_syslog/syslog_conf.h +++ b/plugins/out_syslog/syslog_conf.h @@ -28,6 +28,7 @@ #define FLB_SYSLOG_UDP 0 #define FLB_SYSLOG_TCP 1 #define FLB_SYSLOG_TLS 2 +#define FLB_SYSLOG_DTLS 3 #define FLB_SYSLOG_RFC3164 0 #define FLB_SYSLOG_RFC5424 1 diff --git a/src/flb_connection.c b/src/flb_connection.c index a3ed402651b..e28a5686b47 100644 --- a/src/flb_connection.c +++ b/src/flb_connection.c @@ -97,11 +97,20 @@ static void compose_user_friendly_remote_host(struct flb_connection *connection) connection->remote_port); } else if (connection_type == FLB_TRANSPORT_UDP) { - snprintf(connection->user_friendly_remote_host, - sizeof(connection->user_friendly_remote_host), - "udp://%s:%u", - connection->remote_host, - connection->remote_port); + if (flb_stream_get_flag_status(connection->stream, FLB_IO_DTLS)) { + snprintf(connection->user_friendly_remote_host, + sizeof(connection->user_friendly_remote_host), + "dtls://%s:%u", + connection->remote_host, + connection->remote_port); + } + else { + snprintf(connection->user_friendly_remote_host, + sizeof(connection->user_friendly_remote_host), + "udp://%s:%u", + connection->remote_host, + connection->remote_port); + } } else if (connection_type == FLB_TRANSPORT_UNIX_STREAM) { snprintf(connection->user_friendly_remote_host, @@ -254,4 +263,4 @@ void flb_connection_unset_io_timeout(struct flb_connection *connection) assert(connection != NULL); connection->ts_io_timeout = -1; -} \ No newline at end of file +} diff --git a/src/flb_io.c b/src/flb_io.c index 38fbb3eed4b..be5b282374f 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -107,6 +107,7 @@ int flb_io_net_connect(struct flb_connection *connection, struct flb_coro *coro) { int ret; + int socket_ret; int async = FLB_FALSE; flb_sockfd_t fd = -1; int flags = flb_connection_get_flags(connection); @@ -125,17 +126,49 @@ int flb_io_net_connect(struct flb_connection *connection, async = FLB_FALSE; } - /* Perform TCP connection */ - fd = flb_net_tcp_connect(connection->upstream->tcp_host, - connection->upstream->tcp_port, - connection->stream->net.source_address, - connection->stream->net.connect_timeout, - async, coro, connection); + if (connection->stream->transport == FLB_TRANSPORT_UDP) { + fd = flb_net_udp_connect(connection->upstream->tcp_host, + connection->upstream->tcp_port, + connection->stream->net.source_address); + + if (fd >= 0) { + if (async == FLB_TRUE) { + socket_ret = flb_net_socket_nonblocking(fd); + if (socket_ret == -1) { + flb_socket_close(fd); + connection->fd = -1; + connection->event.fd = -1; + return -1; + } + } + + socket_ret = flb_net_socket_set_rcvtimeout(fd, connection->net->io_timeout); + if (socket_ret == -1) { + flb_socket_close(fd); + connection->fd = -1; + connection->event.fd = -1; + return -1; + } + + connection->fd = fd; + connection->event.fd = fd; + } + } + else { + /* Perform TCP connection */ + fd = flb_net_tcp_connect(connection->upstream->tcp_host, + connection->upstream->tcp_port, + connection->stream->net.source_address, + connection->stream->net.connect_timeout, + async, coro, connection); + } + if (fd == -1) { return -1; } - if (connection->upstream->proxied_host) { + if (connection->stream->transport == FLB_TRANSPORT_TCP && + connection->upstream->proxied_host) { ret = flb_http_client_proxy_connect(connection); if (ret == -1) { @@ -156,7 +189,8 @@ int flb_io_net_connect(struct flb_connection *connection, } /* set TCP keepalive and it's options */ - if (connection->net->tcp_keepalive) { + if (connection->stream->transport == FLB_TRANSPORT_TCP && + connection->net->tcp_keepalive) { ret = flb_net_socket_tcp_keepalive(connection->fd, connection->net); @@ -694,7 +728,7 @@ int flb_io_net_write(struct flb_connection *connection, const void *data, } } #ifdef FLB_HAVE_TLS - else if (flags & FLB_IO_TLS) { + else if (flags & (FLB_IO_TLS | FLB_IO_DTLS)) { if (flags & FLB_IO_ASYNC) { ret = flb_tls_net_write_async(coro, connection->tls_session, data, len, out_len); } @@ -702,6 +736,10 @@ int flb_io_net_write(struct flb_connection *connection, const void *data, ret = flb_tls_net_write(connection->tls_session, data, len, out_len); } } + else { + flb_error("[io] TLS session set on connection #%i but transport flags are invalid (%i)", + connection->fd, flags); + } #endif if (ret > 0) { @@ -742,7 +780,7 @@ ssize_t flb_io_net_read(struct flb_connection *connection, void *buf, size_t len } } #ifdef FLB_HAVE_TLS - else if (flags & FLB_IO_TLS) { + else if (flags & (FLB_IO_TLS | FLB_IO_DTLS)) { if (flags & FLB_IO_ASYNC) { ret = flb_tls_net_read_async(coro, connection->tls_session, buf, len); } @@ -750,6 +788,10 @@ ssize_t flb_io_net_read(struct flb_connection *connection, void *buf, size_t len ret = flb_tls_net_read(connection->tls_session, buf, len); } } + else { + flb_error("[io] TLS session set on connection #%i but transport flags are invalid (%i)", + connection->fd, flags); + } #endif if (ret > 0) { diff --git a/src/flb_upstream.c b/src/flb_upstream.c index 89a0fae281f..62decd65ec1 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -292,6 +292,7 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config, struct flb_tls *tls) { int ret; + int transport; char *proxy_protocol = NULL; char *proxy_host = NULL; char *proxy_port = NULL; @@ -307,16 +308,22 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config, u->base.dynamically_allocated = FLB_TRUE; + transport = FLB_TRANSPORT_TCP; + if ((flags & FLB_IO_UDP) || (flags & FLB_IO_DTLS)) { + transport = FLB_TRANSPORT_UDP; + } + flb_stream_setup(&u->base, FLB_UPSTREAM, - FLB_TRANSPORT_TCP, + transport, flags, tls, config, NULL); /* Set upstream to the http_proxy if it is specified. */ - if (flb_upstream_needs_proxy(host, config->http_proxy, config->no_proxy) == FLB_TRUE) { + if (transport == FLB_TRANSPORT_TCP && + flb_upstream_needs_proxy(host, config->http_proxy, config->no_proxy) == FLB_TRUE) { flb_debug("[upstream] config->http_proxy: %s", config->http_proxy); ret = flb_utils_proxy_url_split(config->http_proxy, &proxy_protocol, &proxy_username, &proxy_password, diff --git a/src/tls/openssl.c b/src/tls/openssl.c index cb8c2749561..0e939172fa2 100644 --- a/src/tls/openssl.c +++ b/src/tls/openssl.c @@ -820,7 +820,22 @@ static void *tls_context_create(int verify, * * https://www.openssl.org/docs/man1.0.2/man3/SSLv23_method.html */ - if (mode == FLB_TLS_SERVER_MODE) { + if (mode == FLB_TLS_SERVER_MODE_DGRAM || + mode == FLB_TLS_CLIENT_MODE_DGRAM) { +#ifndef OPENSSL_NO_DTLS + if (mode == FLB_TLS_SERVER_MODE_DGRAM) { + ssl_ctx = SSL_CTX_new(DTLSv1_server_method()); + } + else { + ssl_ctx = SSL_CTX_new(DTLSv1_client_method()); + } +#else + flb_error("[openssl] DTLS mode requested but this OpenSSL build " + "does not provide DTLS support"); + return NULL; +#endif + } + else if (mode == FLB_TLS_SERVER_MODE) { ssl_ctx = SSL_CTX_new(SSLv23_server_method()); } else { @@ -831,6 +846,12 @@ static void *tls_context_create(int verify, if (mode == FLB_TLS_SERVER_MODE) { ssl_ctx = SSL_CTX_new(TLS_server_method()); } + else if (mode == FLB_TLS_SERVER_MODE_DGRAM) { + ssl_ctx = SSL_CTX_new(DTLS_server_method()); + } + else if (mode == FLB_TLS_CLIENT_MODE_DGRAM) { + ssl_ctx = SSL_CTX_new(DTLS_client_method()); + } else { ssl_ctx = SSL_CTX_new(TLS_client_method()); } @@ -1556,7 +1577,8 @@ static int tls_net_handshake(struct flb_tls *tls, pthread_mutex_lock(&ctx->mutex); if (!session->continuation_flag) { - if (tls->mode == FLB_TLS_CLIENT_MODE) { + if (tls->mode == FLB_TLS_CLIENT_MODE || + tls->mode == FLB_TLS_CLIENT_MODE_DGRAM) { SSL_set_connect_state(session->ssl); if (ctx->alpn != NULL) { @@ -1571,7 +1593,8 @@ static int tls_net_handshake(struct flb_tls *tls, } } } - else if (tls->mode == FLB_TLS_SERVER_MODE) { + else if (tls->mode == FLB_TLS_SERVER_MODE || + tls->mode == FLB_TLS_SERVER_MODE_DGRAM) { SSL_set_accept_state(session->ssl); } else { @@ -1605,10 +1628,12 @@ static int tls_net_handshake(struct flb_tls *tls, ERR_clear_error(); - if (tls->mode == FLB_TLS_CLIENT_MODE) { + if (tls->mode == FLB_TLS_CLIENT_MODE || + tls->mode == FLB_TLS_CLIENT_MODE_DGRAM) { ret = SSL_connect(session->ssl); } - else if (tls->mode == FLB_TLS_SERVER_MODE) { + else if (tls->mode == FLB_TLS_SERVER_MODE || + tls->mode == FLB_TLS_SERVER_MODE_DGRAM) { ret = SSL_accept(session->ssl); } diff --git a/tests/integration/scenarios/out_syslog/config/out_syslog_dtls.yaml b/tests/integration/scenarios/out_syslog/config/out_syslog_dtls.yaml new file mode 100644 index 00000000000..5b33a7fcb63 --- /dev/null +++ b/tests/integration/scenarios/out_syslog/config/out_syslog_dtls.yaml @@ -0,0 +1,22 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_syslog + dummy: '{"message":"hello from out_syslog"}' + rate: 1 + + outputs: + - name: syslog + match: out_syslog + host: 127.0.0.1 + port: ${SYSLOG_RECEIVER_PORT} + mode: dtls + tls: on + tls.verify: off + syslog_message_key: message diff --git a/tests/integration/scenarios/out_syslog/config/out_syslog_tcp.yaml b/tests/integration/scenarios/out_syslog/config/out_syslog_tcp.yaml new file mode 100644 index 00000000000..835703061f5 --- /dev/null +++ b/tests/integration/scenarios/out_syslog/config/out_syslog_tcp.yaml @@ -0,0 +1,20 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_syslog + dummy: '{"message":"hello from out_syslog"}' + rate: 1 + + outputs: + - name: syslog + match: out_syslog + host: 127.0.0.1 + port: ${SYSLOG_RECEIVER_PORT} + mode: tcp + syslog_message_key: message diff --git a/tests/integration/scenarios/out_syslog/config/out_syslog_udp.yaml b/tests/integration/scenarios/out_syslog/config/out_syslog_udp.yaml new file mode 100644 index 00000000000..008103b052f --- /dev/null +++ b/tests/integration/scenarios/out_syslog/config/out_syslog_udp.yaml @@ -0,0 +1,20 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_syslog + dummy: '{"message":"hello from out_syslog"}' + rate: 1 + + outputs: + - name: syslog + match: out_syslog + host: 127.0.0.1 + port: ${SYSLOG_RECEIVER_PORT} + mode: udp + syslog_message_key: message diff --git a/tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py b/tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py new file mode 100644 index 00000000000..c478557ca62 --- /dev/null +++ b/tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py @@ -0,0 +1,271 @@ +import os +import shutil +import socket +import subprocess +import threading +import time + +import pytest + +from utils.test_service import FluentBitTestService + + +class UdpReceiver: + def __init__(self, host, port): + self.host = host + self.port = port + self.message = None + self.error = None + self._ready = threading.Event() + self._done = threading.Event() + self._thread = threading.Thread(target=self._run, daemon=True) + + def _run(self): + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server: + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.bind((self.host, self.port)) + server.settimeout(120) + self._ready.set() + + data, _ = server.recvfrom(4096) + self.message = data + self._done.set() + except Exception as exc: + self.error = exc + self._ready.set() + self._done.set() + + def start(self): + self._thread.start() + + def wait_ready(self, timeout=5): + if not self._ready.wait(timeout): + raise TimeoutError("Timed out waiting for UDP receiver readiness") + + def wait_message(self, timeout=10): + if not self._done.wait(timeout): + raise TimeoutError("Timed out waiting for UDP syslog payload") + + if self.error is not None: + raise self.error + + return self.message + + +class TcpReceiver: + def __init__(self, host, port): + self.host = host + self.port = port + self.message = None + self.error = None + self._ready = threading.Event() + self._done = threading.Event() + self._thread = threading.Thread(target=self._run, daemon=True) + + def _run(self): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server: + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.bind((self.host, self.port)) + server.listen(1) + server.settimeout(120) + self._ready.set() + conn, _ = server.accept() + + with conn: + conn.settimeout(20) + chunks = [] + + while True: + chunk = conn.recv(4096) + if not chunk: + break + chunks.append(chunk) + if b"\n" in chunk: + break + + self.message = b"".join(chunks) + self._done.set() + except Exception as exc: + self.error = exc + self._ready.set() + self._done.set() + + def start(self): + self._thread.start() + + def wait_ready(self, timeout=5): + if not self._ready.wait(timeout): + raise TimeoutError("Timed out waiting for TCP receiver readiness") + + def wait_message(self, timeout=10): + if not self._done.wait(timeout): + raise TimeoutError("Timed out waiting for TCP syslog payload") + + if self.error is not None: + raise self.error + + return self.message + + +class DtlsReceiver: + def __init__(self, port, cert_file, key_file): + self.port = port + self.cert_file = cert_file + self.key_file = key_file + self.process = None + + def start(self): + openssl = shutil.which("openssl") + if openssl is None: + raise RuntimeError("openssl is required for DTLS test") + + self.process = subprocess.Popen( # noqa: S603 - controlled test command + [ + openssl, + "s_server", + "-dtls", + "-accept", + str(self.port), + "-cert", + self.cert_file, + "-key", + self.key_file, + "-naccept", + "1", + "-ign_eof", + ], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + time.sleep(0.5) + if self.process.poll() is not None: + output = self._read_output(timeout=2) + raise RuntimeError(f"DTLS receiver failed to start: {output}") + + def wait_ready(self, timeout=5): + deadline = time.time() + timeout + while time.time() < deadline: + if self.process.poll() is not None: + output = self._read_output(timeout=2) + raise RuntimeError(f"DTLS receiver terminated early: {output}") + time.sleep(0.1) + + def _read_output(self, timeout=2): + stdout, stderr = self.process.communicate(timeout=timeout) + return (stdout + stderr).decode("utf-8", errors="replace") + + def wait_message(self, timeout=30): + try: + output = self._read_output(timeout=timeout) + except subprocess.TimeoutExpired as exc: + raise TimeoutError("Timed out waiting for DTLS handshake") from exc + + return output + + def stop(self): + if self.process is None: + return + + if self.process.poll() is None: + self.process.terminate() + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait(timeout=5) + + +class Service: + def __init__(self, config_file, receiver_type): + self.config_file = os.path.abspath(os.path.join(os.path.dirname(__file__), "../config", config_file)) + self.receiver_type = receiver_type + self.receiver = None + + cert_dir = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../in_splunk/certificate") + ) + self.tls_crt_file = os.path.join(cert_dir, "certificate.pem") + self.tls_key_file = os.path.join(cert_dir, "private_key.pem") + + self.service = FluentBitTestService( + self.config_file, + pre_start=self._start_receiver, + post_stop=self._stop_receiver, + ) + + def _start_receiver(self, service): + self.receiver_port = service.allocate_port_env("SYSLOG_RECEIVER_PORT") + + if self.receiver_type == "udp": + self.receiver = UdpReceiver("127.0.0.1", self.receiver_port) + elif self.receiver_type == "tcp": + self.receiver = TcpReceiver("127.0.0.1", self.receiver_port) + elif self.receiver_type == "dtls": + self.receiver = DtlsReceiver(self.receiver_port, self.tls_crt_file, self.tls_key_file) + else: + raise ValueError(f"Unknown receiver type: {self.receiver_type}") + + self.receiver.start() + self.receiver.wait_ready(timeout=5) + + def _stop_receiver(self, _service): + if self.receiver_type == "dtls" and self.receiver is not None: + self.receiver.stop() + + def start(self): + self.service.start() + + def stop(self): + self.service.stop() + + +def _assert_syslog_payload(payload): + text = payload.decode("utf-8", errors="replace") + assert "hello from out_syslog" in text + assert text.startswith("<") + + +def _assert_dtls_payload(output): + assert "ACCEPT" in output + assert "DONE" in output + + +def test_out_syslog_udp(): + service = Service("out_syslog_udp.yaml", "udp") + service.start() + + try: + payload = service.receiver.wait_message(timeout=20) + finally: + service.stop() + + _assert_syslog_payload(payload) + + +def test_out_syslog_tcp(): + service = Service("out_syslog_tcp.yaml", "tcp") + service.start() + + try: + payload = service.receiver.wait_message(timeout=15) + finally: + service.stop() + + _assert_syslog_payload(payload) + + +@pytest.mark.skipif(not shutil.which("openssl"), reason="openssl is required for DTLS test") +def test_out_syslog_dtls(): + service = Service("out_syslog_dtls.yaml", "dtls") + service.start() + + try: + output = service.receiver.wait_message(timeout=30) + finally: + service.stop() + + _assert_dtls_payload(output)