Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,26 @@ add_executable(test_aggregate_drop_budget tests/unit/flow/test_aggregate_drop_bu
target_link_libraries(test_aggregate_drop_budget PRIVATE openpenny)
add_test(NAME aggregate_drop_budget COMMAND test_aggregate_drop_budget)

add_executable(test_terminal_snapshot_resolution tests/unit/flow/test_terminal_snapshot_resolution.cpp)
target_link_libraries(test_terminal_snapshot_resolution PRIVATE openpenny)
add_test(NAME terminal_snapshot_resolution COMMAND test_terminal_snapshot_resolution)

add_executable(test_aggregate_pending_resolution tests/unit/flow/test_aggregate_pending_resolution.cpp)
target_link_libraries(test_aggregate_pending_resolution PRIVATE openpenny)
add_test(NAME aggregate_pending_resolution COMMAND test_aggregate_pending_resolution)

add_executable(test_aggregate_freeze_at_drop_limit tests/unit/flow/test_aggregate_freeze_at_drop_limit.cpp)
target_link_libraries(test_aggregate_freeze_at_drop_limit PRIVATE openpenny)
add_test(NAME aggregate_freeze_at_drop_limit COMMAND test_aggregate_freeze_at_drop_limit)

add_executable(test_aggregate_duplicate_fallback tests/unit/flow/test_aggregate_duplicate_fallback.cpp)
target_link_libraries(test_aggregate_duplicate_fallback PRIVATE openpenny)
add_test(NAME aggregate_duplicate_fallback COMMAND test_aggregate_duplicate_fallback)

add_executable(test_flow_evaluation_phase_gate tests/unit/flow/test_flow_evaluation_phase_gate.cpp)
target_link_libraries(test_flow_evaluation_phase_gate PRIVATE openpenny)
add_test(NAME flow_evaluation_phase_gate COMMAND test_flow_evaluation_phase_gate)

add_executable(test_cli_options tests/unit/cli/test_cli_options.cpp)
target_link_libraries(test_cli_options PRIVATE openpenny)
add_test(NAME cli_options COMMAND test_cli_options)
Expand All @@ -178,6 +198,10 @@ add_executable(test_traffic_match tests/unit/net/test_traffic_match.cpp)
target_link_libraries(test_traffic_match PRIVATE openpenny)
add_test(NAME traffic_match COMMAND test_traffic_match)

add_executable(test_packet_parser tests/unit/net/test_packet_parser.cpp)
target_link_libraries(test_packet_parser PRIVATE openpenny)
add_test(NAME packet_parser COMMAND test_packet_parser)

add_executable(test_control_planner tests/unit/control/test_control_planner.cpp)
target_link_libraries(test_control_planner PRIVATE openpenny)
add_test(NAME control_planner COMMAND test_control_planner)
Expand Down
64 changes: 64 additions & 0 deletions include/openpenny/agg/FlowKey.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// SPDX-License-Identifier: BSD-2-Clause

#pragma once

#include <cstddef>
#include <cstdint>
#include <unordered_map>
#include <unordered_set>

namespace openpenny {

struct FlowKey {
/**
* @brief Protocol-aware flow tuple in host byte order.
*
* Encodes IPv4 source/destination, L4 ports, and the IPv4 protocol
* number so TCP/UDP traffic with the same addresses/ports do not
* alias to the same key.
*/
std::uint32_t src{0};
std::uint32_t dst{0};
std::uint16_t sport{0};
std::uint16_t dport{0};
std::uint8_t ip_proto{0};

bool operator==(const FlowKey& o) const noexcept {
return src == o.src &&
dst == o.dst &&
sport == o.sport &&
dport == o.dport &&
ip_proto == o.ip_proto;
}
};

struct FlowKeyHash {
/**
* @brief Mix all FlowKey fields into a single hash using 64-bit avalanching.
*/
std::size_t operator()(const FlowKey& k) const noexcept {
const std::uint64_t addr_pair =
(static_cast<std::uint64_t>(k.src) << 32) | k.dst;
const std::uint64_t ports_proto =
(static_cast<std::uint64_t>(k.sport) << 24) |
(static_cast<std::uint64_t>(k.dport) << 8) |
static_cast<std::uint64_t>(k.ip_proto);

std::uint64_t v =
addr_pair ^ (ports_proto + 0x9e3779b97f4a7c15ULL +
(addr_pair << 6) + (addr_pair >> 2));
v ^= (v >> 33);
v *= 0xff51afd7ed558ccdULL;
v ^= (v >> 33);
v *= 0xc4ceb9fe1a85ec53ULL;
v ^= (v >> 33);
return static_cast<std::size_t>(v);
}
};

template <typename T>
using FlowMap = std::unordered_map<FlowKey, T, FlowKeyHash>;

using FlowSet = std::unordered_set<FlowKey, FlowKeyHash>;

} // namespace openpenny
29 changes: 3 additions & 26 deletions include/openpenny/agg/Stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,18 @@
* @file Stats.h
* @brief Per-flow and aggregated statistics with a striped hash table.
*/
#include "openpenny/agg/FlowKey.h"

#include <atomic>
#include <cstdint>
#include <string>
#include <vector>
#include <unordered_map>
#include <shared_mutex>
#include <mutex>
#include <chrono>

namespace openpenny {

struct FlowKey {
/**
* @brief Tuple identifying a TCP/UDP flow in host byte order.
*/
uint32_t src; uint32_t dst; uint16_t sport; uint16_t dport;
bool operator==(const FlowKey& o) const noexcept {
return src==o.src && dst==o.dst && sport==o.sport && dport==o.dport;
}
};

struct FlowKeyHash {
/**
* @brief Mix all FlowKey fields into a single hash using 64-bit avalanching.
*/
size_t operator()(const FlowKey& k) const noexcept {
uint64_t v = (static_cast<uint64_t>(k.src) << 32) ^ k.dst;
v ^= (static_cast<uint64_t>(k.sport) << 16) ^ k.dport;
v ^= (v >> 33); v *= 0xff51afd7ed558ccdULL;
v ^= (v >> 33); v *= 0xc4ceb9fe1a85ec53ULL;
v ^= (v >> 33);
return static_cast<size_t>(v);
}
};

/**
* @brief Per-flow counters that mirror the BPF-side stats exposed to users.
*/
Expand Down Expand Up @@ -91,7 +68,7 @@ class FlowTable {
private:
struct Shard {
mutable std::shared_mutex mutex;
std::unordered_map<FlowKey, Counters, FlowKeyHash> map;
FlowMap<Counters> map;
};
std::vector<Shard> shards_;
FlowKeyHash hash_;
Expand Down
15 changes: 13 additions & 2 deletions include/openpenny/app/core/ActiveTestPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,17 @@ class ActiveTestPipelineRunner : public IPipelineStrategy {
/** Expire idle flows based on configured timeout. */
void expire_idle_flows(const std::chrono::steady_clock::time_point& now);

/** Sweep pending snapshots and expire those past timeout. */
void sweep_expired_snapshots(const std::chrono::steady_clock::time_point& now);
/** Return true once the aggregate phase has completed and per-flow tests may run. */
bool individual_flow_evaluation_enabled() const;

/** Evaluate already-tracked flows once per-flow testing becomes active. */
void evaluate_individual_flows_if_enabled();

/** Complete terminal flows once all pending drop snapshots are resolved. */
void complete_resolved_terminal_flows();

/** Complete a flow and preserve a printable closed-loop summary if applicable. */
void complete_flow_with_summary(const FlowKey& key, const char* reason);

// -------------------------------------------------------------------------
// Member state
Expand Down Expand Up @@ -232,6 +241,8 @@ class ActiveTestPipelineRunner : public IPipelineStrategy {
*/
std::size_t total_pkts_forwarded_{0};
std::size_t total_forward_errors_{0};
std::vector<std::string> closed_loop_flow_summaries_;
std::vector<std::string> duplicate_exceeded_flow_summaries_;

/**
* Last time we logged global stats (prevents log flooding).
Expand Down
50 changes: 20 additions & 30 deletions include/openpenny/app/core/DropCollectorBinding.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,58 +5,48 @@
#include "openpenny/app/core/OpenpennyPipelineDriver.h"
#include "openpenny/agg/Stats.h"

#include <mutex>
#include <string>
#include <unordered_map>

namespace openpenny::penny {
class FlowEngine;
}
#include <utility>
#include <vector>

namespace openpenny::app {

/**
* @brief Maintains FlowEngine -> DropCollector bindings and installs the
* snapshot hook so drop events are mirrored into the shared collector.
* @brief Mirrors per-flow drop snapshots into the shared collector.
*
* New drops are inserted one at a time via upsert(). Snapshot state changes
* that affect a suffix of the per-flow snapshot vector (duplicate/rtx/expire)
* are mirrored via refresh_from() so the collector can rescan the already
* contiguous, append-only snapshot storage directly.
*/
class DropCollectorBinding {
public:
static DropCollectorBinding& instance();

// Ensure the global timer snapshot hook is installed exactly once.
void ensure_snapshot_hook();

void bind(penny::FlowEngine* flow,
DropCollectorPtr collector,
const std::string& thread_name,
std::size_t shard_index);

void unbind(penny::FlowEngine* flow);

void upsert(DropCollectorPtr collector,
const std::string& thread_name,
std::size_t shard_index,
const FlowKey& key,
penny::PacketDropId packet_id,
const penny::PacketDropSnapshot& snap);

private:
struct BindingContext {
DropCollectorPtr collector;
std::string thread_name;
std::size_t shard_index{0};
};
void refresh_from(
DropCollectorPtr collector,
const std::string& thread_name,
std::size_t shard_index,
const FlowKey& key,
const std::vector<std::pair<penny::PacketDropId, penny::PacketDropSnapshot>>& snapshots,
std::size_t start_index);

private:
DropCollectorBinding() = default;
BindingContext lookup(penny::FlowEngine* flow) const;
void upsert_locked(const BindingContext& binding,

void upsert_locked(DropCollector& collector,
DropCollector::Shard& shard,
const std::string& thread_name,
const FlowKey& key,
penny::PacketDropId packet_id,
const penny::PacketDropSnapshot& snap);

mutable std::mutex mtx_;
std::once_flag hook_once_;
std::unordered_map<penny::FlowEngine*, BindingContext> bindings_;
};

} // namespace openpenny::app
9 changes: 8 additions & 1 deletion include/openpenny/app/core/OpenpennyPipelineDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#pragma once

#include "openpenny/config/Config.h"
#include "openpenny/agg/Stats.h"
#include "openpenny/agg/FlowKey.h"
#include "openpenny/egress/PacketSink.h"
#include "openpenny/penny/flow/state/PennySnapshot.h"
#include "openpenny/penny/flow/state/PacketDropId.h"
Expand Down Expand Up @@ -124,6 +124,10 @@ struct DropCollector {

std::atomic<bool> accepting{true};
std::size_t shard_count{1};
std::size_t snapshot_limit{0};
std::atomic<std::size_t> accepted_snapshot_count{0};
mutable std::mutex frozen_aggregate_counters_mtx;
std::optional<openpenny::app::AggregatedCounters> frozen_aggregate_counters;
std::array<Shard, kMaxShards> shards{};

std::size_t clamp_shard_index(std::size_t idx) const noexcept {
Expand Down Expand Up @@ -160,10 +164,13 @@ struct ModeResult {
std::size_t flows_tracked_data = 0;
bool penny_completed = false; // True when Penny heuristics triggered shutdown.
bool aggregates_penny_completed = false; // Flag representing aggregate Penny status.
bool closed_loop_stop_hit = false; // True when the configured min_closed_loop_flows threshold was observed.
// Passive-mode gap summary.
std::size_t passive_flows_with_open_gaps = 0;
std::size_t passive_open_gaps = 0;
std::vector<std::string> passive_gap_summaries;
std::vector<std::string> closed_loop_flow_summaries;
std::vector<std::string> duplicate_exceeded_flow_summaries;
std::size_t passive_flows_rst = 0;
std::size_t passive_flows_syn_only = 0;
std::size_t passive_flows_finished = 0;
Expand Down
12 changes: 6 additions & 6 deletions include/openpenny/app/core/PassiveTestPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#pragma once

#include "openpenny/agg/Stats.h"
#include "openpenny/agg/FlowKey.h"
#include "openpenny/app/core/OpenpennyPipelineDriver.h"
#include "openpenny/app/core/PipelineRunner.h"
#include "openpenny/config/Config.h"
Expand All @@ -13,8 +13,6 @@
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

namespace openpenny {
Expand Down Expand Up @@ -75,17 +73,19 @@ class PassiveTestPipelineRunner : public IPipelineStrategy {
void finalize(ModeResult& result) override;

private:
void reserve_for_config();

const Config& cfg_;
const PipelineOptions& opts_;
FlowMatcher matcher_;
net::PacketSourcePtr source_;
std::unordered_map<FlowKey, PassiveFlowState, FlowKeyHash> flows_;
FlowMap<PassiveFlowState> flows_;
std::chrono::steady_clock::time_point start_time_{std::chrono::steady_clock::now()};
std::size_t flows_seen_{0};
std::size_t flows_finished_{0};
std::vector<PassiveFlowState> finished_flows_;
std::unordered_map<FlowKey, std::size_t, FlowKeyHash> finished_index_;
std::unordered_set<FlowKey, FlowKeyHash> finished_keys_;
FlowMap<std::size_t> finished_index_;
FlowSet finished_keys_;
bool stop_grace_active_{false};
std::chrono::steady_clock::time_point stop_grace_start_{};
bool stop_requested_{false};
Expand Down
2 changes: 1 addition & 1 deletion include/openpenny/app/core/PerThreadStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <string>
#include <vector>

#include "openpenny/agg/Stats.h" // for FlowKey
#include "openpenny/agg/FlowKey.h"
#include "openpenny/penny/flow/state/PacketDropId.h"

namespace openpenny::app {
Expand Down
9 changes: 9 additions & 0 deletions include/openpenny/app/core/RuntimeSetup.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,13 @@ const RuntimeSetupSnapshot& current_runtime_setup();
// Mutable view for helpers that need to update status fields.
RuntimeSetupSnapshot& runtime_setup_mutable();

bool current_aggregates_active() noexcept;
void set_current_aggregates_active(bool value) noexcept;

RuntimeStatus::AggregatesStatus current_aggregates_status() noexcept;
void set_current_aggregates_status(RuntimeStatus::AggregatesStatus status) noexcept;

bool current_has_aggregate_eval() noexcept;
void set_current_has_aggregate_eval(bool value) noexcept;

} // namespace openpenny
6 changes: 3 additions & 3 deletions include/openpenny/egress/PacketSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ class PacketSink {
* @brief Emit a parsed packet. Must be thread-safe.
*
* Returns true on a successful write, false on any error. Transient
* EAGAIN/EWOULDBLOCK are counted as errors==0 (pipeline drops the
* packet) because the pipeline is not responsible for reliable
* delivery -- it's a passive mirror.
* EAGAIN/EWOULDBLOCK still mean the packet was dropped; sinks may count
* those in stats_.errors as backpressure-induced loss so operators can
* distinguish real reinjection congestion from intentional Penny drops.
*/
virtual bool write(const net::PacketView& packet) = 0;

Expand Down
Loading
Loading