diff --git a/libs/config/config.h b/libs/config/config.h index 382cc27..25a7e93 100644 --- a/libs/config/config.h +++ b/libs/config/config.h @@ -23,6 +23,7 @@ class Config const std::string& GetWriterLocation() const noexcept { return m_writerConfig.GetLocation(); } const WriterType& GetWriterType() const noexcept { return m_writerConfig.GetType(); } const unsigned int GetWriterBufferSize() const noexcept { return m_writerConfig.GetBufferSize(); } + const unsigned int GetFlushIntervalMs() const noexcept { return m_writerConfig.GetFlushIntervalMs(); } private: diff --git a/libs/writer/writer_config/writer_config.cpp b/libs/writer/writer_config/writer_config.cpp index 6f331ab..d503198 100644 --- a/libs/writer/writer_config/writer_config.cpp +++ b/libs/writer/writer_config/writer_config.cpp @@ -56,4 +56,11 @@ WriterConfig::WriterConfig(const std::string& type, const unsigned int bufferSiz Logger::info("WriterConfig buffering enabled with size: {}", m_bufferSize); } +WriterConfig::WriterConfig(const std::string& type, const unsigned int bufferSize, const unsigned int flushIntervalMs) + : WriterConfig(type, bufferSize) // Constructor delegation +{ + m_flushIntervalMs = flushIntervalMs; + Logger::info("WriterConfig flush interval enabled with interval: {}ms", m_flushIntervalMs); +} + } // namespace spectator \ No newline at end of file diff --git a/libs/writer/writer_config/writer_config.h b/libs/writer/writer_config/writer_config.h index 737cfb5..d1f0055 100644 --- a/libs/writer/writer_config/writer_config.h +++ b/libs/writer/writer_config/writer_config.h @@ -12,15 +12,18 @@ class WriterConfig public: explicit WriterConfig(const std::string& type); WriterConfig(const std::string& type, unsigned int bufferSize); + WriterConfig(const std::string& type, unsigned int bufferSize, unsigned int flushIntervalMs); [[nodiscard]] const WriterType& GetType() const noexcept { return m_type; } [[nodiscard]] unsigned int GetBufferSize() const noexcept { return m_bufferSize; } [[nodiscard]] const std::string& GetLocation() const noexcept { return m_location; } + [[nodiscard]] unsigned int GetFlushIntervalMs() const noexcept { return m_flushIntervalMs; } private: WriterType m_type; std::string m_location; unsigned int m_bufferSize = 0; + unsigned int m_flushIntervalMs = 0; }; } // namespace spectator \ No newline at end of file diff --git a/libs/writer/writer_wrapper/test_writer.cpp b/libs/writer/writer_wrapper/test_writer.cpp index 68d743a..25c1a01 100644 --- a/libs/writer/writer_wrapper/test_writer.cpp +++ b/libs/writer/writer_wrapper/test_writer.cpp @@ -127,6 +127,106 @@ TEST_F(WriterWrapperUDSWriterTest, MultithreadedWrite) EXPECT_EQ(actualIncrements, expectedIncrements); } +// Verify that metrics written infrequently are still flushed via the interval mechanism +// even when the buffer is not full. +TEST_F(WriterWrapperUDSWriterTest, IntervalFlushWithIdleThread) +{ + Logger::info("Starting interval flush test..."); + + // Use a large buffer so capacity-based flush never triggers, + // but set a short flush interval so metrics get sent in time. + const std::string unixUrl = "/tmp/test_uds_socket"; + constexpr unsigned int largeBuffer = 4096; + constexpr unsigned int flushIntervalMs = 200; + WriterTestHelper::InitializeWriter(WriterType::Unix, unixUrl, 0, largeBuffer, flushIntervalMs); + + MeterId meterId("interval.test.counter"); + Counter counter(meterId); + counter.Increment(); + + // Wait for at least two flush intervals to pass + std::this_thread::sleep_for(std::chrono::milliseconds(flushIntervalMs * 3)); + + auto msgs = get_uds_messages(); + EXPECT_FALSE(msgs.empty()) << "Expected metrics to be flushed by interval timer"; + + std::regex counter_regex(R"(c:interval\.test\.counter:1.000000)"); + bool found = false; + for (const auto& msg : msgs) + { + std::stringstream ss(msg); + std::string line; + while (std::getline(ss, line)) + { + if (std::regex_match(line, counter_regex)) + { + found = true; + } + } + } + EXPECT_TRUE(found) << "Expected interval.test.counter metric to be received"; +} + +// Verify that multiple worker threads do not block each other: each thread uses its own +// local buffer, flushing by capacity. The interval timer catches any tail data that +// does not fill the buffer before the threads exit. +TEST_F(WriterWrapperUDSWriterTest, ThreadLocalBufferNoMutexContention) +{ + Logger::info("Starting thread-local buffer contention test..."); + + const std::string unixUrl = "/tmp/test_uds_socket"; + // Small buffer so capacity-based flush fires frequently; interval timer catches the tail + WriterTestHelper::InitializeWriter(WriterType::Unix, unixUrl, 0, 64, 100); + + constexpr auto numThreads = 8; + constexpr auto incrementsPerThread = 20; + + auto worker = [&](int threadId) + { + std::string name = fmt::format("tl.counter.{}", threadId); + MeterId meterId(name); + Counter counter(meterId); + for (int j = 0; j < incrementsPerThread; j++) + { + counter.Increment(); + } + }; + + std::vector threads; + for (int i = 0; i < numThreads; i++) + { + threads.emplace_back(worker, i); + } + for (auto& t : threads) + { + t.join(); + } + + // Allow the interval timer to flush any remaining buffered data + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + auto msgs = get_uds_messages(); + EXPECT_FALSE(msgs.empty()); + + int totalIncrements = 0; + std::regex counter_regex(R"(c:tl\.counter\.\d+:1.000000)"); + for (const auto& msg : msgs) + { + std::stringstream ss(msg); + std::string line; + while (std::getline(ss, line)) + { + if (!line.empty()) + { + EXPECT_TRUE(std::regex_match(line, counter_regex)) + << "Unexpected line: " << line; + totalIncrements++; + } + } + } + EXPECT_EQ(totalIncrements, numThreads * incrementsPerThread); +} + // This is a unique test that attempts to create messages of exactly 10 bytes in size // and writes to a buffer of size 10 bytes from multiple threads. The NDrive team discovered // a deadlock scenario in this specific case where the buffer size matched the message size diff --git a/libs/writer/writer_wrapper/writer.cpp b/libs/writer/writer_wrapper/writer.cpp index aac4887..61563c9 100644 --- a/libs/writer/writer_wrapper/writer.cpp +++ b/libs/writer/writer_wrapper/writer.cpp @@ -8,28 +8,55 @@ namespace spectator { static constexpr auto NEW_LINE = '\n'; +// Each worker thread owns a shared_ptr to its buffer. +// The registry in Writer holds weak_ptrs so threads can exit freely. +thread_local std::shared_ptr tl_buffer; + +// Flush any data remaining in the buffer when the thread exits. +Writer::ThreadLocalBuffer::~ThreadLocalBuffer() +{ + if (data.empty()) + { + return; + } + try + { + auto& instance = Writer::GetInstance(); + if (instance.m_impl) + { + instance.TryToSend(data); + } + } + catch (...) + { + // Best-effort: ignore errors during thread teardown + } +} + Writer::~Writer() { auto& instance = GetInstance(); - if (instance.bufferingEnabled) + // Signal all background threads to stop { + std::lock_guard lock(instance.shutdownMutex); instance.shutdown.store(true); - instance.cv_receiver.notify_all(); - instance.cv_sender.notify_all(); - if (instance.sendingThread.joinable()) { - instance.sendingThread.join(); - } } + instance.cv_shutdown.notify_all(); + + if (instance.flushTimerThread.joinable()) + { + instance.flushTimerThread.join(); + } + this->Close(); } -void Writer::Initialize(WriterType type, const std::string& param, int port, unsigned int bufferSize) +void Writer::Initialize(WriterType type, const std::string& param, int port, + unsigned int bufferSize, unsigned int flushIntervalMs) { - // Get the singleton instance directly auto& instance = GetInstance(); - // Create the new writer based on type try { switch (type) @@ -51,19 +78,40 @@ void Writer::Initialize(WriterType type, const std::string& param, int port, uns } instance.m_currentType = type; - - if (bufferSize > 0) + + // Stop any pre-existing timer thread before re-initialising + { + std::lock_guard lock(instance.shutdownMutex); + instance.shutdown.store(true); + } + instance.cv_shutdown.notify_all(); + if (instance.flushTimerThread.joinable()) + { + instance.flushTimerThread.join(); + } + // Clear the thread-local buffer registry from any previous init + { + std::lock_guard regLock(instance.registryMutex); + instance.threadBufferRegistry.clear(); + } + instance.shutdown.store(false); + instance.bufferingEnabled = false; + + if (bufferSize > 0 || flushIntervalMs > 0) { instance.bufferingEnabled = true; instance.bufferSize = bufferSize; - instance.buffer.reserve(bufferSize); - instance.writeImpl = &Writer::BufferedWrite; - // Create a thread with proper binding to the instance method - instance.sendingThread = std::thread(&Writer::ThreadSend, &instance); + instance.writeImpl = &Writer::ThreadLocalBufferedWrite; + + if (flushIntervalMs > 0) + { + instance.flushInterval = std::chrono::milliseconds(flushIntervalMs); + instance.flushTimerThread = std::thread(&Writer::FlushTimerThread, &instance); + Logger::info("Writer interval flush enabled: {}ms", flushIntervalMs); + } } else { - // Explicitly set to non-buffered if buffer size is 0 instance.writeImpl = &Writer::NonBufferedWrite; } } @@ -80,52 +128,97 @@ void Writer::TryToSend(const std::string& message) instance.m_impl->Write(message); } -void Writer::ThreadSend() +void Writer::FlushTimerThread() { auto& instance = GetInstance(); - std::string message{}; - while (instance.shutdown.load() == false) + + while (true) { + // Sleep for the flush interval, or wake early on shutdown + { + std::unique_lock lock(instance.shutdownMutex); + instance.cv_shutdown.wait_for(lock, instance.flushInterval, + [&instance] { return instance.shutdown.load(); }); + } + + if (instance.shutdown.load()) + { + break; + } + + // Collect live buffers (clean up expired weak_ptrs in the same pass) + std::vector> buffersToFlush; { - std::unique_lock lock(instance.writeMutex); - instance.cv_sender.wait( - lock, [&instance] { return instance.buffer.size() >= instance.bufferSize || instance.shutdown.load(); }); - if (instance.shutdown.load() == true) + std::lock_guard regLock(instance.registryMutex); + for (auto it = instance.threadBufferRegistry.begin(); + it != instance.threadBufferRegistry.end();) { - return; + if (auto buf = it->lock()) + { + buffersToFlush.push_back(std::move(buf)); + ++it; + } + else + { + it = instance.threadBufferRegistry.erase(it); + } + } + } + + // Drain each thread-local buffer + for (auto& buf : buffersToFlush) + { + std::string toSend; + { + std::lock_guard lock(buf->mutex); + if (!buf->data.empty()) + { + toSend = std::move(buf->data); + buf->data.clear(); + } + } + if (!toSend.empty()) + { + instance.TryToSend(toSend); } - message = std::move(instance.buffer); - instance.buffer = std::string(); - instance.buffer.reserve(instance.bufferSize); } - instance.cv_receiver.notify_one(); - instance.TryToSend(message); } } -void Writer::BufferedWrite(const std::string& message) +void Writer::ThreadLocalBufferedWrite(const std::string& message) { auto& instance = GetInstance(); + + // Initialise this thread's buffer on first use and register it + if (!tl_buffer) { - std::unique_lock lock(instance.writeMutex); - // TODO: Optimize memory alloc to not exceed allocated size - instance.cv_receiver.wait( - lock, [&instance] { return instance.buffer.size() < instance.bufferSize || instance.shutdown.load(); }); - if (instance.shutdown.load()) + tl_buffer = std::make_shared(); + std::lock_guard regLock(instance.registryMutex); + instance.threadBufferRegistry.push_back(tl_buffer); + } + + std::string toSend; + { + std::lock_guard lock(tl_buffer->mutex); + tl_buffer->data.append(message); + tl_buffer->data.push_back(NEW_LINE); + + // Capacity-based flush: drain when the thread-local buffer is full + if (instance.bufferSize > 0 && tl_buffer->data.size() >= instance.bufferSize) { - Logger::info("Write operation aborted due to shutdown signal"); - return; + toSend = std::move(tl_buffer->data); + tl_buffer->data.clear(); } - instance.buffer.append(message); - instance.buffer.push_back(NEW_LINE); } - instance.buffer.size() >= instance.bufferSize ? instance.cv_sender.notify_one() : instance.cv_receiver.notify_one(); + + if (!toSend.empty()) + { + instance.TryToSend(toSend); + } } void Writer::NonBufferedWrite(const std::string& message) { - // Since this is a non-static method, we're already operating on an instance - // and can call the instance method directly this->TryToSend(message + NEW_LINE); } @@ -139,7 +232,6 @@ void Writer::Write(const std::string& message) return; } - // Call the member function using the pointer-to-member syntax (instance.*instance.writeImpl)(message); } @@ -163,4 +255,4 @@ void Writer::Close() } } -} // namespace spectator \ No newline at end of file +} // namespace spectator diff --git a/libs/writer/writer_wrapper/writer.h b/libs/writer/writer_wrapper/writer.h index 82407ca..2a64c98 100644 --- a/libs/writer/writer_wrapper/writer.h +++ b/libs/writer/writer_wrapper/writer.h @@ -3,14 +3,27 @@ #include #include +#include +#include #include +#include #include +#include +#include namespace spectator { class Writer final : public Singleton { public: + // Per-thread buffer: each worker thread gets its own; avoids global mutex on writes. + struct ThreadLocalBuffer + { + std::string data; + std::mutex mutex; + ~ThreadLocalBuffer(); // Flushes any remaining data on thread exit + }; + ~Writer() override; private: @@ -31,15 +44,17 @@ class Writer final : public Singleton // Private constructor - enforces singleton pattern Writer() = default; - static void Initialize(WriterType type, const std::string& param = "", int port = 0, unsigned int bufferSize = 0); + static void Initialize(WriterType type, const std::string& param = "", int port = 0, + unsigned int bufferSize = 0, unsigned int flushIntervalMs = 0); static void Write(const std::string& message); - void BufferedWrite(const std::string& message); + void ThreadLocalBufferedWrite(const std::string& message); void NonBufferedWrite(const std::string& message); - void ThreadSend(); + // Background thread: periodically drains all thread-local buffers + void FlushTimerThread(); void TryToSend(const std::string& message); @@ -50,22 +65,26 @@ class Writer final : public Singleton static WriterType GetWriterType() { return GetInstance().m_currentType; } std::unique_ptr m_impl; - WriterType m_currentType = WriterType::Memory; // Default type + WriterType m_currentType = WriterType::Memory; bool bufferingEnabled = false; unsigned int bufferSize = 0; - std::string buffer{}; - - // Function pointer for write strategy - member function pointer + + // Function pointer for write strategy using WriteFunction = void (Writer::*)(const std::string&); - WriteFunction writeImpl = &Writer::NonBufferedWrite; // Default to non-buffered + WriteFunction writeImpl = &Writer::NonBufferedWrite; + + // Registry of all active thread-local buffers (weak_ptrs to avoid keeping threads alive) + std::mutex registryMutex; + std::vector> threadBufferRegistry; + // Interval flush + std::chrono::milliseconds flushInterval{0}; + std::thread flushTimerThread; - // multithreading writes - std::mutex writeMutex; - std::thread sendingThread; - std::condition_variable cv_receiver; - std::condition_variable cv_sender; + // Shutdown coordination std::atomic shutdown{false}; + std::mutex shutdownMutex; + std::condition_variable cv_shutdown; }; -} // namespace spectator \ No newline at end of file +} // namespace spectator diff --git a/libs/writer/writer_wrapper/writer_test_helper.h b/libs/writer/writer_wrapper/writer_test_helper.h index 972a00b..0559843 100644 --- a/libs/writer/writer_wrapper/writer_test_helper.h +++ b/libs/writer/writer_wrapper/writer_test_helper.h @@ -14,9 +14,10 @@ class WriterTestHelper { public: // Initialize the Writer for testing purposes - static void InitializeWriter(WriterType type, const std::string& param = "", int port = 0, unsigned int bufferSize = 0) + static void InitializeWriter(WriterType type, const std::string& param = "", int port = 0, + unsigned int bufferSize = 0, unsigned int flushIntervalMs = 0) { - Writer::Initialize(type, param, port, bufferSize); + Writer::Initialize(type, param, port, bufferSize, flushIntervalMs); } // Get the Writer's implementation for testing purposes diff --git a/spectator/registry.cpp b/spectator/registry.cpp index 81d394e..d9cf27b 100644 --- a/spectator/registry.cpp +++ b/spectator/registry.cpp @@ -41,19 +41,25 @@ Registry::Registry(const Config& config) : m_config(config) if (config.GetWriterType() == WriterType::Memory) { Logger::info("Registry initializing Memory Writer"); - Writer::Initialize(config.GetWriterType(), "", 0, this->m_config.GetWriterBufferSize()); + Writer::Initialize(config.GetWriterType(), "", 0, + this->m_config.GetWriterBufferSize(), + this->m_config.GetFlushIntervalMs()); } else if (config.GetWriterType() == WriterType::UDP) { auto [ip, port] = ParseUdpAddress(this->m_config.GetWriterLocation()); Logger::info("Registry initializing UDP Writer at {}:{}", ip, port); - Writer::Initialize(config.GetWriterType(), ip, port, this->m_config.GetWriterBufferSize()); + Writer::Initialize(config.GetWriterType(), ip, port, + this->m_config.GetWriterBufferSize(), + this->m_config.GetFlushIntervalMs()); } else if (config.GetWriterType() == WriterType::Unix) { auto socketPath = ParseUnixAddress(this->m_config.GetWriterLocation()); Logger::info("Registry initializing UDS Writer at {}", socketPath); - Writer::Initialize(config.GetWriterType(), socketPath, 0, this->m_config.GetWriterBufferSize()); + Writer::Initialize(config.GetWriterType(), socketPath, 0, + this->m_config.GetWriterBufferSize(), + this->m_config.GetFlushIntervalMs()); } }