Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ ifdef BAZEL_REMOTE_CACHE
BAZEL_BUILD_OPTS += --remote_cache=$(BAZEL_REMOTE_CACHE)
endif

BAZEL_TEST_OPTS ?= --jobs=HOST_RAM*.0003 --test_timeout=300 --local_test_jobs=1 --flaky_test_attempts=3
BAZEL_TEST_OPTS ?= --jobs=HOST_RAM*.0002 --test_timeout=100 --local_test_jobs=1 --flaky_test_attempts=3
BAZEL_TEST_OPTS += --test_output=errors

BUILDARCH := $(subst aarch64,arm64,$(subst x86_64,amd64,$(shell uname -m)))
Expand Down
12 changes: 12 additions & 0 deletions cilium/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "versioned_lib",
hdrs = ["versioned.h"],
repository = "@envoy",
deps = [
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@envoy//source/common/common:assert_lib",
],
)

envoy_cc_library(
name = "network_policy_lib",
srcs = [
Expand All @@ -45,6 +56,7 @@ envoy_cc_library(
"//cilium:conntrack_lib",
"//cilium:grpc_subscription_lib",
"//cilium:ipcache_lib",
"//cilium:versioned_lib",
"//cilium/api:npds_cc_proto",
"@envoy//envoy/config:subscription_interface",
"@envoy//envoy/singleton:manager_interface",
Expand Down
6 changes: 6 additions & 0 deletions cilium/api/bpf_metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,10 @@ message BpfMetadata {

// Configuration for the source of NPDS updates. Currently this field is not supported.
envoy.config.core.v3.ConfigSource npds_config = 16;

// Use delta NPDS rather than the state-of-the-world protocol.
// Even with delta NPDS, each new stream starts with a full dump.
// Only wildcard subscriptions are supported.
// All listeners on the node must agree on this setting.
bool use_delta_npds = 17;
}
33 changes: 33 additions & 0 deletions cilium/api/npds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import "validate/validate.proto";
// [#protodoc-title: Network policy management and NPDS]

// Each resource name is a network policy identifier.
// Deprecated: This service will be removed when Cilium 1.20 is the oldest supported release.
service NetworkPolicyDiscoveryService {
option (envoy.annotations.resource).type = "cilium.NetworkPolicy";

Expand All @@ -33,6 +34,32 @@ service NetworkPolicyDiscoveryService {
}
}

// Policy and selector resource names are exact-match identifiers in delta NPDS.
service NetworkPolicyResourceDiscoveryService {
option (envoy.annotations.resource).type = "cilium.NetworkPolicyResource";

rpc DeltaNetworkPolicyResources(stream envoy.service.discovery.v3.DeltaDiscoveryRequest)
returns (stream envoy.service.discovery.v3.DeltaDiscoveryResponse) {
}
}

// A delta NPDS resource that carries either an endpoint policy or a shared selector.
message NetworkPolicyResource {
oneof resource {
NetworkPolicy policy = 1;
Selector selector = 2;
}
}

// A shared set of remote identities referenced by selector resource name.
// Unlike the old state-of-the-world remote identity lists, an empty selector
// matches nothing.
message Selector {
// The set of numeric remote security IDs selected by this selector.
// If empty, this selector selects no remote identities.
repeated uint32 remote_identities = 1;
}

// A network policy that is enforced by a filter on the network flows to/from
// associated hosts.
message NetworkPolicy {
Expand Down Expand Up @@ -153,6 +180,12 @@ message PortNetworkPolicyRule {
// Optional. If not specified, any remote host is matched by this predicate.
repeated uint32 remote_policies = 7;

// Optional selector resource names that can be resolved to shared remote
// policy sets in delta NPDS.
// Selector references are matched by exact selector resource name.
// Optional. If not specified, any remote host is matched by this predicate.
repeated string selectors = 11;

// Optional downstream TLS context. If present, the incoming connection must
// be a TLS connection.
TLSContext downstream_tls_context = 3;
Expand Down
37 changes: 14 additions & 23 deletions cilium/bpf_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,27 +176,6 @@ SINGLETON_MANAGER_REGISTRATION(cilium_bpf_conntrack);
SINGLETON_MANAGER_REGISTRATION(cilium_host_map);
SINGLETON_MANAGER_REGISTRATION(cilium_network_policy);

namespace {

std::shared_ptr<const Cilium::PolicyHostMap>
createHostMap(Server::Configuration::ListenerFactoryContext& context) {
return context.serverFactoryContext().singletonManager().getTyped<const Cilium::PolicyHostMap>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_host_map), [&context] {
auto map = std::make_shared<Cilium::PolicyHostMap>(context.serverFactoryContext());
map->startSubscription(context.serverFactoryContext());
return map;
});
}

std::shared_ptr<const Cilium::NetworkPolicyMap>
createPolicyMap(Server::Configuration::FactoryContext& context) {
return context.serverFactoryContext().singletonManager().getTyped<const Cilium::NetworkPolicyMap>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_network_policy),
[&context] { return std::make_shared<Cilium::NetworkPolicyMap>(context, true); });
}

} // namespace

Config::Config(const ::cilium::BpfMetadata& config,
Server::Configuration::ListenerFactoryContext& context)
: so_linger_(config.has_original_source_so_linger_time()
Expand Down Expand Up @@ -236,7 +215,13 @@ Config::Config(const ::cilium::BpfMetadata& config,
}

if (config.use_nphds()) {
hosts_ = createHostMap(context);
hosts_ =
context.serverFactoryContext().singletonManager().getTyped<const Cilium::PolicyHostMap>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_host_map), [&context] {
auto map = std::make_shared<Cilium::PolicyHostMap>(context.serverFactoryContext());
map->startSubscription(context.serverFactoryContext());
return map;
});
}

// Note: all instances use the bpf root of the first filter with non-empty
Expand Down Expand Up @@ -273,7 +258,13 @@ Config::Config(const ::cilium::BpfMetadata& config,
// instances!
// Only created if either ipcache_ or hosts_ map exists
if (ipcache_ || hosts_) {
npmap_ = createPolicyMap(context);
npmap_ =
context.serverFactoryContext().singletonManager().getTyped<const Cilium::NetworkPolicyMap>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_network_policy), [&context, &config] {
return std::make_shared<Cilium::NetworkPolicyMap>(context, true,
config.use_delta_npds());
});
npmap_->setUseDeltaXds(config.use_delta_npds());
}
}

Expand Down
146 changes: 122 additions & 24 deletions cilium/grpc_subscription.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@
#include <fmt/format.h>

#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "envoy/annotations/resource.pb.h"
#include "envoy/common/exception.h"
#include "envoy/common/random_generator.h"
#include "envoy/config/core/v3/config_source.pb.h"
#include "envoy/config/custom_config_validators.h"
#include "envoy/config/grpc_mux.h"
#include "envoy/config/subscription.h"
#include "envoy/config/subscription_factory.h"
#include "envoy/event/dispatcher.h"
#include "envoy/grpc/async_client.h"
#include "envoy/local_info/local_info.h"
#include "envoy/server/factory_context.h"
#include "envoy/stats/scope.h"
#include "envoy/upstream/cluster_manager.h"

Expand All @@ -27,7 +28,9 @@
#include "source/common/grpc/common.h"
#include "source/common/protobuf/protobuf.h" // IWYU pragma: keep
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
#include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"

#include "absl/container/flat_hash_map.h"
#include "absl/status/statusor.h"
Expand All @@ -40,6 +43,77 @@ namespace Cilium {

namespace {

class StreamTrackedGrpcMux {
public:
StreamTrackedGrpcMux(std::function<void()> on_transport_established,
std::function<void()> on_transport_close)
: on_transport_established_(std::move(on_transport_established)),
on_transport_close_(std::move(on_transport_close)) {}

virtual ~StreamTrackedGrpcMux() = default;

bool streamConnected() const { return stream_connected_; }

void onStreamEstablished() {
stream_connected_ = true;
on_transport_established_();
}

void onEstablishmentFailure() {
const bool was_connected = stream_connected_;
stream_connected_ = false;
if (was_connected) {
on_transport_close_();
}
}

private:
bool stream_connected_{false};
std::function<void()> on_transport_established_;
std::function<void()> on_transport_close_;
};

class SotwGrpcMuxImpl : public Config::GrpcMuxImpl, public StreamTrackedGrpcMux {
public:
SotwGrpcMuxImpl(Config::GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node,
std::function<void()> on_transport_established,
std::function<void()> on_transport_close)
: Config::GrpcMuxImpl(grpc_mux_context, skip_subsequent_node),
StreamTrackedGrpcMux(std::move(on_transport_established), std::move(on_transport_close)) {}
~SotwGrpcMuxImpl() override = default;

void onStreamEstablished() override {
Config::GrpcMuxImpl::onStreamEstablished();
StreamTrackedGrpcMux::onStreamEstablished();
}

void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) override {
Config::GrpcMuxImpl::onEstablishmentFailure(next_attempt_may_send_initial_resource_version);
StreamTrackedGrpcMux::onEstablishmentFailure();
}
};

class DeltaGrpcMuxImpl : public Config::NewGrpcMuxImpl, public StreamTrackedGrpcMux {
public:
explicit DeltaGrpcMuxImpl(Config::GrpcMuxContext& grpc_mux_context,
std::function<void()> on_transport_established,
std::function<void()> on_transport_close)
: Config::NewGrpcMuxImpl(grpc_mux_context),
StreamTrackedGrpcMux(std::move(on_transport_established), std::move(on_transport_close)) {}

~DeltaGrpcMuxImpl() override = default;

void onStreamEstablished() override {
Config::NewGrpcMuxImpl::onStreamEstablished();
StreamTrackedGrpcMux::onStreamEstablished();
}

void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) override {
Config::NewGrpcMuxImpl::onEstablishmentFailure(next_attempt_may_send_initial_resource_version);
StreamTrackedGrpcMux::onEstablishmentFailure();
}
};

// service RPC method fully qualified names.
struct Service {
std::string sotw_grpc_method_;
Expand All @@ -59,6 +133,7 @@ TypeUrlToServiceMap* buildTypeUrlToServiceMap() {
// https://www.mail-archive.com/protobuf@googlegroups.com/msg04540.html.
for (absl::string_view name : {
"cilium.NetworkPolicyDiscoveryService",
"cilium.NetworkPolicyResourceDiscoveryService",
"cilium.NetworkPolicyHostsDiscoveryService",
}) {
const auto* service_desc =
Expand Down Expand Up @@ -122,35 +197,52 @@ const Protobuf::MethodDescriptor& sotwGrpcMethod(absl::string_view type_url) {

// Hard-coded Cilium gRPC cluster
// Note: No rate-limit settings are used, consider if needed.
envoy::config::core::v3::ConfigSource getCiliumXDSAPIConfig() {
envoy::config::core::v3::ConfigSource getCiliumXDSAPIConfig(bool use_delta_xds = false) {
auto config_source = envoy::config::core::v3::ConfigSource();
/* config_source.initial_fetch_timeout is set to 50 millliseconds.
/* config_source.initial_fetch_timeout is set to 50 milliseconds.
* This applies only to SDS Secrets for now, as for NPDS and NPHDS we explicitly set the timeout
* as 0 (no timeout).
*/
config_source.mutable_initial_fetch_timeout()->set_nanos(50000000);
config_source.set_resource_api_version(envoy::config::core::v3::ApiVersion::V3);
auto api_config_source = config_source.mutable_api_config_source();
api_config_source->set_set_node_on_first_message_only(true);
api_config_source->set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC);
api_config_source->set_api_type(use_delta_xds
? envoy::config::core::v3::ApiConfigSource::DELTA_GRPC
: envoy::config::core::v3::ApiConfigSource::GRPC);
api_config_source->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3);
api_config_source->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("xds-grpc-cilium");
return config_source;
}

envoy::config::core::v3::ConfigSource cilium_xds_api_config = getCiliumXDSAPIConfig();

std::unique_ptr<Config::GrpcSubscriptionImpl>
subscribe(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
Upstream::ClusterManager& cm, Event::Dispatcher& dispatcher,
Random::RandomGenerator& random, Stats::Scope& scope,
Config::SubscriptionCallbacks& callbacks,
Config::OpaqueResourceDecoderSharedPtr resource_decoder,
std::chrono::milliseconds init_fetch_timeout) {
bool grpcStreamConnected(Config::Subscription* subscription) {
auto* sub = dynamic_cast<Config::GrpcSubscriptionImpl*>(subscription);
if (!sub) {
return false;
}

auto* grpc_mux = dynamic_cast<StreamTrackedGrpcMux*>(sub->grpcMux().get());
if (grpc_mux == nullptr) {
return false;
}

return grpc_mux->streamConnected();
}

std::unique_ptr<Config::Subscription>
subscribe(const std::string& type_url, Server::Configuration::CommonFactoryContext& context,
Stats::Scope& scope, Config::SubscriptionCallbacks& callbacks,
Config::OpaqueResourceDecoderSharedPtr resource_decoder, bool use_delta_xds,
std::chrono::milliseconds init_fetch_timeout,
std::function<void()> on_transport_established,
std::function<void()> on_transport_close) {
const envoy::config::core::v3::ConfigSource config_source = getCiliumXDSAPIConfig(use_delta_xds);
const envoy::config::core::v3::ApiConfigSource& api_config_source =
cilium_xds_api_config.api_config_source();
config_source.api_config_source();
THROW_IF_NOT_OK(Config::Utility::checkApiConfigSourceSubscriptionBackingCluster(
cm.primaryClusters(), api_config_source));
context.clusterManager().primaryClusters(), api_config_source));

Config::SubscriptionStats stats = Config::Utility::generateStats(scope);
Envoy::Config::SubscriptionOptions options;
Expand All @@ -159,7 +251,7 @@ subscribe(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
std::make_unique<NopConfigValidatorsImpl>();
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
cm.grpcAsyncClientManager(), api_config_source, scope, true, 0, false);
context.clusterManager().grpcAsyncClientManager(), api_config_source, scope, true, 0, false);
THROW_IF_NOT_OK_REF(factory_or_error.status());

absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
Expand All @@ -170,9 +262,9 @@ subscribe(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
/*async_client_=*/THROW_OR_RETURN_VALUE(
factory_or_error.value()->createUncachedRawAsyncClient(), Grpc::RawAsyncClientPtr),
/*failover_async_client_=*/nullptr,
/*dispatcher_=*/dispatcher,
/*service_method_=*/sotwGrpcMethod(type_url),
/*local_info_=*/local_info,
/*dispatcher_=*/context.mainThreadDispatcher(),
/*service_method_=*/use_delta_xds ? deltaGrpcMethod(type_url) : sotwGrpcMethod(type_url),
/*local_info_=*/context.localInfo(),
/*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
/*scope_=*/scope,
/*config_validators_=*/std::move(nop_config_validators),
Expand All @@ -181,16 +273,22 @@ subscribe(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
/*backoff_strategy_=*/
std::make_unique<JitteredExponentialBackOffStrategy>(
Config::SubscriptionFactory::RetryInitialDelayMs,
Config::SubscriptionFactory::RetryMaxDelayMs, random),
Config::SubscriptionFactory::RetryMaxDelayMs, context.api().randomGenerator()),
/*target_xds_authority_=*/"",
/*eds_resources_cache_=*/nullptr // EDS cache is only used for ADS.
};

std::shared_ptr<Config::GrpcMux> grpc_mux =
use_delta_xds ? std::static_pointer_cast<Config::GrpcMux>(std::make_shared<DeltaGrpcMuxImpl>(
grpc_mux_context, std::move(on_transport_established),
std::move(on_transport_close)))
: std::static_pointer_cast<Config::GrpcMux>(std::make_shared<SotwGrpcMuxImpl>(
grpc_mux_context, api_config_source.set_node_on_first_message_only(),
std::move(on_transport_established), std::move(on_transport_close)));

return std::make_unique<Config::GrpcSubscriptionImpl>(
std::make_shared<GrpcMuxImpl>(grpc_mux_context,
api_config_source.set_node_on_first_message_only()),
callbacks, resource_decoder, stats, type_url, dispatcher, init_fetch_timeout,
/*is_aggregated*/ false, options);
grpc_mux, callbacks, resource_decoder, stats, type_url, context.mainThreadDispatcher(),
init_fetch_timeout, /*is_aggregated*/ false, options);
}

} // namespace Cilium
Expand Down
Loading
Loading