From cd2ac1d9c1cc48ea2487d47e5c2402beb221c04c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 20 Apr 2026 17:13:35 +0900 Subject: [PATCH 01/13] io: Handle DTLS protocol Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_io.h | 4 +++- src/flb_io.c | 38 +++++++++++++++++++++++++++---------- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/include/fluent-bit/flb_io.h b/include/fluent-bit/flb_io.h index 9a6afd1e0cd..45f14f5cfb4 100644 --- a/include/fluent-bit/flb_io.h +++ b/include/fluent-bit/flb_io.h @@ -35,9 +35,11 @@ #define FLB_IO_OPT_TLS 4 /* use TCP and optional TLS */ #define FLB_IO_ASYNC 8 /* use async mode (depends on event loop) */ #define FLB_IO_TCP_KA 16 /* use async mode (depends on event loop) */ +#define FLB_IO_UDP 32 /* use plain UDP */ +#define FLB_IO_DTLS 64 /* use DTLS over UDP */ /* Other features */ -#define FLB_IO_IPV6 32 /* network I/O uses IPv6 */ +#define FLB_IO_IPV6 128 /* network I/O uses IPv6 */ struct flb_connection; diff --git a/src/flb_io.c b/src/flb_io.c index 38fbb3eed4b..c1a57c0cb34 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -125,17 +125,26 @@ int flb_io_net_connect(struct flb_connection *connection, async = FLB_FALSE; } - /* Perform TCP connection */ - fd = flb_net_tcp_connect(connection->upstream->tcp_host, - connection->upstream->tcp_port, - connection->stream->net.source_address, - connection->stream->net.connect_timeout, - async, coro, connection); + if (connection->stream->transport == FLB_TRANSPORT_UDP) { + fd = flb_net_udp_connect(connection->upstream->tcp_host, + connection->upstream->tcp_port, + connection->stream->net.source_address); + } + else { + /* Perform TCP connection */ + fd = flb_net_tcp_connect(connection->upstream->tcp_host, + connection->upstream->tcp_port, + connection->stream->net.source_address, + connection->stream->net.connect_timeout, + async, coro, connection); + } + if (fd == -1) { return -1; } - if (connection->upstream->proxied_host) { + if (connection->stream->transport == FLB_TRANSPORT_TCP && + connection->upstream->proxied_host) { ret = flb_http_client_proxy_connect(connection); if (ret == -1) { @@ -156,7 +165,8 @@ int flb_io_net_connect(struct flb_connection *connection, } /* set TCP keepalive and it's options */ - if (connection->net->tcp_keepalive) { + if (connection->stream->transport == FLB_TRANSPORT_TCP && + connection->net->tcp_keepalive) { ret = flb_net_socket_tcp_keepalive(connection->fd, connection->net); @@ -694,7 +704,7 @@ int flb_io_net_write(struct flb_connection *connection, const void *data, } } #ifdef FLB_HAVE_TLS - else if (flags & FLB_IO_TLS) { + else if (flags & (FLB_IO_TLS | FLB_IO_DTLS)) { if (flags & FLB_IO_ASYNC) { ret = flb_tls_net_write_async(coro, connection->tls_session, data, len, out_len); } @@ -702,6 +712,10 @@ int flb_io_net_write(struct flb_connection *connection, const void *data, ret = flb_tls_net_write(connection->tls_session, data, len, out_len); } } + else { + flb_error("[io] TLS session set on connection #%i but transport flags are invalid (%i)", + connection->fd, flags); + } #endif if (ret > 0) { @@ -742,7 +756,7 @@ ssize_t flb_io_net_read(struct flb_connection *connection, void *buf, size_t len } } #ifdef FLB_HAVE_TLS - else if (flags & FLB_IO_TLS) { + else if (flags & (FLB_IO_TLS | FLB_IO_DTLS)) { if (flags & FLB_IO_ASYNC) { ret = flb_tls_net_read_async(coro, connection->tls_session, buf, len); } @@ -750,6 +764,10 @@ ssize_t flb_io_net_read(struct flb_connection *connection, void *buf, size_t len ret = flb_tls_net_read(connection->tls_session, buf, len); } } + else { + flb_error("[io] TLS session set on connection #%i but transport flags are invalid (%i)", + connection->fd, flags); + } #endif if (ret > 0) { From b7b6cba9b86ba3efbab21a17c8f60467fac3d359 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 20 Apr 2026 17:13:59 +0900 Subject: [PATCH 02/13] stream: Distinguish DTLS protocol Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_stream.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/include/fluent-bit/flb_stream.h b/include/fluent-bit/flb_stream.h index d1b858a9090..ab55ea70f7f 100644 --- a/include/fluent-bit/flb_stream.h +++ b/include/fluent-bit/flb_stream.h @@ -136,7 +136,8 @@ static inline int flb_stream_is_keepalive(struct flb_stream *stream) static inline int flb_stream_is_secure(struct flb_stream *stream) { - return flb_stream_get_flag_status(stream, FLB_IO_TLS); + return flb_stream_get_flag_status(stream, FLB_IO_TLS) || + flb_stream_get_flag_status(stream, FLB_IO_DTLS); } static inline int flb_stream_is_thread_safe(struct flb_stream *stream) @@ -205,4 +206,4 @@ static inline int flb_stream_release_lock(struct flb_stream *stream) return result; } -#endif \ No newline at end of file +#endif From 84f01692fe229c67ecd7b2398c1cc6fd6b8f97b6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 20 Apr 2026 17:14:19 +0900 Subject: [PATCH 03/13] upstream: Handle DTLS protocol Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_upstream.h | 2 ++ src/flb_upstream.c | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/include/fluent-bit/flb_upstream.h b/include/fluent-bit/flb_upstream.h index 544f97fee90..44d585c5bd9 100644 --- a/include/fluent-bit/flb_upstream.h +++ b/include/fluent-bit/flb_upstream.h @@ -41,6 +41,8 @@ * --- flb_io.h --- * #define FLB_IO_TCP 1 * #define FLB_IO_TLS 2 + * #define FLB_IO_UDP 32 + * #define FLB_IO_DTLS 64 * #define FLB_IO_ASYNC 8 * #define FLB_IO_TCP_KA 16 * --- diff --git a/src/flb_upstream.c b/src/flb_upstream.c index 89a0fae281f..62decd65ec1 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -292,6 +292,7 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config, struct flb_tls *tls) { int ret; + int transport; char *proxy_protocol = NULL; char *proxy_host = NULL; char *proxy_port = NULL; @@ -307,16 +308,22 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config, u->base.dynamically_allocated = FLB_TRUE; + transport = FLB_TRANSPORT_TCP; + if ((flags & FLB_IO_UDP) || (flags & FLB_IO_DTLS)) { + transport = FLB_TRANSPORT_UDP; + } + flb_stream_setup(&u->base, FLB_UPSTREAM, - FLB_TRANSPORT_TCP, + transport, flags, tls, config, NULL); /* Set upstream to the http_proxy if it is specified. */ - if (flb_upstream_needs_proxy(host, config->http_proxy, config->no_proxy) == FLB_TRUE) { + if (transport == FLB_TRANSPORT_TCP && + flb_upstream_needs_proxy(host, config->http_proxy, config->no_proxy) == FLB_TRUE) { flb_debug("[upstream] config->http_proxy: %s", config->http_proxy); ret = flb_utils_proxy_url_split(config->http_proxy, &proxy_protocol, &proxy_username, &proxy_password, From 949b936688dba828f46e024fc9b1fdfe3db42abc Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 15 Apr 2026 19:17:18 +0900 Subject: [PATCH 04/13] tls: flb_tls: Handle DTLS protocol Signed-off-by: Hiroshi Hatake --- include/fluent-bit/tls/flb_tls.h | 2 ++ src/tls/openssl.c | 21 ++++++++++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/include/fluent-bit/tls/flb_tls.h b/include/fluent-bit/tls/flb_tls.h index 018231218e2..bea34d65e1a 100644 --- a/include/fluent-bit/tls/flb_tls.h +++ b/include/fluent-bit/tls/flb_tls.h @@ -46,6 +46,8 @@ #define FLB_TLS_CLIENT_MODE 0 #define FLB_TLS_SERVER_MODE 1 +#define FLB_TLS_CLIENT_MODE_DGRAM 2 +#define FLB_TLS_SERVER_MODE_DGRAM 3 struct flb_tls; struct flb_connection; diff --git a/src/tls/openssl.c b/src/tls/openssl.c index cb8c2749561..3dfff25b1ba 100644 --- a/src/tls/openssl.c +++ b/src/tls/openssl.c @@ -820,7 +820,8 @@ static void *tls_context_create(int verify, * * https://www.openssl.org/docs/man1.0.2/man3/SSLv23_method.html */ - if (mode == FLB_TLS_SERVER_MODE) { + if (mode == FLB_TLS_SERVER_MODE || + mode == FLB_TLS_SERVER_MODE_DGRAM) { ssl_ctx = SSL_CTX_new(SSLv23_server_method()); } else { @@ -831,6 +832,12 @@ static void *tls_context_create(int verify, if (mode == FLB_TLS_SERVER_MODE) { ssl_ctx = SSL_CTX_new(TLS_server_method()); } + else if (mode == FLB_TLS_SERVER_MODE_DGRAM) { + ssl_ctx = SSL_CTX_new(DTLS_server_method()); + } + else if (mode == FLB_TLS_CLIENT_MODE_DGRAM) { + ssl_ctx = SSL_CTX_new(DTLS_client_method()); + } else { ssl_ctx = SSL_CTX_new(TLS_client_method()); } @@ -1556,7 +1563,8 @@ static int tls_net_handshake(struct flb_tls *tls, pthread_mutex_lock(&ctx->mutex); if (!session->continuation_flag) { - if (tls->mode == FLB_TLS_CLIENT_MODE) { + if (tls->mode == FLB_TLS_CLIENT_MODE || + tls->mode == FLB_TLS_CLIENT_MODE_DGRAM) { SSL_set_connect_state(session->ssl); if (ctx->alpn != NULL) { @@ -1571,7 +1579,8 @@ static int tls_net_handshake(struct flb_tls *tls, } } } - else if (tls->mode == FLB_TLS_SERVER_MODE) { + else if (tls->mode == FLB_TLS_SERVER_MODE || + tls->mode == FLB_TLS_SERVER_MODE_DGRAM) { SSL_set_accept_state(session->ssl); } else { @@ -1605,10 +1614,12 @@ static int tls_net_handshake(struct flb_tls *tls, ERR_clear_error(); - if (tls->mode == FLB_TLS_CLIENT_MODE) { + if (tls->mode == FLB_TLS_CLIENT_MODE || + tls->mode == FLB_TLS_CLIENT_MODE_DGRAM) { ret = SSL_connect(session->ssl); } - else if (tls->mode == FLB_TLS_SERVER_MODE) { + else if (tls->mode == FLB_TLS_SERVER_MODE || + tls->mode == FLB_TLS_SERVER_MODE_DGRAM) { ret = SSL_accept(session->ssl); } From f42738eb5577ea7318a3c8f84e9aadaca975360a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 15 Apr 2026 19:50:49 +0900 Subject: [PATCH 05/13] connection: Handle DTLS connections Signed-off-by: Hiroshi Hatake --- src/flb_connection.c | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/flb_connection.c b/src/flb_connection.c index a3ed402651b..e28a5686b47 100644 --- a/src/flb_connection.c +++ b/src/flb_connection.c @@ -97,11 +97,20 @@ static void compose_user_friendly_remote_host(struct flb_connection *connection) connection->remote_port); } else if (connection_type == FLB_TRANSPORT_UDP) { - snprintf(connection->user_friendly_remote_host, - sizeof(connection->user_friendly_remote_host), - "udp://%s:%u", - connection->remote_host, - connection->remote_port); + if (flb_stream_get_flag_status(connection->stream, FLB_IO_DTLS)) { + snprintf(connection->user_friendly_remote_host, + sizeof(connection->user_friendly_remote_host), + "dtls://%s:%u", + connection->remote_host, + connection->remote_port); + } + else { + snprintf(connection->user_friendly_remote_host, + sizeof(connection->user_friendly_remote_host), + "udp://%s:%u", + connection->remote_host, + connection->remote_port); + } } else if (connection_type == FLB_TRANSPORT_UNIX_STREAM) { snprintf(connection->user_friendly_remote_host, @@ -254,4 +263,4 @@ void flb_connection_unset_io_timeout(struct flb_connection *connection) assert(connection != NULL); connection->ts_io_timeout = -1; -} \ No newline at end of file +} From c194c549a550fd8b663dad66a164f576007cc6f0 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 15 Apr 2026 19:17:54 +0900 Subject: [PATCH 06/13] out_syslog: Handle Dgram on DTLS Signed-off-by: Hiroshi Hatake --- plugins/out_syslog/syslog.c | 85 ++++++++++++++++++++++++++++---- plugins/out_syslog/syslog_conf.c | 14 +++++- plugins/out_syslog/syslog_conf.h | 1 + 3 files changed, 88 insertions(+), 12 deletions(-) diff --git a/plugins/out_syslog/syslog.c b/plugins/out_syslog/syslog.c index b191fe778b9..cd9d7da2d38 100644 --- a/plugins/out_syslog/syslog.c +++ b/plugins/out_syslog/syslog.c @@ -25,6 +25,7 @@ #include #include #include +#include #include "syslog_conf.h" @@ -101,6 +102,56 @@ static struct { { NULL, 0,-1 }, }; +#ifdef FLB_HAVE_TLS +static int syslog_configure_dtls_context(struct flb_output_instance *ins) +{ + int ret; + + if (ins->tls != NULL) { + flb_tls_destroy(ins->tls); + ins->tls = NULL; + } + + ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE_DGRAM, + ins->tls_verify, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (ins->tls == NULL) { + return -1; + } + + if (ins->tls_verify_hostname == FLB_TRUE) { + ret = flb_tls_set_verify_hostname(ins->tls, ins->tls_verify_hostname); + if (ret == -1) { + return -1; + } + } + + if (ins->tls_min_version != NULL || ins->tls_max_version != NULL) { + ret = flb_tls_set_minmax_proto(ins->tls, + ins->tls_min_version, + ins->tls_max_version); + if (ret != 0) { + return -1; + } + } + + if (ins->tls_ciphers != NULL) { + ret = flb_tls_set_ciphers(ins->tls, ins->tls_ciphers); + if (ret != 0) { + return -1; + } + } + + return 0; +} +#endif + /* '"', '\' ']' */ static char rfc5424_sp_value[256] = { 0, 0, 0 , 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 , 0 , 0, 0, @@ -894,6 +945,7 @@ static void cb_syslog_flush(struct flb_event_chunk *event_chunk, static int cb_syslog_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { + int ret; int io_flags; struct flb_syslog *ctx = NULL; @@ -926,9 +978,24 @@ static int cb_syslog_init(struct flb_output_instance *ins, struct flb_config *co } } else { +#ifdef FLB_HAVE_TLS + if (ctx->parsed_mode == FLB_SYSLOG_DTLS) { + ret = syslog_configure_dtls_context(ins); + if (ret != 0) { + flb_plg_error(ins, "could not initialize DTLS context"); + flb_syslog_config_destroy(ctx); + return -1; + } + } +#endif - /* use TLS ? */ - if (ins->use_tls == FLB_TRUE) { + if (ctx->parsed_mode == FLB_SYSLOG_UDP) { + io_flags = FLB_IO_UDP; + } + else if (ctx->parsed_mode == FLB_SYSLOG_DTLS) { + io_flags = FLB_IO_DTLS; + } + else if (ins->use_tls == FLB_TRUE) { io_flags = FLB_IO_TLS; } else { @@ -940,17 +1007,16 @@ static int cb_syslog_init(struct flb_output_instance *ins, struct flb_config *co } ctx->u = flb_upstream_create(config, ins->host.name, ins->host.port, - io_flags, ins->tls); + io_flags, ins->tls); if (!(ctx->u)) { flb_syslog_config_destroy(ctx); return -1; } flb_output_upstream_set(ctx->u, ins); - } - - /* Set the plugin context */ - flb_output_set_context(ins, ctx); + /* Set the plugin context */ + flb_output_set_context(ins, ctx); + } flb_plg_info(ctx->ins, "setup done for %s:%i (TLS=%s)", ins->host.name, ins->host.port, ins->use_tls ? "on" : "off"); @@ -1046,9 +1112,8 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "mode", "udp", 0, FLB_TRUE, offsetof(struct flb_syslog, mode), - "Set the desired transport type, the available options are tcp and udp. If you need to " - "use a TLS secure channel, choose 'tcp' mode here and enable the 'tls' option separately. " - "DTLS over udp is not supported by this plugin." + "Set the desired transport type, the available options are udp, tcp, tls and dtls. " + "Use tls=on together with mode=dtls." }, { diff --git a/plugins/out_syslog/syslog_conf.c b/plugins/out_syslog/syslog_conf.c index 5523c1f0dca..e7384e32d44 100644 --- a/plugins/out_syslog/syslog_conf.c +++ b/plugins/out_syslog/syslog_conf.c @@ -102,6 +102,9 @@ struct flb_syslog *flb_syslog_config_create(struct flb_output_instance *ins, else if (!strcasecmp(tmp, "udp")) { ctx->parsed_mode = FLB_SYSLOG_UDP; } + else if (!strcasecmp(tmp, "dtls")) { + ctx->parsed_mode = FLB_SYSLOG_DTLS; + } else { flb_plg_error(ctx->ins, "unknown syslog mode %s", tmp); flb_syslog_config_destroy(ctx); @@ -111,8 +114,15 @@ struct flb_syslog *flb_syslog_config_create(struct flb_output_instance *ins, if (ctx->parsed_mode == FLB_SYSLOG_UDP && ins->use_tls == FLB_TRUE) { flb_plg_error(ctx->ins, - "invalid configuration: mode=udp with tls=on is unsupported " - "(DTLS is not implemented)"); + "invalid configuration: mode=udp with tls=on is unsupported; " + "use mode=dtls for secure datagram transport"); + flb_syslog_config_destroy(ctx); + return NULL; + } + + if (ctx->parsed_mode == FLB_SYSLOG_DTLS && ins->use_tls == FLB_FALSE) { + flb_plg_error(ctx->ins, + "invalid configuration: mode=dtls requires tls=on"); flb_syslog_config_destroy(ctx); return NULL; } diff --git a/plugins/out_syslog/syslog_conf.h b/plugins/out_syslog/syslog_conf.h index 0e8d3911d96..80c8d2ea383 100644 --- a/plugins/out_syslog/syslog_conf.h +++ b/plugins/out_syslog/syslog_conf.h @@ -28,6 +28,7 @@ #define FLB_SYSLOG_UDP 0 #define FLB_SYSLOG_TCP 1 #define FLB_SYSLOG_TLS 2 +#define FLB_SYSLOG_DTLS 3 #define FLB_SYSLOG_RFC3164 0 #define FLB_SYSLOG_RFC5424 1 From 719fef2e7d9ef608679981745f4842ea09dd5a66 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 20 Apr 2026 14:53:41 +0900 Subject: [PATCH 07/13] tests: integeration: Add out_syslog cases for UDP, TCP, and DTLS Signed-off-by: Hiroshi Hatake --- .../out_syslog/config/out_syslog_dtls.yaml | 22 ++ .../out_syslog/config/out_syslog_tcp.yaml | 20 ++ .../out_syslog/config/out_syslog_udp.yaml | 20 ++ .../out_syslog/tests/test_out_syslog_001.py | 267 ++++++++++++++++++ 4 files changed, 329 insertions(+) create mode 100644 tests/integration/scenarios/out_syslog/config/out_syslog_dtls.yaml create mode 100644 tests/integration/scenarios/out_syslog/config/out_syslog_tcp.yaml create mode 100644 tests/integration/scenarios/out_syslog/config/out_syslog_udp.yaml create mode 100644 tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py diff --git a/tests/integration/scenarios/out_syslog/config/out_syslog_dtls.yaml b/tests/integration/scenarios/out_syslog/config/out_syslog_dtls.yaml new file mode 100644 index 00000000000..5b33a7fcb63 --- /dev/null +++ b/tests/integration/scenarios/out_syslog/config/out_syslog_dtls.yaml @@ -0,0 +1,22 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_syslog + dummy: '{"message":"hello from out_syslog"}' + rate: 1 + + outputs: + - name: syslog + match: out_syslog + host: 127.0.0.1 + port: ${SYSLOG_RECEIVER_PORT} + mode: dtls + tls: on + tls.verify: off + syslog_message_key: message diff --git a/tests/integration/scenarios/out_syslog/config/out_syslog_tcp.yaml b/tests/integration/scenarios/out_syslog/config/out_syslog_tcp.yaml new file mode 100644 index 00000000000..835703061f5 --- /dev/null +++ b/tests/integration/scenarios/out_syslog/config/out_syslog_tcp.yaml @@ -0,0 +1,20 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_syslog + dummy: '{"message":"hello from out_syslog"}' + rate: 1 + + outputs: + - name: syslog + match: out_syslog + host: 127.0.0.1 + port: ${SYSLOG_RECEIVER_PORT} + mode: tcp + syslog_message_key: message diff --git a/tests/integration/scenarios/out_syslog/config/out_syslog_udp.yaml b/tests/integration/scenarios/out_syslog/config/out_syslog_udp.yaml new file mode 100644 index 00000000000..008103b052f --- /dev/null +++ b/tests/integration/scenarios/out_syslog/config/out_syslog_udp.yaml @@ -0,0 +1,20 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_syslog + dummy: '{"message":"hello from out_syslog"}' + rate: 1 + + outputs: + - name: syslog + match: out_syslog + host: 127.0.0.1 + port: ${SYSLOG_RECEIVER_PORT} + mode: udp + syslog_message_key: message diff --git a/tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py b/tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py new file mode 100644 index 00000000000..21b21a6fcfb --- /dev/null +++ b/tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py @@ -0,0 +1,267 @@ +import os +import shutil +import socket +import subprocess +import threading +import time + +import pytest + +from utils.test_service import FluentBitTestService + + +class UdpReceiver: + def __init__(self, host, port): + self.host = host + self.port = port + self.message = None + self.error = None + self._ready = threading.Event() + self._done = threading.Event() + self._thread = threading.Thread(target=self._run, daemon=True) + + def _run(self): + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server: + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.bind((self.host, self.port)) + server.settimeout(120) + self._ready.set() + + data, _ = server.recvfrom(4096) + self.message = data + self._done.set() + except Exception as exc: + self.error = exc + self._ready.set() + self._done.set() + + def start(self): + self._thread.start() + + def wait_ready(self, timeout=5): + if not self._ready.wait(timeout): + raise TimeoutError("Timed out waiting for UDP receiver readiness") + + def wait_message(self, timeout=10): + if not self._done.wait(timeout): + raise TimeoutError("Timed out waiting for UDP syslog payload") + + if self.error is not None: + raise self.error + + return self.message + + +class TcpReceiver: + def __init__(self, host, port): + self.host = host + self.port = port + self.message = None + self.error = None + self._ready = threading.Event() + self._done = threading.Event() + self._thread = threading.Thread(target=self._run, daemon=True) + + def _run(self): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server: + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.bind((self.host, self.port)) + server.listen(1) + server.settimeout(120) + self._ready.set() + conn, _ = server.accept() + + with conn: + conn.settimeout(20) + chunks = [] + + while True: + chunk = conn.recv(4096) + if not chunk: + break + chunks.append(chunk) + if b"\n" in chunk: + break + + self.message = b"".join(chunks) + self._done.set() + except Exception as exc: + self.error = exc + self._ready.set() + self._done.set() + + def start(self): + self._thread.start() + + def wait_ready(self, timeout=5): + if not self._ready.wait(timeout): + raise TimeoutError("Timed out waiting for TCP receiver readiness") + + def wait_message(self, timeout=10): + if not self._done.wait(timeout): + raise TimeoutError("Timed out waiting for TCP syslog payload") + + if self.error is not None: + raise self.error + + return self.message + + +class DtlsReceiver: + def __init__(self, port, cert_file, key_file): + self.port = port + self.cert_file = cert_file + self.key_file = key_file + self.process = None + + def start(self): + self.process = subprocess.Popen( + [ + "openssl", + "s_server", + "-dtls", + "-accept", + str(self.port), + "-cert", + self.cert_file, + "-key", + self.key_file, + "-naccept", + "1", + "-ign_eof", + ], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + time.sleep(0.5) + if self.process.poll() is not None: + output = self._read_output(timeout=2) + raise RuntimeError(f"DTLS receiver failed to start: {output}") + + def wait_ready(self, timeout=5): + deadline = time.time() + timeout + while time.time() < deadline: + if self.process.poll() is not None: + output = self._read_output(timeout=2) + raise RuntimeError(f"DTLS receiver terminated early: {output}") + time.sleep(0.1) + + def _read_output(self, timeout=2): + stdout, stderr = self.process.communicate(timeout=timeout) + return (stdout + stderr).decode("utf-8", errors="replace") + + def wait_message(self, timeout=30): + try: + output = self._read_output(timeout=timeout) + except subprocess.TimeoutExpired as exc: + raise TimeoutError("Timed out waiting for DTLS handshake") from exc + + return output + + def stop(self): + if self.process is None: + return + + if self.process.poll() is None: + self.process.terminate() + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait(timeout=5) + + +class Service: + def __init__(self, config_file, receiver_type): + self.config_file = os.path.abspath(os.path.join(os.path.dirname(__file__), "../config", config_file)) + self.receiver_type = receiver_type + self.receiver = None + + cert_dir = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../in_splunk/certificate") + ) + self.tls_crt_file = os.path.join(cert_dir, "certificate.pem") + self.tls_key_file = os.path.join(cert_dir, "private_key.pem") + + self.service = FluentBitTestService( + self.config_file, + pre_start=self._start_receiver, + post_stop=self._stop_receiver, + ) + + def _start_receiver(self, service): + self.receiver_port = service.allocate_port_env("SYSLOG_RECEIVER_PORT") + + if self.receiver_type == "udp": + self.receiver = UdpReceiver("127.0.0.1", self.receiver_port) + elif self.receiver_type == "tcp": + self.receiver = TcpReceiver("127.0.0.1", self.receiver_port) + elif self.receiver_type == "dtls": + self.receiver = DtlsReceiver(self.receiver_port, self.tls_crt_file, self.tls_key_file) + else: + raise ValueError(f"Unknown receiver type: {self.receiver_type}") + + self.receiver.start() + self.receiver.wait_ready(timeout=5) + + def _stop_receiver(self, _service): + if self.receiver_type == "dtls" and self.receiver is not None: + self.receiver.stop() + + def start(self): + self.service.start() + + def stop(self): + self.service.stop() + + +def _assert_syslog_payload(payload): + text = payload.decode("utf-8", errors="replace") + assert "hello from out_syslog" in text + assert text.startswith("<") + + +def _assert_dtls_payload(output): + assert "ACCEPT" in output + assert "DONE" in output + + +def test_out_syslog_udp(): + service = Service("out_syslog_udp.yaml", "udp") + service.start() + + try: + payload = service.receiver.wait_message(timeout=20) + finally: + service.stop() + + _assert_syslog_payload(payload) + + +def test_out_syslog_tcp(): + service = Service("out_syslog_tcp.yaml", "tcp") + service.start() + + try: + payload = service.receiver.wait_message(timeout=15) + finally: + service.stop() + + _assert_syslog_payload(payload) + + +@pytest.mark.skipif(not shutil.which("openssl"), reason="openssl is required for DTLS test") +def test_out_syslog_dtls(): + service = Service("out_syslog_dtls.yaml", "dtls") + service.start() + + try: + output = service.receiver.wait_message(timeout=30) + finally: + service.stop() + + _assert_dtls_payload(output) From 5cf6a809491086cb879700d358aff97ff106eddf Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 20 Apr 2026 18:05:15 +0900 Subject: [PATCH 08/13] out_syslog: Address review comments Signed-off-by: Hiroshi Hatake --- plugins/out_syslog/syslog.c | 13 ++++++++++--- plugins/out_syslog/syslog_conf.c | 6 +++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/plugins/out_syslog/syslog.c b/plugins/out_syslog/syslog.c index cd9d7da2d38..e01a020e12a 100644 --- a/plugins/out_syslog/syslog.c +++ b/plugins/out_syslog/syslog.c @@ -987,6 +987,12 @@ static int cb_syslog_init(struct flb_output_instance *ins, struct flb_config *co return -1; } } +#else + if (ctx->parsed_mode == FLB_SYSLOG_DTLS) { + flb_plg_error(ins, "could not initialize DTLS context"); + flb_syslog_config_destroy(ctx); + return -1; + } #endif if (ctx->parsed_mode == FLB_SYSLOG_UDP) { @@ -1013,10 +1019,11 @@ static int cb_syslog_init(struct flb_output_instance *ins, struct flb_config *co return -1; } flb_output_upstream_set(ctx->u, ins); - - /* Set the plugin context */ - flb_output_set_context(ins, ctx); } + + /* Set the plugin context for all modes, including UDP. */ + flb_output_set_context(ins, ctx); + flb_plg_info(ctx->ins, "setup done for %s:%i (TLS=%s)", ins->host.name, ins->host.port, ins->use_tls ? "on" : "off"); diff --git a/plugins/out_syslog/syslog_conf.c b/plugins/out_syslog/syslog_conf.c index e7384e32d44..04ab53583f1 100644 --- a/plugins/out_syslog/syslog_conf.c +++ b/plugins/out_syslog/syslog_conf.c @@ -87,9 +87,6 @@ struct flb_syslog *flb_syslog_config_create(struct flb_output_instance *ins, return NULL; } - /* Set context */ - flb_output_set_context(ins, ctx); - /* Config Mode */ tmp = flb_output_get_property("mode", ins); if (tmp) { @@ -171,6 +168,9 @@ struct flb_syslog *flb_syslog_config_create(struct flb_output_instance *ins, } } + /* Set context after validation succeeds */ + flb_output_set_context(ins, ctx); + return ctx; } From 55c9a76ab7d8ca88c8fa458964881e1b9058b38a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 20 Apr 2026 18:05:51 +0900 Subject: [PATCH 09/13] io: Store fd on UDP Signed-off-by: Hiroshi Hatake --- src/flb_io.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/flb_io.c b/src/flb_io.c index c1a57c0cb34..1662d9acd98 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -129,6 +129,11 @@ int flb_io_net_connect(struct flb_connection *connection, fd = flb_net_udp_connect(connection->upstream->tcp_host, connection->upstream->tcp_port, connection->stream->net.source_address); + + if (fd >= 0) { + connection->fd = fd; + connection->event.fd = fd; + } } else { /* Perform TCP connection */ From 504b4b2c40d1d3d0a0eb6ce7271d8846e352a18e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 20 Apr 2026 18:10:09 +0900 Subject: [PATCH 10/13] tests: integration: Resolve openssl fullpath before running Signed-off-by: Hiroshi Hatake --- .../scenarios/out_syslog/tests/test_out_syslog_001.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py b/tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py index 21b21a6fcfb..c478557ca62 100644 --- a/tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py +++ b/tests/integration/scenarios/out_syslog/tests/test_out_syslog_001.py @@ -117,9 +117,13 @@ def __init__(self, port, cert_file, key_file): self.process = None def start(self): - self.process = subprocess.Popen( + openssl = shutil.which("openssl") + if openssl is None: + raise RuntimeError("openssl is required for DTLS test") + + self.process = subprocess.Popen( # noqa: S603 - controlled test command [ - "openssl", + openssl, "s_server", "-dtls", "-accept", From 8fde6704bc0d84fadadaa26deb1ab082442e24e6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 22 Apr 2026 12:49:42 +0900 Subject: [PATCH 11/13] out_syslog: Preserve platform DTLS options in DTLS Signed-off-by: Hiroshi Hatake --- plugins/out_syslog/syslog.c | 72 +++++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 19 deletions(-) diff --git a/plugins/out_syslog/syslog.c b/plugins/out_syslog/syslog.c index e01a020e12a..c59d061ed33 100644 --- a/plugins/out_syslog/syslog.c +++ b/plugins/out_syslog/syslog.c @@ -103,28 +103,10 @@ static struct { }; #ifdef FLB_HAVE_TLS -static int syslog_configure_dtls_context(struct flb_output_instance *ins) +static int syslog_configure_tls_options(struct flb_output_instance *ins) { int ret; - if (ins->tls != NULL) { - flb_tls_destroy(ins->tls); - ins->tls = NULL; - } - - ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE_DGRAM, - ins->tls_verify, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - if (ins->tls == NULL) { - return -1; - } - if (ins->tls_verify_hostname == FLB_TRUE) { ret = flb_tls_set_verify_hostname(ins->tls, ins->tls_verify_hostname); if (ret == -1) { @@ -148,8 +130,60 @@ static int syslog_configure_dtls_context(struct flb_output_instance *ins) } } +#if defined(FLB_SYSTEM_WINDOWS) + if (ins->tls_win_use_enterprise_certstore) { + ret = flb_tls_set_use_enterprise_store(ins->tls, + ins->tls_win_use_enterprise_certstore); + if (ret == -1) { + return -1; + } + } + + if (ins->tls_win_thumbprints) { + ret = flb_tls_set_client_thumbprints(ins->tls, ins->tls_win_thumbprints); + if (ret == -1) { + return -1; + } + } + + if (ins->tls_win_certstore_name) { + ret = flb_tls_set_certstore_name(ins->tls, ins->tls_win_certstore_name); + if (ret == -1) { + return -1; + } + + ret = flb_tls_load_system_certificates(ins->tls); + if (ret == -1) { + return -1; + } + } +#endif + return 0; } + +static int syslog_configure_dtls_context(struct flb_output_instance *ins) +{ + if (ins->tls != NULL) { + flb_tls_destroy(ins->tls); + ins->tls = NULL; + } + + ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE_DGRAM, + ins->tls_verify, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (ins->tls == NULL) { + return -1; + } + + return syslog_configure_tls_options(ins); +} #endif /* '"', '\' ']' */ From 18dc607fed252a1877943cf91c1a17e51f48edb8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 22 Apr 2026 12:50:19 +0900 Subject: [PATCH 12/13] io: DTLS async/nonblocking parity Signed-off-by: Hiroshi Hatake --- src/flb_io.c | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/flb_io.c b/src/flb_io.c index 1662d9acd98..be5b282374f 100644 --- a/src/flb_io.c +++ b/src/flb_io.c @@ -107,6 +107,7 @@ int flb_io_net_connect(struct flb_connection *connection, struct flb_coro *coro) { int ret; + int socket_ret; int async = FLB_FALSE; flb_sockfd_t fd = -1; int flags = flb_connection_get_flags(connection); @@ -131,6 +132,24 @@ int flb_io_net_connect(struct flb_connection *connection, connection->stream->net.source_address); if (fd >= 0) { + if (async == FLB_TRUE) { + socket_ret = flb_net_socket_nonblocking(fd); + if (socket_ret == -1) { + flb_socket_close(fd); + connection->fd = -1; + connection->event.fd = -1; + return -1; + } + } + + socket_ret = flb_net_socket_set_rcvtimeout(fd, connection->net->io_timeout); + if (socket_ret == -1) { + flb_socket_close(fd); + connection->fd = -1; + connection->event.fd = -1; + return -1; + } + connection->fd = fd; connection->event.fd = fd; } From 732e4e5045b970c3bb9c4e9a7170567d079398c7 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 24 Apr 2026 18:11:11 +0900 Subject: [PATCH 13/13] tls: openssl: Handle relatively old OpenSSL versions Signed-off-by: Hiroshi Hatake --- src/tls/openssl.c | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/tls/openssl.c b/src/tls/openssl.c index 3dfff25b1ba..0e939172fa2 100644 --- a/src/tls/openssl.c +++ b/src/tls/openssl.c @@ -820,8 +820,22 @@ static void *tls_context_create(int verify, * * https://www.openssl.org/docs/man1.0.2/man3/SSLv23_method.html */ - if (mode == FLB_TLS_SERVER_MODE || - mode == FLB_TLS_SERVER_MODE_DGRAM) { + if (mode == FLB_TLS_SERVER_MODE_DGRAM || + mode == FLB_TLS_CLIENT_MODE_DGRAM) { +#ifndef OPENSSL_NO_DTLS + if (mode == FLB_TLS_SERVER_MODE_DGRAM) { + ssl_ctx = SSL_CTX_new(DTLSv1_server_method()); + } + else { + ssl_ctx = SSL_CTX_new(DTLSv1_client_method()); + } +#else + flb_error("[openssl] DTLS mode requested but this OpenSSL build " + "does not provide DTLS support"); + return NULL; +#endif + } + else if (mode == FLB_TLS_SERVER_MODE) { ssl_ctx = SSL_CTX_new(SSLv23_server_method()); } else {