From 254ba6181fad724bd6414b977d4164e00441daf0 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Fri, 27 Mar 2026 02:57:04 +0000 Subject: [PATCH 1/2] Thread-local buffers to eliminate per-write mutex contention Each worker thread now writes to its own thread_local buffer instead of a single shared buffer protected by a global mutex. Threads only acquire a lock when draining their own buffer (capacity-based flush) or registering on first use. Multiple worker threads never block each other. Remaining data in a thread-local buffer is flushed by ThreadLocalBuffer::~ThreadLocalBuffer() when the thread exits, so no metrics are silently dropped. --- libs/writer/writer_wrapper/test_writer.cpp | 60 ++++++++++ libs/writer/writer_wrapper/writer.cpp | 112 +++++++++--------- libs/writer/writer_wrapper/writer.h | 36 +++--- .../writer_wrapper/writer_test_helper.h | 3 +- spectator/registry.cpp | 6 +- 5 files changed, 145 insertions(+), 72 deletions(-) diff --git a/libs/writer/writer_wrapper/test_writer.cpp b/libs/writer/writer_wrapper/test_writer.cpp index 68d743a..7ca1b34 100644 --- a/libs/writer/writer_wrapper/test_writer.cpp +++ b/libs/writer/writer_wrapper/test_writer.cpp @@ -127,6 +127,66 @@ TEST_F(WriterWrapperUDSWriterTest, MultithreadedWrite) EXPECT_EQ(actualIncrements, expectedIncrements); } +// Verify that multiple worker threads do not block each other: each thread uses its own +// local buffer, flushing by capacity. Remaining data is drained by ThreadLocalBuffer's +// destructor when each thread exits. +TEST_F(WriterWrapperUDSWriterTest, ThreadLocalBufferNoMutexContention) +{ + Logger::info("Starting thread-local buffer contention test..."); + + const std::string unixUrl = "/tmp/test_uds_socket"; + // Small buffer so capacity-based flush fires frequently + WriterTestHelper::InitializeWriter(WriterType::Unix, unixUrl, 0, 64); + + constexpr auto numThreads = 8; + constexpr auto incrementsPerThread = 20; + + auto worker = [&](int threadId) + { + std::string name = fmt::format("tl.counter.{}", threadId); + MeterId meterId(name); + Counter counter(meterId); + for (int j = 0; j < incrementsPerThread; j++) + { + counter.Increment(); + } + }; + + std::vector threads; + for (int i = 0; i < numThreads; i++) + { + threads.emplace_back(worker, i); + } + for (auto& t : threads) + { + t.join(); + } + + // Allow time for thread destructor flushes to reach the UDS server + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + auto msgs = get_uds_messages(); + EXPECT_FALSE(msgs.empty()); + + int totalIncrements = 0; + std::regex counter_regex(R"(c:tl\.counter\.\d+:1.000000)"); + for (const auto& msg : msgs) + { + std::stringstream ss(msg); + std::string line; + while (std::getline(ss, line)) + { + if (!line.empty()) + { + EXPECT_TRUE(std::regex_match(line, counter_regex)) + << "Unexpected line: " << line; + totalIncrements++; + } + } + } + EXPECT_EQ(totalIncrements, numThreads * incrementsPerThread); +} + // This is a unique test that attempts to create messages of exactly 10 bytes in size // and writes to a buffer of size 10 bytes from multiple threads. The NDrive team discovered // a deadlock scenario in this specific case where the buffer size matched the message size diff --git a/libs/writer/writer_wrapper/writer.cpp b/libs/writer/writer_wrapper/writer.cpp index aac4887..4d81d33 100644 --- a/libs/writer/writer_wrapper/writer.cpp +++ b/libs/writer/writer_wrapper/writer.cpp @@ -8,28 +8,43 @@ namespace spectator { static constexpr auto NEW_LINE = '\n'; -Writer::~Writer() -{ - auto& instance = GetInstance(); +// Each worker thread owns a shared_ptr to its buffer. +// The registry in Writer holds weak_ptrs so threads can exit freely. +thread_local std::shared_ptr tl_buffer; - if (instance.bufferingEnabled) +// Flush any data remaining in the buffer when the thread exits. +Writer::ThreadLocalBuffer::~ThreadLocalBuffer() +{ + if (data.empty()) + { + return; + } + try { - instance.shutdown.store(true); - instance.cv_receiver.notify_all(); - instance.cv_sender.notify_all(); - if (instance.sendingThread.joinable()) { - instance.sendingThread.join(); + auto& instance = Writer::GetInstance(); + if (instance.m_impl) + { + instance.TryToSend(data); } } + catch (...) + { + // Best-effort: ignore errors during thread teardown + } +} + +Writer::~Writer() +{ + auto& instance = GetInstance(); + instance.shutdown.store(true); this->Close(); } -void Writer::Initialize(WriterType type, const std::string& param, int port, unsigned int bufferSize) +void Writer::Initialize(WriterType type, const std::string& param, int port, + unsigned int bufferSize) { - // Get the singleton instance directly auto& instance = GetInstance(); - // Create the new writer based on type try { switch (type) @@ -51,19 +66,23 @@ void Writer::Initialize(WriterType type, const std::string& param, int port, uns } instance.m_currentType = type; - + instance.shutdown.store(false); + instance.bufferingEnabled = false; + + // Clear thread-local buffer registry from any previous init + { + std::lock_guard regLock(instance.registryMutex); + instance.threadBufferRegistry.clear(); + } + if (bufferSize > 0) { instance.bufferingEnabled = true; instance.bufferSize = bufferSize; - instance.buffer.reserve(bufferSize); - instance.writeImpl = &Writer::BufferedWrite; - // Create a thread with proper binding to the instance method - instance.sendingThread = std::thread(&Writer::ThreadSend, &instance); + instance.writeImpl = &Writer::ThreadLocalBufferedWrite; } else { - // Explicitly set to non-buffered if buffer size is 0 instance.writeImpl = &Writer::NonBufferedWrite; } } @@ -80,52 +99,40 @@ void Writer::TryToSend(const std::string& message) instance.m_impl->Write(message); } -void Writer::ThreadSend() +void Writer::ThreadLocalBufferedWrite(const std::string& message) { auto& instance = GetInstance(); - std::string message{}; - while (instance.shutdown.load() == false) + + // Initialise this thread's buffer on first use and register it + if (!tl_buffer) { - { - std::unique_lock lock(instance.writeMutex); - instance.cv_sender.wait( - lock, [&instance] { return instance.buffer.size() >= instance.bufferSize || instance.shutdown.load(); }); - if (instance.shutdown.load() == true) - { - return; - } - message = std::move(instance.buffer); - instance.buffer = std::string(); - instance.buffer.reserve(instance.bufferSize); - } - instance.cv_receiver.notify_one(); - instance.TryToSend(message); + tl_buffer = std::make_shared(); + std::lock_guard regLock(instance.registryMutex); + instance.threadBufferRegistry.push_back(tl_buffer); } -} -void Writer::BufferedWrite(const std::string& message) -{ - auto& instance = GetInstance(); + std::string toSend; { - std::unique_lock lock(instance.writeMutex); - // TODO: Optimize memory alloc to not exceed allocated size - instance.cv_receiver.wait( - lock, [&instance] { return instance.buffer.size() < instance.bufferSize || instance.shutdown.load(); }); - if (instance.shutdown.load()) + std::lock_guard lock(tl_buffer->mutex); + tl_buffer->data.append(message); + tl_buffer->data.push_back(NEW_LINE); + + // Capacity-based flush: drain when the thread-local buffer is full + if (tl_buffer->data.size() >= instance.bufferSize) { - Logger::info("Write operation aborted due to shutdown signal"); - return; + toSend = std::move(tl_buffer->data); + tl_buffer->data.clear(); } - instance.buffer.append(message); - instance.buffer.push_back(NEW_LINE); } - instance.buffer.size() >= instance.bufferSize ? instance.cv_sender.notify_one() : instance.cv_receiver.notify_one(); + + if (!toSend.empty()) + { + instance.TryToSend(toSend); + } } void Writer::NonBufferedWrite(const std::string& message) { - // Since this is a non-static method, we're already operating on an instance - // and can call the instance method directly this->TryToSend(message + NEW_LINE); } @@ -139,7 +146,6 @@ void Writer::Write(const std::string& message) return; } - // Call the member function using the pointer-to-member syntax (instance.*instance.writeImpl)(message); } @@ -163,4 +169,4 @@ void Writer::Close() } } -} // namespace spectator \ No newline at end of file +} // namespace spectator diff --git a/libs/writer/writer_wrapper/writer.h b/libs/writer/writer_wrapper/writer.h index 82407ca..92b8ae2 100644 --- a/libs/writer/writer_wrapper/writer.h +++ b/libs/writer/writer_wrapper/writer.h @@ -4,13 +4,23 @@ #include #include +#include #include +#include namespace spectator { class Writer final : public Singleton { public: + // Per-thread buffer: each worker thread gets its own; avoids global mutex on writes. + struct ThreadLocalBuffer + { + std::string data; + std::mutex mutex; + ~ThreadLocalBuffer(); // Flushes any remaining data on thread exit + }; + ~Writer() override; private: @@ -31,16 +41,15 @@ class Writer final : public Singleton // Private constructor - enforces singleton pattern Writer() = default; - static void Initialize(WriterType type, const std::string& param = "", int port = 0, unsigned int bufferSize = 0); + static void Initialize(WriterType type, const std::string& param = "", int port = 0, + unsigned int bufferSize = 0); static void Write(const std::string& message); - void BufferedWrite(const std::string& message); + void ThreadLocalBufferedWrite(const std::string& message); void NonBufferedWrite(const std::string& message); - void ThreadSend(); - void TryToSend(const std::string& message); void Close(); @@ -50,22 +59,19 @@ class Writer final : public Singleton static WriterType GetWriterType() { return GetInstance().m_currentType; } std::unique_ptr m_impl; - WriterType m_currentType = WriterType::Memory; // Default type + WriterType m_currentType = WriterType::Memory; bool bufferingEnabled = false; unsigned int bufferSize = 0; - std::string buffer{}; - - // Function pointer for write strategy - member function pointer + + // Function pointer for write strategy using WriteFunction = void (Writer::*)(const std::string&); - WriteFunction writeImpl = &Writer::NonBufferedWrite; // Default to non-buffered + WriteFunction writeImpl = &Writer::NonBufferedWrite; + // Registry of all active thread-local buffers (weak_ptrs to avoid keeping threads alive) + std::mutex registryMutex; + std::vector> threadBufferRegistry; - // multithreading writes - std::mutex writeMutex; - std::thread sendingThread; - std::condition_variable cv_receiver; - std::condition_variable cv_sender; std::atomic shutdown{false}; }; -} // namespace spectator \ No newline at end of file +} // namespace spectator diff --git a/libs/writer/writer_wrapper/writer_test_helper.h b/libs/writer/writer_wrapper/writer_test_helper.h index 972a00b..b43cc81 100644 --- a/libs/writer/writer_wrapper/writer_test_helper.h +++ b/libs/writer/writer_wrapper/writer_test_helper.h @@ -14,7 +14,8 @@ class WriterTestHelper { public: // Initialize the Writer for testing purposes - static void InitializeWriter(WriterType type, const std::string& param = "", int port = 0, unsigned int bufferSize = 0) + static void InitializeWriter(WriterType type, const std::string& param = "", int port = 0, + unsigned int bufferSize = 0) { Writer::Initialize(type, param, port, bufferSize); } diff --git a/spectator/registry.cpp b/spectator/registry.cpp index 81d394e..820c82c 100644 --- a/spectator/registry.cpp +++ b/spectator/registry.cpp @@ -41,19 +41,19 @@ Registry::Registry(const Config& config) : m_config(config) if (config.GetWriterType() == WriterType::Memory) { Logger::info("Registry initializing Memory Writer"); - Writer::Initialize(config.GetWriterType(), "", 0, this->m_config.GetWriterBufferSize()); + Writer::Initialize(config.GetWriterType(), "", 0, this->m_config.GetWriterBufferSize()); } else if (config.GetWriterType() == WriterType::UDP) { auto [ip, port] = ParseUdpAddress(this->m_config.GetWriterLocation()); Logger::info("Registry initializing UDP Writer at {}:{}", ip, port); - Writer::Initialize(config.GetWriterType(), ip, port, this->m_config.GetWriterBufferSize()); + Writer::Initialize(config.GetWriterType(), ip, port, this->m_config.GetWriterBufferSize()); } else if (config.GetWriterType() == WriterType::Unix) { auto socketPath = ParseUnixAddress(this->m_config.GetWriterLocation()); Logger::info("Registry initializing UDS Writer at {}", socketPath); - Writer::Initialize(config.GetWriterType(), socketPath, 0, this->m_config.GetWriterBufferSize()); + Writer::Initialize(config.GetWriterType(), socketPath, 0, this->m_config.GetWriterBufferSize()); } } From 575e1ba3389779c81d2348146557b6800685aa50 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Fri, 27 Mar 2026 03:00:54 +0000 Subject: [PATCH 2/2] Interval flushing to ensure timely metric emission Adds a flushIntervalMs parameter to WriterConfig (and Config) that starts a background timer thread which drains all thread-local buffers on a fixed period. This guarantees metrics are emitted within a bounded timeframe even when write volume is low and the capacity-based flush never triggers. --- libs/config/config.h | 1 + libs/writer/writer_config/writer_config.cpp | 7 ++ libs/writer/writer_config/writer_config.h | 3 + libs/writer/writer_wrapper/test_writer.cpp | 50 ++++++++- libs/writer/writer_wrapper/writer.cpp | 100 ++++++++++++++++-- libs/writer/writer_wrapper/writer.h | 15 ++- .../writer_wrapper/writer_test_helper.h | 4 +- spectator/registry.cpp | 12 ++- 8 files changed, 174 insertions(+), 18 deletions(-) diff --git a/libs/config/config.h b/libs/config/config.h index 382cc27..25a7e93 100644 --- a/libs/config/config.h +++ b/libs/config/config.h @@ -23,6 +23,7 @@ class Config const std::string& GetWriterLocation() const noexcept { return m_writerConfig.GetLocation(); } const WriterType& GetWriterType() const noexcept { return m_writerConfig.GetType(); } const unsigned int GetWriterBufferSize() const noexcept { return m_writerConfig.GetBufferSize(); } + const unsigned int GetFlushIntervalMs() const noexcept { return m_writerConfig.GetFlushIntervalMs(); } private: diff --git a/libs/writer/writer_config/writer_config.cpp b/libs/writer/writer_config/writer_config.cpp index 6f331ab..d503198 100644 --- a/libs/writer/writer_config/writer_config.cpp +++ b/libs/writer/writer_config/writer_config.cpp @@ -56,4 +56,11 @@ WriterConfig::WriterConfig(const std::string& type, const unsigned int bufferSiz Logger::info("WriterConfig buffering enabled with size: {}", m_bufferSize); } +WriterConfig::WriterConfig(const std::string& type, const unsigned int bufferSize, const unsigned int flushIntervalMs) + : WriterConfig(type, bufferSize) // Constructor delegation +{ + m_flushIntervalMs = flushIntervalMs; + Logger::info("WriterConfig flush interval enabled with interval: {}ms", m_flushIntervalMs); +} + } // namespace spectator \ No newline at end of file diff --git a/libs/writer/writer_config/writer_config.h b/libs/writer/writer_config/writer_config.h index 737cfb5..d1f0055 100644 --- a/libs/writer/writer_config/writer_config.h +++ b/libs/writer/writer_config/writer_config.h @@ -12,15 +12,18 @@ class WriterConfig public: explicit WriterConfig(const std::string& type); WriterConfig(const std::string& type, unsigned int bufferSize); + WriterConfig(const std::string& type, unsigned int bufferSize, unsigned int flushIntervalMs); [[nodiscard]] const WriterType& GetType() const noexcept { return m_type; } [[nodiscard]] unsigned int GetBufferSize() const noexcept { return m_bufferSize; } [[nodiscard]] const std::string& GetLocation() const noexcept { return m_location; } + [[nodiscard]] unsigned int GetFlushIntervalMs() const noexcept { return m_flushIntervalMs; } private: WriterType m_type; std::string m_location; unsigned int m_bufferSize = 0; + unsigned int m_flushIntervalMs = 0; }; } // namespace spectator \ No newline at end of file diff --git a/libs/writer/writer_wrapper/test_writer.cpp b/libs/writer/writer_wrapper/test_writer.cpp index 7ca1b34..25c1a01 100644 --- a/libs/writer/writer_wrapper/test_writer.cpp +++ b/libs/writer/writer_wrapper/test_writer.cpp @@ -127,16 +127,56 @@ TEST_F(WriterWrapperUDSWriterTest, MultithreadedWrite) EXPECT_EQ(actualIncrements, expectedIncrements); } +// Verify that metrics written infrequently are still flushed via the interval mechanism +// even when the buffer is not full. +TEST_F(WriterWrapperUDSWriterTest, IntervalFlushWithIdleThread) +{ + Logger::info("Starting interval flush test..."); + + // Use a large buffer so capacity-based flush never triggers, + // but set a short flush interval so metrics get sent in time. + const std::string unixUrl = "/tmp/test_uds_socket"; + constexpr unsigned int largeBuffer = 4096; + constexpr unsigned int flushIntervalMs = 200; + WriterTestHelper::InitializeWriter(WriterType::Unix, unixUrl, 0, largeBuffer, flushIntervalMs); + + MeterId meterId("interval.test.counter"); + Counter counter(meterId); + counter.Increment(); + + // Wait for at least two flush intervals to pass + std::this_thread::sleep_for(std::chrono::milliseconds(flushIntervalMs * 3)); + + auto msgs = get_uds_messages(); + EXPECT_FALSE(msgs.empty()) << "Expected metrics to be flushed by interval timer"; + + std::regex counter_regex(R"(c:interval\.test\.counter:1.000000)"); + bool found = false; + for (const auto& msg : msgs) + { + std::stringstream ss(msg); + std::string line; + while (std::getline(ss, line)) + { + if (std::regex_match(line, counter_regex)) + { + found = true; + } + } + } + EXPECT_TRUE(found) << "Expected interval.test.counter metric to be received"; +} + // Verify that multiple worker threads do not block each other: each thread uses its own -// local buffer, flushing by capacity. Remaining data is drained by ThreadLocalBuffer's -// destructor when each thread exits. +// local buffer, flushing by capacity. The interval timer catches any tail data that +// does not fill the buffer before the threads exit. TEST_F(WriterWrapperUDSWriterTest, ThreadLocalBufferNoMutexContention) { Logger::info("Starting thread-local buffer contention test..."); const std::string unixUrl = "/tmp/test_uds_socket"; - // Small buffer so capacity-based flush fires frequently - WriterTestHelper::InitializeWriter(WriterType::Unix, unixUrl, 0, 64); + // Small buffer so capacity-based flush fires frequently; interval timer catches the tail + WriterTestHelper::InitializeWriter(WriterType::Unix, unixUrl, 0, 64, 100); constexpr auto numThreads = 8; constexpr auto incrementsPerThread = 20; @@ -162,7 +202,7 @@ TEST_F(WriterWrapperUDSWriterTest, ThreadLocalBufferNoMutexContention) t.join(); } - // Allow time for thread destructor flushes to reach the UDS server + // Allow the interval timer to flush any remaining buffered data std::this_thread::sleep_for(std::chrono::milliseconds(300)); auto msgs = get_uds_messages(); diff --git a/libs/writer/writer_wrapper/writer.cpp b/libs/writer/writer_wrapper/writer.cpp index 4d81d33..61563c9 100644 --- a/libs/writer/writer_wrapper/writer.cpp +++ b/libs/writer/writer_wrapper/writer.cpp @@ -36,12 +36,24 @@ Writer::ThreadLocalBuffer::~ThreadLocalBuffer() Writer::~Writer() { auto& instance = GetInstance(); - instance.shutdown.store(true); + + // Signal all background threads to stop + { + std::lock_guard lock(instance.shutdownMutex); + instance.shutdown.store(true); + } + instance.cv_shutdown.notify_all(); + + if (instance.flushTimerThread.joinable()) + { + instance.flushTimerThread.join(); + } + this->Close(); } void Writer::Initialize(WriterType type, const std::string& param, int port, - unsigned int bufferSize) + unsigned int bufferSize, unsigned int flushIntervalMs) { auto& instance = GetInstance(); @@ -66,20 +78,37 @@ void Writer::Initialize(WriterType type, const std::string& param, int port, } instance.m_currentType = type; - instance.shutdown.store(false); - instance.bufferingEnabled = false; - // Clear thread-local buffer registry from any previous init + // Stop any pre-existing timer thread before re-initialising + { + std::lock_guard lock(instance.shutdownMutex); + instance.shutdown.store(true); + } + instance.cv_shutdown.notify_all(); + if (instance.flushTimerThread.joinable()) + { + instance.flushTimerThread.join(); + } + // Clear the thread-local buffer registry from any previous init { std::lock_guard regLock(instance.registryMutex); instance.threadBufferRegistry.clear(); } + instance.shutdown.store(false); + instance.bufferingEnabled = false; - if (bufferSize > 0) + if (bufferSize > 0 || flushIntervalMs > 0) { instance.bufferingEnabled = true; instance.bufferSize = bufferSize; instance.writeImpl = &Writer::ThreadLocalBufferedWrite; + + if (flushIntervalMs > 0) + { + instance.flushInterval = std::chrono::milliseconds(flushIntervalMs); + instance.flushTimerThread = std::thread(&Writer::FlushTimerThread, &instance); + Logger::info("Writer interval flush enabled: {}ms", flushIntervalMs); + } } else { @@ -99,6 +128,63 @@ void Writer::TryToSend(const std::string& message) instance.m_impl->Write(message); } +void Writer::FlushTimerThread() +{ + auto& instance = GetInstance(); + + while (true) + { + // Sleep for the flush interval, or wake early on shutdown + { + std::unique_lock lock(instance.shutdownMutex); + instance.cv_shutdown.wait_for(lock, instance.flushInterval, + [&instance] { return instance.shutdown.load(); }); + } + + if (instance.shutdown.load()) + { + break; + } + + // Collect live buffers (clean up expired weak_ptrs in the same pass) + std::vector> buffersToFlush; + { + std::lock_guard regLock(instance.registryMutex); + for (auto it = instance.threadBufferRegistry.begin(); + it != instance.threadBufferRegistry.end();) + { + if (auto buf = it->lock()) + { + buffersToFlush.push_back(std::move(buf)); + ++it; + } + else + { + it = instance.threadBufferRegistry.erase(it); + } + } + } + + // Drain each thread-local buffer + for (auto& buf : buffersToFlush) + { + std::string toSend; + { + std::lock_guard lock(buf->mutex); + if (!buf->data.empty()) + { + toSend = std::move(buf->data); + buf->data.clear(); + } + } + if (!toSend.empty()) + { + instance.TryToSend(toSend); + } + } + } +} + void Writer::ThreadLocalBufferedWrite(const std::string& message) { auto& instance = GetInstance(); @@ -118,7 +204,7 @@ void Writer::ThreadLocalBufferedWrite(const std::string& message) tl_buffer->data.push_back(NEW_LINE); // Capacity-based flush: drain when the thread-local buffer is full - if (tl_buffer->data.size() >= instance.bufferSize) + if (instance.bufferSize > 0 && tl_buffer->data.size() >= instance.bufferSize) { toSend = std::move(tl_buffer->data); tl_buffer->data.clear(); diff --git a/libs/writer/writer_wrapper/writer.h b/libs/writer/writer_wrapper/writer.h index 92b8ae2..2a64c98 100644 --- a/libs/writer/writer_wrapper/writer.h +++ b/libs/writer/writer_wrapper/writer.h @@ -3,9 +3,12 @@ #include #include +#include +#include #include #include #include +#include #include namespace spectator { @@ -42,7 +45,7 @@ class Writer final : public Singleton Writer() = default; static void Initialize(WriterType type, const std::string& param = "", int port = 0, - unsigned int bufferSize = 0); + unsigned int bufferSize = 0, unsigned int flushIntervalMs = 0); static void Write(const std::string& message); @@ -50,6 +53,9 @@ class Writer final : public Singleton void NonBufferedWrite(const std::string& message); + // Background thread: periodically drains all thread-local buffers + void FlushTimerThread(); + void TryToSend(const std::string& message); void Close(); @@ -71,7 +77,14 @@ class Writer final : public Singleton std::mutex registryMutex; std::vector> threadBufferRegistry; + // Interval flush + std::chrono::milliseconds flushInterval{0}; + std::thread flushTimerThread; + + // Shutdown coordination std::atomic shutdown{false}; + std::mutex shutdownMutex; + std::condition_variable cv_shutdown; }; } // namespace spectator diff --git a/libs/writer/writer_wrapper/writer_test_helper.h b/libs/writer/writer_wrapper/writer_test_helper.h index b43cc81..0559843 100644 --- a/libs/writer/writer_wrapper/writer_test_helper.h +++ b/libs/writer/writer_wrapper/writer_test_helper.h @@ -15,9 +15,9 @@ class WriterTestHelper public: // Initialize the Writer for testing purposes static void InitializeWriter(WriterType type, const std::string& param = "", int port = 0, - unsigned int bufferSize = 0) + unsigned int bufferSize = 0, unsigned int flushIntervalMs = 0) { - Writer::Initialize(type, param, port, bufferSize); + Writer::Initialize(type, param, port, bufferSize, flushIntervalMs); } // Get the Writer's implementation for testing purposes diff --git a/spectator/registry.cpp b/spectator/registry.cpp index 820c82c..d9cf27b 100644 --- a/spectator/registry.cpp +++ b/spectator/registry.cpp @@ -41,19 +41,25 @@ Registry::Registry(const Config& config) : m_config(config) if (config.GetWriterType() == WriterType::Memory) { Logger::info("Registry initializing Memory Writer"); - Writer::Initialize(config.GetWriterType(), "", 0, this->m_config.GetWriterBufferSize()); + Writer::Initialize(config.GetWriterType(), "", 0, + this->m_config.GetWriterBufferSize(), + this->m_config.GetFlushIntervalMs()); } else if (config.GetWriterType() == WriterType::UDP) { auto [ip, port] = ParseUdpAddress(this->m_config.GetWriterLocation()); Logger::info("Registry initializing UDP Writer at {}:{}", ip, port); - Writer::Initialize(config.GetWriterType(), ip, port, this->m_config.GetWriterBufferSize()); + Writer::Initialize(config.GetWriterType(), ip, port, + this->m_config.GetWriterBufferSize(), + this->m_config.GetFlushIntervalMs()); } else if (config.GetWriterType() == WriterType::Unix) { auto socketPath = ParseUnixAddress(this->m_config.GetWriterLocation()); Logger::info("Registry initializing UDS Writer at {}", socketPath); - Writer::Initialize(config.GetWriterType(), socketPath, 0, this->m_config.GetWriterBufferSize()); + Writer::Initialize(config.GetWriterType(), socketPath, 0, + this->m_config.GetWriterBufferSize(), + this->m_config.GetFlushIntervalMs()); } }