diff --git a/include/asio/detail/impl/io_uring_service.ipp b/include/asio/detail/impl/io_uring_service.ipp index c43b65d7e8..5c7d7f98ae 100644 --- a/include/asio/detail/impl/io_uring_service.ipp +++ b/include/asio/detail/impl/io_uring_service.ipp @@ -11,6 +11,15 @@ #ifndef ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP #define ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP +#if defined(__SANITIZE_THREAD__) || (defined(__has_feature) && __has_feature(thread_sanitizer)) +# include +# define TSAN_ACQUIRE(addr) __tsan_acquire(addr) +# define TSAN_RELEASE(addr) __tsan_release(addr) +#else +# define TSAN_ACQUIRE(addr) +# define TSAN_RELEASE(addr) +#endif + #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) @@ -156,6 +165,8 @@ void io_uring_service::notify_fork( break; if (void* ptr = ::io_uring_cqe_get_data(cqe)) { + TSAN_ACQUIRE(ptr); + if (ptr != this && ptr != &timer_queues_ && ptr != &timeout_) { io_queue* io_q = static_cast(ptr); @@ -234,6 +245,9 @@ void io_uring_service::register_internal_io_object( { op->prepare(sqe); ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]); + + TSAN_RELEASE(&io_obj->queues_[op_type]); + post_submit_sqes_op(lock); } else @@ -296,6 +310,9 @@ void io_uring_service::start_op(int op_type, { op->prepare(sqe); ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]); + + TSAN_RELEASE(&io_obj->queues_[op_type]); + scheduler_.work_started(); post_submit_sqes_op(lock); } @@ -433,6 +450,9 @@ void io_uring_service::run(long usec, op_queue& ops) ++local_ops; ::io_uring_prep_timeout(sqe, &ts, 0, 0); ::io_uring_sqe_set_data(sqe, &ts); + + TSAN_RELEASE(&ts); + submit_sqes(); } } @@ -452,6 +472,9 @@ void io_uring_service::run(long usec, op_queue& ops) ++local_ops; ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0); ::io_uring_sqe_set_data(sqe, &ts); + + TSAN_RELEASE(&ts); + submit_sqes(); } } @@ -465,6 +488,8 @@ void io_uring_service::run(long usec, op_queue& ops) { if (void* ptr = ::io_uring_cqe_get_data(cqe)) { + TSAN_ACQUIRE(ptr); + if (ptr == this) { // The io_uring service was interrupted. @@ -510,6 +535,9 @@ void io_uring_service::run(long usec, op_queue& ops) { ::io_uring_prep_timeout(sqe, &timeout_, 0, 0); ::io_uring_sqe_set_data(sqe, &timeout_); + + TSAN_RELEASE(&timeout_); + push_submit_sqes_op(ops); } } @@ -523,6 +551,8 @@ void io_uring_service::interrupt() { ::io_uring_prep_nop(sqe); ::io_uring_sqe_set_data(sqe, this); + + TSAN_RELEASE(this); } submit_sqes(); } @@ -686,6 +716,8 @@ void io_uring_service::update_timeout() { ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&timeout_), 0); ::io_uring_sqe_set_data(sqe, &timer_queues_); + + TSAN_RELEASE(&timer_queues_); } } @@ -860,6 +892,9 @@ operation* io_uring_service::io_queue::perform_io(int result) { op_queue_.front()->prepare(sqe); ::io_uring_sqe_set_data(sqe, this); + + TSAN_RELEASE(this); + service->post_submit_sqes_op(lock); } else