boost/asio/detail/impl/io_uring_service.ipp
//
// detail/impl/io_uring_service.ipp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
#define BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include <boost/asio/detail/config.hpp>
#if defined(BOOST_ASIO_HAS_IO_URING)
#include <cstddef>
#include <sys/eventfd.h>
#include <boost/asio/detail/io_uring_service.hpp>
#include <boost/asio/detail/reactor_op.hpp>
#include <boost/asio/detail/scheduler.hpp>
#include <boost/asio/detail/throw_error.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/detail/push_options.hpp>
namespace boost {
namespace asio {
namespace detail {
io_uring_service::io_uring_service(boost::asio::execution_context& ctx)
: execution_context_service_base<io_uring_service>(ctx),
scheduler_(use_service<scheduler>(ctx)),
mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
outstanding_work_(0),
submit_sqes_op_(this),
pending_sqes_(0),
pending_submit_sqes_op_(false),
shutdown_(false),
timeout_(),
registration_mutex_(mutex_.enabled()),
reactor_(use_service<reactor>(ctx)),
reactor_data_(),
event_fd_(-1)
{
reactor_.init_task();
init_ring();
register_with_reactor();
}
io_uring_service::~io_uring_service()
{
if (ring_.ring_fd != -1)
::io_uring_queue_exit(&ring_);
if (event_fd_ != -1)
::close(event_fd_);
}
void io_uring_service::shutdown()
{
mutex::scoped_lock lock(mutex_);
shutdown_ = true;
lock.unlock();
op_queue<operation> ops;
// Cancel all outstanding operations.
while (io_object* io_obj = registered_io_objects_.first())
{
for (int i = 0; i < max_ops; ++i)
{
if (!io_obj->queues_[i].op_queue_.empty())
{
ops.push(io_obj->queues_[i].op_queue_);
if (::io_uring_sqe* sqe = get_sqe())
::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
}
}
io_obj->shutdown_ = true;
registered_io_objects_.free(io_obj);
}
// Cancel the timeout operation.
if (::io_uring_sqe* sqe = get_sqe())
::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
submit_sqes();
// Wait for all completions to come back.
for (; outstanding_work_ > 0; --outstanding_work_)
{
::io_uring_cqe* cqe = 0;
if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
break;
}
timer_queues_.get_all_timers(ops);
scheduler_.abandon_operations(ops);
}
void io_uring_service::notify_fork(
boost::asio::execution_context::fork_event fork_ev)
{
switch (fork_ev)
{
case boost::asio::execution_context::fork_prepare:
{
// Cancel all outstanding operations. They will be restarted
// after the fork completes.
mutex::scoped_lock registration_lock(registration_mutex_);
for (io_object* io_obj = registered_io_objects_.first();
io_obj != 0; io_obj = io_obj->next_)
{
mutex::scoped_lock io_object_lock(io_obj->mutex_);
for (int i = 0; i < max_ops; ++i)
{
if (!io_obj->queues_[i].op_queue_.empty()
&& !io_obj->queues_[i].cancel_requested_)
{
mutex::scoped_lock lock(mutex_);
if (::io_uring_sqe* sqe = get_sqe())
::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
}
}
}
// Cancel the timeout operation.
{
mutex::scoped_lock lock(mutex_);
if (::io_uring_sqe* sqe = get_sqe())
::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
submit_sqes();
}
// Wait for all completions to come back, and post all completed I/O
// queues to the scheduler. Note that some operations may have already
// completed, or were explicitly cancelled. All others will be
// automatically restarted.
op_queue<operation> ops;
for (; outstanding_work_ > 0; --outstanding_work_)
{
::io_uring_cqe* cqe = 0;
if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
break;
if (void* ptr = ::io_uring_cqe_get_data(cqe))
{
if (ptr != this && ptr != &timer_queues_ && ptr != &timeout_)
{
io_queue* io_q = static_cast<io_queue*>(ptr);
io_q->set_result(cqe->res);
ops.push(io_q);
}
}
}
scheduler_.post_deferred_completions(ops);
// Restart and eventfd operation.
register_with_reactor();
}
break;
case boost::asio::execution_context::fork_parent:
// Restart the timeout and eventfd operations.
update_timeout();
register_with_reactor();
break;
case boost::asio::execution_context::fork_child:
{
// The child process gets a new io_uring instance.
::io_uring_queue_exit(&ring_);
init_ring();
register_with_reactor();
}
break;
default:
break;
}
}
void io_uring_service::init_task()
{
scheduler_.init_task();
}
void io_uring_service::register_io_object(
io_uring_service::per_io_object_data& io_obj)
{
io_obj = allocate_io_object();
mutex::scoped_lock io_object_lock(io_obj->mutex_);
io_obj->service_ = this;
io_obj->shutdown_ = false;
for (int i = 0; i < max_ops; ++i)
{
io_obj->queues_[i].io_object_ = io_obj;
io_obj->queues_[i].cancel_requested_ = false;
}
}
void io_uring_service::register_internal_io_object(
io_uring_service::per_io_object_data& io_obj,
int op_type, io_uring_operation* op)
{
io_obj = allocate_io_object();
mutex::scoped_lock io_object_lock(io_obj->mutex_);
io_obj->service_ = this;
io_obj->shutdown_ = false;
for (int i = 0; i < max_ops; ++i)
{
io_obj->queues_[i].io_object_ = io_obj;
io_obj->queues_[i].cancel_requested_ = false;
}
io_obj->queues_[op_type].op_queue_.push(op);
io_object_lock.unlock();
mutex::scoped_lock lock(mutex_);
if (::io_uring_sqe* sqe = get_sqe())
{
op->prepare(sqe);
::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
post_submit_sqes_op(lock);
}
else
{
boost::system::error_code ec(ENOBUFS,
boost::asio::error::get_system_category());
boost::asio::detail::throw_error(ec, "io_uring_get_sqe");
}
}
void io_uring_service::register_buffers(const ::iovec* v, unsigned n)
{
int result = ::io_uring_register_buffers(&ring_, v, n);
if (result < 0)
{
boost::system::error_code ec(-result,
boost::asio::error::get_system_category());
boost::asio::detail::throw_error(ec, "io_uring_register_buffers");
}
}
void io_uring_service::unregister_buffers()
{
(void)::io_uring_unregister_buffers(&ring_);
}
void io_uring_service::start_op(int op_type,
io_uring_service::per_io_object_data& io_obj,
io_uring_operation* op, bool is_continuation)
{
if (!io_obj)
{
op->ec_ = boost::asio::error::bad_descriptor;
post_immediate_completion(op, is_continuation);
return;
}
mutex::scoped_lock io_object_lock(io_obj->mutex_);
if (io_obj->shutdown_)
{
io_object_lock.unlock();
post_immediate_completion(op, is_continuation);
return;
}
if (io_obj->queues_[op_type].op_queue_.empty())
{
if (op->perform(false))
{
io_object_lock.unlock();
scheduler_.post_immediate_completion(op, is_continuation);
}
else
{
io_obj->queues_[op_type].op_queue_.push(op);
io_object_lock.unlock();
mutex::scoped_lock lock(mutex_);
if (::io_uring_sqe* sqe = get_sqe())
{
op->prepare(sqe);
::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
scheduler_.work_started();
post_submit_sqes_op(lock);
}
else
{
lock.unlock();
io_obj->queues_[op_type].set_result(-ENOBUFS);
post_immediate_completion(&io_obj->queues_[op_type], is_continuation);
}
}
}
else
{
io_obj->queues_[op_type].op_queue_.push(op);
scheduler_.work_started();
}
}
void io_uring_service::cancel_ops(io_uring_service::per_io_object_data& io_obj)
{
if (!io_obj)
return;
mutex::scoped_lock io_object_lock(io_obj->mutex_);
op_queue<operation> ops;
do_cancel_ops(io_obj, ops);
io_object_lock.unlock();
scheduler_.post_deferred_completions(ops);
}
void io_uring_service::cancel_ops_by_key(
io_uring_service::per_io_object_data& io_obj,
int op_type, void* cancellation_key)
{
if (!io_obj)
return;
mutex::scoped_lock io_object_lock(io_obj->mutex_);
bool first = true;
op_queue<operation> ops;
op_queue<io_uring_operation> other_ops;
while (io_uring_operation* op = io_obj->queues_[op_type].op_queue_.front())
{
io_obj->queues_[op_type].op_queue_.pop();
if (op->cancellation_key_ == cancellation_key)
{
if (first)
{
other_ops.push(op);
if (!io_obj->queues_[op_type].cancel_requested_)
{
io_obj->queues_[op_type].cancel_requested_ = true;
mutex::scoped_lock lock(mutex_);
if (::io_uring_sqe* sqe = get_sqe())
{
::io_uring_prep_cancel(sqe, &io_obj->queues_[op_type], 0);
submit_sqes();
}
}
}
else
{
op->ec_ = boost::asio::error::operation_aborted;
ops.push(op);
}
}
else
other_ops.push(op);
first = false;
}
io_obj->queues_[op_type].op_queue_.push(other_ops);
io_object_lock.unlock();
scheduler_.post_deferred_completions(ops);
}
void io_uring_service::deregister_io_object(
io_uring_service::per_io_object_data& io_obj)
{
if (!io_obj)
return;
mutex::scoped_lock io_object_lock(io_obj->mutex_);
if (!io_obj->shutdown_)
{
op_queue<operation> ops;
do_cancel_ops(io_obj, ops);
io_obj->shutdown_ = true;
io_object_lock.unlock();
scheduler_.post_deferred_completions(ops);
// Leave io_obj set so that it will be freed by the subsequent
// call to cleanup_io_obj.
}
else
{
// We are shutting down, so prevent cleanup_io_object from freeing
// the I/O object and let the destructor free it instead.
io_obj = 0;
}
}
void io_uring_service::cleanup_io_object(
io_uring_service::per_io_object_data& io_obj)
{
if (io_obj)
{
free_io_object(io_obj);
io_obj = 0;
}
}
void io_uring_service::run(long usec, op_queue<operation>& ops)
{
__kernel_timespec ts;
int local_ops = 0;
if (usec > 0)
{
ts.tv_sec = usec / 1000000;
ts.tv_nsec = (usec % 1000000) * 1000;
mutex::scoped_lock lock(mutex_);
if (::io_uring_sqe* sqe = get_sqe())
{
++local_ops;
::io_uring_prep_timeout(sqe, &ts, 0, 0);
::io_uring_sqe_set_data(sqe, &ts);
submit_sqes();
}
}
::io_uring_cqe* cqe = 0;
int result = (usec == 0)
? ::io_uring_peek_cqe(&ring_, &cqe)
: ::io_uring_wait_cqe(&ring_, &cqe);
if (result == 0 && usec > 0)
{
if (::io_uring_cqe_get_data(cqe) != &ts)
{
mutex::scoped_lock lock(mutex_);
if (::io_uring_sqe* sqe = get_sqe())
{
++local_ops;
::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0);
submit_sqes();
}
}
}
bool check_timers = false;
int count = 0;
while (result == 0)
{
if (void* ptr = ::io_uring_cqe_get_data(cqe))
{
if (ptr == this)
{
// The io_uring service was interrupted.
}
else if (ptr == &timer_queues_)
{
check_timers = true;
}
else if (ptr == &timeout_)
{
check_timers = true;
timeout_.tv_sec = 0;
timeout_.tv_nsec = 0;
}
else if (ptr == &ts)
{
--local_ops;
}
else
{
io_queue* io_q = static_cast<io_queue*>(ptr);
io_q->set_result(cqe->res);
ops.push(io_q);
}
}
::io_uring_cqe_seen(&ring_, cqe);
result = (++count < complete_batch_size || local_ops > 0)
? ::io_uring_peek_cqe(&ring_, &cqe) : -EAGAIN;
}
decrement(outstanding_work_, count);
if (check_timers)
{
mutex::scoped_lock lock(mutex_);
timer_queues_.get_ready_timers(ops);
if (timeout_.tv_sec == 0 && timeout_.tv_nsec == 0)
{
timeout_ = get_timeout();
if (::io_uring_sqe* sqe = get_sqe())
{
::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
::io_uring_sqe_set_data(sqe, &timeout_);
push_submit_sqes_op(ops);
}
}
}
}
void io_uring_service::interrupt()
{
mutex::scoped_lock lock(mutex_);
if (::io_uring_sqe* sqe = get_sqe())
{
::io_uring_prep_nop(sqe);
::io_uring_sqe_set_data(sqe, this);
}
submit_sqes();
}
void io_uring_service::init_ring()
{
int result = ::io_uring_queue_init(ring_size, &ring_, 0);
if (result < 0)
{
ring_.ring_fd = -1;
boost::system::error_code ec(-result,
boost::asio::error::get_system_category());
boost::asio::detail::throw_error(ec, "io_uring_queue_init");
}
#if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
event_fd_ = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (event_fd_ < 0)
{
boost::system::error_code ec(-result,
boost::asio::error::get_system_category());
::io_uring_queue_exit(&ring_);
boost::asio::detail::throw_error(ec, "eventfd");
}
result = ::io_uring_register_eventfd(&ring_, event_fd_);
if (result < 0)
{
::close(event_fd_);
::io_uring_queue_exit(&ring_);
boost::system::error_code ec(-result,
boost::asio::error::get_system_category());
boost::asio::detail::throw_error(ec, "io_uring_queue_init");
}
#endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
}
#if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
class io_uring_service::event_fd_read_op :
public reactor_op
{
public:
event_fd_read_op(io_uring_service* s)
: reactor_op(boost::system::error_code(),
&event_fd_read_op::do_perform, event_fd_read_op::do_complete),
service_(s)
{
}
static status do_perform(reactor_op* base)
{
event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
for (;;)
{
// Only perform one read. The kernel maintains an atomic counter.
uint64_t counter(0);
errno = 0;
int bytes_read = ::read(o->service_->event_fd_,
&counter, sizeof(uint64_t));
if (bytes_read < 0 && errno == EINTR)
continue;
break;
}
op_queue<operation> ops;
o->service_->run(0, ops);
o->service_->scheduler_.post_deferred_completions(ops);
return not_done;
}
static void do_complete(void* /*owner*/, operation* base,
const boost::system::error_code& /*ec*/,
std::size_t /*bytes_transferred*/)
{
event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
delete o;
}
private:
io_uring_service* service_;
};
#endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
void io_uring_service::register_with_reactor()
{
#if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
reactor_.register_internal_descriptor(reactor::read_op,
event_fd_, reactor_data_, new event_fd_read_op(this));
#endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
}
io_uring_service::io_object* io_uring_service::allocate_io_object()
{
mutex::scoped_lock registration_lock(registration_mutex_);
return registered_io_objects_.alloc(
BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
REACTOR_IO, scheduler_.concurrency_hint()));
}
void io_uring_service::free_io_object(io_uring_service::io_object* io_obj)
{
mutex::scoped_lock registration_lock(registration_mutex_);
registered_io_objects_.free(io_obj);
}
void io_uring_service::do_cancel_ops(
per_io_object_data& io_obj, op_queue<operation>& ops)
{
bool cancel_op = false;
for (int i = 0; i < max_ops; ++i)
{
if (io_uring_operation* first_op = io_obj->queues_[i].op_queue_.front())
{
cancel_op = true;
io_obj->queues_[i].op_queue_.pop();
while (io_uring_operation* op = io_obj->queues_[i].op_queue_.front())
{
op->ec_ = boost::asio::error::operation_aborted;
io_obj->queues_[i].op_queue_.pop();
ops.push(op);
}
io_obj->queues_[i].op_queue_.push(first_op);
}
}
if (cancel_op)
{
mutex::scoped_lock lock(mutex_);
for (int i = 0; i < max_ops; ++i)
{
if (!io_obj->queues_[i].op_queue_.empty()
&& !io_obj->queues_[i].cancel_requested_)
{
io_obj->queues_[i].cancel_requested_ = true;
if (::io_uring_sqe* sqe = get_sqe())
::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
}
}
submit_sqes();
}
}
void io_uring_service::do_add_timer_queue(timer_queue_base& queue)
{
mutex::scoped_lock lock(mutex_);
timer_queues_.insert(&queue);
}
void io_uring_service::do_remove_timer_queue(timer_queue_base& queue)
{
mutex::scoped_lock lock(mutex_);
timer_queues_.erase(&queue);
}
void io_uring_service::update_timeout()
{
if (::io_uring_sqe* sqe = get_sqe())
{
::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&timeout_), 0);
::io_uring_sqe_set_data(sqe, &timer_queues_);
}
}
__kernel_timespec io_uring_service::get_timeout() const
{
__kernel_timespec ts;
long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
ts.tv_sec = usec / 1000000;
ts.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
return ts;
}
::io_uring_sqe* io_uring_service::get_sqe()
{
::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_);
if (!sqe)
{
submit_sqes();
sqe = ::io_uring_get_sqe(&ring_);
}
if (sqe)
++pending_sqes_;
return sqe;
}
void io_uring_service::submit_sqes()
{
if (pending_sqes_ != 0)
{
int result = ::io_uring_submit(&ring_);
if (result > 0)
{
pending_sqes_ -= result;
increment(outstanding_work_, result);
}
}
}
void io_uring_service::post_submit_sqes_op(mutex::scoped_lock& lock)
{
if (pending_sqes_ >= submit_batch_size)
{
submit_sqes();
}
else if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
{
pending_submit_sqes_op_ = true;
lock.unlock();
scheduler_.post_immediate_completion(&submit_sqes_op_, false);
}
}
void io_uring_service::push_submit_sqes_op(op_queue<operation>& ops)
{
if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
{
pending_submit_sqes_op_ = true;
ops.push(&submit_sqes_op_);
scheduler_.compensating_work_started();
}
}
io_uring_service::submit_sqes_op::submit_sqes_op(io_uring_service* s)
: operation(&io_uring_service::submit_sqes_op::do_complete),
service_(s)
{
}
void io_uring_service::submit_sqes_op::do_complete(void* owner, operation* base,
const boost::system::error_code& /*ec*/, std::size_t /*bytes_transferred*/)
{
if (owner)
{
submit_sqes_op* o = static_cast<submit_sqes_op*>(base);
mutex::scoped_lock lock(o->service_->mutex_);
o->service_->submit_sqes();
if (o->service_->pending_sqes_ != 0)
o->service_->scheduler_.post_immediate_completion(o, true);
else
o->service_->pending_submit_sqes_op_ = false;
}
}
io_uring_service::io_queue::io_queue()
: operation(&io_uring_service::io_queue::do_complete)
{
}
struct io_uring_service::perform_io_cleanup_on_block_exit
{
explicit perform_io_cleanup_on_block_exit(io_uring_service* s)
: service_(s), first_op_(0)
{
}
~perform_io_cleanup_on_block_exit()
{
if (first_op_)
{
// Post the remaining completed operations for invocation.
if (!ops_.empty())
service_->scheduler_.post_deferred_completions(ops_);
// A user-initiated operation has completed, but there's no need to
// explicitly call work_finished() here. Instead, we'll take advantage of
// the fact that the scheduler will call work_finished() once we return.
}
else
{
// No user-initiated operations have completed, so we need to compensate
// for the work_finished() call that the scheduler will make once this
// operation returns.
service_->scheduler_.compensating_work_started();
}
}
io_uring_service* service_;
op_queue<operation> ops_;
operation* first_op_;
};
operation* io_uring_service::io_queue::perform_io(int result)
{
perform_io_cleanup_on_block_exit io_cleanup(io_object_->service_);
mutex::scoped_lock io_object_lock(io_object_->mutex_);
if (result != -ECANCELED || cancel_requested_)
{
if (io_uring_operation* op = op_queue_.front())
{
if (result < 0)
{
op->ec_.assign(-result, boost::asio::error::get_system_category());
op->bytes_transferred_ = 0;
}
else
{
op->ec_.assign(0, op->ec_.category());
op->bytes_transferred_ = static_cast<std::size_t>(result);
}
}
while (io_uring_operation* op = op_queue_.front())
{
if (op->perform(io_cleanup.ops_.empty()))
{
op_queue_.pop();
io_cleanup.ops_.push(op);
}
else
break;
}
}
cancel_requested_ = false;
if (!op_queue_.empty())
{
io_uring_service* service = io_object_->service_;
mutex::scoped_lock lock(service->mutex_);
if (::io_uring_sqe* sqe = service->get_sqe())
{
op_queue_.front()->prepare(sqe);
::io_uring_sqe_set_data(sqe, this);
service->post_submit_sqes_op(lock);
}
else
{
lock.unlock();
while (io_uring_operation* op = op_queue_.front())
{
op->ec_ = boost::asio::error::no_buffer_space;
op_queue_.pop();
io_cleanup.ops_.push(op);
}
}
}
// The first operation will be returned for completion now. The others will
// be posted for later by the io_cleanup object's destructor.
io_cleanup.first_op_ = io_cleanup.ops_.front();
io_cleanup.ops_.pop();
return io_cleanup.first_op_;
}
void io_uring_service::io_queue::do_complete(void* owner, operation* base,
const boost::system::error_code& ec, std::size_t bytes_transferred)
{
if (owner)
{
io_queue* io_q = static_cast<io_queue*>(base);
int result = static_cast<int>(bytes_transferred);
if (operation* op = io_q->perform_io(result))
{
op->complete(owner, ec, 0);
}
}
}
io_uring_service::io_object::io_object(bool locking)
: mutex_(locking)
{
}
} // namespace detail
} // namespace asio
} // namespace boost
#include <boost/asio/detail/pop_options.hpp>
#endif // defined(BOOST_ASIO_HAS_IO_URING)
#endif // BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP