Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Config
const std::string& GetWriterLocation() const noexcept { return m_writerConfig.GetLocation(); }
const WriterType& GetWriterType() const noexcept { return m_writerConfig.GetType(); }
const unsigned int GetWriterBufferSize() const noexcept { return m_writerConfig.GetBufferSize(); }
const unsigned int GetFlushIntervalMs() const noexcept { return m_writerConfig.GetFlushIntervalMs(); }


private:
Expand Down
7 changes: 7 additions & 0 deletions libs/writer/writer_config/writer_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,11 @@ WriterConfig::WriterConfig(const std::string& type, const unsigned int bufferSiz
Logger::info("WriterConfig buffering enabled with size: {}", m_bufferSize);
}

WriterConfig::WriterConfig(const std::string& type, const unsigned int bufferSize, const unsigned int flushIntervalMs)
: WriterConfig(type, bufferSize) // Constructor delegation
{
m_flushIntervalMs = flushIntervalMs;
Logger::info("WriterConfig flush interval enabled with interval: {}ms", m_flushIntervalMs);
}

} // namespace spectator
3 changes: 3 additions & 0 deletions libs/writer/writer_config/writer_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ class WriterConfig
public:
explicit WriterConfig(const std::string& type);
WriterConfig(const std::string& type, unsigned int bufferSize);
WriterConfig(const std::string& type, unsigned int bufferSize, unsigned int flushIntervalMs);

[[nodiscard]] const WriterType& GetType() const noexcept { return m_type; }
[[nodiscard]] unsigned int GetBufferSize() const noexcept { return m_bufferSize; }
[[nodiscard]] const std::string& GetLocation() const noexcept { return m_location; }
[[nodiscard]] unsigned int GetFlushIntervalMs() const noexcept { return m_flushIntervalMs; }

private:
WriterType m_type;
std::string m_location;
unsigned int m_bufferSize = 0;
unsigned int m_flushIntervalMs = 0;
};

} // namespace spectator
100 changes: 100 additions & 0 deletions libs/writer/writer_wrapper/test_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,106 @@ TEST_F(WriterWrapperUDSWriterTest, MultithreadedWrite)
EXPECT_EQ(actualIncrements, expectedIncrements);
}

// Verify that metrics written infrequently are still flushed via the interval mechanism
// even when the buffer is not full.
TEST_F(WriterWrapperUDSWriterTest, IntervalFlushWithIdleThread)
{
Logger::info("Starting interval flush test...");

// Use a large buffer so capacity-based flush never triggers,
// but set a short flush interval so metrics get sent in time.
const std::string unixUrl = "/tmp/test_uds_socket";
constexpr unsigned int largeBuffer = 4096;
constexpr unsigned int flushIntervalMs = 200;
WriterTestHelper::InitializeWriter(WriterType::Unix, unixUrl, 0, largeBuffer, flushIntervalMs);

MeterId meterId("interval.test.counter");
Counter counter(meterId);
counter.Increment();

// Wait for at least two flush intervals to pass
std::this_thread::sleep_for(std::chrono::milliseconds(flushIntervalMs * 3));

auto msgs = get_uds_messages();
EXPECT_FALSE(msgs.empty()) << "Expected metrics to be flushed by interval timer";

std::regex counter_regex(R"(c:interval\.test\.counter:1.000000)");
bool found = false;
for (const auto& msg : msgs)
{
std::stringstream ss(msg);
std::string line;
while (std::getline(ss, line))
{
if (std::regex_match(line, counter_regex))
{
found = true;
}
}
}
EXPECT_TRUE(found) << "Expected interval.test.counter metric to be received";
}

// Verify that multiple worker threads do not block each other: each thread uses its own
// local buffer, flushing by capacity. The interval timer catches any tail data that
// does not fill the buffer before the threads exit.
TEST_F(WriterWrapperUDSWriterTest, ThreadLocalBufferNoMutexContention)
{
Logger::info("Starting thread-local buffer contention test...");

const std::string unixUrl = "/tmp/test_uds_socket";
// Small buffer so capacity-based flush fires frequently; interval timer catches the tail
WriterTestHelper::InitializeWriter(WriterType::Unix, unixUrl, 0, 64, 100);

constexpr auto numThreads = 8;
constexpr auto incrementsPerThread = 20;

auto worker = [&](int threadId)
{
std::string name = fmt::format("tl.counter.{}", threadId);
MeterId meterId(name);
Counter counter(meterId);
for (int j = 0; j < incrementsPerThread; j++)
{
counter.Increment();
}
};

std::vector<std::thread> threads;
for (int i = 0; i < numThreads; i++)
{
threads.emplace_back(worker, i);
}
for (auto& t : threads)
{
t.join();
}

// Allow the interval timer to flush any remaining buffered data
std::this_thread::sleep_for(std::chrono::milliseconds(300));

auto msgs = get_uds_messages();
EXPECT_FALSE(msgs.empty());

int totalIncrements = 0;
std::regex counter_regex(R"(c:tl\.counter\.\d+:1.000000)");
for (const auto& msg : msgs)
{
std::stringstream ss(msg);
std::string line;
while (std::getline(ss, line))
{
if (!line.empty())
{
EXPECT_TRUE(std::regex_match(line, counter_regex))
<< "Unexpected line: " << line;
totalIncrements++;
}
}
}
EXPECT_EQ(totalIncrements, numThreads * incrementsPerThread);
}

// This is a unique test that attempts to create messages of exactly 10 bytes in size
// and writes to a buffer of size 10 bytes from multiple threads. The NDrive team discovered
// a deadlock scenario in this specific case where the buffer size matched the message size
Expand Down
180 changes: 136 additions & 44 deletions libs/writer/writer_wrapper/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,55 @@ namespace spectator {

static constexpr auto NEW_LINE = '\n';

// Each worker thread owns a shared_ptr to its buffer.
// The registry in Writer holds weak_ptrs so threads can exit freely.
thread_local std::shared_ptr<Writer::ThreadLocalBuffer> tl_buffer;

// Flush any data remaining in the buffer when the thread exits.
Writer::ThreadLocalBuffer::~ThreadLocalBuffer()
{
if (data.empty())
{
return;
}
try
{
auto& instance = Writer::GetInstance();
if (instance.m_impl)
{
instance.TryToSend(data);
}
}
catch (...)
{
// Best-effort: ignore errors during thread teardown
}
}

Writer::~Writer()
{
auto& instance = GetInstance();

if (instance.bufferingEnabled)
// Signal all background threads to stop
{
std::lock_guard<std::mutex> lock(instance.shutdownMutex);
instance.shutdown.store(true);
instance.cv_receiver.notify_all();
instance.cv_sender.notify_all();
if (instance.sendingThread.joinable()) {
instance.sendingThread.join();
}
}
instance.cv_shutdown.notify_all();

if (instance.flushTimerThread.joinable())
{
instance.flushTimerThread.join();
}

this->Close();
}

void Writer::Initialize(WriterType type, const std::string& param, int port, unsigned int bufferSize)
void Writer::Initialize(WriterType type, const std::string& param, int port,
unsigned int bufferSize, unsigned int flushIntervalMs)
{
// Get the singleton instance directly
auto& instance = GetInstance();

// Create the new writer based on type
try
{
switch (type)
Expand All @@ -51,19 +78,40 @@ void Writer::Initialize(WriterType type, const std::string& param, int port, uns
}

instance.m_currentType = type;

if (bufferSize > 0)

// Stop any pre-existing timer thread before re-initialising
{
std::lock_guard<std::mutex> lock(instance.shutdownMutex);
instance.shutdown.store(true);
}
instance.cv_shutdown.notify_all();
if (instance.flushTimerThread.joinable())
{
instance.flushTimerThread.join();
}
// Clear the thread-local buffer registry from any previous init
{
std::lock_guard<std::mutex> regLock(instance.registryMutex);
instance.threadBufferRegistry.clear();
}
instance.shutdown.store(false);
instance.bufferingEnabled = false;

if (bufferSize > 0 || flushIntervalMs > 0)
{
instance.bufferingEnabled = true;
instance.bufferSize = bufferSize;
instance.buffer.reserve(bufferSize);
instance.writeImpl = &Writer::BufferedWrite;
// Create a thread with proper binding to the instance method
instance.sendingThread = std::thread(&Writer::ThreadSend, &instance);
instance.writeImpl = &Writer::ThreadLocalBufferedWrite;

if (flushIntervalMs > 0)
{
instance.flushInterval = std::chrono::milliseconds(flushIntervalMs);
instance.flushTimerThread = std::thread(&Writer::FlushTimerThread, &instance);
Logger::info("Writer interval flush enabled: {}ms", flushIntervalMs);
}
}
else
{
// Explicitly set to non-buffered if buffer size is 0
instance.writeImpl = &Writer::NonBufferedWrite;
}
}
Expand All @@ -80,52 +128,97 @@ void Writer::TryToSend(const std::string& message)
instance.m_impl->Write(message);
}

void Writer::ThreadSend()
void Writer::FlushTimerThread()
{
auto& instance = GetInstance();
std::string message{};
while (instance.shutdown.load() == false)

while (true)
{
// Sleep for the flush interval, or wake early on shutdown
{
std::unique_lock<std::mutex> lock(instance.shutdownMutex);
instance.cv_shutdown.wait_for(lock, instance.flushInterval,
[&instance] { return instance.shutdown.load(); });
}

if (instance.shutdown.load())
{
break;
}

// Collect live buffers (clean up expired weak_ptrs in the same pass)
std::vector<std::shared_ptr<ThreadLocalBuffer>> buffersToFlush;
{
std::unique_lock<std::mutex> lock(instance.writeMutex);
instance.cv_sender.wait(
lock, [&instance] { return instance.buffer.size() >= instance.bufferSize || instance.shutdown.load(); });
if (instance.shutdown.load() == true)
std::lock_guard<std::mutex> regLock(instance.registryMutex);
for (auto it = instance.threadBufferRegistry.begin();
it != instance.threadBufferRegistry.end();)
{
return;
if (auto buf = it->lock())
{
buffersToFlush.push_back(std::move(buf));
++it;
}
else
{
it = instance.threadBufferRegistry.erase(it);
}
}
}

// Drain each thread-local buffer
for (auto& buf : buffersToFlush)
{
std::string toSend;
{
std::lock_guard<std::mutex> lock(buf->mutex);
if (!buf->data.empty())
{
toSend = std::move(buf->data);
buf->data.clear();
}
}
if (!toSend.empty())
{
instance.TryToSend(toSend);
}
message = std::move(instance.buffer);
instance.buffer = std::string();
instance.buffer.reserve(instance.bufferSize);
}
instance.cv_receiver.notify_one();
instance.TryToSend(message);
}
}

void Writer::BufferedWrite(const std::string& message)
void Writer::ThreadLocalBufferedWrite(const std::string& message)
{
auto& instance = GetInstance();

// Initialise this thread's buffer on first use and register it
if (!tl_buffer)
{
std::unique_lock<std::mutex> lock(instance.writeMutex);
// TODO: Optimize memory alloc to not exceed allocated size
instance.cv_receiver.wait(
lock, [&instance] { return instance.buffer.size() < instance.bufferSize || instance.shutdown.load(); });
if (instance.shutdown.load())
tl_buffer = std::make_shared<ThreadLocalBuffer>();
std::lock_guard<std::mutex> regLock(instance.registryMutex);
instance.threadBufferRegistry.push_back(tl_buffer);
}

std::string toSend;
{
std::lock_guard<std::mutex> lock(tl_buffer->mutex);
tl_buffer->data.append(message);
tl_buffer->data.push_back(NEW_LINE);

// Capacity-based flush: drain when the thread-local buffer is full
if (instance.bufferSize > 0 && tl_buffer->data.size() >= instance.bufferSize)
{
Logger::info("Write operation aborted due to shutdown signal");
return;
toSend = std::move(tl_buffer->data);
tl_buffer->data.clear();
}
instance.buffer.append(message);
instance.buffer.push_back(NEW_LINE);
}
instance.buffer.size() >= instance.bufferSize ? instance.cv_sender.notify_one() : instance.cv_receiver.notify_one();

if (!toSend.empty())
{
instance.TryToSend(toSend);
}
}

void Writer::NonBufferedWrite(const std::string& message)
{
// Since this is a non-static method, we're already operating on an instance
// and can call the instance method directly
this->TryToSend(message + NEW_LINE);
}

Expand All @@ -139,7 +232,6 @@ void Writer::Write(const std::string& message)
return;
}

// Call the member function using the pointer-to-member syntax
(instance.*instance.writeImpl)(message);
}

Expand All @@ -163,4 +255,4 @@ void Writer::Close()
}
}

} // namespace spectator
} // namespace spectator
Loading
Loading