Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions include/datadog/telemetry/telemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,24 @@ void send_configuration_change();
void capture_configuration_change(
const std::vector<tracing::ConfigMetadata>& new_configuration);

/// The `log` namespace provides functions for reporting logs.
namespace log {
/// Report internal warning message to Datadog.
///
/// @param message The warning message to log.
void report_warning_log(std::string message);
void warning(std::string message);

/// Report internal error message to Datadog.
///
/// @param message The error message.
void report_error_log(std::string message);
void error(std::string message);

/// Report internal error message to Datadog.
///
/// @param message The error message.
/// @param stacktrace Stacktrace leading to the error.
void report_error_log(std::string message, std::string stacktrace);
void error(std::string message, std::string stacktrace);
} // namespace log

/// The `counter` namespace provides functions to track values.
/// Counters can be useful for tracking the total number of an event occurring
Expand Down
8 changes: 5 additions & 3 deletions src/datadog/telemetry/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,24 @@ void capture_configuration_change(
instance());
}

void report_warning_log(std::string message) {
namespace log {
void warning(std::string message) {
std::visit(details::Overload{
[&](Telemetry& telemetry) { telemetry.log_warning(message); },
[](NoopTelemetry) {},
},
instance());
}

void report_error_log(std::string message) {
void error(std::string message) {
std::visit(details::Overload{
[&](Telemetry& telemetry) { telemetry.log_error(message); },
[](NoopTelemetry) {},
},
instance());
}

void report_error_log(std::string message, std::string stacktrace) {
void error(std::string message, std::string stacktrace) {
std::visit(details::Overload{
[&](Telemetry& telemetry) {
telemetry.log_error(message, stacktrace);
Expand All @@ -100,6 +101,7 @@ void report_error_log(std::string message, std::string stacktrace) {
},
instance());
}
} // namespace log

namespace counter {
void increment(const Counter& counter) {
Expand Down
44 changes: 23 additions & 21 deletions src/datadog/telemetry/telemetry_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,21 @@ std::string to_string(datadog::tracing::ConfigName name) {
std::abort();
}

nlohmann::json encode_log(const telemetry::LogMessage& log) {
auto encoded = nlohmann::json{
{"message", log.message},
{"level", to_string(log.level)},
{"tracer_time", log.timestamp},
};
if (log.stacktrace) {
encoded.emplace("stack_trace", *log.stacktrace);
nlohmann::json encode_logs(const std::vector<telemetry::LogMessage>& logs) {
auto encoded_logs = nlohmann::json::array();
for (auto& log : logs) {
auto encoded = nlohmann::json{
{"message", log.message},
{"level", to_string(log.level)},
{"tracer_time", log.timestamp},
};
if (log.stacktrace) {
encoded.emplace("stack_trace", *log.stacktrace);
}

encoded_logs.emplace_back(std::move(encoded));
}
return encoded;
return encoded_logs;
}

std::string_view to_string(details::MetricType type) {
Expand Down Expand Up @@ -371,13 +376,14 @@ std::string Telemetry::heartbeat_and_telemetry() {
batch_payloads.emplace_back(std::move(distributions_json));
}

if (!logs_.empty()) {
auto encoded_logs = nlohmann::json::array();
for (const auto& log : logs_) {
auto encoded = encode_log(log);
encoded_logs.emplace_back(std::move(encoded));
}
std::vector<telemetry::LogMessage> old_logs;
{
std::lock_guard l{log_mutex_};
std::swap(old_logs, logs_);
}

if (!old_logs.empty()) {
auto encoded_logs = encode_logs(old_logs);
assert(!encoded_logs.empty());

auto logs_payload = nlohmann::json::object({
Expand Down Expand Up @@ -434,12 +440,7 @@ std::string Telemetry::app_closing() {
}

if (!logs_.empty()) {
auto encoded_logs = nlohmann::json::array();
for (const auto& log : logs_) {
auto encoded = encode_log(log);
encoded_logs.emplace_back(std::move(encoded));
}

auto encoded_logs = encode_logs(logs_);
assert(!encoded_logs.empty());

auto logs_payload = nlohmann::json::object({
Expand Down Expand Up @@ -667,6 +668,7 @@ void Telemetry::log(std::string message, telemetry::LogLevel level,
auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(
clock_().wall.time_since_epoch())
.count();
std::lock_guard l{log_mutex_};
logs_.emplace_back(
telemetry::LogMessage{std::move(message), level, stacktrace, timestamp});
}
Expand Down
1 change: 1 addition & 0 deletions src/datadog/telemetry/telemetry_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class Telemetry final {
/// Configuration
std::vector<tracing::ConfigMetadata> configuration_snapshot_;

std::mutex log_mutex_;
std::vector<telemetry::LogMessage> logs_;

// Track sequence id per payload generated
Expand Down
160 changes: 96 additions & 64 deletions test/telemetry/test_telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,77 +672,109 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") {
}
}
}
}

SECTION("logs serialization") {
SECTION("log level is correct") {
struct TestCase {
std::string_view name;
std::string input;
Optional<std::string> stacktrace;
std::function<void(Telemetry&, const std::string&,
const Optional<std::string>& stacktrace)>
apply;
std::string expected_log_level;
};

auto test_case = GENERATE(values<TestCase>({
{
"warning log",
"This is a warning log!",
nullopt,
[](Telemetry& telemetry, const std::string& input,
const Optional<std::string>&) {
telemetry.log_warning(input);
},
"WARNING",
},
{
"error log",
"This is an error log!",
nullopt,
[](Telemetry& telemetry, const std::string& input,
const Optional<std::string>&) {
telemetry.log_error(input);
},
"ERROR",
},
{
"error log with stacktrace",
"This is an error log with a fake stacktrace!",
"error here\nthen here\nfinally here\n",
[](Telemetry& telemetry, const std::string& input,
Optional<std::string> stacktrace) {
telemetry.log_error(input, *stacktrace);
},
"ERROR",
},
}));

CAPTURE(test_case.name);
SECTION("logs reporting") {
SECTION("log level is correct") {
struct TestCase {
std::string_view name;
std::string input;
Optional<std::string> stacktrace;
std::function<void(Telemetry&, const std::string&,
const Optional<std::string>& stacktrace)>
apply;
std::string expected_log_level;
};

client->clear();
test_case.apply(telemetry, test_case.input, test_case.stacktrace);
scheduler->trigger_heartbeat();
auto test_case = GENERATE(values<TestCase>({
{
"warning log",
"This is a warning log!",
nullopt,
[](Telemetry& telemetry, const std::string& input,
const Optional<std::string>&) {
telemetry.log_warning(input);
},
"WARNING",
},
{
"error log",
"This is an error log!",
nullopt,
[](Telemetry& telemetry, const std::string& input,
const Optional<std::string>&) { telemetry.log_error(input); },
"ERROR",
},
{
"error log with stacktrace",
"This is an error log with a fake stacktrace!",
"error here\nthen here\nfinally here\n",
[](Telemetry& telemetry, const std::string& input,
Optional<std::string> stacktrace) {
telemetry.log_error(input, *stacktrace);
},
"ERROR",
},
}));

auto message_batch = nlohmann::json::parse(client->request_body);
REQUIRE(is_valid_telemetry_payload(message_batch));
REQUIRE(message_batch["payload"].size() == 2);
CAPTURE(test_case.name);

auto logs_message = message_batch["payload"][1];
REQUIRE(logs_message["request_type"] == "logs");
client->clear();
test_case.apply(telemetry, test_case.input, test_case.stacktrace);
scheduler->trigger_heartbeat();

auto logs_payload = logs_message["payload"]["logs"];
REQUIRE(logs_payload.size() == 1);
CHECK(logs_payload[0]["level"] == test_case.expected_log_level);
CHECK(logs_payload[0]["message"] == test_case.input);
CHECK(logs_payload[0].contains("tracer_time"));
auto message_batch = nlohmann::json::parse(client->request_body);
REQUIRE(is_valid_telemetry_payload(message_batch));
REQUIRE(message_batch["payload"].size() == 2);

if (test_case.stacktrace) {
CHECK(logs_payload[0]["stack_trace"] == test_case.stacktrace);
} else {
CHECK(logs_payload[0].contains("stack_trace") == false);
}
auto logs_message = message_batch["payload"][1];
REQUIRE(logs_message["request_type"] == "logs");

auto logs_payload = logs_message["payload"]["logs"];
REQUIRE(logs_payload.size() == 1);
CHECK(logs_payload[0]["level"] == test_case.expected_log_level);
CHECK(logs_payload[0]["message"] == test_case.input);
CHECK(logs_payload[0]["tracer_time"] == 1672484400);

if (test_case.stacktrace) {
CHECK(logs_payload[0]["stack_trace"] == test_case.stacktrace);
} else {
CHECK(logs_payload[0].contains("stack_trace") == false);
}

// Make sure the next heartbeat doesn't contains counters if no
// datapoint has been incremented, decremented or set.
client->clear();
scheduler->trigger_heartbeat();

auto message_batch2 = nlohmann::json::parse(client->request_body);
REQUIRE(is_valid_telemetry_payload(message_batch2) == true);
REQUIRE(message_batch2["payload"].size() == 1);

auto payload2 = message_batch["payload"][0];
CHECK(payload2["request_type"] == "app-heartbeat");
}

SECTION("dtor sends logs in `app-closing` message") {
{
Telemetry tmp_telemetry{*finalize_config(), logger, client,
scheduler, *url, clock};
tmp_telemetry.log_warning("Be careful!");
client->clear();
}

auto message_batch = nlohmann::json::parse(client->request_body);
REQUIRE(is_valid_telemetry_payload(message_batch));
REQUIRE(message_batch["payload"].size() == 2);

auto logs_message = message_batch["payload"][1];
REQUIRE(logs_message["request_type"] == "logs");

auto logs_payload = logs_message["payload"]["logs"];
REQUIRE(logs_payload.size() == 1);
CHECK(logs_payload[0]["level"] == "WARNING");
CHECK(logs_payload[0]["message"] == "Be careful!");
CHECK(logs_payload[0]["tracer_time"] == 1672484400);
}
}
}
Expand Down