Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions include/asio/detail/impl/io_uring_service.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
#ifndef ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
#define ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP

#if defined(__SANITIZE_THREAD__) || (defined(__has_feature) && __has_feature(thread_sanitizer))
# include <sanitizer/tsan_interface.h>
# define TSAN_ACQUIRE(addr) __tsan_acquire(addr)
# define TSAN_RELEASE(addr) __tsan_release(addr)
#else
# define TSAN_ACQUIRE(addr)
# define TSAN_RELEASE(addr)
#endif

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
Expand Down Expand Up @@ -156,6 +165,8 @@ void io_uring_service::notify_fork(
break;
if (void* ptr = ::io_uring_cqe_get_data(cqe))
{
TSAN_ACQUIRE(ptr);

if (ptr != this && ptr != &timer_queues_ && ptr != &timeout_)
{
io_queue* io_q = static_cast<io_queue*>(ptr);
Expand Down Expand Up @@ -234,6 +245,9 @@ void io_uring_service::register_internal_io_object(
{
op->prepare(sqe);
::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);

TSAN_RELEASE(&io_obj->queues_[op_type]);

post_submit_sqes_op(lock);
}
else
Expand Down Expand Up @@ -296,6 +310,9 @@ void io_uring_service::start_op(int op_type,
{
op->prepare(sqe);
::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);

TSAN_RELEASE(&io_obj->queues_[op_type]);

scheduler_.work_started();
post_submit_sqes_op(lock);
}
Expand Down Expand Up @@ -433,6 +450,9 @@ void io_uring_service::run(long usec, op_queue<operation>& ops)
++local_ops;
::io_uring_prep_timeout(sqe, &ts, 0, 0);
::io_uring_sqe_set_data(sqe, &ts);

TSAN_RELEASE(&ts);

submit_sqes();
}
}
Expand All @@ -452,6 +472,9 @@ void io_uring_service::run(long usec, op_queue<operation>& ops)
++local_ops;
::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0);
::io_uring_sqe_set_data(sqe, &ts);

TSAN_RELEASE(&ts);

submit_sqes();
}
}
Expand All @@ -465,6 +488,8 @@ void io_uring_service::run(long usec, op_queue<operation>& ops)
{
if (void* ptr = ::io_uring_cqe_get_data(cqe))
{
TSAN_ACQUIRE(ptr);

if (ptr == this)
{
// The io_uring service was interrupted.
Expand Down Expand Up @@ -510,6 +535,9 @@ void io_uring_service::run(long usec, op_queue<operation>& ops)
{
::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
::io_uring_sqe_set_data(sqe, &timeout_);

TSAN_RELEASE(&timeout_);

push_submit_sqes_op(ops);
}
}
Expand All @@ -523,6 +551,8 @@ void io_uring_service::interrupt()
{
::io_uring_prep_nop(sqe);
::io_uring_sqe_set_data(sqe, this);

TSAN_RELEASE(this);
}
submit_sqes();
}
Expand Down Expand Up @@ -686,6 +716,8 @@ void io_uring_service::update_timeout()
{
::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&timeout_), 0);
::io_uring_sqe_set_data(sqe, &timer_queues_);

TSAN_RELEASE(&timer_queues_);
}
}

Expand Down Expand Up @@ -860,6 +892,9 @@ operation* io_uring_service::io_queue::perform_io(int result)
{
op_queue_.front()->prepare(sqe);
::io_uring_sqe_set_data(sqe, this);

TSAN_RELEASE(this);

service->post_submit_sqes_op(lock);
}
else
Expand Down