From 5fad558aa45607f23a3919cecf69bf00d816681e Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Thu, 4 Jun 2026 22:39:56 +0000 Subject: [PATCH] Add dynamic RX flow lifecycle Signed-off-by: Cliff Burdick --- docs/api-reference/configuration.md | 22 +- docs/api-reference/cpp.md | 67 +- docs/concepts.md | 9 +- docs/tutorials/configuration-walkthrough.md | 2 +- include/daqiri/common.h | 61 +- include/daqiri/types.h | 52 +- python/daqiri_common_pybind.cpp | 42 + src/common.cpp | 21 +- src/manager.cpp | 20 + src/manager.h | 5 +- src/managers/dpdk/daqiri_dpdk_mgr.cpp | 879 +++++++++++++++++--- src/managers/dpdk/daqiri_dpdk_mgr.h | 87 +- src/managers/rdma/daqiri_rdma_mgr.h | 2 +- src/managers/socket/daqiri_socket_mgr.cpp | 2 +- src/managers/socket/daqiri_socket_mgr.h | 2 +- 15 files changed, 1118 insertions(+), 155 deletions(-) diff --git a/docs/api-reference/configuration.md b/docs/api-reference/configuration.md index 47d6519..461932e 100644 --- a/docs/api-reference/configuration.md +++ b/docs/api-reference/configuration.md @@ -172,11 +172,15 @@ RDMA transport settings: ### Flows -`rx.flows:` — Flow rules that steer packets to specific queues based on match criteria. +`rx.flows:` — Static startup flow rules that steer packets to specific queues based on +match criteria. This sequence may be omitted; a queues-only RX config can add DPDK RX +flows later with the dynamic flow API. - **`name`**: Flow name. - type: `string` -- **`id`**: Flow ID. Retrievable at runtime via `get_packet_flow_id()`. +- **`id`**: Non-zero static flow ID. Retrievable at runtime via `get_packet_flow_id()`. + Static IDs are reserved for the lifetime of the process and are not deletable through + the dynamic flow API. - type: `integer` - **`action`**: What to do with matched packets. - **`type`**: Action type. Only `queue` is currently supported. @@ -201,11 +205,23 @@ RDMA transport settings: ### Flow Isolation `rx.flow_isolation:` — When `true`, only packets matching an explicit flow rule are delivered. -Unmatched packets are dropped. When `false`, unmatched packets go to a default queue. +Unmatched packets are dropped. When `false`, unmatched packets go to a default queue. A +queues-only config can set `flow_isolation: true` and then install dynamic RX flows after +`daqiri_init()`. - type: `boolean` - default: `false` +### Dynamic Flow Capacity + +`rx.dynamic_flow_capacity:` — DPDK template-table capacity reserved for dynamic RX flow +rules on this interface. DAQIRI uses this when the DPDK template/async fast path is +available; legacy fallback paths still accept dynamic RX flow operations but do not use a +template table. + +- type: `integer` +- default: `1024` + ### Hardware Timestamps `rx.hardware_timestamps:` — Enable per-packet hardware RX timestamps for Raw Ethernet diff --git a/docs/api-reference/cpp.md b/docs/api-reference/cpp.md index 0af6728..5cbab65 100644 --- a/docs/api-reference/cpp.md +++ b/docs/api-reference/cpp.md @@ -77,7 +77,7 @@ For a single-segment configuration (CPU-only or batched GPU): for (int i = 0; i < daqiri::get_num_packets(burst); i++) { void *pkt = daqiri::get_packet_ptr(burst, i); uint32_t len = daqiri::get_packet_length(burst, i); - uint16_t flow = daqiri::get_packet_flow_id(burst, i); + daqiri::FlowId flow = daqiri::get_packet_flow_id(burst, i); uint64_t rx_ts_ns = 0; if (daqiri::get_packet_rx_timestamp(burst, i, &rx_ts_ns) == daqiri::Status::SUCCESS) { // rx_ts_ns is in the NIC timestamp clock domain. @@ -130,6 +130,61 @@ daqiri::free_all_segment_packets(burst, seg); daqiri::free_rx_burst(burst); ``` +## Dynamic RX Flows + +DPDK RX flows can be added and deleted after `daqiri_init()`. This supports +queues-only startup configs, including `rx.flow_isolation: true` with no +initial `rx.flows`. Static YAML flows still use explicit configured IDs and are +not deletable through this API. + +```cpp +daqiri::FlowRuleConfig flow; +flow.name_ = "udp_5000"; +flow.action_.type_ = daqiri::FlowType::QUEUE; +flow.action_.id_ = 0; +flow.match_.type_ = daqiri::FlowMatchType::NORMAL; +flow.match_.udp_dst_ = 5000; + +daqiri::FlowOpId add_op = 0; +auto st = daqiri::add_rx_flow_async(0, flow, &add_op); +if (st != daqiri::Status::SUCCESS) { + // invalid port/queue/match, unsupported backend, or no flow IDs available +} + +daqiri::FlowId flow_id = 0; +daqiri::FlowOpResult result; +while (flow_id == 0) { + st = daqiri::poll_flow_op(&result); + if (st == daqiri::Status::NOT_READY) { + continue; + } + if (st != daqiri::Status::SUCCESS) { + // handle poll error + break; + } + if (result.op_id_ == add_op) { + if (result.status_ != daqiri::Status::SUCCESS) { + // handle flow create failure + break; + } + flow_id = result.flow_id_; + } +} +``` + +Packets matching a dynamic rule are marked with the same `FlowId` returned by +the add completion, so `get_packet_flow_id()` gives the handle to pass to +`delete_flow_async()`. `poll_flow_op()` returns `Status::NOT_READY` when no flow +operation has completed yet. + +```cpp +daqiri::FlowOpId delete_op = 0; +auto delete_status = daqiri::delete_flow_async(flow_id, &delete_op); +``` + +Dynamic flow support is RX-only in v1. Socket, RDMA, and software loopback +managers return `NOT_SUPPORTED`. + ## Reordered RX Bursts For an overview of what RX reorder is and when to use it, see @@ -411,9 +466,17 @@ workflow sections above show the common call order and ownership rules. | `get_segment_packet_ptr(burst, seg, idx)` | Return a packet pointer for a specific segment. | | `get_packet_length(burst, idx)` | Return the logical packet length. | | `get_segment_packet_length(burst, seg, idx)` | Return the length of one packet segment. | -| `get_packet_flow_id(burst, idx)` | Return the matched flow ID, or `0` when no flow matched. | +| `get_packet_flow_id(burst, idx)` | Return the matched `FlowId`, or `0` when no flow matched. | | `get_packet_rx_timestamp(burst, idx, ×tamp_ns)` | Return the hardware RX timestamp when enabled and available. | +### Dynamic RX Flow Lifecycle + +| Function | Purpose | +| --- | --- | +| `add_rx_flow_async(port, flow, &op_id)` | Enqueue a dynamic RX flow create. The add completion returns the allocated `FlowId`. | +| `delete_flow_async(flow_id, &op_id)` | Enqueue deletion of an active dynamic flow. Static YAML flows and unknown IDs return `INVALID_PARAMETER`. | +| `poll_flow_op(&result)` | Return one completed flow operation, or `NOT_READY` when none are ready. | + ### RX and Reorder | Function | Purpose | diff --git a/docs/concepts.md b/docs/concepts.md index c9ca280..6978615 100644 --- a/docs/concepts.md +++ b/docs/concepts.md @@ -222,7 +222,14 @@ Flow rules are only available in Raw Ethernet (`stream_type: "raw"`). A flow's match can combine fields such as `udp_src`, `udp_dst`, and `ipv4_len`; multiple flows can target the same queue, and the matching flow's ID is available at runtime so the application can distinguish -them. Flows are configured under `rx.flows` in the YAML. +them. + +Flows can be static or dynamic. Static flows are configured under +`rx.flows` in the YAML and keep their configured IDs for the process lifetime. +Dynamic RX flows are added after `daqiri_init()` with `add_rx_flow_async()`; +their non-zero `FlowId` is allocated by DAQIRI, returned in the add completion, +and used as the packet mark returned by `get_packet_flow_id()`. Only dynamic +flows can be deleted dynamically. TX dynamic flows are not part of v1. ### Flow Steering diff --git a/docs/tutorials/configuration-walkthrough.md b/docs/tutorials/configuration-walkthrough.md index bc4a04c..7d462b6 100644 --- a/docs/tutorials/configuration-walkthrough.md +++ b/docs/tutorials/configuration-walkthrough.md @@ -368,7 +368,7 @@ flows: udp_dst: 5000 ``` -1. **`id`** · `integer` · *required* — Flow tag attached to matching packets. Set to a non-zero value here so the `reorder_configs:` block below can reference it via `flow_ids:` to select which packets to reorder. +1. **`id`** · `integer` · *required* — Static flow tag attached to matching packets. Set to a non-zero value here so the `reorder_configs:` block below can reference it via `flow_ids:` to select which packets to reorder. Dynamic RX flows are added after initialization and are not attached to reorder configs in v1. **The `reorder_configs:` block.** The core of the feature — sits inside the `rx:` section alongside `queues` and `flows`. diff --git a/include/daqiri/common.h b/include/daqiri/common.h index dee1b5d..8d7c920 100644 --- a/include/daqiri/common.h +++ b/include/daqiri/common.h @@ -167,9 +167,42 @@ uint32_t get_packet_length(BurstParams *burst, int idx); * * @param burst Burst structure containing packets * @param idx Index of packet - * @return uint16_t Flow ID + * @return FlowId Flow ID, or 0 when no flow matched */ -uint16_t get_packet_flow_id(BurstParams* burst, int idx); +FlowId get_packet_flow_id(BurstParams* burst, int idx); + +/** + * @brief Enqueue creation of a dynamic RX flow rule. + * + * The flow ID is allocated by DAQIRI and returned in the completion result from + * poll_flow_op(). Packets matching the dynamic rule are marked with the same + * ID, so get_packet_flow_id() can be used to identify and later delete the rule. + * + * @param port Port ID of interface + * @param flow Rule match and queue action to install + * @param op_id Output operation ID used to track completion + * @return Status indicating whether the operation was accepted + */ +Status add_rx_flow_async(int port, const FlowRuleConfig &flow, FlowOpId *op_id); + +/** + * @brief Enqueue deletion of a dynamic flow rule. + * + * Only flows created by add_rx_flow_async() are deletable through this API. + * + * @param flow_id Dynamic flow ID returned by an add completion + * @param op_id Output operation ID used to track completion + * @return Status indicating whether the operation was accepted + */ +Status delete_flow_async(FlowId flow_id, FlowOpId *op_id); + +/** + * @brief Poll one dynamic flow operation completion. + * + * @param result Output completion details + * @return SUCCESS when a completion was returned, NOT_READY when none are ready + */ +Status poll_flow_op(FlowOpResult *result); /** * @brief Get the hardware RX timestamp of a packet in nanoseconds @@ -1111,6 +1144,13 @@ template <> struct YAML::convert { rx_cfg.flow_isolation_ = rx["flow_isolation"].as(); } catch (const std::exception& e) { rx_cfg.flow_isolation_ = false; } + try { + rx_cfg.dynamic_flow_capacity_ = + rx["dynamic_flow_capacity"].as(); + } catch (const std::exception& e) { + rx_cfg.dynamic_flow_capacity_ = daqiri::DEFAULT_DYNAMIC_FLOW_CAPACITY; + } + for (const auto &q_item : rx["queues"]) { daqiri::RxQueueConfig q; if (!parse_rx_queue_config( @@ -1128,13 +1168,20 @@ template <> struct YAML::convert { rx_cfg.queues_.emplace_back(std::move(q)); } - for (const auto &flow_item : rx["flows"]) { - daqiri::FlowConfig flow; - if (!parse_flow_config(flow_item, flow)) { - DAQIRI_LOG_ERROR("Failed to parse FlowConfig"); + if (rx["flows"].IsDefined()) { + if (!rx["flows"].IsSequence()) { + DAQIRI_LOG_ERROR("'rx.flows' must be a sequence for interface '{}'", + ifcfg.name_); return false; } - rx_cfg.flows_.emplace_back(std::move(flow)); + for (const auto &flow_item : rx["flows"]) { + daqiri::FlowConfig flow; + if (!parse_flow_config(flow_item, flow)) { + DAQIRI_LOG_ERROR("Failed to parse FlowConfig"); + return false; + } + rx_cfg.flows_.emplace_back(std::move(flow)); + } } try { diff --git a/include/daqiri/types.h b/include/daqiri/types.h index a8a4041..885e3ff 100644 --- a/include/daqiri/types.h +++ b/include/daqiri/types.h @@ -44,6 +44,10 @@ static inline constexpr uint32_t MAX_INTERFACES = 4; static inline constexpr int MAX_NUM_SEGS = 4; static inline constexpr uint32_t DAQIRI_BURST_FLAG_REORDERED = (1U << 28); static inline constexpr uint32_t DAQIRI_BURST_FLAG_REORDER_TIMEOUT = (1U << 29); +static inline constexpr uint32_t DEFAULT_DYNAMIC_FLOW_CAPACITY = 1024; + +using FlowId = uint32_t; +using FlowOpId = uint64_t; struct ReorderBurstInfo { uint64_t batch_id; @@ -541,14 +545,14 @@ struct TxQueueConfig { enum class FlowType { QUEUE }; struct FlowAction { - FlowType type_; - uint16_t id_; + FlowType type_ = FlowType::QUEUE; + uint16_t id_ = 0; }; struct FlexItemMatch { - uint16_t flex_item_id_; - uint32_t val_; - uint32_t mask_; + uint16_t flex_item_id_ = 0; + uint32_t val_ = 0; + uint32_t mask_ = 0; }; enum class FlowMatchType { @@ -557,20 +561,39 @@ enum class FlowMatchType { }; struct FlowMatch { - FlowMatchType type_; - uint16_t udp_src_; - uint16_t udp_dst_; - uint16_t ipv4_len_; - in_addr_t ipv4_src_; - in_addr_t ipv4_dst_; + FlowMatchType type_ = FlowMatchType::NORMAL; + uint16_t udp_src_ = 0; + uint16_t udp_dst_ = 0; + uint16_t ipv4_len_ = 0; + in_addr_t ipv4_src_ = INADDR_ANY; + in_addr_t ipv4_dst_ = INADDR_ANY; FlexItemMatch flex_item_match_; }; struct FlowConfig { std::string name_; - uint16_t id_; + FlowId id_ = 0; + FlowAction action_; + FlowMatch match_; + void* backend_config_ = nullptr; // Filled in by operator +}; + +struct FlowRuleConfig { + std::string name_; FlowAction action_; FlowMatch match_; - void* backend_config_; // Filled in by operator + void* backend_config_ = nullptr; // Filled in by operator +}; + +enum class FlowOpType { + ADD_RX, + DELETE, +}; + +struct FlowOpResult { + FlowOpId op_id_ = 0; + FlowOpType type_ = FlowOpType::ADD_RX; + Status status_ = Status::NOT_READY; + FlowId flow_id_ = 0; }; struct CommonConfig { @@ -756,7 +779,7 @@ struct ReorderConfig { std::string reorder_type_; std::string memory_region_; uint32_t payload_byte_offset_ = 0; - std::vector flow_ids_; + std::vector flow_ids_; ReorderMethod method_ = ReorderMethod::INVALID; ReorderSeqBatchNumberConfig seq_batch_number_; ReorderSeqPacketsPerBatchConfig seq_packets_per_batch_; @@ -766,6 +789,7 @@ struct ReorderConfig { struct RxConfig { bool flow_isolation_ = false; bool hardware_timestamps_ = false; + uint32_t dynamic_flow_capacity_ = DEFAULT_DYNAMIC_FLOW_CAPACITY; std::vector queues_; std::vector flows_; std::vector flex_items_; diff --git a/python/daqiri_common_pybind.cpp b/python/daqiri_common_pybind.cpp index fac4ab7..c8b1fcf 100644 --- a/python/daqiri_common_pybind.cpp +++ b/python/daqiri_common_pybind.cpp @@ -377,6 +377,10 @@ void bind_enums(py::module_ &m) { .value("NORMAL", FlowMatchType::NORMAL) .value("FLEX_ITEM", FlowMatchType::FLEX_ITEM); + py::enum_(m, "FlowOpType") + .value("ADD_RX", FlowOpType::ADD_RX) + .value("DELETE", FlowOpType::DELETE); + py::enum_(m, "ReorderMethod") .value("INVALID", ReorderMethod::INVALID) .value("SEQ_BATCH_NUMBER", ReorderMethod::SEQ_BATCH_NUMBER) @@ -525,6 +529,19 @@ void bind_config_types(py::module_ &m) { .def_readwrite("action", &FlowConfig::action_) .def_readwrite("match", &FlowConfig::match_); + py::class_(m, "FlowRuleConfig") + .def(py::init<>()) + .def_readwrite("name", &FlowRuleConfig::name_) + .def_readwrite("action", &FlowRuleConfig::action_) + .def_readwrite("match", &FlowRuleConfig::match_); + + py::class_(m, "FlowOpResult") + .def(py::init<>()) + .def_readwrite("op_id", &FlowOpResult::op_id_) + .def_readwrite("type", &FlowOpResult::type_) + .def_readwrite("status", &FlowOpResult::status_) + .def_readwrite("flow_id", &FlowOpResult::flow_id_); + py::class_(m, "CommonConfig") .def(py::init<>()) .def_readwrite("version", &CommonConfig::version) @@ -607,6 +624,8 @@ void bind_config_types(py::module_ &m) { .def(py::init<>()) .def_readwrite("flow_isolation", &RxConfig::flow_isolation_) .def_readwrite("hardware_timestamps", &RxConfig::hardware_timestamps_) + .def_readwrite("dynamic_flow_capacity", + &RxConfig::dynamic_flow_capacity_) .def_readwrite("queues", &RxConfig::queues_) .def_readwrite("flows", &RxConfig::flows_) .def_readwrite("flex_items", &RxConfig::flex_items_) @@ -929,6 +948,29 @@ PYBIND11_MODULE(_daqiri, m) { m.def("get_port_id", &get_port_id, "key"_a); m.def("drop_all_traffic", &drop_all_traffic, "port"_a); m.def("allow_all_traffic", &allow_all_traffic, "port"_a); + m.def( + "add_rx_flow_async", + [](int port, const FlowRuleConfig &flow) { + FlowOpId op_id = 0; + const Status status = add_rx_flow_async(port, flow, &op_id); + return py::make_tuple(status, op_id); + }, + "port"_a, "flow"_a); + m.def( + "delete_flow_async", + [](FlowId flow_id) { + FlowOpId op_id = 0; + const Status status = delete_flow_async(flow_id, &op_id); + return py::make_tuple(status, op_id); + }, + "flow_id"_a); + m.def( + "poll_flow_op", + []() { + FlowOpResult result; + const Status status = poll_flow_op(&result); + return py::make_tuple(status, result); + }); m.def("get_num_rx_queues", &get_num_rx_queues, "port_id"_a); m.def("flush_port_queue", &flush_port_queue, "port"_a, "queue"_a); diff --git a/src/common.cpp b/src/common.cpp index bca3d31..e8cac58 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -103,11 +103,26 @@ uint32_t get_packet_length(BurstParams* burst, int idx) { return g_daqiri_mgr->get_packet_length(burst, idx); } -uint16_t get_packet_flow_id(BurstParams* burst, int idx) { +FlowId get_packet_flow_id(BurstParams* burst, int idx) { ASSERT_DAQIRI_MGR_INITIALIZED(); return g_daqiri_mgr->get_packet_flow_id(burst, idx); } +Status add_rx_flow_async(int port, const FlowRuleConfig& flow, FlowOpId* op_id) { + ASSERT_DAQIRI_MGR_INITIALIZED(); + return g_daqiri_mgr->add_rx_flow_async(port, flow, op_id); +} + +Status delete_flow_async(FlowId flow_id, FlowOpId* op_id) { + ASSERT_DAQIRI_MGR_INITIALIZED(); + return g_daqiri_mgr->delete_flow_async(flow_id, op_id); +} + +Status poll_flow_op(FlowOpResult* result) { + ASSERT_DAQIRI_MGR_INITIALIZED(); + return g_daqiri_mgr->poll_flow_op(result); +} + Status get_packet_rx_timestamp(BurstParams* burst, int idx, uint64_t* timestamp_ns) { ASSERT_DAQIRI_MGR_INITIALIZED(); return g_daqiri_mgr->get_packet_rx_timestamp(burst, idx, timestamp_ns); @@ -589,7 +604,7 @@ bool YAML::convert::parse_flow_config( struct in_addr addr; try { flow.name_ = flow_item["name"].as(); - flow.id_ = flow_item["id"].as(); + flow.id_ = flow_item["id"].as(); flow.action_.type_ = daqiri::FlowType::QUEUE; flow.action_.id_ = flow_item["action"]["id"].as(); } catch (const std::exception& e) { @@ -743,7 +758,7 @@ bool YAML::convert::parse_reorder_config( } for (const auto& flow_id_node : reorder_item["flow_ids"]) { - reorder_config.flow_ids_.push_back(flow_id_node.as()); + reorder_config.flow_ids_.push_back(flow_id_node.as()); } if (reorder_config.flow_ids_.empty()) { DAQIRI_LOG_ERROR("Reorder config '{}' requires at least one flow ID", diff --git a/src/manager.cpp b/src/manager.cpp index f1223a1..d50abb2 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -784,6 +784,26 @@ Status Manager::allow_all_traffic(int port) { return Status::NOT_SUPPORTED; } +Status Manager::add_rx_flow_async(int port, const FlowRuleConfig& flow, FlowOpId* op_id) { + (void)port; + (void)flow; + (void)op_id; + DAQIRI_LOG_ERROR("add_rx_flow_async not implemented for this manager type"); + return Status::NOT_SUPPORTED; +} + +Status Manager::delete_flow_async(FlowId flow_id, FlowOpId* op_id) { + (void)flow_id; + (void)op_id; + DAQIRI_LOG_ERROR("delete_flow_async not implemented for this manager type"); + return Status::NOT_SUPPORTED; +} + +Status Manager::poll_flow_op(FlowOpResult* result) { + (void)result; + return Status::NOT_SUPPORTED; +} + Status Manager::get_rx_burst(BurstParams** burst, int port_id) { // Check if the port_id is valid if (port_id < 0 || port_id >= static_cast(cfg_.ifs_.size())) { diff --git a/src/manager.h b/src/manager.h index cf63319..31cb423 100644 --- a/src/manager.h +++ b/src/manager.h @@ -47,7 +47,7 @@ class Manager { virtual uint32_t get_packet_length(BurstParams* burst, int idx) = 0; virtual void* get_segment_packet_ptr(BurstParams* burst, int seg, int idx) = 0; virtual uint32_t get_segment_packet_length(BurstParams* burst, int seg, int idx) = 0; - virtual uint16_t get_packet_flow_id(BurstParams* burst, int idx) = 0; + virtual FlowId get_packet_flow_id(BurstParams* burst, int idx) = 0; virtual Status get_packet_rx_timestamp(BurstParams* burst, int idx, uint64_t* timestamp_ns) = 0; virtual void* get_packet_extra_info(BurstParams* burst, int idx) = 0; virtual Status get_tx_packet_burst(BurstParams* burst) = 0; @@ -89,6 +89,9 @@ class Manager { virtual Status get_mac_addr(int port, char* mac) = 0; virtual Status drop_all_traffic(int port); virtual Status allow_all_traffic(int port); + virtual Status add_rx_flow_async(int port, const FlowRuleConfig& flow, FlowOpId* op_id); + virtual Status delete_flow_async(FlowId flow_id, FlowOpId* op_id); + virtual Status poll_flow_op(FlowOpResult* result); virtual int get_port_id(const std::string& key) final; // NOLINT(readability/inheritance) virtual bool validate_config() const; virtual uint16_t get_num_rx_queues(int port_id) const; diff --git a/src/managers/dpdk/daqiri_dpdk_mgr.cpp b/src/managers/dpdk/daqiri_dpdk_mgr.cpp index 029f45c..3885a3b 100644 --- a/src/managers/dpdk/daqiri_dpdk_mgr.cpp +++ b/src/managers/dpdk/daqiri_dpdk_mgr.cpp @@ -302,7 +302,7 @@ bool DpdkMgr::init_reorder_queue_state(const InterfaceConfig& intf, const RxQueu const auto key = generate_queue_key(intf.port_id_, qcfg.common_.id_); ReorderQueueState qstate; - std::unordered_map flow_id_to_queue; + std::unordered_map flow_id_to_queue; for (const auto& flow : intf.rx_.flows_) { if (flow_id_to_queue.find(flow.id_) != flow_id_to_queue.end()) { DAQIRI_LOG_ERROR("Duplicate flow ID {} in interface '{}'", flow.id_, intf.name_); @@ -314,7 +314,7 @@ bool DpdkMgr::init_reorder_queue_state(const InterfaceConfig& intf, const RxQueu for (const auto& reorder_cfg : intf.rx_.reorder_configs_) { const bool use_gpu_backend = reorder_cfg.reorder_type_ == "gpu"; int flow_queue_id = -1; - std::vector queue_flow_ids; + std::vector queue_flow_ids; queue_flow_ids.reserve(reorder_cfg.flow_ids_.size()); for (const auto flow_id : reorder_cfg.flow_ids_) { @@ -1297,7 +1297,7 @@ Status DpdkMgr::process_burst_for_reorder(uint32_t key, ReorderQueueState& qstat qstate.unmatched_count = 0; for (int i = 0; i < num_pkts; ++i) { - const uint16_t flow_id = get_packet_flow_id(burst, i); + const FlowId flow_id = get_packet_flow_id(burst, i); const auto flow_it = qstate.flow_id_to_plan.find(flow_id); if (flow_it == qstate.flow_id_to_plan.end()) { if (qstate.unmatched_count >= unmatched_indices.size()) { @@ -1855,6 +1855,11 @@ void DpdkMgr::initialize() { for (auto& conf : local_port_conf) { conf = conf_eth_port; } + if (!reserve_static_flow_ids()) { + DAQIRI_LOG_CRITICAL("Failed to reserve static flow IDs"); + return; + } + /* Initialize DPDK params */ constexpr int max_nargs = 32; constexpr int max_arg_size = 64; @@ -2270,10 +2275,9 @@ void DpdkMgr::initialize() { DAQIRI_LOG_INFO("Successfully configured ethdev"); } - ret = - rte_eth_dev_adjust_nb_rx_tx_desc(intf.port_id_, - &default_num_rx_desc, - &default_num_tx_desc); + ret = rte_eth_dev_adjust_nb_rx_tx_desc(intf.port_id_, + &default_num_rx_desc, + &default_num_tx_desc); if (ret < 0) { DAQIRI_LOG_CRITICAL( "Cannot adjust number of descriptors: err={}, port={}", ret, intf.port_id_); @@ -2285,6 +2289,8 @@ void DpdkMgr::initialize() { rte_eth_macaddr_get(intf.port_id_, &conf_ports_eth_addr[intf.port_id_]); + configure_flow_api_for_port(intf.port_id_, rx.dynamic_flow_capacity_); + if (intf.rx_.flow_isolation_) { struct rte_flow_error error; ret = rte_flow_isolate(intf.port_id_, 1, &error); @@ -2384,7 +2390,8 @@ void DpdkMgr::initialize() { for (const auto& flow : rx.flows_) { DAQIRI_LOG_INFO("Adding RX flow {}", flow.name_); if (flow.match_.type_ == FlowMatchType::FLEX_ITEM) { - add_flex_item_flow(intf.port_id_, flow.match_.flex_item_match_, flow.action_.id_); + add_flex_item_flow( + intf.port_id_, flow.match_.flex_item_match_, flow.action_.id_, flow.id_); } else { add_flow(intf.port_id_, flow); } @@ -2544,6 +2551,683 @@ int DpdkMgr::setup_pools_and_rings(int max_rx_batch, int max_tx_batch) { #define MAX_PATTERN_NUM 5 #define MAX_ACTION_NUM 4 +static constexpr uint8_t kFlowPatternIpv4Len = 1U << 0; +static constexpr uint8_t kFlowPatternIpv4Src = 1U << 1; +static constexpr uint8_t kFlowPatternIpv4Dst = 1U << 2; +static constexpr uint8_t kFlowPatternUdpSrc = 1U << 3; +static constexpr uint8_t kFlowPatternUdpDst = 1U << 4; +static constexpr uint8_t kNormalFlowPatternTemplateCount = 32; +static constexpr uint32_t kDynamicFlowQueueId = 0; +static constexpr uint32_t kRxFlowGroup = 3; +static constexpr uint32_t kRxFlowPriority = 1; + +static void build_normal_template_pattern(uint8_t pattern_index, + struct rte_flow_item pattern[MAX_PATTERN_NUM]) { + static struct rte_flow_item_ipv4 ip_mask; + static struct rte_flow_item_udp udp_mask; + + memset(pattern, 0, sizeof(struct rte_flow_item) * MAX_PATTERN_NUM); + memset(&ip_mask, 0, sizeof(ip_mask)); + memset(&udp_mask, 0, sizeof(udp_mask)); + + pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; + pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4; + pattern[2].type = RTE_FLOW_ITEM_TYPE_UDP; + pattern[3].type = RTE_FLOW_ITEM_TYPE_END; + + if ((pattern_index & kFlowPatternIpv4Len) != 0) { ip_mask.hdr.total_length = 0xffff; } + if ((pattern_index & kFlowPatternIpv4Src) != 0) { ip_mask.hdr.src_addr = 0xffffffff; } + if ((pattern_index & kFlowPatternIpv4Dst) != 0) { ip_mask.hdr.dst_addr = 0xffffffff; } + if ((pattern_index & (kFlowPatternIpv4Len | kFlowPatternIpv4Src | kFlowPatternIpv4Dst)) != 0) { + pattern[1].mask = &ip_mask; + } + + if ((pattern_index & kFlowPatternUdpSrc) != 0) { udp_mask.hdr.src_port = 0xffff; } + if ((pattern_index & kFlowPatternUdpDst) != 0) { udp_mask.hdr.dst_port = 0xffff; } + if ((pattern_index & (kFlowPatternUdpSrc | kFlowPatternUdpDst)) != 0) { + pattern[2].mask = &udp_mask; + } +} + +void* DpdkMgr::flow_op_user_data(FlowOpId op_id) { + return reinterpret_cast(static_cast(op_id)); +} + +FlowOpId DpdkMgr::flow_op_id_from_user_data(void* user_data) { + return static_cast(reinterpret_cast(user_data)); +} + +FlowOpId DpdkMgr::allocate_flow_op_id() { + if (next_flow_op_id_ == 0) { next_flow_op_id_ = 1; } + return next_flow_op_id_++; +} + +FlowId DpdkMgr::allocate_dynamic_flow_id() { + while (next_dynamic_flow_id_ != 0) { + const FlowId candidate = next_dynamic_flow_id_++; + if (candidate == 0) { continue; } + if (static_flow_ids_.find(candidate) != static_flow_ids_.end()) { continue; } + if (dynamic_flows_.find(candidate) != dynamic_flows_.end()) { continue; } + return candidate; + } + return 0; +} + +bool DpdkMgr::reserve_static_flow_ids() { + static_flow_ids_.clear(); + for (const auto& intf : cfg_.ifs_) { + for (const auto& flow : intf.rx_.flows_) { + if (flow.id_ == 0) { + DAQIRI_LOG_ERROR("Static flow '{}' on interface '{}' uses reserved flow ID 0", + flow.name_, + intf.name_); + return false; + } + if (!static_flow_ids_.insert(flow.id_).second) { + DAQIRI_LOG_ERROR("Duplicate static flow ID {}", flow.id_); + return false; + } + } + } + next_dynamic_flow_id_ = 1; + return true; +} + +bool DpdkMgr::is_valid_rx_queue(int port, uint16_t queue_id) const { + if (port < 0 || port >= static_cast(cfg_.ifs_.size())) { return false; } + const auto& queues = cfg_.ifs_[port].rx_.queues_; + return std::any_of(queues.begin(), queues.end(), [queue_id](const RxQueueConfig& q) { + return q.common_.id_ == queue_id; + }); +} + +uint8_t DpdkMgr::normal_flow_pattern_index(const FlowMatch& match) const { + uint8_t pattern_index = 0; + if (match.ipv4_len_ > 0) { pattern_index |= kFlowPatternIpv4Len; } + if (match.ipv4_src_ != INADDR_ANY) { pattern_index |= kFlowPatternIpv4Src; } + if (match.ipv4_dst_ != INADDR_ANY) { pattern_index |= kFlowPatternIpv4Dst; } + if (match.udp_src_ > 0) { pattern_index |= kFlowPatternUdpSrc; } + if (match.udp_dst_ > 0) { pattern_index |= kFlowPatternUdpDst; } + return pattern_index; +} + +bool DpdkMgr::is_normal_flow_match(const FlowMatch& match) const { + return match.type_ == FlowMatchType::NORMAL && normal_flow_pattern_index(match) != 0; +} + +bool DpdkMgr::validate_dynamic_rx_flow(int port, const FlowRuleConfig& flow) const { + if (!initialized_) { + DAQIRI_LOG_ERROR("Cannot add dynamic RX flow before DAQIRI initialization"); + return false; + } + if (loopback_ == LoopbackType::LOOPBACK_TYPE_SW) { + DAQIRI_LOG_ERROR("Dynamic RX flows are not supported in software loopback mode"); + return false; + } + if (port < 0 || port >= static_cast(cfg_.ifs_.size())) { + DAQIRI_LOG_ERROR("Invalid dynamic RX flow port {}", port); + return false; + } + if (flow.action_.type_ != FlowType::QUEUE) { + DAQIRI_LOG_ERROR("Dynamic RX flow action type is not supported"); + return false; + } + if (!is_valid_rx_queue(port, flow.action_.id_)) { + DAQIRI_LOG_ERROR("Dynamic RX flow targets invalid port/queue {}/{}", port, flow.action_.id_); + return false; + } + if (is_normal_flow_match(flow.match_)) { return true; } + if (flow.match_.type_ == FlowMatchType::FLEX_ITEM) { + const auto& flex_items = cfg_.ifs_[port].rx_.flex_items_; + if (flow.match_.flex_item_match_.flex_item_id_ >= flex_items.size()) { + DAQIRI_LOG_ERROR("Dynamic RX flow references invalid flex item ID {}", + flow.match_.flex_item_match_.flex_item_id_); + return false; + } + return true; + } + + DAQIRI_LOG_ERROR("Dynamic RX flow must define a normal IPv4/UDP or flex-item match"); + return false; +} + +void DpdkMgr::build_normal_flow_pattern(const FlowMatch& match, + struct rte_flow_item pattern[MAX_PATTERN_NUM]) const { + static struct rte_flow_item_ipv4 ip_spec; + static struct rte_flow_item_udp udp_spec; + + memset(pattern, 0, sizeof(struct rte_flow_item) * MAX_PATTERN_NUM); + memset(&ip_spec, 0, sizeof(ip_spec)); + memset(&udp_spec, 0, sizeof(udp_spec)); + + pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; + pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4; + pattern[2].type = RTE_FLOW_ITEM_TYPE_UDP; + pattern[3].type = RTE_FLOW_ITEM_TYPE_END; + + bool has_ip_match = false; + if (match.ipv4_len_ > 0) { + ip_spec.hdr.total_length = htons(match.ipv4_len_); + has_ip_match = true; + } + if (match.ipv4_src_ != INADDR_ANY) { + ip_spec.hdr.src_addr = match.ipv4_src_; + has_ip_match = true; + } + if (match.ipv4_dst_ != INADDR_ANY) { + ip_spec.hdr.dst_addr = match.ipv4_dst_; + has_ip_match = true; + } + if (has_ip_match) { pattern[1].spec = &ip_spec; } + + bool has_udp_match = false; + if (match.udp_src_ > 0) { + udp_spec.hdr.src_port = htons(match.udp_src_); + has_udp_match = true; + } + if (match.udp_dst_ > 0) { + udp_spec.hdr.dst_port = htons(match.udp_dst_); + has_udp_match = true; + } + if (has_udp_match) { pattern[2].spec = &udp_spec; } +} + +void DpdkMgr::build_mark_queue_actions(FlowId flow_id, + uint16_t queue_id, + struct rte_flow_action action[MAX_ACTION_NUM]) const { + static struct rte_flow_action_mark mark; + static struct rte_flow_action_queue queue; + + memset(action, 0, sizeof(struct rte_flow_action) * MAX_ACTION_NUM); + memset(&mark, 0, sizeof(mark)); + memset(&queue, 0, sizeof(queue)); + + mark.id = flow_id; + queue.index = queue_id; + action[0].type = RTE_FLOW_ACTION_TYPE_MARK; + action[0].conf = &mark; + action[1].type = RTE_FLOW_ACTION_TYPE_QUEUE; + action[1].conf = &queue; + action[2].type = RTE_FLOW_ACTION_TYPE_END; +} + +Status DpdkMgr::enqueue_software_flow_completion(const FlowOpResult& result) { + ready_flow_ops_.push(result); + return Status::SUCCESS; +} + +bool DpdkMgr::configure_flow_api_for_port(uint16_t port, uint32_t capacity) { + if (port >= flow_template_states_.size()) { return false; } + auto& state = flow_template_states_[port]; + if (state.configured) { return true; } + + struct rte_flow_error error; + struct rte_flow_port_info port_info; + struct rte_flow_queue_info queue_info; + memset(&error, 0, sizeof(error)); + memset(&port_info, 0, sizeof(port_info)); + memset(&queue_info, 0, sizeof(queue_info)); + + int ret = rte_flow_info_get(port, &port_info, &queue_info, &error); + if (ret < 0 || port_info.max_nb_queues == 0) { + DAQIRI_LOG_WARN("DPDK async flow API is not available on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + return false; + } + + state.capacity = capacity == 0 ? DEFAULT_DYNAMIC_FLOW_CAPACITY : capacity; + struct rte_flow_port_attr port_attr; + struct rte_flow_queue_attr queue_attr; + memset(&port_attr, 0, sizeof(port_attr)); + memset(&queue_attr, 0, sizeof(queue_attr)); + + if ((port_info.supported_flags & RTE_FLOW_PORT_FLAG_STRICT_QUEUE) != 0) { + port_attr.flags |= RTE_FLOW_PORT_FLAG_STRICT_QUEUE; + } + queue_attr.size = std::max(64, state.capacity * 2); + if (queue_info.max_size > 0) { queue_attr.size = std::min(queue_attr.size, queue_info.max_size); } + const struct rte_flow_queue_attr* queue_attrs[] = {&queue_attr}; + + ret = rte_flow_configure(port, &port_attr, 1, queue_attrs, &error); + if (ret < 0) { + DAQIRI_LOG_WARN("Failed to configure DPDK async flow API on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + return false; + } + + state.configured = true; + state.flow_queue_id = kDynamicFlowQueueId; + DAQIRI_LOG_INFO("Configured DPDK async flow API on port {} with queue size {}", + port, + queue_attr.size); + return true; +} + +bool DpdkMgr::ensure_normal_flow_template_table(uint16_t port) { + if (port >= flow_template_states_.size()) { return false; } + auto& state = flow_template_states_[port]; + if (state.templates_ready) { return true; } + if (!state.configured) { return false; } + + struct rte_flow_error error; + memset(&error, 0, sizeof(error)); + + state.normal_pattern_templates.assign(kNormalFlowPatternTemplateCount, nullptr); + for (uint8_t pattern_idx = 0; pattern_idx < kNormalFlowPatternTemplateCount; ++pattern_idx) { + struct rte_flow_pattern_template_attr pattern_attr; + struct rte_flow_item pattern[MAX_PATTERN_NUM]; + memset(&pattern_attr, 0, sizeof(pattern_attr)); + pattern_attr.ingress = 1; + pattern_attr.relaxed_matching = 1; + build_normal_template_pattern(pattern_idx, pattern); + + state.normal_pattern_templates[pattern_idx] = + rte_flow_pattern_template_create(port, &pattern_attr, pattern, &error); + if (state.normal_pattern_templates[pattern_idx] == nullptr) { + DAQIRI_LOG_WARN("Failed to create RX flow pattern template {} on port {}: {}", + pattern_idx, + port, + error.message ? error.message : rte_strerror(rte_errno)); + for (auto* tmpl : state.normal_pattern_templates) { + if (tmpl != nullptr) { rte_flow_pattern_template_destroy(port, tmpl, &error); } + } + state.normal_pattern_templates.clear(); + return false; + } + } + + struct rte_flow_actions_template_attr actions_attr; + struct rte_flow_action actions[MAX_ACTION_NUM]; + struct rte_flow_action masks[MAX_ACTION_NUM]; + struct rte_flow_action_mark mark; + struct rte_flow_action_queue queue; + struct rte_flow_action_mark mark_mask; + struct rte_flow_action_queue queue_mask; + memset(&actions_attr, 0, sizeof(actions_attr)); + memset(actions, 0, sizeof(actions)); + memset(masks, 0, sizeof(masks)); + memset(&mark, 0, sizeof(mark)); + memset(&queue, 0, sizeof(queue)); + memset(&mark_mask, 0, sizeof(mark_mask)); + memset(&queue_mask, 0, sizeof(queue_mask)); + actions_attr.ingress = 1; + actions[0].type = RTE_FLOW_ACTION_TYPE_MARK; + actions[0].conf = &mark; + actions[1].type = RTE_FLOW_ACTION_TYPE_QUEUE; + actions[1].conf = &queue; + actions[2].type = RTE_FLOW_ACTION_TYPE_END; + masks[0].type = RTE_FLOW_ACTION_TYPE_MARK; + masks[0].conf = &mark_mask; + masks[1].type = RTE_FLOW_ACTION_TYPE_QUEUE; + masks[1].conf = &queue_mask; + masks[2].type = RTE_FLOW_ACTION_TYPE_END; + + state.mark_queue_actions_template = + rte_flow_actions_template_create(port, &actions_attr, actions, masks, &error); + if (state.mark_queue_actions_template == nullptr) { + DAQIRI_LOG_WARN("Failed to create RX flow actions template on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + for (auto* tmpl : state.normal_pattern_templates) { + if (tmpl != nullptr) { rte_flow_pattern_template_destroy(port, tmpl, &error); } + } + state.normal_pattern_templates.clear(); + return false; + } + + struct rte_flow_template_table_attr table_attr; + memset(&table_attr, 0, sizeof(table_attr)); + table_attr.flow_attr.ingress = 1; + table_attr.flow_attr.group = kRxFlowGroup; + table_attr.flow_attr.priority = kRxFlowPriority; + table_attr.nb_flows = state.capacity; + table_attr.insertion_type = RTE_FLOW_TABLE_INSERTION_TYPE_PATTERN; + table_attr.hash_func = RTE_FLOW_TABLE_HASH_FUNC_DEFAULT; + + std::vector action_templates = { + state.mark_queue_actions_template}; + state.normal_table = + rte_flow_template_table_create(port, + &table_attr, + state.normal_pattern_templates.data(), + static_cast(state.normal_pattern_templates.size()), + action_templates.data(), + static_cast(action_templates.size()), + &error); + if (state.normal_table == nullptr) { + DAQIRI_LOG_WARN("Failed to create RX flow template table on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + rte_flow_actions_template_destroy(port, state.mark_queue_actions_template, &error); + state.mark_queue_actions_template = nullptr; + for (auto* tmpl : state.normal_pattern_templates) { + if (tmpl != nullptr) { rte_flow_pattern_template_destroy(port, tmpl, &error); } + } + state.normal_pattern_templates.clear(); + return false; + } + + state.templates_ready = true; + DAQIRI_LOG_INFO("Created RX flow template table on port {} with capacity {}", + port, + state.capacity); + return true; +} + +Status DpdkMgr::add_rx_flow_legacy_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id, + FlowOpId op_id) { + FlowConfig cfg; + cfg.name_ = flow.name_; + cfg.id_ = flow_id; + cfg.action_ = flow.action_; + cfg.match_ = flow.match_; + cfg.backend_config_ = flow.backend_config_; + + struct rte_flow* rte_flow = nullptr; + if (cfg.match_.type_ == FlowMatchType::FLEX_ITEM) { + rte_flow = add_flex_item_flow(port, cfg.match_.flex_item_match_, cfg.action_.id_, cfg.id_); + } else { + rte_flow = add_flow(port, cfg); + } + + FlowOpResult result; + result.op_id_ = op_id; + result.type_ = FlowOpType::ADD_RX; + result.flow_id_ = flow_id; + + if (rte_flow == nullptr) { + result.status_ = Status::INTERNAL_ERROR; + enqueue_software_flow_completion(result); + return Status::SUCCESS; + } + + DynamicFlowEntry entry; + entry.flow_id = flow_id; + entry.port = static_cast(port); + entry.queue = flow.action_.id_; + entry.flow = rte_flow; + entry.backend = DynamicFlowBackend::LEGACY; + entry.state = DynamicFlowState::ACTIVE; + dynamic_flows_[flow_id] = entry; + + result.status_ = Status::SUCCESS; + enqueue_software_flow_completion(result); + return Status::SUCCESS; +} + +Status DpdkMgr::add_rx_flow_template_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id, + FlowOpId op_id) { + if (!ensure_rx_flow_jump_rule(static_cast(port)) || + !ensure_normal_flow_template_table(static_cast(port))) { + return add_rx_flow_legacy_locked(port, flow, flow_id, op_id); + } + + auto& state = flow_template_states_[port]; + struct rte_flow_item pattern[MAX_PATTERN_NUM]; + struct rte_flow_action action[MAX_ACTION_NUM]; + struct rte_flow_op_attr op_attr; + struct rte_flow_error error; + memset(&op_attr, 0, sizeof(op_attr)); + memset(&error, 0, sizeof(error)); + build_normal_flow_pattern(flow.match_, pattern); + build_mark_queue_actions(flow_id, flow.action_.id_, action); + + const uint8_t pattern_index = normal_flow_pattern_index(flow.match_); + struct rte_flow* rte_flow = + rte_flow_async_create(static_cast(port), + state.flow_queue_id, + &op_attr, + state.normal_table, + pattern, + pattern_index, + action, + 0, + flow_op_user_data(op_id), + &error); + if (rte_flow == nullptr) { + DAQIRI_LOG_WARN("Failed to enqueue async RX flow create on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + return add_rx_flow_legacy_locked(port, flow, flow_id, op_id); + } + + if (rte_flow_push(static_cast(port), state.flow_queue_id, &error) < 0) { + DAQIRI_LOG_WARN("Failed to push async RX flow create on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + + FlowOpResult result; + result.op_id_ = op_id; + result.type_ = FlowOpType::ADD_RX; + result.status_ = Status::INTERNAL_ERROR; + result.flow_id_ = flow_id; + enqueue_software_flow_completion(result); + return Status::SUCCESS; + } + + DynamicFlowEntry entry; + entry.flow_id = flow_id; + entry.port = static_cast(port); + entry.queue = flow.action_.id_; + entry.flow = rte_flow; + entry.backend = DynamicFlowBackend::TEMPLATE; + entry.state = DynamicFlowState::ADDING; + entry.flow_queue_id = state.flow_queue_id; + dynamic_flows_[flow_id] = entry; + + PendingFlowOp pending; + pending.flow_id = flow_id; + pending.result.op_id_ = op_id; + pending.result.type_ = FlowOpType::ADD_RX; + pending.result.status_ = Status::NOT_READY; + pending.result.flow_id_ = flow_id; + pending_flow_ops_[op_id] = pending; + return Status::SUCCESS; +} + +Status DpdkMgr::delete_flow_legacy_locked(DynamicFlowEntry& entry, FlowOpId op_id) { + struct rte_flow_error error; + memset(&error, 0, sizeof(error)); + + FlowOpResult result; + result.op_id_ = op_id; + result.type_ = FlowOpType::DELETE; + result.flow_id_ = entry.flow_id; + result.status_ = Status::SUCCESS; + + if (rte_flow_destroy(entry.port, entry.flow, &error) != 0) { + DAQIRI_LOG_ERROR("Failed to destroy dynamic RX flow {} on port {}: {}", + entry.flow_id, + entry.port, + error.message ? error.message : rte_strerror(rte_errno)); + result.status_ = Status::INTERNAL_ERROR; + } + + dynamic_flows_.erase(entry.flow_id); + enqueue_software_flow_completion(result); + return Status::SUCCESS; +} + +Status DpdkMgr::delete_flow_template_locked(DynamicFlowEntry& entry, FlowOpId op_id) { + struct rte_flow_op_attr op_attr; + struct rte_flow_error error; + memset(&op_attr, 0, sizeof(op_attr)); + memset(&error, 0, sizeof(error)); + + if (rte_flow_async_destroy(entry.port, + entry.flow_queue_id, + &op_attr, + entry.flow, + flow_op_user_data(op_id), + &error) < 0) { + DAQIRI_LOG_ERROR("Failed to enqueue async RX flow destroy for {} on port {}: {}", + entry.flow_id, + entry.port, + error.message ? error.message : rte_strerror(rte_errno)); + return Status::INTERNAL_ERROR; + } + if (rte_flow_push(entry.port, entry.flow_queue_id, &error) < 0) { + DAQIRI_LOG_ERROR("Failed to push async RX flow destroy for {} on port {}: {}", + entry.flow_id, + entry.port, + error.message ? error.message : rte_strerror(rte_errno)); + return Status::INTERNAL_ERROR; + } + + entry.state = DynamicFlowState::DELETING; + PendingFlowOp pending; + pending.flow_id = entry.flow_id; + pending.result.op_id_ = op_id; + pending.result.type_ = FlowOpType::DELETE; + pending.result.status_ = Status::NOT_READY; + pending.result.flow_id_ = entry.flow_id; + pending_flow_ops_[op_id] = pending; + return Status::SUCCESS; +} + +void DpdkMgr::poll_dpdk_flow_completions_locked() { + struct rte_flow_error error; + struct rte_flow_op_result results[16]; + + for (uint16_t port = 0; port < flow_template_states_.size(); ++port) { + auto& state = flow_template_states_[port]; + if (!state.configured) { continue; } + + while (true) { + memset(&error, 0, sizeof(error)); + memset(results, 0, sizeof(results)); + const int ret = rte_flow_pull(port, state.flow_queue_id, results, 16, &error); + if (ret <= 0) { break; } + + for (int i = 0; i < ret; ++i) { + const FlowOpId op_id = flow_op_id_from_user_data(results[i].user_data); + const auto pending_it = pending_flow_ops_.find(op_id); + if (pending_it == pending_flow_ops_.end()) { continue; } + + FlowOpResult result = pending_it->second.result; + result.status_ = results[i].status == RTE_FLOW_OP_SUCCESS ? Status::SUCCESS + : Status::INTERNAL_ERROR; + + const auto flow_it = dynamic_flows_.find(pending_it->second.flow_id); + if (flow_it != dynamic_flows_.end()) { + if (result.type_ == FlowOpType::ADD_RX) { + if (result.status_ == Status::SUCCESS) { + flow_it->second.state = DynamicFlowState::ACTIVE; + } else { + dynamic_flows_.erase(flow_it); + } + } else { + dynamic_flows_.erase(flow_it); + } + } + + pending_flow_ops_.erase(pending_it); + ready_flow_ops_.push(result); + } + } + } +} + +Status DpdkMgr::add_rx_flow_async(int port, const FlowRuleConfig& flow, FlowOpId* op_id) { + if (op_id == nullptr) { return Status::NULL_PTR; } + *op_id = 0; + std::lock_guard guard(flow_lock_); + if (!validate_dynamic_rx_flow(port, flow)) { return Status::INVALID_PARAMETER; } + + const FlowId flow_id = allocate_dynamic_flow_id(); + if (flow_id == 0) { return Status::NO_SPACE_AVAILABLE; } + const FlowOpId new_op_id = allocate_flow_op_id(); + *op_id = new_op_id; + + if (is_normal_flow_match(flow.match_)) { + return add_rx_flow_template_locked(port, flow, flow_id, new_op_id); + } + return add_rx_flow_legacy_locked(port, flow, flow_id, new_op_id); +} + +Status DpdkMgr::delete_flow_async(FlowId flow_id, FlowOpId* op_id) { + if (op_id == nullptr) { return Status::NULL_PTR; } + *op_id = 0; + std::lock_guard guard(flow_lock_); + if (!initialized_) { return Status::NOT_READY; } + if (static_flow_ids_.find(flow_id) != static_flow_ids_.end()) { return Status::INVALID_PARAMETER; } + const auto flow_it = dynamic_flows_.find(flow_id); + if (flow_it == dynamic_flows_.end() || flow_it->second.state != DynamicFlowState::ACTIVE) { + return Status::INVALID_PARAMETER; + } + + const FlowOpId new_op_id = allocate_flow_op_id(); + if (flow_it->second.backend == DynamicFlowBackend::TEMPLATE) { + const Status status = delete_flow_template_locked(flow_it->second, new_op_id); + if (status == Status::SUCCESS) { *op_id = new_op_id; } + return status; + } + const Status status = delete_flow_legacy_locked(flow_it->second, new_op_id); + if (status == Status::SUCCESS) { *op_id = new_op_id; } + return status; +} + +Status DpdkMgr::poll_flow_op(FlowOpResult* result) { + if (result == nullptr) { return Status::NULL_PTR; } + std::lock_guard guard(flow_lock_); + poll_dpdk_flow_completions_locked(); + if (ready_flow_ops_.empty()) { return Status::NOT_READY; } + *result = ready_flow_ops_.front(); + ready_flow_ops_.pop(); + return Status::SUCCESS; +} + +void DpdkMgr::cleanup_dynamic_flows() { + std::lock_guard guard(flow_lock_); + struct rte_flow_error error; + memset(&error, 0, sizeof(error)); + + pending_flow_ops_.clear(); + while (!ready_flow_ops_.empty()) { ready_flow_ops_.pop(); } + + for (auto& [flow_id, entry] : dynamic_flows_) { + if (entry.flow != nullptr) { rte_flow_destroy(entry.port, entry.flow, &error); } + } + dynamic_flows_.clear(); + + for (uint16_t port = 0; port < drop_all_traffic_flow.size(); ++port) { + if (drop_all_traffic_flow[port].drop != nullptr) { + rte_flow_destroy(port, drop_all_traffic_flow[port].drop, &error); + drop_all_traffic_flow[port].drop = nullptr; + } + drop_all_traffic_flow[port].jump = nullptr; + } + + for (uint16_t port = 0; port < rx_flow_jump_rules_.size(); ++port) { + auto& state = flow_template_states_[port]; + if (state.normal_table != nullptr) { + rte_flow_template_table_destroy(port, state.normal_table, &error); + state.normal_table = nullptr; + } + if (state.mark_queue_actions_template != nullptr) { + rte_flow_actions_template_destroy(port, state.mark_queue_actions_template, &error); + state.mark_queue_actions_template = nullptr; + } + for (auto* tmpl : state.normal_pattern_templates) { + if (tmpl != nullptr) { rte_flow_pattern_template_destroy(port, tmpl, &error); } + } + state.normal_pattern_templates.clear(); + state.templates_ready = false; + state.configured = false; + + if (rx_flow_jump_rules_[port] != nullptr) { + rte_flow_destroy(port, rx_flow_jump_rules_[port], &error); + rx_flow_jump_rules_[port] = nullptr; + } + } +} + struct rte_flow_item_flex_handle *DpdkMgr::create_flex_flow_rule( int port, int offset, struct rte_flow_item *udp_item, struct rte_flow_item *end_pattern) { static struct rte_flow_item_flex_handle *item_handle = NULL; @@ -2553,36 +3237,6 @@ struct rte_flow_item_flex_handle *DpdkMgr::create_flex_flow_rule( return item_handle; } - { - struct rte_flow_error jump_error; - struct rte_flow_attr jump_attr; - jump_attr.group = 0; - jump_attr.ingress = 1; - struct rte_flow_action_jump jump_v; - jump_v.group = 1; - struct rte_flow_action jump_actions[2]; - jump_actions[0].type = RTE_FLOW_ACTION_TYPE_JUMP; - jump_actions[0].conf = &jump_v; - jump_actions[1].type = RTE_FLOW_ACTION_TYPE_END; - - struct rte_flow_item jump_pattern[2]; - jump_pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; - jump_pattern[0].spec = 0; - jump_pattern[0].mask = 0; - jump_pattern[1].type = RTE_FLOW_ITEM_TYPE_END; - - int res = rte_flow_validate(port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (!res) { - struct rte_flow* flow = rte_flow_create( - port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (flow == NULL) { - printf("rte_flow_create failed"); - } - } else { - printf("Failed flow validation: %d\n", res); - } - } - struct rte_flow_item_flex_conf flex_conf; flex_conf.tunnel = FLEX_TUNNEL_MODE_SINGLE; memset(&flex_conf.next_header, 0, sizeof(flex_conf.next_header)); @@ -2621,7 +3275,7 @@ struct rte_flow_item_flex_handle *DpdkMgr::create_flex_flow_rule( } struct rte_flow* DpdkMgr::add_flex_item_flow( - int port, const FlexItemMatch& match_info, uint16_t queue_id) { + int port, const FlexItemMatch& match_info, uint16_t queue_id, FlowId mark_id) { /* Declaring structs being used. 8< */ struct rte_flow_attr attr; struct rte_flow_item pattern[MAX_PATTERN_NUM]; @@ -2636,6 +3290,8 @@ struct rte_flow* DpdkMgr::add_flex_item_flow( int res; const auto& flex_item_config = cfg_.ifs_[port].rx_.flex_items_[match_info.flex_item_id_]; + if (!ensure_rx_flow_jump_rule(static_cast(port))) { return nullptr; } + memset(pattern, 0, sizeof(pattern)); memset(action, 0, sizeof(action)); memset(&attr, 0, sizeof(struct rte_flow_attr)); @@ -2644,15 +3300,18 @@ struct rte_flow* DpdkMgr::add_flex_item_flow( memset(&udp_spec, 0, sizeof(struct rte_flow_item_udp)); memset(&udp_mask, 0, sizeof(struct rte_flow_item_udp)); - // struct rte_flow_action_mark mark; - // mark.id = 0x40 + queue_id; - - action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE; - action[0].conf = &queue; - action[1].type = RTE_FLOW_ACTION_TYPE_END; - // action[1].type = RTE_FLOW_ACTION_TYPE_MARK; - // action[1].conf = &mark; - // action[2].type = RTE_FLOW_ACTION_TYPE_END; + struct rte_flow_action_mark mark = {.id = mark_id}; + if (mark_id != 0) { + action[0].type = RTE_FLOW_ACTION_TYPE_MARK; + action[0].conf = &mark; + action[1].type = RTE_FLOW_ACTION_TYPE_QUEUE; + action[1].conf = &queue; + action[2].type = RTE_FLOW_ACTION_TYPE_END; + } else { + action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE; + action[0].conf = &queue; + action[1].type = RTE_FLOW_ACTION_TYPE_END; + } pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4; @@ -2719,7 +3378,7 @@ struct rte_flow* DpdkMgr::add_flex_item_flow( attr.ingress = 1; attr.priority = 0; - attr.group = 1; + attr.group = kRxFlowGroup; /* Validate the rule and create it */ res = rte_flow_validate(port, &attr, pattern, action, &error); @@ -2742,6 +3401,46 @@ struct rte_flow* DpdkMgr::add_flex_item_flow( // Taken from flow_block.c DPDK example */ +bool DpdkMgr::ensure_rx_flow_jump_rule(uint16_t port) { + if (port >= rx_flow_jump_rules_.size()) { return false; } + if (rx_flow_jump_rules_[port] != nullptr) { return true; } + + struct rte_flow_error error; + struct rte_flow_attr attr; + struct rte_flow_action_jump jump = {.group = kRxFlowGroup}; + struct rte_flow_action actions[] = { + {.type = RTE_FLOW_ACTION_TYPE_JUMP, .conf = &jump}, + {.type = RTE_FLOW_ACTION_TYPE_END}, + }; + struct rte_flow_item pattern[] = { + {.type = RTE_FLOW_ITEM_TYPE_ETH, .spec = nullptr, .mask = nullptr}, + {.type = RTE_FLOW_ITEM_TYPE_END}, + }; + + memset(&error, 0, sizeof(error)); + memset(&attr, 0, sizeof(attr)); + attr.group = 0; + attr.ingress = 1; + + int ret = rte_flow_validate(port, &attr, pattern, actions, &error); + if (ret != 0) { + DAQIRI_LOG_ERROR("Failed to validate RX flow jump rule on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + return false; + } + + rx_flow_jump_rules_[port] = rte_flow_create(port, &attr, pattern, actions, &error); + if (rx_flow_jump_rules_[port] == nullptr) { + DAQIRI_LOG_ERROR("Failed to create RX flow jump rule on port {}: {}", + port, + error.message ? error.message : rte_strerror(rte_errno)); + return false; + } + + return true; +} + struct rte_flow* DpdkMgr::add_flow(int port, const FlowConfig& cfg) { /* Declaring structs being used. 8< */ struct rte_flow_attr attr; @@ -2757,33 +3456,7 @@ struct rte_flow* DpdkMgr::add_flow(int port, const FlowConfig& cfg) { struct rte_flow_item_ipv4 ip_mask; int res; - // HWS requires using a non-zero group, so we make a jump event to group 3 for all ethernet - // packets - { - struct rte_flow_error jump_error; - struct rte_flow_attr jump_attr{.group = 0, .ingress = 1}; - struct rte_flow_action_jump jump_v = {.group = 3}; - struct rte_flow_action jump_actions[] = { - { .type = RTE_FLOW_ACTION_TYPE_JUMP, .conf = &jump_v}, - { .type = RTE_FLOW_ACTION_TYPE_END} - }; - - struct rte_flow_item jump_pattern[] = { - { .type = RTE_FLOW_ITEM_TYPE_ETH, .spec = 0, .mask = 0}, - { .type = RTE_FLOW_ITEM_TYPE_END}, - }; - - auto res = rte_flow_validate(port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (!res) { - struct rte_flow* flow = rte_flow_create( - port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (flow == nullptr) { - DAQIRI_LOG_ERROR("rte_flow_create failed"); - } - } else { - DAQIRI_LOG_ERROR("Failed flow validation: {}", res); - } - } + if (!ensure_rx_flow_jump_rule(static_cast(port))) { return nullptr; } memset(pattern, 0, sizeof(pattern)); memset(action, 0, sizeof(action)); @@ -2863,8 +3536,8 @@ struct rte_flow* DpdkMgr::add_flow(int port, const FlowConfig& cfg) { attr.ingress = 1; - attr.priority = 1; // Lower priority to allow drop_traffic (priority 0) to take precedence - attr.group = 3; + attr.priority = kRxFlowPriority; // Lower priority to allow drop_traffic to take precedence + attr.group = kRxFlowGroup; pattern[3].type = RTE_FLOW_ITEM_TYPE_END; @@ -2881,32 +3554,10 @@ Status DpdkMgr::drop_all_traffic(int port) { struct rte_flow_error error; DropTrafficConfig config; - // Initialize the jump rule to group 3 (required by HWS) - { - struct rte_flow_error jump_error; - struct rte_flow_attr jump_attr{.group = 0, .ingress = 1}; - struct rte_flow_action_jump jump_v = {.group = 3}; - struct rte_flow_action jump_actions[] = { - { .type = RTE_FLOW_ACTION_TYPE_JUMP, .conf = &jump_v}, - { .type = RTE_FLOW_ACTION_TYPE_END} - }; - - struct rte_flow_item jump_pattern[] = { - { .type = RTE_FLOW_ITEM_TYPE_ETH, .spec = 0, .mask = 0}, - { .type = RTE_FLOW_ITEM_TYPE_END}, - }; - - auto res = rte_flow_validate(port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (!res) { - config.jump = rte_flow_create( - port, &jump_attr, jump_pattern, jump_actions, &jump_error); - if (config.jump == nullptr) { - DAQIRI_LOG_ERROR("rte_flow_create failed for jump rule in drop_all_traffic"); - } - } else { - DAQIRI_LOG_ERROR("Failed flow validation for jump rule in drop_all_traffic: {}", res); - } + if (!ensure_rx_flow_jump_rule(static_cast(port))) { + return Status::INTERNAL_ERROR; } + config.jump = nullptr; // Clear all structures memset(pattern, 0, sizeof(pattern)); @@ -2921,10 +3572,10 @@ Status DpdkMgr::drop_all_traffic(int port) { pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH; pattern[1].type = RTE_FLOW_ITEM_TYPE_END; - // Set highest priority (0) and use group 3 (consistent with add_flow) + // Set highest priority (0) and use the shared RX flow group. attr.ingress = 1; attr.priority = 0; // Highest priority - blocks all traffic - attr.group = 3; + attr.group = kRxFlowGroup; DAQIRI_LOG_INFO("Creating drop all traffic rule on port {} with priority {}", port, attr.priority); @@ -2935,7 +3586,6 @@ Status DpdkMgr::drop_all_traffic(int port) { if (config.drop == nullptr) { DAQIRI_LOG_ERROR("Failed to create drop all traffic flow rule on port {}: {}", port, error.message ? error.message : "unknown error"); - rte_flow_destroy(port, config.jump, &error); return Status::INTERNAL_ERROR; } else { DAQIRI_LOG_INFO("Successfully created drop all traffic rule on port {}", port); @@ -2956,22 +3606,15 @@ Status DpdkMgr::allow_all_traffic(int port) { // Tell the RX threads they can keep processing packets flush_rx_queues.store(false); - struct rte_flow_error jump_error; struct rte_flow_error drop_error; int drop_ret = rte_flow_destroy(port, drop_all_traffic_flow[port].drop, &drop_error); - int jump_ret = rte_flow_destroy(port, drop_all_traffic_flow[port].jump, &jump_error); if (drop_ret != 0) { DAQIRI_LOG_ERROR("Failed to destroy drop all traffic flow rule on port {}: {}", port, drop_error.message ? drop_error.message : "unknown error"); } - if (jump_ret != 0) { - DAQIRI_LOG_ERROR("Failed to destroy jump all traffic flow rule on port {}: {}", - port, jump_error.message ? jump_error.message : "unknown error"); - } - - if (drop_ret != 0 || jump_ret != 0) { + if (drop_ret != 0) { return Status::INTERNAL_ERROR; } @@ -3133,6 +3776,7 @@ void DpdkMgr::PrintDpdkStats(int port) { DpdkMgr::~DpdkMgr() { cleanup_reorder_state(); + if (eal_initialized_) { cleanup_dynamic_flows(); } // shutdown() handles ring cleanup BEFORE rte_eal_cleanup(), so the maps // are empty by the time we get here on the normal exit path. The loops @@ -3159,7 +3803,7 @@ bool DpdkMgr::validate_config() const { if (!Manager::validate_config()) { return false; } for (const auto& intf : cfg_.ifs_) { - std::unordered_map flow_to_queue; + std::unordered_map flow_to_queue; for (const auto& flow : intf.rx_.flows_) { if (flow_to_queue.find(flow.id_) != flow_to_queue.end()) { DAQIRI_LOG_ERROR("Duplicate flow ID {} on interface '{}'", flow.id_, intf.name_); @@ -3168,7 +3812,7 @@ bool DpdkMgr::validate_config() const { flow_to_queue.emplace(flow.id_, flow.action_.id_); } - std::unordered_set reorder_flow_ids; + std::unordered_set reorder_flow_ids; for (const auto& reorder_cfg : intf.rx_.reorder_configs_) { const bool use_gpu_backend = reorder_cfg.reorder_type_ == "gpu"; const bool use_cpu_backend = reorder_cfg.reorder_type_ == "cpu"; @@ -4048,7 +4692,7 @@ uint32_t DpdkMgr::get_packet_length(BurstParams* burst, int idx) { return reinterpret_cast(burst->pkts[0][idx])->pkt_len; } -uint16_t DpdkMgr::get_packet_flow_id(BurstParams* burst, int idx) { +FlowId DpdkMgr::get_packet_flow_id(BurstParams* burst, int idx) { if (burst == nullptr || idx < 0) { return 0; } if ((burst->hdr.hdr.burst_flags & kBurstFlagDpdkReordered) != 0U) { return 0; } if (idx >= static_cast(burst->hdr.hdr.num_pkts) || burst->pkts[0] == nullptr) { @@ -4416,6 +5060,7 @@ void DpdkMgr::shutdown() { stats_.Shutdown(); stats_thread_.join(); + cleanup_dynamic_flows(); // Release DPDK resources BEFORE rte_eal_cleanup(). Ring/mempool pointers // are owned by EAL memzones and become invalid once cleanup runs, so the diff --git a/src/managers/dpdk/daqiri_dpdk_mgr.h b/src/managers/dpdk/daqiri_dpdk_mgr.h index b72e62a..aef4dcc 100644 --- a/src/managers/dpdk/daqiri_dpdk_mgr.h +++ b/src/managers/dpdk/daqiri_dpdk_mgr.h @@ -48,8 +48,10 @@ #include #include #include +#include #include #include +#include #include "src/manager.h" #include #include "daqiri_dpdk_stats.h" @@ -104,7 +106,7 @@ class DpdkMgr : public Manager { void* get_packet_ptr(BurstParams* burst, int idx) override; uint32_t get_segment_packet_length(BurstParams* burst, int seg, int idx) override; uint32_t get_packet_length(BurstParams* burst, int idx) override; - uint16_t get_packet_flow_id(BurstParams* burst, int idx) override; + FlowId get_packet_flow_id(BurstParams* burst, int idx) override; Status get_packet_rx_timestamp(BurstParams* burst, int idx, uint64_t* timestamp_ns) override; void* get_packet_extra_info(BurstParams* burst, int idx) override; Status get_tx_packet_burst(BurstParams* burst) override; @@ -141,6 +143,9 @@ class DpdkMgr : public Manager { Status get_mac_addr(int port, char* mac) override; Status drop_all_traffic(int port) override; Status allow_all_traffic(int port) override; + Status add_rx_flow_async(int port, const FlowRuleConfig& flow, FlowOpId* op_id) override; + Status delete_flow_async(FlowId flow_id, FlowOpId* op_id) override; + Status poll_flow_op(FlowOpResult* result) override; void shutdown() override; void print_stats() override; void adjust_memory_regions() override; @@ -163,6 +168,7 @@ class DpdkMgr : public Manager { bool calibrate_rx_timestamp_clock(uint16_t port_id); int setup_pools_and_rings(int max_rx_batch, int max_tx_batch); struct rte_flow* add_flow(int port, const FlowConfig& cfg); + bool ensure_rx_flow_jump_rule(uint16_t port); void create_dummy_rx_q(); void create_dummy_tx_q(); struct rte_flow* add_modify_flow_set(int port, int queue, const char* buf, int len, @@ -170,7 +176,73 @@ class DpdkMgr : public Manager { static struct rte_flow_item_flex_handle *create_flex_flow_rule( int port, int offset, struct rte_flow_item *udp_item, struct rte_flow_item *end_pattern); - struct rte_flow* add_flex_item_flow(int port, const FlexItemMatch& match, uint16_t queue_id); + struct rte_flow* add_flex_item_flow(int port, + const FlexItemMatch& match, + uint16_t queue_id, + FlowId mark_id = 0); + + enum class DynamicFlowBackend { + LEGACY, + TEMPLATE, + }; + + enum class DynamicFlowState { + ADDING, + ACTIVE, + DELETING, + }; + + struct DynamicFlowEntry { + FlowId flow_id = 0; + uint16_t port = 0; + uint16_t queue = 0; + struct rte_flow* flow = nullptr; + DynamicFlowBackend backend = DynamicFlowBackend::LEGACY; + DynamicFlowState state = DynamicFlowState::ADDING; + uint32_t flow_queue_id = 0; + }; + + struct PendingFlowOp { + FlowOpResult result; + FlowId flow_id = 0; + }; + + struct PortFlowTemplateState { + bool configured = false; + bool templates_ready = false; + uint32_t flow_queue_id = 0; + uint32_t capacity = DEFAULT_DYNAMIC_FLOW_CAPACITY; + struct rte_flow_template_table* normal_table = nullptr; + std::vector normal_pattern_templates; + struct rte_flow_actions_template* mark_queue_actions_template = nullptr; + }; + + FlowOpId allocate_flow_op_id(); + FlowId allocate_dynamic_flow_id(); + bool reserve_static_flow_ids(); + bool validate_dynamic_rx_flow(int port, const FlowRuleConfig& flow) const; + bool is_valid_rx_queue(int port, uint16_t queue_id) const; + bool is_normal_flow_match(const FlowMatch& match) const; + uint8_t normal_flow_pattern_index(const FlowMatch& match) const; + void build_normal_flow_pattern(const FlowMatch& match, struct rte_flow_item pattern[]) const; + void build_mark_queue_actions(FlowId flow_id, uint16_t queue_id, struct rte_flow_action action[]) const; + Status enqueue_software_flow_completion(const FlowOpResult& result); + Status add_rx_flow_legacy_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id, + FlowOpId op_id); + Status add_rx_flow_template_locked(int port, + const FlowRuleConfig& flow, + FlowId flow_id, + FlowOpId op_id); + Status delete_flow_legacy_locked(DynamicFlowEntry& entry, FlowOpId op_id); + Status delete_flow_template_locked(DynamicFlowEntry& entry, FlowOpId op_id); + bool configure_flow_api_for_port(uint16_t port, uint32_t capacity); + bool ensure_normal_flow_template_table(uint16_t port); + void poll_dpdk_flow_completions_locked(); + void cleanup_dynamic_flows(); + static void* flow_op_user_data(FlowOpId op_id); + static FlowOpId flow_op_id_from_user_data(void* user_data); void apply_tx_offloads(int port); @@ -266,7 +338,7 @@ class DpdkMgr : public Manager { struct ReorderQueueState { bool enabled = false; bool single_plan_fast_path = false; - std::unordered_map flow_id_to_plan; + std::unordered_map flow_id_to_plan; std::vector plans; std::vector> plan_pkt_indices; std::vector plan_pkt_counts; @@ -315,6 +387,8 @@ class DpdkMgr : public Manager { void release_reorder_output_context(BurstParams* burst); std::array mac_addrs; + std::array rx_flow_jump_rules_{}; + std::array flow_template_states_{}; std::unordered_map rx_rings; struct rte_ether_addr conf_ports_eth_addr[RTE_MAX_ETHPORTS]; std::unordered_map flex_item_handles_; @@ -331,6 +405,13 @@ class DpdkMgr : public Manager { std::unordered_map reorder_queue_states_; std::mutex reorder_lock_; std::array drop_all_traffic_flow; + std::mutex flow_lock_; + FlowId next_dynamic_flow_id_ = 1; + FlowOpId next_flow_op_id_ = 1; + std::unordered_set static_flow_ids_; + std::unordered_map dynamic_flows_; + std::unordered_map pending_flow_ops_; + std::queue ready_flow_ops_; int timestamp_dynfield_offset_{-1}; uint64_t rx_timestamp_dynflag_mask_{0}; uint64_t tx_timestamp_dynflag_mask_{0}; diff --git a/src/managers/rdma/daqiri_rdma_mgr.h b/src/managers/rdma/daqiri_rdma_mgr.h index b95c4e9..1577fbf 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.h +++ b/src/managers/rdma/daqiri_rdma_mgr.h @@ -107,7 +107,7 @@ class RdmaMgr : public Manager { void* get_packet_ptr(BurstParams* burst, int idx) override; uint32_t get_packet_length(BurstParams* burst, int idx) override; - uint16_t get_packet_flow_id(BurstParams* burst, int idx) override { return 0; } + FlowId get_packet_flow_id(BurstParams* burst, int idx) override { return 0; } Status get_packet_rx_timestamp(BurstParams* burst, int idx, uint64_t* timestamp_ns) override { (void)burst; (void)idx; diff --git a/src/managers/socket/daqiri_socket_mgr.cpp b/src/managers/socket/daqiri_socket_mgr.cpp index da0e412..05b4f3d 100644 --- a/src/managers/socket/daqiri_socket_mgr.cpp +++ b/src/managers/socket/daqiri_socket_mgr.cpp @@ -210,7 +210,7 @@ uint32_t SocketMgr::get_segment_packet_length(BurstParams* burst, int seg, int i return burst->pkt_lens[seg][idx]; } -uint16_t SocketMgr::get_packet_flow_id(BurstParams* burst, int idx) { +FlowId SocketMgr::get_packet_flow_id(BurstParams* burst, int idx) { return 0; } diff --git a/src/managers/socket/daqiri_socket_mgr.h b/src/managers/socket/daqiri_socket_mgr.h index 569100d..8774471 100644 --- a/src/managers/socket/daqiri_socket_mgr.h +++ b/src/managers/socket/daqiri_socket_mgr.h @@ -48,7 +48,7 @@ class SocketMgr : public Manager { uint32_t get_packet_length(BurstParams* burst, int idx) override; void* get_segment_packet_ptr(BurstParams* burst, int seg, int idx) override; uint32_t get_segment_packet_length(BurstParams* burst, int seg, int idx) override; - uint16_t get_packet_flow_id(BurstParams* burst, int idx) override; + FlowId get_packet_flow_id(BurstParams* burst, int idx) override; Status get_packet_rx_timestamp(BurstParams* burst, int idx, uint64_t* timestamp_ns) override; void* get_packet_extra_info(BurstParams* burst, int idx) override;