Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions rabbitmq/functional_tests/basic_chaos/rabbitmq_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <userver/components/component_context.hpp>
#include <userver/components/minimal_server_component_list.hpp>
#include <userver/concurrent/variable.hpp>
#include <userver/formats/json.hpp>
#include <userver/formats/parse/common_containers.hpp>
#include <userver/formats/serialize/common_containers.hpp>
#include <userver/server/handlers/http_handler_base.hpp>
#include <userver/server/handlers/tests_control.hpp>
Expand All @@ -26,8 +28,7 @@ class ChaosProducer final : public components::LoggableComponentBase {

ChaosProducer(const components::ComponentConfig& config, const components::ComponentContext& context)
: components::LoggableComponentBase{config, context},
rabbit_client_{context.FindComponent<components::RabbitMQ>("chaos-rabbit").GetClient()}
{
rabbit_client_{context.FindComponent<components::RabbitMQ>("chaos-rabbit").GetClient()} {
const auto setup_deadline = engine::Deadline::FromDuration(kDefaultOperationTimeout);

auto admin_channel = rabbit_client_->GetAdminChannel(setup_deadline);
Expand Down Expand Up @@ -77,9 +78,7 @@ class ChaosConsumer final : public components::ComponentBase {
static constexpr std::string_view kName{"chaos-consumer"};

ChaosConsumer(const components::ComponentConfig& config, const components::ComponentContext& context)
: components::ComponentBase{config, context},
consumer_{config, context, messages_}
{
: components::ComponentBase{config, context}, consumer_{config, context, messages_} {
Start();
}

Expand Down Expand Up @@ -119,8 +118,7 @@ class ChaosConsumer final : public components::ComponentBase {
)
: urabbitmq::
ConsumerBase{context.FindComponent<components::RabbitMQ>(config["rabbit_name"].As<std::string>()).GetClient(), ParseSettings(config)},
messages_{messages}
{}
messages_{messages} {}

protected:
void Process(urabbitmq::ConsumedMessage msg) override {
Expand Down Expand Up @@ -150,8 +148,7 @@ class ChaosHandler final : public server::handlers::HttpHandlerBase {
ChaosHandler(const components::ComponentConfig& config, const components::ComponentContext& context)
: server::handlers::HttpHandlerBase{config, context},
producer_{context.FindComponent<ChaosProducer>()},
consumer_{context.FindComponent<ChaosConsumer>()}
{}
consumer_{context.FindComponent<ChaosConsumer>()} {}

std::string HandleRequestThrow(const server::http::HttpRequest& request, server::request::RequestContext&)
const override {
Expand All @@ -178,6 +175,13 @@ class ChaosHandler final : public server::handlers::HttpHandlerBase {
throw server::handlers::ClientError{server::handlers::ExternalBody{"No 'message' query argument"}};
}
urabbitmq::Envelope envelope{message, urabbitmq::MessageType::kTransient, {}, {}, {}};
if (!request.RequestBody().empty()) {
const auto request_json = formats::json::FromString(request.RequestBody());
if (request_json.HasMember("headers")) {
envelope
.headers = request_json["headers"].As<std::unordered_map<std::string, urabbitmq::HeaderValue>>();
}
}
const auto& correlation_id = request.GetArg("correlation_id");
if (!correlation_id.empty()) {
envelope.correlation_id = correlation_id;
Expand Down Expand Up @@ -219,16 +223,17 @@ class ChaosHandler final : public server::handlers::HttpHandlerBase {
}

std::string HandleGet() const {
formats::json::ValueBuilder messages_builder;
urabbitmq::HeaderValue::Builder messages_builder;
for (const auto& item : consumer_.GetMessages()) {
formats::json::ValueBuilder item_builder;
urabbitmq::HeaderValue::Builder item_builder;
item_builder["message"] = item.message;
if (item.correlation_id.has_value()) {
item_builder["correlation_id"] = item.correlation_id;
}
if (item.reply_to.has_value()) {
item_builder["reply_to"] = item.reply_to;
}
item_builder["headers"] = item.headers;
messages_builder.PushBack(std::move(item_builder));
}
return formats::json::ToString(messages_builder.ExtractValue());
Expand Down
1 change: 1 addition & 0 deletions rabbitmq/functional_tests/basic_chaos/static_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ components_manager:
min_pool_size: 1
max_pool_size: 1
max_in_flight_requests: 5
heartbeat_interval_seconds: 1
use_secure_connection: false

secdist: {} # Component that stores configuration of hosts and passwords
Expand Down
88 changes: 86 additions & 2 deletions rabbitmq/functional_tests/basic_chaos/tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ async def _clear_messages(service_client):
assert response.status_code == 200


def _strip_headers(messages):
return [{key: value for key, value in message.items() if key != 'headers'} for message in messages]


async def _publish_and_consume(testpoint, client):
@testpoint('message_consumed')
def message_consumed(data):
Expand All @@ -51,7 +55,7 @@ def message_consumed(data):
response = await client.get('/v1/messages')
assert response.status_code == 200

assert response.json() == MESSAGES
assert _strip_headers(response.json()) == MESSAGES


async def test_rabbitmq_happy(testpoint, service_client, gate):
Expand All @@ -60,6 +64,55 @@ async def test_rabbitmq_happy(testpoint, service_client, gate):
await _publish_and_consume(testpoint, service_client)


async def test_rabbitmq_headers(testpoint, service_client, gate):
await _clear_messages(service_client)

@testpoint('message_consumed')
def message_consumed(data):
pass

expected_headers = {
'x-bool': True,
'x-int': -10,
'x-uint': 10,
'x-double': 2.5,
'x-array': [-7, 'array-value', {'enabled': False, 'nullable': None}],
'x-object': {
'count': 42,
'name': 'nested-object',
'array': [-7, 'array-value', {'enabled': False, 'nullable': None}],
},
'x-null': None,
}

response = await service_client.post(
'/v1/messages?message=headers&reliable=1&reply_to=reply&correlation_id=corr-id',
json={'headers': expected_headers},
)
assert response.status_code == 200

await message_consumed.wait_call()

response = await service_client.get('/v1/messages')
assert response.status_code == 200
messages = response.json()
assert len(messages) == 1

consumed = messages[0]
assert consumed['message'] == 'headers'
assert consumed['reply_to'] == 'reply'
assert consumed['correlation_id'] == 'corr-id'
assert consumed['headers']['x-bool'] is True
assert consumed['headers']['x-int'] == -10
assert consumed['headers']['x-uint'] == 10
assert consumed['headers']['x-double'] == 2.5
assert consumed['headers']['x-array'] == expected_headers['x-array']
assert consumed['headers']['x-object'] == expected_headers['x-object']
assert consumed['headers']['x-null'] is None
assert consumed['headers']['u-trace-id']
assert consumed['headers']['u-parent-span-id']


@pytest.mark.skip(reason='std::terminate is called, fix in TAXICOMMON-6995')
async def test_consumer_reconnects(testpoint, service_client, gate):
await _clear_messages(service_client)
Expand All @@ -85,4 +138,35 @@ def message_consumed(data):
response = await service_client.get('/v1/messages')
assert response.status_code == 200

assert response.json() == MESSAGES
assert _strip_headers(response.json()) == MESSAGES


async def test_rabbitmq_heartbeat_reconnects(testpoint, service_client, gate):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если вдруг отключить/сломать хартбит, то почему тест должен начать проваливаться?

await _clear_messages(service_client)

@testpoint('message_consumed')
def message_consumed(data):
pass

response = await service_client.post('/v1/messages?message=before-heartbeat')
assert response.status_code == 200
await message_consumed.wait_call()

await gate.to_server_noop()
await gate.to_client_noop()
await asyncio.sleep(2.5)
await gate.to_server_pass()
await gate.to_client_pass()
await gate.sockets_close()

await gate.wait_for_connections(timeout=10.0)
await asyncio.sleep(1.0)

response = await service_client.post('/v1/messages?message=after-heartbeat')
assert response.status_code == 200

await message_consumed.wait_call()

response = await service_client.get('/v1/messages')
assert response.status_code == 200
assert any(message['message'] == 'after-heartbeat' for message in response.json())
5 changes: 5 additions & 0 deletions rabbitmq/include/userver/urabbitmq/client_settings.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <chrono>
#include <cstddef>
#include <optional>
#include <string>
Expand Down Expand Up @@ -74,6 +75,10 @@ struct PoolSettings final {
/// (tcp error/protocol error/write timeout) leads to a errors burst:
/// all outstanding request will fails at once
size_t max_in_flight_requests = 5;

/// Requested AMQP heartbeat interval in seconds.
/// Set to 0 to disable heartbeats.
size_t heartbeat_interval_seconds = 60;
};

class TestsHelper;
Expand Down
11 changes: 11 additions & 0 deletions rabbitmq/include/userver/urabbitmq/typedefs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
/// @brief Convenient typedefs for RabbitMQ entities.

#include <chrono>
#include <cstdint>
#include <optional>
#include <string>
#include <unordered_map>

#include <userver/formats/json/value.hpp>
#include <userver/utils/strong_typedef.hpp>

USERVER_NAMESPACE_BEGIN
Expand Down Expand Up @@ -63,6 +68,10 @@ enum class MessageType {
kTransient,
};

/// JSON-like representation of an AMQP header value.
/// This is not JSON, but a convenient tree representation for AMQP field values.
using HeaderValue = formats::json::Value;

/// @brief Structure holding an AMQP message body along with some of its
/// metadata fields. This struct is used to pass messages to the end user,
/// hiding the actual AMQP message object implementation.
Expand All @@ -75,6 +84,7 @@ struct ConsumedMessage {
Metadata metadata;
std::optional<std::string> reply_to{};
std::optional<std::string> correlation_id{};
std::unordered_map<std::string, HeaderValue> headers{};
};

/// @brief Structure holding an AMQP message body along with some of its
Expand All @@ -86,6 +96,7 @@ struct Envelope {
std::optional<std::string> reply_to{};
std::optional<std::string> correlation_id{};
std::optional<std::chrono::milliseconds> expiration{};
std::optional<std::unordered_map<std::string, HeaderValue>> headers{};
};

} // namespace urabbitmq
Expand Down
125 changes: 125 additions & 0 deletions rabbitmq/src/tests/header_value_rmqtest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#include <amqpcpp.h>

#include <cstdint>
#include <string>
#include <unordered_map>
#include <utility>

#include <userver/formats/common/type.hpp>
#include <userver/formats/json/value_builder.hpp>
#include <userver/utest/utest.hpp>

#include <urabbitmq/impl/header_value.hpp>

USERVER_NAMESPACE_BEGIN

namespace {

template <typename T>
urabbitmq::HeaderValue MakeHeaderValue(T&& value) {
return urabbitmq::HeaderValue::Builder{std::forward<T>(value)}.ExtractValue();
}

urabbitmq::HeaderValue MakeNestedArrayValue() {
urabbitmq::HeaderValue::Builder builder{formats::common::Type::kArray};
builder.PushBack(std::int64_t{-7});
builder.PushBack("array-value");

urabbitmq::HeaderValue::Builder nested_object{formats::common::Type::kObject};
nested_object["enabled"] = false;
nested_object["nullable"] = urabbitmq::HeaderValue::Builder{};
builder.PushBack(std::move(nested_object));

return builder.ExtractValue();
}

urabbitmq::HeaderValue MakeNestedObjectValue() {
urabbitmq::HeaderValue::Builder builder{formats::common::Type::kObject};
builder["count"] = std::uint64_t{42};
builder["name"] = "nested-object";
builder["array"] = urabbitmq::HeaderValue::Builder{MakeNestedArrayValue()};

return builder.ExtractValue();
}

void ExpectHeadersEqual(
const std::unordered_map<std::string, urabbitmq::HeaderValue>& actual,
const std::unordered_map<std::string, urabbitmq::HeaderValue>& expected
) {
ASSERT_EQ(actual.size(), expected.size());
for (const auto& [key, expected_value] : expected) {
const auto it = actual.find(key);
ASSERT_NE(it, actual.end()) << "Missing key: " << key;
EXPECT_EQ(it->second, expected_value) << "Unexpected value for key: " << key;
}
}

} // namespace

UTEST(HeaderValue, ConvertsNestedAmqpTypes) {
AMQP::Table headers;
headers.set("string", "value");
headers.set("bool", true);
headers.set("signed", static_cast<std::int16_t>(-10));
headers.set("unsigned", static_cast<std::uint16_t>(10));
headers.set("double", AMQP::Double{1.5});
headers.set("null", nullptr);

AMQP::Array nested_array;
nested_array.push_back(AMQP::LongLong{-7});
nested_array.push_back(AMQP::LongString{"array-value"});
AMQP::Table nested_array_object;
nested_array_object.set("enabled", false);
nested_array_object.set("nullable", nullptr);
nested_array.push_back(nested_array_object);
headers.set("array", nested_array);

AMQP::Table nested_object;
nested_object.set("count", static_cast<std::uint32_t>(42));
nested_object.set("name", "nested-object");
nested_object.set("array", nested_array);
headers.set("object", nested_object);

const std::unordered_map<std::string, urabbitmq::HeaderValue> expected{
{"string", MakeHeaderValue("value")},
{"bool", MakeHeaderValue(true)},
{"signed", MakeHeaderValue(-10)},
{"unsigned", MakeHeaderValue(10u)},
{"double", MakeHeaderValue(1.5)},
{"null", urabbitmq::HeaderValue::Builder{}.ExtractValue()},
{"array", MakeNestedArrayValue()},
{"object", MakeNestedObjectValue()},
};

const auto actual = urabbitmq::impl::TableToHeaders(headers);
ExpectHeadersEqual(actual, expected);
EXPECT_TRUE(actual.at("signed").IsInt());
EXPECT_TRUE(actual.at("unsigned").IsUInt());
}

UTEST(HeaderValue, RoundTripsHeaders) {
const std::unordered_map<std::string, urabbitmq::HeaderValue> expected{
{"string", MakeHeaderValue("value")},
{"bool", MakeHeaderValue(false)},
{"signed", MakeHeaderValue(-123456789)},
{"signed64", MakeHeaderValue(std::int64_t{-1234567890123})},
{"unsigned", MakeHeaderValue(123456789u)},
{"unsigned64", MakeHeaderValue(std::uint64_t{1234567890123})},
{"double", MakeHeaderValue(3.25)},
{"null", urabbitmq::HeaderValue::Builder{}.ExtractValue()},
{"array", MakeNestedArrayValue()},
{"object", MakeNestedObjectValue()},
};

AMQP::Table table;
urabbitmq::impl::AddHeadersToTable(table, expected);

const auto actual = urabbitmq::impl::TableToHeaders(table);
ExpectHeadersEqual(actual, expected);
EXPECT_TRUE(actual.at("signed").IsInt());
EXPECT_TRUE(actual.at("signed64").IsInt64());
EXPECT_TRUE(actual.at("unsigned").IsUInt());
EXPECT_TRUE(actual.at("unsigned64").IsUInt64());
}

USERVER_NAMESPACE_END
Loading
Loading