Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions docs/api-reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,15 @@ RDMA transport settings:

### Flows

`rx.flows:` — Flow rules that steer packets to specific queues based on match criteria.
`rx.flows:` — Static startup flow rules that steer packets to specific queues based on
match criteria. This sequence may be omitted; a queues-only RX config can add DPDK RX
flows later with the dynamic flow API.

- **`name`**: Flow name.
- type: `string`
- **`id`**: Flow ID. Retrievable at runtime via `get_packet_flow_id()`.
- **`id`**: Non-zero static flow ID. Retrievable at runtime via `get_packet_flow_id()`.
Static IDs are reserved for the lifetime of the process and are not deletable through
the dynamic flow API.
- type: `integer`
- **`action`**: What to do with matched packets.
- **`type`**: Action type. Only `queue` is currently supported.
Expand All @@ -201,11 +205,23 @@ RDMA transport settings:
### Flow Isolation

`rx.flow_isolation:` — When `true`, only packets matching an explicit flow rule are delivered.
Unmatched packets are dropped. When `false`, unmatched packets go to a default queue.
Unmatched packets are dropped. When `false`, unmatched packets go to a default queue. A
queues-only config can set `flow_isolation: true` and then install dynamic RX flows after
`daqiri_init()`.

- type: `boolean`
- default: `false`

### Dynamic Flow Capacity

`rx.dynamic_flow_capacity:` — DPDK template-table capacity reserved for dynamic RX flow
rules on this interface. DAQIRI uses this when the DPDK template/async fast path is
available; legacy fallback paths still accept dynamic RX flow operations but do not use a
template table.

- type: `integer`
- default: `1024`

### Hardware Timestamps

`rx.hardware_timestamps:` — Enable per-packet hardware RX timestamps for Raw Ethernet
Expand Down
67 changes: 65 additions & 2 deletions docs/api-reference/cpp.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ For a single-segment configuration (CPU-only or batched GPU):
for (int i = 0; i < daqiri::get_num_packets(burst); i++) {
void *pkt = daqiri::get_packet_ptr(burst, i);
uint32_t len = daqiri::get_packet_length(burst, i);
uint16_t flow = daqiri::get_packet_flow_id(burst, i);
daqiri::FlowId flow = daqiri::get_packet_flow_id(burst, i);
uint64_t rx_ts_ns = 0;
if (daqiri::get_packet_rx_timestamp(burst, i, &rx_ts_ns) == daqiri::Status::SUCCESS) {
// rx_ts_ns is in the NIC timestamp clock domain.
Expand Down Expand Up @@ -130,6 +130,61 @@ daqiri::free_all_segment_packets(burst, seg);
daqiri::free_rx_burst(burst);
```

## Dynamic RX Flows

DPDK RX flows can be added and deleted after `daqiri_init()`. This supports
queues-only startup configs, including `rx.flow_isolation: true` with no
initial `rx.flows`. Static YAML flows still use explicit configured IDs and are
not deletable through this API.

```cpp
daqiri::FlowRuleConfig flow;
flow.name_ = "udp_5000";
flow.action_.type_ = daqiri::FlowType::QUEUE;
flow.action_.id_ = 0;
flow.match_.type_ = daqiri::FlowMatchType::NORMAL;
flow.match_.udp_dst_ = 5000;

daqiri::FlowOpId add_op = 0;
auto st = daqiri::add_rx_flow_async(0, flow, &add_op);
if (st != daqiri::Status::SUCCESS) {
// invalid port/queue/match, unsupported backend, or no flow IDs available
}

daqiri::FlowId flow_id = 0;
daqiri::FlowOpResult result;
while (flow_id == 0) {
st = daqiri::poll_flow_op(&result);
if (st == daqiri::Status::NOT_READY) {
continue;
}
if (st != daqiri::Status::SUCCESS) {
// handle poll error
break;
}
if (result.op_id_ == add_op) {
if (result.status_ != daqiri::Status::SUCCESS) {
// handle flow create failure
break;
}
flow_id = result.flow_id_;
}
}
```

Packets matching a dynamic rule are marked with the same `FlowId` returned by
the add completion, so `get_packet_flow_id()` gives the handle to pass to
`delete_flow_async()`. `poll_flow_op()` returns `Status::NOT_READY` when no flow
operation has completed yet.

```cpp
daqiri::FlowOpId delete_op = 0;
auto delete_status = daqiri::delete_flow_async(flow_id, &delete_op);
```

Dynamic flow support is RX-only in v1. Socket, RDMA, and software loopback
managers return `NOT_SUPPORTED`.

## Reordered RX Bursts

For an overview of what RX reorder is and when to use it, see
Expand Down Expand Up @@ -411,9 +466,17 @@ workflow sections above show the common call order and ownership rules.
| `get_segment_packet_ptr(burst, seg, idx)` | Return a packet pointer for a specific segment. |
| `get_packet_length(burst, idx)` | Return the logical packet length. |
| `get_segment_packet_length(burst, seg, idx)` | Return the length of one packet segment. |
| `get_packet_flow_id(burst, idx)` | Return the matched flow ID, or `0` when no flow matched. |
| `get_packet_flow_id(burst, idx)` | Return the matched `FlowId`, or `0` when no flow matched. |
| `get_packet_rx_timestamp(burst, idx, &timestamp_ns)` | Return the hardware RX timestamp when enabled and available. |

### Dynamic RX Flow Lifecycle

| Function | Purpose |
| --- | --- |
| `add_rx_flow_async(port, flow, &op_id)` | Enqueue a dynamic RX flow create. The add completion returns the allocated `FlowId`. |
| `delete_flow_async(flow_id, &op_id)` | Enqueue deletion of an active dynamic flow. Static YAML flows and unknown IDs return `INVALID_PARAMETER`. |
| `poll_flow_op(&result)` | Return one completed flow operation, or `NOT_READY` when none are ready. |

### RX and Reorder

| Function | Purpose |
Expand Down
9 changes: 8 additions & 1 deletion docs/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,14 @@ Flow rules are only available in Raw Ethernet (`stream_type: "raw"`).
A flow's match can combine fields such as `udp_src`, `udp_dst`, and
`ipv4_len`; multiple flows can target the same queue, and the matching
flow's ID is available at runtime so the application can distinguish
them. Flows are configured under `rx.flows` in the YAML.
them.

Flows can be static or dynamic. Static flows are configured under
`rx.flows` in the YAML and keep their configured IDs for the process lifetime.
Dynamic RX flows are added after `daqiri_init()` with `add_rx_flow_async()`;
their non-zero `FlowId` is allocated by DAQIRI, returned in the add completion,
and used as the packet mark returned by `get_packet_flow_id()`. Only dynamic
flows can be deleted dynamically. TX dynamic flows are not part of v1.

### Flow Steering

Expand Down
2 changes: 1 addition & 1 deletion docs/tutorials/configuration-walkthrough.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ flows:
udp_dst: 5000
```

1. **`id`** · `integer` · *required* — Flow tag attached to matching packets. Set to a non-zero value here so the `reorder_configs:` block below can reference it via `flow_ids:` to select which packets to reorder.
1. **`id`** · `integer` · *required* — Static flow tag attached to matching packets. Set to a non-zero value here so the `reorder_configs:` block below can reference it via `flow_ids:` to select which packets to reorder. Dynamic RX flows are added after initialization and are not attached to reorder configs in v1.

**The `reorder_configs:` block.** The core of the feature — sits inside the `rx:` section alongside `queues` and `flows`.

Expand Down
61 changes: 54 additions & 7 deletions include/daqiri/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,42 @@ uint32_t get_packet_length(BurstParams *burst, int idx);
*
* @param burst Burst structure containing packets
* @param idx Index of packet
* @return uint16_t Flow ID
* @return FlowId Flow ID, or 0 when no flow matched
*/
uint16_t get_packet_flow_id(BurstParams* burst, int idx);
FlowId get_packet_flow_id(BurstParams* burst, int idx);

/**
* @brief Enqueue creation of a dynamic RX flow rule.
*
* The flow ID is allocated by DAQIRI and returned in the completion result from
* poll_flow_op(). Packets matching the dynamic rule are marked with the same
* ID, so get_packet_flow_id() can be used to identify and later delete the rule.
*
* @param port Port ID of interface
* @param flow Rule match and queue action to install
* @param op_id Output operation ID used to track completion
* @return Status indicating whether the operation was accepted
*/
Status add_rx_flow_async(int port, const FlowRuleConfig &flow, FlowOpId *op_id);

/**
* @brief Enqueue deletion of a dynamic flow rule.
*
* Only flows created by add_rx_flow_async() are deletable through this API.
*
* @param flow_id Dynamic flow ID returned by an add completion
* @param op_id Output operation ID used to track completion
* @return Status indicating whether the operation was accepted
*/
Status delete_flow_async(FlowId flow_id, FlowOpId *op_id);

/**
* @brief Poll one dynamic flow operation completion.
*
* @param result Output completion details
* @return SUCCESS when a completion was returned, NOT_READY when none are ready
*/
Status poll_flow_op(FlowOpResult *result);

/**
* @brief Get the hardware RX timestamp of a packet in nanoseconds
Expand Down Expand Up @@ -1111,6 +1144,13 @@ template <> struct YAML::convert<daqiri::NetworkConfig> {
rx_cfg.flow_isolation_ = rx["flow_isolation"].as<bool>();
} catch (const std::exception& e) { rx_cfg.flow_isolation_ = false; }

try {
rx_cfg.dynamic_flow_capacity_ =
rx["dynamic_flow_capacity"].as<uint32_t>();
} catch (const std::exception& e) {
rx_cfg.dynamic_flow_capacity_ = daqiri::DEFAULT_DYNAMIC_FLOW_CAPACITY;
}

for (const auto &q_item : rx["queues"]) {
daqiri::RxQueueConfig q;
if (!parse_rx_queue_config(
Expand All @@ -1128,13 +1168,20 @@ template <> struct YAML::convert<daqiri::NetworkConfig> {
rx_cfg.queues_.emplace_back(std::move(q));
}

for (const auto &flow_item : rx["flows"]) {
daqiri::FlowConfig flow;
if (!parse_flow_config(flow_item, flow)) {
DAQIRI_LOG_ERROR("Failed to parse FlowConfig");
if (rx["flows"].IsDefined()) {
if (!rx["flows"].IsSequence()) {
DAQIRI_LOG_ERROR("'rx.flows' must be a sequence for interface '{}'",
ifcfg.name_);
return false;
}
rx_cfg.flows_.emplace_back(std::move(flow));
for (const auto &flow_item : rx["flows"]) {
daqiri::FlowConfig flow;
if (!parse_flow_config(flow_item, flow)) {
DAQIRI_LOG_ERROR("Failed to parse FlowConfig");
return false;
}
rx_cfg.flows_.emplace_back(std::move(flow));
}
}

try {
Expand Down
52 changes: 38 additions & 14 deletions include/daqiri/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ static inline constexpr uint32_t MAX_INTERFACES = 4;
static inline constexpr int MAX_NUM_SEGS = 4;
static inline constexpr uint32_t DAQIRI_BURST_FLAG_REORDERED = (1U << 28);
static inline constexpr uint32_t DAQIRI_BURST_FLAG_REORDER_TIMEOUT = (1U << 29);
static inline constexpr uint32_t DEFAULT_DYNAMIC_FLOW_CAPACITY = 1024;

using FlowId = uint32_t;
using FlowOpId = uint64_t;

struct ReorderBurstInfo {
uint64_t batch_id;
Expand Down Expand Up @@ -541,14 +545,14 @@ struct TxQueueConfig {
enum class FlowType { QUEUE };

struct FlowAction {
FlowType type_;
uint16_t id_;
FlowType type_ = FlowType::QUEUE;
uint16_t id_ = 0;
};

struct FlexItemMatch {
uint16_t flex_item_id_;
uint32_t val_;
uint32_t mask_;
uint16_t flex_item_id_ = 0;
uint32_t val_ = 0;
uint32_t mask_ = 0;
};

enum class FlowMatchType {
Expand All @@ -557,20 +561,39 @@ enum class FlowMatchType {
};

struct FlowMatch {
FlowMatchType type_;
uint16_t udp_src_;
uint16_t udp_dst_;
uint16_t ipv4_len_;
in_addr_t ipv4_src_;
in_addr_t ipv4_dst_;
FlowMatchType type_ = FlowMatchType::NORMAL;
uint16_t udp_src_ = 0;
uint16_t udp_dst_ = 0;
uint16_t ipv4_len_ = 0;
in_addr_t ipv4_src_ = INADDR_ANY;
in_addr_t ipv4_dst_ = INADDR_ANY;
FlexItemMatch flex_item_match_;
};
struct FlowConfig {
std::string name_;
uint16_t id_;
FlowId id_ = 0;
FlowAction action_;
FlowMatch match_;
void* backend_config_ = nullptr; // Filled in by operator
};

struct FlowRuleConfig {
std::string name_;
FlowAction action_;
FlowMatch match_;
void* backend_config_; // Filled in by operator
void* backend_config_ = nullptr; // Filled in by operator
};

enum class FlowOpType {
ADD_RX,
DELETE,
};

struct FlowOpResult {
FlowOpId op_id_ = 0;
FlowOpType type_ = FlowOpType::ADD_RX;
Status status_ = Status::NOT_READY;
FlowId flow_id_ = 0;
};

struct CommonConfig {
Expand Down Expand Up @@ -756,7 +779,7 @@ struct ReorderConfig {
std::string reorder_type_;
std::string memory_region_;
uint32_t payload_byte_offset_ = 0;
std::vector<uint16_t> flow_ids_;
std::vector<FlowId> flow_ids_;
ReorderMethod method_ = ReorderMethod::INVALID;
ReorderSeqBatchNumberConfig seq_batch_number_;
ReorderSeqPacketsPerBatchConfig seq_packets_per_batch_;
Expand All @@ -766,6 +789,7 @@ struct ReorderConfig {
struct RxConfig {
bool flow_isolation_ = false;
bool hardware_timestamps_ = false;
uint32_t dynamic_flow_capacity_ = DEFAULT_DYNAMIC_FLOW_CAPACITY;
std::vector<RxQueueConfig> queues_;
std::vector<FlowConfig> flows_;
std::vector<FlexItemConfig> flex_items_;
Expand Down
Loading
Loading