From 9dfd6b717a791bc1941b2f1528485fbd65953a5e Mon Sep 17 00:00:00 2001 From: Kevin Krakauer Date: Fri, 20 Feb 2026 15:11:32 -0800 Subject: [PATCH 1/4] disable clang format, It's run automatically by many automated tools and wreaks havoc on commit diffs. We should, maybe, go back later and re-enable it. --- check_all_options.c | 3 +++ check_all_options.h | 3 +++ coef.c | 3 +++ coef.h | 3 +++ common.c | 3 +++ common.h | 3 +++ control_plane.c | 3 +++ control_plane.h | 3 +++ countdown_cond.h | 3 +++ cpuinfo.c | 3 +++ cpuinfo.h | 3 +++ define_all_flags.c | 3 +++ define_all_flags.h | 3 +++ flags.c | 3 +++ flags.h | 3 +++ flow.c | 3 +++ flow.h | 3 +++ hexdump.c | 3 +++ hexdump.h | 3 +++ histo.c | 3 +++ histo.h | 3 +++ lib.h | 3 +++ logging.c | 3 +++ logging.h | 3 +++ loop.c | 3 +++ loop.h | 3 +++ or_die.c | 3 +++ or_die.h | 3 +++ parse.c | 3 +++ parse.h | 3 +++ percentiles.c | 3 +++ percentiles.h | 3 +++ pq.c | 3 +++ pq.h | 3 +++ print.c | 3 +++ print.h | 3 +++ psp_crr.c | 3 +++ psp_crr_main.c | 3 +++ psp_kernel.h | 3 +++ psp_lib.c | 3 +++ psp_lib.h | 3 +++ psp_rr.c | 5 ++++- psp_rr_main.c | 3 +++ psp_stream.c | 5 ++++- psp_stream_main.c | 3 +++ rr.c | 3 +++ rr.h | 3 +++ rusage.c | 3 +++ rusage.h | 3 +++ snaps.c | 3 +++ snaps.h | 3 +++ socket.c | 3 +++ socket.h | 3 +++ stats.c | 3 +++ stats.h | 3 +++ stream.c | 3 +++ stream.h | 3 +++ tcp_crr.c | 3 +++ tcp_crr_main.c | 3 +++ tcp_rr.c | 3 +++ tcp_rr_main.c | 3 +++ tcp_stream.c | 3 +++ tcp_stream_main.c | 3 +++ thread.c | 3 +++ thread.h | 3 +++ udp_rr.c | 3 +++ udp_rr_main.c | 3 +++ udp_stream.c | 3 +++ udp_stream_main.c | 3 +++ version.c | 3 +++ version.h | 3 +++ 71 files changed, 215 insertions(+), 2 deletions(-) diff --git a/check_all_options.c b/check_all_options.c index eaafe10..5d49c66 100644 --- a/check_all_options.c +++ b/check_all_options.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "check_all_options.h" @@ -132,3 +134,4 @@ void check_options_psp_common(struct options *opts, struct callbacks *cb) "PSP server requires server IP (-H) for device lookup."); } } +/* clang-format on */ diff --git a/check_all_options.h b/check_all_options.h index cb0ebd6..349e9f6 100644 --- a/check_all_options.h +++ b/check_all_options.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_CHECK_ALL_OPTIONS_H_ #define THIRD_PARTY_NEPER_CHECK_ALL_OPTIONS_H_ @@ -34,3 +36,4 @@ void check_options_psp_common(struct options *opts, struct callbacks *cb); #endif // THIRD_PARTY_NEPER_CHECK_ALL_OPTIONS_H_ +/* clang-format on */ diff --git a/coef.c b/coef.c index d2764bd..b7da234 100644 --- a/coef.c +++ b/coef.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "coef.h" #include "common.h" @@ -102,3 +104,4 @@ struct neper_coef *neper_coef(void) return coef; } +/* clang-format on */ diff --git a/coef.h b/coef.h index 5fc392a..d0b8840 100644 --- a/coef.h +++ b/coef.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_COEF_H #define THIRD_PARTY_NEPER_COEF_H @@ -44,3 +46,4 @@ struct neper_coef { struct neper_coef *neper_coef(void); #endif +/* clang-format on */ diff --git a/common.c b/common.c index 94c2e7b..02735a9 100644 --- a/common.c +++ b/common.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include #include #include @@ -346,3 +348,4 @@ int create_suicide_timeout(int sec_to_suicide) } return 0; } +/* clang-format on */ diff --git a/common.h b/common.h index 16a9011..6590441 100644 --- a/common.h +++ b/common.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_COMMON_H #define THIRD_PARTY_NEPER_COMMON_H @@ -169,3 +171,4 @@ const struct rate_conversion *auto_unit(const double throughput, const struct rate_conversion *opt, struct callbacks *); #endif +/* clang-format on */ diff --git a/control_plane.c b/control_plane.c index 1341870..b22301c 100644 --- a/control_plane.c +++ b/control_plane.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "control_plane.h" #include #include @@ -514,3 +516,4 @@ void control_plane_destroy(struct control_plane *cp) { free(cp); } +/* clang-format on */ diff --git a/control_plane.h b/control_plane.h index 8e05289..7497c69 100644 --- a/control_plane.h +++ b/control_plane.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_CONTROL_PLANE_H #define THIRD_PARTY_NEPER_CONTROL_PLANE_H @@ -36,3 +38,4 @@ int control_plane_incidents(struct control_plane *cp); void control_plane_destroy(struct control_plane *cp); #endif +/* clang-format on */ diff --git a/countdown_cond.h b/countdown_cond.h index 5b4f60a..81c86c1 100644 --- a/countdown_cond.h +++ b/countdown_cond.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef _COUNTDOWN_COND_H #define _COUNTDOWN_COND_H @@ -92,3 +94,4 @@ static inline void countdown_cond_wait(struct countdown_cond *cc) } #endif +/* clang-format on */ diff --git a/cpuinfo.c b/cpuinfo.c index d2a63ac..dd1f90f 100644 --- a/cpuinfo.c +++ b/cpuinfo.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include #include #include @@ -87,3 +89,4 @@ int get_cpuinfo(struct cpuinfo *cpus, int max_cpus, struct callbacks *cb) fclose(f); return n; } +/* clang-format on */ diff --git a/cpuinfo.h b/cpuinfo.h index 52c542e..39aa615 100644 --- a/cpuinfo.h +++ b/cpuinfo.h @@ -13,6 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/* clang-format off */ #include "lib.h" #ifndef THIRD_PARTY_NEPER_CPUINFO_H @@ -35,3 +37,4 @@ struct cpuinfo { int get_cpuinfo(struct cpuinfo *cpus, int max_cpus, struct callbacks *cb); #endif +/* clang-format on */ diff --git a/define_all_flags.c b/define_all_flags.c index ac576d4..cb0cc95 100644 --- a/define_all_flags.c +++ b/define_all_flags.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "flags.h" #include "lib.h" @@ -183,3 +185,4 @@ struct flags_parser *add_flags_udp_stream(struct flags_parser *fp) return (fp); } +/* clang-format on */ diff --git a/define_all_flags.h b/define_all_flags.h index 4d399f7..0b9327f 100644 --- a/define_all_flags.h +++ b/define_all_flags.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_DEFINE_ALL_FLAGS_H #define THIRD_PARTY_NEPER_DEFINE_ALL_FLAGS_H @@ -32,3 +34,4 @@ struct flags_parser *add_flags_udp_rr(struct flags_parser *fp); struct flags_parser *add_flags_udp_stream(struct flags_parser *fp); #endif +/* clang-format on */ diff --git a/flags.c b/flags.c index db456a8..7344b87 100644 --- a/flags.c +++ b/flags.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "flags.h" #include @@ -369,3 +371,4 @@ void flags_parser_dump(struct flags_parser *fp) print_flag(flag, fp->cb); } } +/* clang-format on */ diff --git a/flags.h b/flags.h index ae72354..1f89a59 100644 --- a/flags.h +++ b/flags.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_FLAGS_H #define THIRD_PARTY_NEPER_FLAGS_H @@ -85,3 +87,4 @@ void flags_parser_destroy(struct flags_parser *fp); } while (0) #endif +/* clang-format on */ diff --git a/flow.c b/flow.c index 016c516..ce3743e 100644 --- a/flow.c +++ b/flow.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include #include #include @@ -394,3 +396,4 @@ ssize_t flow_recv_zerocopy(struct flow *f, void *copybuf, size_t copybuf_len) { #endif return n_read; } +/* clang-format on */ diff --git a/flow.h b/flow.h index 2e0332d..5bba298 100644 --- a/flow.h +++ b/flow.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_FLOW_H #define THIRD_PARTY_NEPER_FLOW_H @@ -77,3 +79,4 @@ void flow_init_rx_zerocopy(struct flow *f, int buffer_size, ssize_t flow_recv_zerocopy(struct flow *f, void *copybuf, size_t copybuf_len); #endif +/* clang-format on */ diff --git a/hexdump.c b/hexdump.c index 237fe98..3ec8c6a 100644 --- a/hexdump.c +++ b/hexdump.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "hexdump.h" #include #include @@ -51,3 +53,4 @@ char *hexdump(const char *in, size_t in_len, char *out, size_t out_len) out[sizeof(row_buf) * num_rows] = '\0'; return out; } +/* clang-format on */ diff --git a/hexdump.h b/hexdump.h index a1d8a14..2a45178 100644 --- a/hexdump.h +++ b/hexdump.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_HEXDUMP_H #define THIRD_PARTY_NEPER_HEXDUMP_H @@ -22,3 +24,4 @@ char *hexdump(const char *in, size_t in_len, char *out, size_t out_len); #endif +/* clang-format on */ diff --git a/histo.c b/histo.c index 9378cdf..89fa3e0 100644 --- a/histo.c +++ b/histo.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "histo.h" #include "common.h" #include "logging.h" @@ -319,3 +321,4 @@ struct neper_histo *neper_histo_new(const struct thread *t, uint8_t k_bits) return ret; } +/* clang-format on */ diff --git a/histo.h b/histo.h index 95ba9f3..8234ba3 100644 --- a/histo.h +++ b/histo.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_HISTO_H #define THIRD_PARTY_NEPER_HISTO_H @@ -74,3 +76,4 @@ void neper_histo_print(struct neper_histo *); void neper_histo_delete(struct neper_histo *); #endif +/* clang-format on */ diff --git a/lib.h b/lib.h index c0e9e46..1fb69de 100644 --- a/lib.h +++ b/lib.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_LIB_H #define THIRD_PARTY_NEPER_LIB_H @@ -156,3 +158,4 @@ int psp_crr(struct options *, struct callbacks *); #endif #endif +/* clang-format on */ diff --git a/logging.c b/logging.c index a1fef8b..bd6969e 100644 --- a/logging.c +++ b/logging.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "logging.h" #include #include @@ -246,3 +248,4 @@ void logging_exit(struct callbacks *cb) { close_log(); } +/* clang-format on */ diff --git a/logging.h b/logging.h index e86e65c..5b3f74f 100644 --- a/logging.h +++ b/logging.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_LOGGING_H #define THIRD_PARTY_NEPER_LOGGING_H @@ -41,3 +43,4 @@ void logging_exit(struct callbacks *); LOG_FATAL(cb, fmt, ##args) #endif +/* clang-format on */ diff --git a/loop.c b/loop.c index b62c88b..21de376 100644 --- a/loop.c +++ b/loop.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include #include "common.h" @@ -100,3 +102,4 @@ void *loop(struct thread *t) /* This is technically a thread callback so it must return a (void *) */ return NULL; } +/* clang-format on */ diff --git a/loop.h b/loop.h index 15ea7cd..8dc4179 100644 --- a/loop.h +++ b/loop.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_LOOP_H #define THIRD_PARTY_NEPER_LOOP_H @@ -22,3 +24,4 @@ struct thread; void *loop(struct thread *); #endif +/* clang-format on */ diff --git a/or_die.c b/or_die.c index 9de1688..4d5f96e 100644 --- a/or_die.c +++ b/or_die.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "or_die.h" @@ -104,3 +106,4 @@ int socket_or_die(int domain, int type, int protocol, struct callbacks *cb) PLOG_FATAL(cb, "socket"); return s; } +/* clang-format on */ diff --git a/or_die.h b/or_die.h index 4b038a6..8e89c6f 100644 --- a/or_die.h +++ b/or_die.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_OR_DIE_H #define THIRD_PARTY_NEPER_OR_DIE_H @@ -36,3 +38,4 @@ void *realloc_or_die(void *ptr, size_t, struct callbacks *); int socket_or_die(int domain, int type, int protocol, struct callbacks *); #endif +/* clang-format on */ diff --git a/parse.c b/parse.c index 04341df..5332279 100644 --- a/parse.c +++ b/parse.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include #include @@ -117,3 +119,4 @@ void parse_duration(const char *arg, void *out, struct callbacks *cb) /* Never found a matching suffix. */ LOG_FATAL(cb, "invalid suffix %s", suffix); } +/* clang-format on */ diff --git a/parse.h b/parse.h index 9162d52..eb7e363 100644 --- a/parse.h +++ b/parse.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_PARSE_H #define THIRD_PARTY_NEPER_PARSE_H @@ -24,3 +26,4 @@ void parse_unit(const char *arg, void *out, struct callbacks *); void parse_duration(const char *arg, void *out, struct callbacks *cb); #endif +/* clang-format on */ diff --git a/percentiles.c b/percentiles.c index 3c24627..26b61da 100644 --- a/percentiles.c +++ b/percentiles.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "percentiles.h" #include #include @@ -92,3 +94,4 @@ void percentiles_print(const char *name, const void *var, struct callbacks *cb) PRINT(cb, name, "%s", s); free(s); } +/* clang-format on */ diff --git a/percentiles.h b/percentiles.h index 8fb725a..25dbd43 100644 --- a/percentiles.h +++ b/percentiles.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_PERCENTILES_H #define THIRD_PARTY_NEPER_PERCENTILES_H @@ -28,3 +30,4 @@ void percentiles_parse(const char *arg, void *out, struct callbacks *); void percentiles_print(const char *name, const void *var, struct callbacks *); #endif +/* clang-format on */ diff --git a/pq.c b/pq.c index 55af588..0218e87 100644 --- a/pq.c +++ b/pq.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "pq.h" #include "common.h" @@ -139,3 +141,4 @@ struct neper_pq *neper_pq(double (*cmp)(void *, void *), uint32_t maxlen, return pq; } +/* clang-format on */ diff --git a/pq.h b/pq.h index 82c2f28..5ffe6a1 100644 --- a/pq.h +++ b/pq.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_PQ_H #define THIRD_PARTY_NEPER_PQ_H @@ -44,3 +46,4 @@ struct neper_pq *neper_pq(double (*cmp)(void *, void *), uint32_t maxlen, struct callbacks *); #endif +/* clang-format on */ diff --git a/print.c b/print.c index fe9b213..7377037 100644 --- a/print.c +++ b/print.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "print.h" #include "common.h" @@ -62,3 +64,4 @@ void print_rusage(FILE *csv, const struct rusage *rusage, const char *nl) rusage->ru_nvcsw, rusage->ru_nivcsw); fprintf(csv, "%s", nl); } +/* clang-format on */ diff --git a/print.h b/print.h index ab2ae6d..e3e2a87 100644 --- a/print.h +++ b/print.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_PRINT_H #define THIRD_PARTY_NEPER_PRINT_H @@ -31,3 +33,4 @@ void print_latency_header(FILE *csv, const struct percentiles *); void print_rusage(FILE *csv, const struct rusage *, const char *nl); #endif +/* clang-format on */ diff --git a/psp_crr.c b/psp_crr.c index 1ffc6e0..2946c59 100644 --- a/psp_crr.c +++ b/psp_crr.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "psp_lib.h" #include "rr.h" @@ -46,3 +48,4 @@ int psp_crr(struct options *opts, struct callbacks *cb) opts->nostats = true; return run_main_thread(opts, cb, fn); } +/* clang-format on */ diff --git a/psp_crr_main.c b/psp_crr_main.c index 110a37c..e2162a6 100644 --- a/psp_crr_main.c +++ b/psp_crr_main.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "flags.h" #include "lib.h" @@ -65,3 +67,4 @@ int main(int argc, char **argv) logging_exit(&cb); return exit_code; } +/* clang-format on */ diff --git a/psp_kernel.h b/psp_kernel.h index e879a30..6937008 100644 --- a/psp_kernel.h +++ b/psp_kernel.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef PSP_KERNEL_H_ #define PSP_KERNEL_H_ @@ -47,3 +49,4 @@ struct psp_spi_tuple { }; #endif // PSP_KERNEL_H_ +/* clang-format on */ diff --git a/psp_lib.c b/psp_lib.c index d1c064f..249a525 100644 --- a/psp_lib.c +++ b/psp_lib.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "psp_lib.h" #include "socket.h" @@ -265,3 +267,4 @@ void psp_post_listen(struct thread *t, int s, struct addrinfo *ai) { pthread_mutex_unlock(&psp_key_lock); } +/* clang-format on */ diff --git a/psp_lib.h b/psp_lib.h index 38f3c85..54df3d9 100644 --- a/psp_lib.h +++ b/psp_lib.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_PSP_LIB_H_ #define THIRD_PARTY_NEPER_PSP_LIB_H_ @@ -45,3 +47,4 @@ void psp_pre_connect(struct thread *t, int s, struct addrinfo *ai); void psp_post_listen(struct thread *t, int s, struct addrinfo *ai); #endif // THIRD_PARTY_NEPER_PSP_LIB_H_ +/* clang-format on */ diff --git a/psp_rr.c b/psp_rr.c index b14a7ef..3a88fcc 100644 --- a/psp_rr.c +++ b/psp_rr.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "psp_lib.h" #include "rr.h" @@ -42,4 +44,5 @@ int psp_rr(struct options *opts, struct callbacks *cb) { const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; return run_main_thread(opts, cb, fn); -} \ No newline at end of file +} +/* clang-format on */ diff --git a/psp_rr_main.c b/psp_rr_main.c index 97ea85e..e039e97 100644 --- a/psp_rr_main.c +++ b/psp_rr_main.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "flags.h" #include "lib.h" @@ -63,3 +65,4 @@ int main(int argc, char **argv) logging_exit(&cb); return exit_code; } +/* clang-format on */ diff --git a/psp_stream.c b/psp_stream.c index 43f0916..ee4859d 100644 --- a/psp_stream.c +++ b/psp_stream.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "psp_lib.h" #include "socket.h" @@ -42,4 +44,5 @@ int psp_stream(struct options *opts, struct callbacks *cb) { const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; return run_main_thread(opts, cb, fn); -} \ No newline at end of file +} +/* clang-format on */ diff --git a/psp_stream_main.c b/psp_stream_main.c index 1f6b8af..8026a98 100644 --- a/psp_stream_main.c +++ b/psp_stream_main.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "flags.h" #include "parse.h" @@ -86,3 +88,4 @@ int main(int argc, char **argv) logging_exit(&cb); return exit_code; } +/* clang-format on */ diff --git a/rr.c b/rr.c index 214ba7a..f5b3689 100644 --- a/rr.c +++ b/rr.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + /* * This file implements six different state machines: * TCP_RR client/server, UDP_RR client/server, and TCP_CRR client/server @@ -620,3 +622,4 @@ int rr_report_stats(struct thread *tinfo) return 0; } +/* clang-format on */ diff --git a/rr.h b/rr.h index 3d11ef6..1954552 100644 --- a/rr.h +++ b/rr.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_RR_H #define THIRD_PARTY_NEPER_RR_H @@ -24,3 +26,4 @@ void rr_flow_init(struct thread *, int fd); int rr_report_stats(struct thread *); #endif +/* clang-format on */ diff --git a/rusage.c b/rusage.c index 2d73e34..0ceec93 100644 --- a/rusage.c +++ b/rusage.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "rusage.h" #include "common.h" @@ -167,3 +169,4 @@ struct neper_rusage *neper_rusage(double interval) return rusage; } +/* clang-format on */ diff --git a/rusage.h b/rusage.h index dd541cb..0f61a37 100644 --- a/rusage.h +++ b/rusage.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_RUSAGE_H #define THIRD_PARTY_NEPER_RUSAGE_H @@ -43,3 +45,4 @@ void set_getrusage_enhanced(bool stime_use_proc, int num_threads); int getrusage_enhanced(int who, struct rusage *usage); #endif +/* clang-format on */ diff --git a/snaps.c b/snaps.c index 1939b78..e444bdb 100644 --- a/snaps.c +++ b/snaps.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "snaps.h" #include "common.h" #include "print.h" @@ -128,3 +130,4 @@ struct neper_snaps *neper_snaps_init(struct neper_rusage *rusage, return snaps; } +/* clang-format on */ diff --git a/snaps.h b/snaps.h index b7616b7..6856c00 100644 --- a/snaps.h +++ b/snaps.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_SNAPS_H #define THIRD_PARTY_NEPER_SNAPS_H @@ -77,3 +79,4 @@ struct neper_snaps *neper_snaps_init(struct neper_rusage *, int total, int extra); #endif +/* clang-format on */ diff --git a/socket.c b/socket.c index b1694be..af6fac0 100644 --- a/socket.c +++ b/socket.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "flow.h" #include "socket.h" @@ -352,3 +354,4 @@ void socket_connect_all(struct thread *t) t->fn->fn_flow_init(t, s); } } +/* clang-format on */ diff --git a/socket.h b/socket.h index 6f5b0f9..14a2b24 100644 --- a/socket.h +++ b/socket.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_SOCKET_H #define THIRD_PARTY_NEPER_SOCKET_H @@ -25,3 +27,4 @@ int socket_connect_one(struct thread *, int flags); void socket_connect_all(struct thread *); #endif +/* clang-format on */ diff --git a/stats.c b/stats.c index 6527b17..2c72443 100644 --- a/stats.c +++ b/stats.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "stats.h" #include "coef.h" #include "common.h" @@ -289,3 +291,4 @@ struct neper_stats *neper_stats_init(struct callbacks *cb) return stats; } +/* clang-format on */ diff --git a/stats.h b/stats.h index d8c51e4..a830786 100644 --- a/stats.h +++ b/stats.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_STATS_H #define THIRD_PARTY_NEPER_STATS_H @@ -79,3 +81,4 @@ struct neper_stats { struct neper_stats *neper_stats_init(struct callbacks *); #endif +/* clang-format on */ diff --git a/stream.c b/stream.c index c057e75..a82c985 100644 --- a/stream.c +++ b/stream.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "stream.h" #include "coef.h" @@ -223,3 +225,4 @@ void stream_flow_init(struct thread *t, int fd) if (t->opts->rx_zerocopy) flow_init_rx_zerocopy(f, t->opts->buffer_size, t->cb); } +/* clang-format on */ diff --git a/stream.h b/stream.h index 8f20fc9..de30309 100644 --- a/stream.h +++ b/stream.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_STREAM_H #define THIRD_PARTY_NEPER_STREAM_H @@ -27,3 +29,4 @@ void stream_handler(struct flow *, uint32_t events); int stream_report(struct thread *); #endif +/* clang-format on */ diff --git a/tcp_crr.c b/tcp_crr.c index 0fe8bd1..85150ae 100644 --- a/tcp_crr.c +++ b/tcp_crr.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "rr.h" #include "socket.h" @@ -41,3 +43,4 @@ int tcp_crr(struct options *opts, struct callbacks *cb) opts->nostats = true; return run_main_thread(opts, cb, fn); } +/* clang-format on */ diff --git a/tcp_crr_main.c b/tcp_crr_main.c index fcd7b9e..09bd501 100644 --- a/tcp_crr_main.c +++ b/tcp_crr_main.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "flags.h" #include "lib.h" @@ -64,3 +66,4 @@ int main(int argc, char **argv) logging_exit(&cb); return exit_code; } +/* clang-format on */ diff --git a/tcp_rr.c b/tcp_rr.c index c51abc1..98334fe 100644 --- a/tcp_rr.c +++ b/tcp_rr.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "rr.h" #include "socket.h" @@ -38,3 +40,4 @@ int tcp_rr(struct options *opts, struct callbacks *cb) const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; return run_main_thread(opts, cb, fn); } +/* clang-format on */ diff --git a/tcp_rr_main.c b/tcp_rr_main.c index 32bcfc9..858c259 100644 --- a/tcp_rr_main.c +++ b/tcp_rr_main.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "flags.h" #include "lib.h" @@ -62,3 +64,4 @@ int main(int argc, char **argv) logging_exit(&cb); return exit_code; } +/* clang-format on */ diff --git a/tcp_stream.c b/tcp_stream.c index 6c52333..b382058 100644 --- a/tcp_stream.c +++ b/tcp_stream.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "socket.h" #include "stream.h" @@ -38,3 +40,4 @@ int tcp_stream(struct options *opts, struct callbacks *cb) const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; return run_main_thread(opts, cb, fn); } +/* clang-format on */ diff --git a/tcp_stream_main.c b/tcp_stream_main.c index d2b6ada..56cbde1 100644 --- a/tcp_stream_main.c +++ b/tcp_stream_main.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "flags.h" #include "parse.h" @@ -84,3 +86,4 @@ int main(int argc, char **argv) logging_exit(&cb); return exit_code; } +/* clang-format on */ diff --git a/thread.c b/thread.c index 63d4bda..2a56015 100644 --- a/thread.c +++ b/thread.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include #include #include @@ -689,3 +691,4 @@ int run_main_thread(struct options *opts, struct callbacks *cb, free(data_pending); return ret; } +/* clang-format on */ diff --git a/thread.h b/thread.h index bb926b4..21b58f2 100644 --- a/thread.h +++ b/thread.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_THREAD_H #define THIRD_PARTY_NEPER_THREAD_H @@ -158,3 +160,4 @@ void thread_flush_stat(struct thread *); void thread_clear_flow_or_die(struct thread*, struct flow *); #endif +/* clang-format on */ diff --git a/udp_rr.c b/udp_rr.c index 206aea6..101f330 100644 --- a/udp_rr.c +++ b/udp_rr.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "rr.h" #include "socket.h" @@ -38,3 +40,4 @@ int udp_rr(struct options *opts, struct callbacks *cb) const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; return run_main_thread(opts, cb, fn); } +/* clang-format on */ diff --git a/udp_rr_main.c b/udp_rr_main.c index 194c605..b6b38e8 100644 --- a/udp_rr_main.c +++ b/udp_rr_main.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "countdown_cond.h" #include "flags.h" @@ -63,3 +65,4 @@ int main(int argc, char **argv) logging_exit(&cb); return exit_code; } +/* clang-format on */ diff --git a/udp_stream.c b/udp_stream.c index 31c4ba5..fa52b2c 100644 --- a/udp_stream.c +++ b/udp_stream.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "socket.h" #include "stream.h" @@ -38,3 +40,4 @@ int udp_stream(struct options *opts, struct callbacks *cb) const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; return run_main_thread(opts, cb, fn); } +/* clang-format on */ diff --git a/udp_stream_main.c b/udp_stream_main.c index 29697fe..b68dd28 100644 --- a/udp_stream_main.c +++ b/udp_stream_main.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "common.h" #include "flags.h" #include "parse.h" @@ -70,3 +72,4 @@ int main(int argc, char **argv) logging_exit(&cb); return exit_code; } +/* clang-format on */ diff --git a/version.c b/version.c index c796452..5af9392 100644 --- a/version.c +++ b/version.c @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #include "version.h" #include @@ -37,3 +39,4 @@ void show_version(void) { printf("%s\n", get_version()); } +/* clang-format on */ diff --git a/version.h b/version.h index e66a4af..926c859 100644 --- a/version.h +++ b/version.h @@ -14,6 +14,8 @@ * limitations under the License. */ +/* clang-format off */ + #ifndef THIRD_PARTY_NEPER_VERSION_H #define THIRD_PARTY_NEPER_VERSION_H @@ -29,3 +31,4 @@ void show_version(void); #endif #endif +/* clang-format on */ From 771ef229b47a67c09021fd5e97cc8f90a335368b Mon Sep 17 00:00:00 2001 From: Kevin Krakauer Date: Fri, 20 Feb 2026 15:31:36 -0800 Subject: [PATCH 2/4] zerocopy send support with MSG_ZEROCOPY Tested: I ran ./tcp_stream -l 5 on the server and ./tcp_stream -H 127.0.0.1 -c -Z -l 3 on the client. The test completed successfully, transferring data with a throughput of 15014.27 Mbit/s (remote_throughput=15014266767). --- common.c | 86 ++++++++++++++++++++++++++++++++++++++++++++++ common.h | 3 ++ define_all_flags.c | 2 +- flow.c | 13 +++++++ flow.h | 9 +++++ rr.c | 35 +++++++++++++++++++ stream.c | 4 +++ 7 files changed, 151 insertions(+), 1 deletion(-) diff --git a/common.c b/common.c index 02735a9..9eeb9c0 100644 --- a/common.c +++ b/common.c @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -25,9 +26,11 @@ #include #include "common.h" +#include "flow.h" #include "lib.h" #include "logging.h" #include "or_die.h" +#include "thread.h" #define kilo (1000) #define kibi (1024) @@ -348,4 +351,87 @@ int create_suicide_timeout(int sec_to_suicide) } return 0; } + +static void process_cmsg(struct flow *f, struct callbacks *cb, + struct cmsghdr *cmsg) +{ + struct sock_extended_err *err; + + switch(cmsg->cmsg_level) { + case SOL_IP: + case SOL_IPV6: + switch (cmsg->cmsg_type) { + case IP_RECVERR: + case IPV6_RECVERR: + { + err = (struct sock_extended_err *)CMSG_DATA(cmsg); + switch (err->ee_origin) { + case SO_EE_ORIGIN_ZEROCOPY: + { + uint32_t lo = err->ee_info; + uint32_t hi = err->ee_data; + uint64_t stat_zcopies = 0; + int ee_copy_event = 0; + + if (err->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) + ee_copy_event = 1; + else + stat_zcopies = hi - lo + 1; + + flow_update_zstat(f, stat_zcopies, + ee_copy_event); + break; + } + default: + LOG_ERROR(cb, "unsupported ee_origin %u\n", + err->ee_origin); + break; + } + break; + } + default: + LOG_ERROR(cb, "unsupported cmsg type: %u\n", + cmsg->cmsg_type); + } + break; + + default: + LOG_ERROR(cb, "unsupported cmsg level: %u\n", cmsg->cmsg_level); + } +} + +// This function is used to process error queue when TX_ZEROCOPY is used. +int do_recvmsg_errqueue(struct thread *t, struct flow *f, uint32_t events) +{ + /* + * Right now we only process ZEROCOPY related cmsg. + */ + char control[CMSG_SPACE(sizeof(struct sockaddr_in6)) + + CMSG_SPACE(sizeof(struct sock_extended_err))]; + struct msghdr msg = { + .msg_control = control, + .msg_controllen = sizeof(control), + }; + struct cmsghdr *cmsg; + int fd = flow_fd(f); + ssize_t n; + + if (events & EPOLLERR) { + do { + n = recvmsg(fd, &msg, MSG_ERRQUEUE); + } while(n == -1 && errno == EINTR); + if (n == -1) { + if (errno != EAGAIN) + LOG_WARN(t->cb, + "recvmsg() on ERRQUEUE failed: %s\n", + strerror(errno)); + return errno; + } + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; + cmsg = CMSG_NXTHDR(&msg, cmsg)) + process_cmsg(f, t->cb, cmsg); + } + return 0; +} + /* clang-format on */ diff --git a/common.h b/common.h index 6590441..844158a 100644 --- a/common.h +++ b/common.h @@ -170,5 +170,8 @@ void print_unit(const char *name, const void *var, struct callbacks *); const struct rate_conversion *auto_unit(const double throughput, const struct rate_conversion *opt, struct callbacks *); + +int do_recvmsg_errqueue(struct thread *t, struct flow *f, uint32_t events); + #endif /* clang-format on */ diff --git a/define_all_flags.c b/define_all_flags.c index cb0cc95..8105193 100644 --- a/define_all_flags.c +++ b/define_all_flags.c @@ -56,6 +56,7 @@ struct flags_parser *add_flags_common(struct flags_parser *fp) DEFINE_FLAG(fp, const char *, all_samples, NULL, 'A', "Print all samples? If yes, this is the output file name"); DEFINE_FLAG_HAS_OPTIONAL_ARGUMENT(fp, all_samples); DEFINE_FLAG_PARSER(fp, all_samples, parse_all_samples); + DEFINE_FLAG_NAMED(fp, bool, tx_zerocopy, false, "zerocopy", 'Z', "Set MSG_ZEROCOPY when sending"); DEFINE_FLAG(fp, bool, time_wait, false, 0, "Do not set SO_LINGER 0. Close gracefully. Active peer will enter TIME_WAIT state"); DEFINE_FLAG(fp, unsigned long, iostat_ms, 0, 0, "Print io stats snapshot every this many ms"); DEFINE_FLAG(fp, unsigned long, wait_start, 0, 0, "Wait this many seconds before starting any data flows."); @@ -114,7 +115,6 @@ struct flags_parser *add_flags_stream(struct flags_parser *fp) DEFINE_FLAG(fp, int, test_length, 10, 'l', "Test length in seconds"); DEFINE_FLAG(fp, bool, edge_trigger, false, 'E', "Edge-triggered epoll"); DEFINE_FLAG(fp, bool, reuseaddr, false, 'R', "Use SO_REUSEADDR on sockets"); - DEFINE_FLAG_NAMED(fp, bool, tx_zerocopy, false, "zerocopy", 'Z', "Set MSG_ZEROCOPY when sending"); DEFINE_FLAG(fp, const struct rate_conversion *, throughput_opt, neper_units_mb_pointer_hack, 0, "Units to display for throughput"); DEFINE_FLAG_PARSER(fp, throughput_opt, parse_unit); DEFINE_FLAG_PRINTER(fp, throughput_opt, print_unit); diff --git a/flow.c b/flow.c index ce3743e..cc18099 100644 --- a/flow.c +++ b/flow.c @@ -54,6 +54,8 @@ struct flow { void *f_rx_zerocopy_buffer; size_t f_rx_zerocopy_buffer_sz; + struct zerocopy_stat z_stat; + long rtt_log_count; }; @@ -347,6 +349,17 @@ void flow_update_next_event(struct flow *f, uint64_t duration) f->f_next_event += duration; } +void flow_update_zstat(struct flow *f, uint64_t zerocopied_bytes, int ee_copied) +{ + f->z_stat.bytes += zerocopied_bytes; + f->z_stat.ee_copied_events += ee_copied; +} + +const struct zerocopy_stat *flow_get_zstat(const struct flow *f) +{ + return &f->z_stat; +} + ssize_t flow_recv_zerocopy(struct flow *f, void *copybuf, size_t copybuf_len) { struct tcp_zerocopy_receive zc = {0}; socklen_t zc_len = sizeof(zc); diff --git a/flow.h b/flow.h index 5bba298..a997eb2 100644 --- a/flow.h +++ b/flow.h @@ -65,6 +65,15 @@ struct flow_create_args { struct neper_stat *(*stat)(struct flow *); /* stats callback */ }; +struct zerocopy_stat { + uint64_t bytes; + int ee_copied_events; +}; + +void flow_update_zstat(struct flow *f, uint64_t zerocopied_bytes, + int ee_copied); +const struct zerocopy_stat *flow_get_zstat(const struct flow *f); + struct flow *flow_create(const struct flow_create_args *); long flow_rtt_log_count(const struct flow *f); void flow_increment_rtt_log_count(struct flow *f); diff --git a/rr.c b/rr.c index f5b3689..006a63e 100644 --- a/rr.c +++ b/rr.c @@ -246,6 +246,8 @@ static bool rr_do_send(struct flow *f, uint32_t events, rr_send_t rr_send) len = opts->buffer_size; flags |= MSG_MORE; } + if (opts->tx_zerocopy) + flags |= MSG_ZEROCOPY; ssize_t n = rr_send(f, flow_mbuf(f), len, flags); if (n == -1) { @@ -373,6 +375,11 @@ static void rr_client_state_1(struct flow *f, uint32_t events) { struct thread *t = flow_thread(f); + if (t->opts->tx_zerocopy && (events & EPOLLERR)) { + do_recvmsg_errqueue(t, f, events); + return; + } + if (rr_do_recv(f, events, rr_fn_recv)) { struct rr_state *rr = flow_opaque(f); @@ -393,6 +400,11 @@ static void rr_client_state_0(struct flow *f, uint32_t events) { struct thread *t = flow_thread(f); + if (t->opts->tx_zerocopy && (events & EPOLLERR)) { + do_recvmsg_errqueue(t, f, events); + return; + } + if (t->data_pending && countdown_cond_dec(t->data_pending) < 0) { /* data vs time mode and no more transactons to send */ return; @@ -407,6 +419,13 @@ static void rr_client_state_0(struct flow *f, uint32_t events) static void crr_client_state_1(struct flow *f, uint32_t events) { + struct thread *t = flow_thread(f); + + if (t->opts->tx_zerocopy && (events & EPOLLERR)) { + do_recvmsg_errqueue(t, f, events); + return; + } + if (rr_do_recv(f, events, rr_fn_recv)) { struct rr_state *rr = flow_opaque(f); @@ -420,6 +439,11 @@ static void crr_client_state_0(struct flow *f, uint32_t events) { struct thread *t = flow_thread(f); + if (t->opts->tx_zerocopy && (events & EPOLLERR)) { + do_recvmsg_errqueue(t, f, events); + return; + } + if (t->data_pending && countdown_cond_dec(t->data_pending) < 0) { /* data vs time mode and no more transactons to send */ return; @@ -437,6 +461,11 @@ static void rr_server_state_2(struct flow *f, uint32_t events) struct neper_stat *stat = flow_stat(f); struct neper_histo *histo = stat ? stat->histo(stat) : NULL; + if (t->opts->tx_zerocopy && (events & EPOLLERR)) { + do_recvmsg_errqueue(t, f, events); + return; + } + if (rr_do_send(f, events, rr->rr_send)) { if (stat) { /* rr server has no meaningful latency to measure. */ @@ -455,6 +484,12 @@ static void rr_server_state_1(struct flow *f) static void rr_server_state_0(struct flow *f, uint32_t events) { struct rr_state *rr = flow_opaque(f); + struct thread *t = flow_thread(f); + + if (t->opts->tx_zerocopy && (events & EPOLLERR)) { + do_recvmsg_errqueue(t, f, events); + return; + } if (rr_do_recv(f, events, rr->rr_recv)) rr_server_state_1(f); diff --git a/stream.c b/stream.c index a82c985..adb36e6 100644 --- a/stream.c +++ b/stream.c @@ -151,6 +151,10 @@ void stream_handler(struct flow *f, uint32_t events) * e.g. Linux kernel tools/testing/selftests/net/msg_zerocopy.c */ } + // Process error queue for TX_ZEROCOPY case. + if (events & EPOLLERR) + do_recvmsg_errqueue(t, f, events); + if (opts->split_bidir && !opts->client && events & EPOLLOUT && events & EPOLLOUT) { /* See comments in flow.c on bidirectional traffic: From 0eb5193981a533074fbcd67769586e824f5560e0 Mon Sep 17 00:00:00 2001 From: Kevin Krakauer Date: Fri, 20 Feb 2026 16:01:19 -0800 Subject: [PATCH 3/4] print more IRQ stats Tested: Ran tcp_stream locally and saw nonlocal values for everything but TX IRQs (which is zero because we're running over loopback). --- Makefile | 1 + irq.c | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ irq.h | 35 ++++++++++++++++++++++++++++ thread.c | 23 +++++++++++++++++++ 4 files changed, 128 insertions(+) create mode 100644 irq.c create mode 100644 irq.h diff --git a/Makefile b/Makefile index 586eeea..769a4e0 100644 --- a/Makefile +++ b/Makefile @@ -31,6 +31,7 @@ lib := \ flow.o \ hexdump.o \ histo.o \ + irq.o \ logging.o \ loop.o \ or_die.o \ diff --git a/irq.c b/irq.c new file mode 100644 index 0000000..8ed76e7 --- /dev/null +++ b/irq.c @@ -0,0 +1,69 @@ +/* + * Copyright 2021 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* clang-format off */ + +#include "irq.h" +#include "common.h" + + +void get_proc_interrupts(struct stats_irq *irqs) +{ + /* Get the hardware and software interrupt by scraping /proc/stat */ + bool assigned_softirq = false; + bool assigned_irq = false; + char *line = NULL; + size_t len = 0; + ssize_t read; + + FILE *f = fopen("/proc/stat", "r"); + if (!f) { + fprintf(stderr, "Could not open /proc/stat\n"); + exit(1); + } + + while ((read = getline(&line, &len, f)) != -1) { + /* Get NET_TX_SOFTIRQ and NET_RX_SOFTIRQ */ + if (!assigned_softirq) + assigned_softirq = sscanf(line, "softirq %*u %*u %*u %u %u", + &irqs->tx_softirq, &irqs->rx_softirq) == 2; + /* Record the first number (sum of all intr) from the intr line */ + if (!assigned_irq) + assigned_irq = sscanf(line, "intr %lu", &irqs->hardirq) == 1; + + if (assigned_softirq && assigned_irq) { + break; + } + } + + if (!assigned_softirq) + fprintf(stderr, + "IO or parser error while reading /proc/stat softirq!\n"); + + if (!assigned_irq) + fprintf(stderr, + "IO or parser error while reading /proc/stat irq!\n"); + + if (line) + free(line); + + fclose(f); + + if (!assigned_softirq || !assigned_irq) + exit(1); +} + +/* clang-format on */ diff --git a/irq.h b/irq.h new file mode 100644 index 0000000..9aebe3d --- /dev/null +++ b/irq.h @@ -0,0 +1,35 @@ +/* + * Copyright 2021 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* clang-format off */ + +#ifndef THIRD_PARTY_NEPER_IRQ_H +#define THIRD_PARTY_NEPER_IRQ_H + +#include +#include + +struct stats_irq { + unsigned long hardirq; + unsigned int tx_softirq; + unsigned int rx_softirq; +}; + +void get_proc_interrupts(struct stats_irq *irqs); + +#endif + +/* clang-format on */ diff --git a/thread.c b/thread.c index 2a56015..7665e7a 100644 --- a/thread.c +++ b/thread.c @@ -27,6 +27,7 @@ #include "cpuinfo.h" #include "flow.h" #include "histo.h" +#include "irq.h" #include "logging.h" #include "loop.h" #include "percentiles.h" @@ -570,6 +571,11 @@ int run_main_thread(struct options *opts, struct callbacks *cb, struct rusage rusage_start; /* updated when first packet comes */ struct rusage rusage_end; /* local to this function, never pass out */ + struct stats_irq irqs_start; /* updated when first packet comes */ + struct stats_irq irqs_end; /* local to this function, never pass out */ + struct timespec time_end; + double elapsed_time; + struct addrinfo *ai; struct thread *ts; /* worker threads */ struct control_plane *cp; @@ -610,6 +616,8 @@ int run_main_thread(struct options *opts, struct callbacks *cb, if (opts->client) sleep(opts->wait_start); + get_proc_interrupts(&irqs_start); + /* start threads *after* control plane is up, to reuse addrinfo. */ reset_port(ai, atoi(opts->port), cb); ts = calloc(opts->num_threads, sizeof(struct thread)); @@ -630,6 +638,9 @@ int run_main_thread(struct options *opts, struct callbacks *cb, stop_worker_threads(cb, opts->num_threads, ts, &ready_barrier, &loop_init_c, &loop_init_m); LOG_INFO(cb, "stopped worker threads"); + get_proc_interrupts(&irqs_end); + common_gettime(&time_end); + elapsed_time = seconds_between(&time_start, &time_end); PRINT(cb, "invalid_secret_count", "%d", control_plane_incidents(cp)); @@ -656,6 +667,18 @@ int run_main_thread(struct options *opts, struct callbacks *cb, PRINT(cb, "nvcsw_end", "%ld", rusage_end.ru_nvcsw); PRINT(cb, "nivcsw_start", "%ld", rusage_start.ru_nivcsw); PRINT(cb, "nivcsw_end", "%ld", rusage_end.ru_nivcsw); + PRINT(cb, "hw_irq_start", "%ld", irqs_start.hardirq); + PRINT(cb, "hw_irq_end", "%ld", irqs_end.hardirq); + PRINT(cb, "tx_sw_irq_start", "%u", irqs_start.tx_softirq); + PRINT(cb, "tx_sw_irq_end", "%u", irqs_end.tx_softirq); + PRINT(cb, "rx_sw_irq_start", "%u", irqs_start.rx_softirq); + PRINT(cb, "rx_sw_irq_end", "%u", irqs_end.rx_softirq); + PRINT(cb, "hw_irq_rate", "%f", + (irqs_end.hardirq - irqs_start.hardirq) / elapsed_time); + PRINT(cb, "tx_sw_irq_rate", "%f", + (irqs_end.tx_softirq - irqs_start.tx_softirq) / elapsed_time); + PRINT(cb, "rx_sw_irq_rate", "%f", + (irqs_end.rx_softirq - irqs_start.rx_softirq) / elapsed_time); pthread_mutex_unlock(&time_start_mutex); /* end printing rusage */ From b115ea2f7bbcc2aa830b90207849c6d3f86bee54 Mon Sep 17 00:00:00 2001 From: Kevin Krakauer Date: Fri, 20 Feb 2026 16:45:58 -0800 Subject: [PATCH 4/4] io_uring support Tested: `./tcp_stream -l 2 -H 127.0.0.1 -O` and `./tcp_stream -c -H 127.0.0.1 -l 2 -O`. Also tested with only the client and only the server using -O. --- Makefile | 5 + common.c | 13 ++ common.h | 6 + define_all_flags.c | 3 + flow.c | 63 +++++++- flow.h | 28 +++- lib.h | 4 + loop.c | 33 +++- psp_stream.c | 4 + rr.c | 364 +++++++++++++++++++++++++++++++++++++++------ tcp_crr.c | 4 + tcp_rr.c | 4 + tcp_stream.c | 4 + thread.c | 19 ++- thread.h | 10 ++ udp_rr.c | 4 + udp_stream.c | 4 + 17 files changed, 523 insertions(+), 49 deletions(-) diff --git a/Makefile b/Makefile index 769a4e0..78fe2f6 100644 --- a/Makefile +++ b/Makefile @@ -64,6 +64,11 @@ psp_rr-objs := psp_rr_main.o psp_rr.o rr.o psp_lib.o $(lib) ext-libs := -lm -lrt -lpthread +ifeq ($(WITH_IO_URING),1) + CFLAGS += -DWITH_IO_URING + ext-libs += -luring +endif + tcp_rr: $(tcp_rr-objs) $(CC) $(LDFLAGS) -o $@ $^ $(ext-libs) diff --git a/common.c b/common.c index 9eeb9c0..ccc72fb 100644 --- a/common.c +++ b/common.c @@ -352,6 +352,19 @@ int create_suicide_timeout(int sec_to_suicide) return 0; } +#ifdef WITH_IO_URING +static struct io_uring s_main_ring; +void io_uring_init_main_ring(struct options *opts) +{ + io_uring_queue_init(opts->uring_size, &s_main_ring, 0); +} + +struct io_uring *io_uring_get_main_ring() +{ + return &s_main_ring; +} +#endif + static void process_cmsg(struct flow *f, struct callbacks *cb, struct cmsghdr *cmsg) { diff --git a/common.h b/common.h index 844158a..c28d6cb 100644 --- a/common.h +++ b/common.h @@ -173,5 +173,11 @@ const struct rate_conversion *auto_unit(const double throughput, int do_recvmsg_errqueue(struct thread *t, struct flow *f, uint32_t events); +#ifdef WITH_IO_URING +struct io_uring; +void io_uring_init_main_ring(struct options *opts); +struct io_uring *io_uring_get_main_ring(); +#endif + #endif /* clang-format on */ diff --git a/define_all_flags.c b/define_all_flags.c index 8105193..92a65cc 100644 --- a/define_all_flags.c +++ b/define_all_flags.c @@ -60,6 +60,8 @@ struct flags_parser *add_flags_common(struct flags_parser *fp) DEFINE_FLAG(fp, bool, time_wait, false, 0, "Do not set SO_LINGER 0. Close gracefully. Active peer will enter TIME_WAIT state"); DEFINE_FLAG(fp, unsigned long, iostat_ms, 0, 0, "Print io stats snapshot every this many ms"); DEFINE_FLAG(fp, unsigned long, wait_start, 0, 0, "Wait this many seconds before starting any data flows."); + DEFINE_FLAG(fp, bool, use_uring, false, 'O', "Should use uring for both tx/rx path"); + DEFINE_FLAG(fp, int, uring_size, 128, 0, "io_uring size"); /* Return the updated fp */ return (fp); @@ -132,6 +134,7 @@ struct flags_parser *add_flags_tcp_rr(struct flags_parser *fp) DEFINE_FLAG(fp, bool, reuseaddr, false, 0, "Use SO_REUSEADDR on sockets"); DEFINE_FLAG(fp, unsigned long, noburst, 0, 0, "noburst interval in ns (default), us, ms, or s"); DEFINE_FLAG_PARSER(fp, noburst, parse_duration); + DEFINE_FLAG(fp, int, iov_len, 0, 0, "Number of bytes to put in 1 iovec"); /* Return the updated fp */ return (fp); diff --git a/flow.c b/flow.c index cc18099..04079dc 100644 --- a/flow.c +++ b/flow.c @@ -22,7 +22,13 @@ #include #include #include +#include #include +#include +#ifdef WITH_IO_URING +#include +#include +#endif #include "common.h" #include "flow.h" @@ -57,6 +63,10 @@ struct flow { struct zerocopy_stat z_stat; long rtt_log_count; + +#ifdef WITH_IO_URING + void *user_data; +#endif }; int flow_fd(const struct flow *f) @@ -163,7 +173,53 @@ void flow_increment_rtt_log_count(struct flow *f) f->rtt_log_count++; } -struct flow *flow_create(const struct flow_create_args *args) +#ifdef WITH_IO_URING +void *flow_get_user_data(const struct flow *f) { + return f->user_data; +} + +void flow_set_user_data(struct flow *f, void *user_data) { + f->user_data = user_data; +} + +int flow_uring_submit_sqe(struct flow_uring_req *f_req, + struct io_uring_sqe *sqe) { + io_uring_sqe_set_data(sqe, f_req); + return 0; +} + +int flow_uring_handle_completions(struct thread *t, struct timespec *timeout) { + int ret; + struct flow_uring_req *f_req; + struct io_uring_cqe *cqe; + + ret = io_uring_peek_cqe(&t->ring, &cqe); + if (ret < 0) { + if (!timeout) { + ret = io_uring_submit_and_wait(&t->ring, 1); + return ret; + } else if (!timespec_is_zero(timeout)) { + struct __kernel_timespec ts; + ts.tv_sec = timeout->tv_sec; + ts.tv_nsec = timeout->tv_nsec; + ret = io_uring_submit_and_wait_timeout(&t->ring, + &cqe, 1, &ts, NULL); + } + } + + if (ret >= 0) { + f_req = (struct flow_uring_req*) io_uring_cqe_get_data(cqe); + ret = cqe->res; + io_uring_cqe_seen(&t->ring, cqe); + if (f_req->cb) { + f_req->cb(f_req, ret); + } + } + return ret; +} +#endif + +void *flow_create(const struct flow_create_args *args) { struct thread *t = args->thread; struct flow *f = calloc_or_die(1, sizeof(struct flow), t->cb); @@ -206,6 +262,11 @@ struct flow *flow_create(const struct flow_create_args *args) if (t->opts->split_bidir && t->opts->client) events &= (f->f_id & 1) ? EPOLLOUT : EPOLLIN; +#ifdef WITH_IO_URING + if (t->use_uring) + return f; +#endif + flow_ctl(f, EPOLL_CTL_ADD, args->handler, events, true); return f; } diff --git a/flow.h b/flow.h index a997eb2..5149e69 100644 --- a/flow.h +++ b/flow.h @@ -24,6 +24,10 @@ #include #include #include +#ifdef WITH_IO_URING +#include +#include +#endif struct flow; /* note: struct is defined opaquely within flow.c */ struct neper_stat; @@ -36,6 +40,12 @@ struct rtt_log { struct timespec timestamp; }; +#ifdef WITH_IO_URING +struct flow_uring_req; + +typedef void (*req_handler)(struct flow_uring_req *, int); +#endif + typedef void (*flow_handler)(struct flow *, uint32_t); /* Simple accessors. */ @@ -70,11 +80,27 @@ struct zerocopy_stat { int ee_copied_events; }; +#ifdef WITH_IO_URING +struct flow_uring_req { + struct flow *f; + req_handler cb; + void *user_data; +}; +#endif + void flow_update_zstat(struct flow *f, uint64_t zerocopied_bytes, int ee_copied); const struct zerocopy_stat *flow_get_zstat(const struct flow *f); -struct flow *flow_create(const struct flow_create_args *); +#ifdef WITH_IO_URING +void *flow_get_user_data(const struct flow *f); +void flow_set_user_data(struct flow *f, void *user_data); +int flow_uring_submit_sqe(struct flow_uring_req *f_req, + struct io_uring_sqe *sqe); +int flow_uring_handle_completions(struct thread *t, struct timespec *timeout); +#endif + +void *flow_create(const struct flow_create_args *); long flow_rtt_log_count(const struct flow *f); void flow_increment_rtt_log_count(struct flow *f); void flow_delete(struct flow *); diff --git a/lib.h b/lib.h index 1fb69de..a61f43b 100644 --- a/lib.h +++ b/lib.h @@ -138,6 +138,10 @@ struct options { /* rr */ const char *log_rtt; int logrtt_entries_per_flow; + + bool use_uring; + int uring_size; + int iov_len; }; #ifdef __cplusplus diff --git a/loop.c b/loop.c index 21de376..ae1db53 100644 --- a/loop.c +++ b/loop.c @@ -18,6 +18,10 @@ #include +#ifdef WITH_IO_URING +#include +#endif + #include "common.h" #include "flow.h" #include "loop.h" @@ -50,7 +54,10 @@ void *loop(struct thread *t) .stat = NULL }; - flow_create(&args); +#ifdef WITH_IO_URING + if (!t->use_uring) +#endif + flow_create(&args); /* Server sockets must be created in order * so that the ebpf filter works. @@ -82,6 +89,25 @@ void *loop(struct thread *t) /* Passing a NULL timeout causes us to block indefinitely. */ struct timespec *poll_timeout = indefinite ? NULL : &timeout; +#ifdef WITH_IO_URING + if (t->use_uring) { + int ret; + + ret = flow_uring_handle_completions(t, poll_timeout); + + if (ret < 0 && ret != -EINTR && ret != -ETIME) { + PLOG_FATAL(t->cb, "flow_uring_handle_completions ret %d\n", ret); + } + /* Read eventfd if there is a stop. This is a bit + * hacky, but avoids creating a io_uring request just + * for eventfd. + */ + if (eventfd_read(t->stop_efd, (eventfd_t *)&ret) == 0) + t->stop = 1; + continue; + } +#endif + int nfds = t->poll_func(t->epfd, events, opts->maxevents, poll_timeout); int i; @@ -95,7 +121,10 @@ void *loop(struct thread *t) } thread_flush_stat(t); free(events); - do_close(t->epfd); +#ifdef WITH_IO_URING + if (!t->use_uring) +#endif + do_close(t->epfd); /* TODO: The first flow object is leaking here... */ diff --git a/psp_stream.c b/psp_stream.c index ee4859d..592f3e0 100644 --- a/psp_stream.c +++ b/psp_stream.c @@ -43,6 +43,10 @@ static const struct neper_fn server_fn = { int psp_stream(struct options *opts, struct callbacks *cb) { const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; +#ifdef WITH_IO_URING + if (opts->use_uring) + io_uring_init_main_ring(opts); +#endif return run_main_thread(opts, cb, fn); } /* clang-format on */ diff --git a/rr.c b/rr.c index 006a63e..f5c2d54 100644 --- a/rr.c +++ b/rr.c @@ -42,6 +42,12 @@ #include "stats.h" #include "thread.h" +#include +#include +#ifndef PAGE_SIZE +#define PAGE_SIZE ((size_t)sysconf(_SC_PAGESIZE)) +#endif + #define NEPER_EPOLL_MASK (EPOLLHUP | EPOLLRDHUP | EPOLLERR) typedef ssize_t (*rr_send_t)(struct flow *, const char *, size_t, int); @@ -59,6 +65,11 @@ struct rr_state { struct sockaddr_storage rr_peer; /* for UDP servers */ socklen_t rr_peerlen; + +#ifdef WITH_IO_URING + struct msghdr msg; /* sendmsg/recvmsg for UDP with io_uring */ + struct iovec iov; +#endif }; struct rr_snap_opaque { @@ -142,7 +153,20 @@ static void *rr_alloc(struct thread *t) len = MAX(opts->request_size, opts->response_size); len = MIN(len, opts->buffer_size); - t->f_mbuf = calloc_or_die(len, sizeof(char), t->cb); +#ifdef WITH_IO_URING + if (opts->iov_len) { + int num_of_pages = (len + opts->iov_len - 1) / opts->iov_len; + size_t alloc_size = num_of_pages * PAGE_SIZE; + void *addr = memalign(PAGE_SIZE, alloc_size); + + if (!addr) + PLOG_FATAL(t->cb, "memalign"); + t->f_mbuf = addr; + } else +#endif + { + t->f_mbuf = calloc_or_die(len, sizeof(char), t->cb); + } return t->f_mbuf; } @@ -196,6 +220,8 @@ static void rr_state_init(struct thread *t, int fd, break; } + struct flow *f; + const struct flow_create_args args = { .thread = t, .fd = fd, @@ -206,7 +232,28 @@ static void rr_state_init(struct thread *t, int fd, .stat = rr_latency_init }; - flow_create(&args); + f = flow_create(&args); + +#ifdef WITH_IO_URING + if (t->use_uring) { + struct flow_uring_req *f_req; + + f_req = calloc_or_die(1, sizeof(struct flow_uring_req), t->cb); + f_req->f = f; + f_req->user_data = NULL; + f_req->cb = NULL; + flow_set_user_data(f, f_req); + + // Setup for udp_rr + if (t->fn->fn_type == SOCK_DGRAM) { + rr->msg.msg_iov = &rr->iov; + rr->msg.msg_iovlen = 1; + rr->msg.msg_name = (void *)&rr->rr_peer; + rr->msg.msg_namelen = sizeof(struct sockaddr_storage); + } + state(f, 0); + } +#endif if (t->opts->client && t->opts->log_rtt && t->rtt_logs == NULL) { t->rtt_log_capacity = (long)t->flow_limit * @@ -223,18 +270,44 @@ static void rr_state_init(struct thread *t, int fd, * successfully completed and the state machine may therefore advance. */ +static inline bool rr_do_send_done(struct flow *f, int n) +{ + struct thread *t = flow_thread(f); + struct rr_state *rr = flow_opaque(f); + if (n == -1) { + PLOG_ERROR(t->cb, "send"); + return false; + } + + rr->rr_xfer -= n; + if (rr->rr_xfer) + return false; + + // Transition to receiving. + rr->rr_xfer = rr_recv_size(t); + return true; +} + static bool rr_do_send(struct flow *f, uint32_t events, rr_send_t rr_send) { struct thread *t = flow_thread(f); const struct options *opts = t->opts; struct rr_state *rr = flow_opaque(f); - if (events & ~(NEPER_EPOLL_MASK | EPOLLOUT)) - LOG_ERROR(t->cb, "%s(): unknown event(s) %x", __func__, events); - - if (events & NEPER_EPOLL_MASK) { - flow_delete(f); - return false; +#ifdef WITH_IO_URING + if (!t->use_uring) +#endif + { + /* EPOLLERR might be marked if zerocopy is used, but should + * already be processed before reaching here. + */ + if (events & ~(NEPER_EPOLL_MASK | EPOLLOUT)) + LOG_ERROR(t->cb, "%s(): unknown event(s) %x", __func__, events); + + if (events & NEPER_EPOLL_MASK) { + flow_delete(f); + return false; + } } ssize_t len = rr->rr_xfer; @@ -249,50 +322,69 @@ static bool rr_do_send(struct flow *f, uint32_t events, rr_send_t rr_send) if (opts->tx_zerocopy) flags |= MSG_ZEROCOPY; - ssize_t n = rr_send(f, flow_mbuf(f), len, flags); - if (n == -1) { - PLOG_ERROR(t->cb, "send"); - return false; +#ifdef WITH_IO_URING + if (t->use_uring) { + struct io_uring_sqe *sqe; + + if (opts->iov_len) { + struct msghdr msg = {0}; + struct iovec *iov_arr; + int num_of_iov = (len + opts->iov_len - 1) / opts->iov_len; + + iov_arr = calloc_or_die(num_of_iov, sizeof(struct iovec), t->cb); + + for (int i = 0; i < num_of_iov; i++) { + iov_arr[i].iov_base = (void *)((char *)flow_mbuf(f) + i * PAGE_SIZE); + if (i == num_of_iov - 1) + iov_arr[i].iov_len = len - i * opts->iov_len; + else + iov_arr[i].iov_len = opts->iov_len; + } + msg.msg_iov = iov_arr; + msg.msg_iovlen = num_of_iov; + + sqe = io_uring_get_sqe(&t->ring); + io_uring_prep_sendmsg(sqe, flow_fd(f), &msg, flags); + int ret = flow_uring_submit_sqe(flow_get_user_data(f), sqe); + free(iov_arr); + return ret; + } + + sqe = io_uring_get_sqe(&t->ring); + if (t->fn->fn_type == SOCK_DGRAM) { + rr->iov.iov_base = (void*) flow_mbuf(f); + rr->iov.iov_len = len; + io_uring_prep_sendmsg(sqe, flow_fd(f), &rr->msg, flags); + } else { + io_uring_prep_send(sqe, flow_fd(f), flow_mbuf(f), len, flags); + } + return flow_uring_submit_sqe(flow_get_user_data(f), sqe); } +#endif - rr->rr_xfer -= n; - if (rr->rr_xfer) + ssize_t n = rr_send(f, flow_mbuf(f), len, flags); +#ifdef WITH_IO_URING + if (t->use_uring && n == 1) return false; +#endif - // Transition to receiving. - rr->rr_xfer = rr_recv_size(t); - return true; + return rr_do_send_done(f, n); } -static bool rr_do_recv(struct flow *f, uint32_t events, rr_recv_t rr_recv) -{ +static inline bool rr_do_recv_done(struct flow *f, int n) { struct thread *t = flow_thread(f); const struct options *opts = t->opts; struct rr_state *rr = flow_opaque(f); - if (events & ~(NEPER_EPOLL_MASK | EPOLLIN)) - LOG_ERROR(t->cb, "%s(): unknown event(s) %x", __func__, events); - - if (events & NEPER_EPOLL_MASK) { - flow_delete(f); - return false; - } - - ssize_t len = rr->rr_xfer; - - if (len > opts->buffer_size) - len = opts->buffer_size; - - ssize_t n; - do { - n = rr_recv(f, flow_mbuf(f), len); - } while(n == -1 && errno == EINTR); - if (n == -1) { PLOG_ERROR(t->cb, "read"); return false; } - if (n == 0) { + if (n == 0 +#ifdef WITH_IO_URING + || (t->use_uring && n == -ECONNRESET) +#endif + ) { flow_delete(f); return false; } @@ -313,6 +405,63 @@ static bool rr_do_recv(struct flow *f, uint32_t events, rr_recv_t rr_recv) return true; } +static bool rr_do_recv(struct flow *f, uint32_t events, rr_recv_t rr_recv) +{ + struct thread *t = flow_thread(f); + const struct options *opts = t->opts; + struct rr_state *rr = flow_opaque(f); + +#ifdef WITH_IO_URING + if (!t->use_uring) +#endif + { + /* EPOLLERR might be marked if zerocopy is used, but should + * already be processed before reaching here. + */ + if (events & ~(NEPER_EPOLL_MASK | EPOLLIN)) + LOG_ERROR(t->cb, + "%s(): unknown event(s) %x", __func__, events); + + if (events & NEPER_EPOLL_MASK) { + flow_delete(f); + return false; + } + } + + ssize_t len = rr->rr_xfer; + + if (len > opts->buffer_size) + len = opts->buffer_size; + +#ifdef WITH_IO_URING + if (t->use_uring) { + struct io_uring_sqe *sqe; + + sqe = io_uring_get_sqe(&t->ring); + if (t->fn->fn_type == SOCK_DGRAM) { + rr->iov.iov_base = (void*) flow_mbuf(f); + rr->iov.iov_len = len; + io_uring_prep_recvmsg(sqe, flow_fd(f), &rr->msg, 0); + } else { + io_uring_prep_recv(sqe, flow_fd(f), flow_mbuf(f), len, 0); + } + return flow_uring_submit_sqe(flow_get_user_data(f), sqe); + } +#endif + + ssize_t n; + do { + n = rr_recv(f, flow_mbuf(f), len); + } while(n == -1 && errno == EINTR); + +#ifdef WITH_IO_URING + if (t->use_uring && n == 1) + return false; +#endif + + return rr_do_recv_done(f, n); +} + static void rr_snapshot(struct thread *t, struct neper_stat *stat, struct neper_snap *snap) { @@ -371,6 +520,34 @@ static bool rr_do_compl(struct flow *f, /* The state machine for RR clients: */ +#ifdef WITH_IO_URING +static void rr_client_state_1_done(struct flow_uring_req *f_req, int events) +{ + struct flow *f = f_req->f; + struct thread *t = flow_thread(f); + + if (t->opts->tx_zerocopy && (events & EPOLLERR)) { + do_recvmsg_errqueue(t, f, events); + return; + } + + if (rr_do_recv_done(f, events)) { + struct rr_state *rr = flow_opaque(f); + + if (rr_do_compl(f, &rr->rr_ts_1, &rr->rr_ts_2)) + return; + + bool sent = !t->opts->delay + && !t->opts->noburst + && rr_do_send(f, EPOLLOUT, rr_fn_send); + if (sent) + return; + + rr_client_state_0(f, 0); + } +} +#endif + static void rr_client_state_1(struct flow *f, uint32_t events) { struct thread *t = flow_thread(f); @@ -394,7 +571,22 @@ static void rr_client_state_1(struct flow *f, uint32_t events) flow_mod(f, rr_client_state_0, EPOLLOUT, true); } +#ifdef WITH_IO_URING + else if (t->use_uring) { + struct flow_uring_req *f_req = flow_get_user_data(f); + f_req->cb = rr_client_state_1_done; + } +#endif +} + +#ifdef WITH_IO_URING +static void rr_client_state_0_done(struct flow_uring_req *f_req, int events) +{ + struct flow *f = f_req->f; + if (rr_do_send_done(f, events)) + rr_client_state_1(f, 0); } +#endif static void rr_client_state_0(struct flow *f, uint32_t events) { @@ -413,10 +605,32 @@ static void rr_client_state_0(struct flow *f, uint32_t events) return; if (rr_do_send(f, events, rr_fn_send)) flow_mod(f, rr_client_state_1, EPOLLIN, true); +#ifdef WITH_IO_URING + else if (t->use_uring) { + struct flow_uring_req *f_req = flow_get_user_data(f); + f_req->cb = rr_client_state_0_done; + } +#endif } /* The state machine for CRR clients: */ +#ifdef WITH_IO_URING +static void crr_client_state_1_done(struct flow_uring_req *f_req, int events) +{ + struct flow *f = f_req->f; + + if (rr_do_recv_done(f, events)) { + struct rr_state *rr = flow_opaque(f); + + if (rr_do_compl(f, &rr->rr_ts_0, &rr->rr_ts_2)) + return; + flow_reconnect(f, crr_client_state_0, EPOLLOUT); + crr_client_state_0(f, 0); + } +} +#endif + static void crr_client_state_1(struct flow *f, uint32_t events) { struct thread *t = flow_thread(f); @@ -433,7 +647,22 @@ static void crr_client_state_1(struct flow *f, uint32_t events) return; flow_reconnect(f, crr_client_state_0, EPOLLOUT); } +#ifdef WITH_IO_URING + else if (t->use_uring) { + struct flow_uring_req *f_req = flow_get_user_data(f); + f_req->cb = crr_client_state_1_done; + } +#endif +} + +#ifdef WITH_IO_URING +static void crr_client_state_0_done(struct flow_uring_req *f_req, int events) +{ + struct flow *f = f_req->f; + if (rr_do_send_done(f, events)) + crr_client_state_1(f, 0); } +#endif static void crr_client_state_0(struct flow *f, uint32_t events) { @@ -450,10 +679,35 @@ static void crr_client_state_0(struct flow *f, uint32_t events) } if (rr_do_send(f, events, rr_fn_send)) flow_mod(f, crr_client_state_1, EPOLLIN, true); +#ifdef WITH_IO_URING + else if (t->use_uring) { + struct flow_uring_req *f_req = flow_get_user_data(f); + f_req->cb = crr_client_state_0_done; + } +#endif } /* The state machine for servers: */ +#ifdef WITH_IO_URING +static void rr_server_state_2_done(struct flow_uring_req *f_req, int events) +{ + struct flow *f = f_req->f; + struct thread *t = flow_thread(f); + struct neper_stat *stat = flow_stat(f); + struct neper_histo *histo = stat ? stat->histo(stat) : NULL; + + if (rr_do_send_done(f, events)) { + if (stat) { + /* rr server has no meaningful latency to measure. */ + neper_histo_event(histo, 0.0); + stat->event(t, stat, 1, false, rr_snapshot); + } + rr_server_state_0(f, 0); + } +} +#endif + static void rr_server_state_2(struct flow *f, uint32_t events) { struct rr_state *rr = flow_opaque(f); @@ -474,12 +728,33 @@ static void rr_server_state_2(struct flow *f, uint32_t events) } flow_mod(f, rr_server_state_0, EPOLLIN, false); } +#ifdef WITH_IO_URING + else if (t->use_uring) { + struct flow_uring_req *f_req = flow_get_user_data(f); + f_req->cb = rr_server_state_2_done; + } +#endif } static void rr_server_state_1(struct flow *f) { - flow_mod(f, rr_server_state_2, EPOLLOUT, false); + struct thread *t = flow_thread(f); +#ifdef WITH_IO_URING + if (t->use_uring) + rr_server_state_2(f, 0); + else +#endif + flow_mod(f, rr_server_state_2, EPOLLOUT, false); +} + +#ifdef WITH_IO_URING +static void rr_server_state_0_done(struct flow_uring_req *f_req, int events) +{ + struct flow *f = f_req->f; + if (rr_do_recv_done(f, events)) + rr_server_state_2(f, 0); } +#endif static void rr_server_state_0(struct flow *f, uint32_t events) { @@ -493,6 +768,12 @@ static void rr_server_state_0(struct flow *f, uint32_t events) if (rr_do_recv(f, events, rr->rr_recv)) rr_server_state_1(f); +#ifdef WITH_IO_URING + else if (t->use_uring) { + struct flow_uring_req *f_req = flow_get_user_data(f); + f_req->cb = rr_server_state_0_done; + } +#endif } /* These functions point the state machines at their first handler functions. */ @@ -529,11 +810,6 @@ void rr_flow_init(struct thread *t, int fd) rr_state_init(t, fd, state, event); } -/* - * Statistics. Ignore everything below this line, which (a) has not been - * changed and (b) is about to be completely replaced. - */ - static void rr_print_snap(struct thread *t, int flow_index, const struct neper_snap *snap, FILE *csv) { diff --git a/tcp_crr.c b/tcp_crr.c index 85150ae..f22525b 100644 --- a/tcp_crr.c +++ b/tcp_crr.c @@ -41,6 +41,10 @@ int tcp_crr(struct options *opts, struct callbacks *cb) /* tcp_crr server doesn't collect stats, as it uses too much memory. */ if (!opts->client) opts->nostats = true; +#ifdef WITH_IO_URING + if (opts->use_uring) + io_uring_init_main_ring(opts); +#endif return run_main_thread(opts, cb, fn); } /* clang-format on */ diff --git a/tcp_rr.c b/tcp_rr.c index 98334fe..910fdd4 100644 --- a/tcp_rr.c +++ b/tcp_rr.c @@ -38,6 +38,10 @@ static const struct neper_fn server_fn = { int tcp_rr(struct options *opts, struct callbacks *cb) { const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; +#ifdef WITH_IO_URING + if (opts->use_uring) + io_uring_init_main_ring(opts); +#endif return run_main_thread(opts, cb, fn); } /* clang-format on */ diff --git a/tcp_stream.c b/tcp_stream.c index b382058..0f0598c 100644 --- a/tcp_stream.c +++ b/tcp_stream.c @@ -38,6 +38,10 @@ static const struct neper_fn server_fn = { int tcp_stream(struct options *opts, struct callbacks *cb) { const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; +#ifdef WITH_IO_URING + if (opts->use_uring) + io_uring_init_main_ring(opts); +#endif return run_main_thread(opts, cb, fn); } /* clang-format on */ diff --git a/thread.c b/thread.c index 7665e7a..ef605cd 100644 --- a/thread.c +++ b/thread.c @@ -418,7 +418,11 @@ void start_worker_threads(struct options *opts, struct callbacks *cb, t[i].fn = fn; t[i].ai_socktype = fn->fn_type; t[i].ai = copy_addrinfo(ai); - t[i].epfd = epoll_create1_or_die(cb); +#ifdef WITH_IO_URING + t[i].use_uring = opts->use_uring; + if (!t[i].use_uring) +#endif + t[i].epfd = epoll_create1_or_die(cb); t[i].stop_efd = eventfd(0, 0); if (t[i].stop_efd == -1) PLOG_FATAL(cb, "eventfd"); @@ -455,6 +459,15 @@ void start_worker_threads(struct options *opts, struct callbacks *cb, t[i].rl.next_event = ~0ULL; t[i].rl.next_noburst_slot = 0; +#ifdef WITH_IO_URING + if (t[i].use_uring) { + /* 128 is a random guess based on maxevents default 1000 */ + s = io_uring_queue_init(128, &t[i].ring, 0); + if (s < 0) + LOG_FATAL(cb, "io_uring_queue_init: %s", strerror(-s)); + } +#endif + s = pthread_create(&t[i].id, &attr, thread_func, &t[i]); if (s != 0) LOG_FATAL(cb, "pthread_create: %s", strerror(s)); @@ -549,6 +562,10 @@ static void free_worker_threads(int num_threads, struct thread *t) int i; for (i = 0; i < num_threads; i++) { +#ifdef WITH_IO_URING + if (t[i].use_uring) + io_uring_queue_exit(&t[i].ring); +#endif do_close(t[i].stop_efd); free(t[i].ai); t[i].rusage->fini(t[i].rusage); diff --git a/thread.h b/thread.h index 21b58f2..3cbd50e 100644 --- a/thread.h +++ b/thread.h @@ -22,6 +22,11 @@ #include #include +#ifdef WITH_IO_URING +#include +#include +#endif + #include "lib.h" struct addrinfo; @@ -138,6 +143,11 @@ struct thread { int64_t gap_ns; struct rtt_log *rtt_logs; long rtt_log_capacity; + +#ifdef WITH_IO_URING + int use_uring; + struct io_uring ring; +#endif }; int thread_stats_events(const struct thread *); diff --git a/udp_rr.c b/udp_rr.c index 101f330..559b9d6 100644 --- a/udp_rr.c +++ b/udp_rr.c @@ -38,6 +38,10 @@ static const struct neper_fn server_fn = { int udp_rr(struct options *opts, struct callbacks *cb) { const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; +#ifdef WITH_IO_URING + if (opts->use_uring) + io_uring_init_main_ring(opts); +#endif return run_main_thread(opts, cb, fn); } /* clang-format on */ diff --git a/udp_stream.c b/udp_stream.c index fa52b2c..7a7e285 100644 --- a/udp_stream.c +++ b/udp_stream.c @@ -38,6 +38,10 @@ static const struct neper_fn server_fn = { int udp_stream(struct options *opts, struct callbacks *cb) { const struct neper_fn *fn = opts->client ? &client_fn : &server_fn; +#ifdef WITH_IO_URING + if (opts->use_uring) + io_uring_init_main_ring(opts); +#endif return run_main_thread(opts, cb, fn); } /* clang-format on */