Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ext/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
have_header("sys/wait.h")

have_header("sys/eventfd.h")
have_header("sys/signalfd.h")
$srcs << "io/event/interrupt.c"

have_func("rb_io_descriptor")
Expand Down
98 changes: 87 additions & 11 deletions ext/io/event/selector/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "pidfd.c"
#include "../interrupt.h"
#include "process_wait_signalfd.c"

enum {
DEBUG = 0,
Expand Down Expand Up @@ -476,51 +477,126 @@ VALUE process_wait_ensure(VALUE _arguments) {

struct IO_Event_List_Type IO_Event_Selector_EPoll_process_wait_list_type = {};

#ifdef HAVE_SYS_SIGNALFD_H
struct process_wait_signalfd_arguments {
struct IO_Event_Selector_EPoll *selector;
struct IO_Event_Selector_EPoll_Waiting waiting;
pid_t pid;
int flags;
int descriptor;
VALUE fiber;
sigset_t old_mask;
};

static
VALUE process_wait_signalfd_transfer(VALUE _arguments) {
struct process_wait_signalfd_arguments *arguments = (struct process_wait_signalfd_arguments *)_arguments;

while (1) {
arguments->waiting = (struct IO_Event_Selector_EPoll_Waiting){
.list = {.type = &IO_Event_Selector_EPoll_process_wait_list_type},
.fiber = arguments->fiber,
.events = IO_EVENT_READABLE,
};

int result = IO_Event_Selector_EPoll_Waiting_register(arguments->selector, PIDT2NUM(arguments->pid), arguments->descriptor, &arguments->waiting);
if (result == -1) {
rb_sys_fail("process_wait_signalfd:IO_Event_Selector_EPoll_Waiting_register");
}

IO_Event_Selector_loop_yield(&arguments->selector->backend);

IO_Event_Selector_EPoll_Waiting_cancel(&arguments->waiting);

if (!arguments->waiting.ready) return Qfalse;

VALUE status = process_wait_signalfd_check(arguments->descriptor, arguments->pid, arguments->flags);
if (status != Qnil) return status;
}
}

static
VALUE process_wait_signalfd_ensure(VALUE _arguments) {
struct process_wait_signalfd_arguments *arguments = (struct process_wait_signalfd_arguments *)_arguments;

IO_Event_List_free(&arguments->waiting.list);
arguments->waiting.fiber = 0;

process_wait_signalfd_close(arguments->descriptor, &arguments->old_mask);

return Qnil;
}
#endif

VALUE IO_Event_Selector_EPoll_process_wait(VALUE self, VALUE fiber, VALUE _pid, VALUE _flags) {
struct IO_Event_Selector_EPoll *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_EPoll, &IO_Event_Selector_EPoll_Type, selector);

pid_t pid = NUM2PIDT(_pid);
int flags = NUM2INT(_flags);

int descriptor = pidfd_open(pid, 0);

if (descriptor == -1) {
#ifdef HAVE_SYS_SIGNALFD_H
if (errno == EPERM) {
// pidfd_open can fail with EPERM inside confined environments (e.g. snap).
// Fall back to signalfd with SIGCHLD:
VALUE status;
sigset_t old_mask;
int signalfd_descriptor = process_wait_signalfd_open(pid, flags, &old_mask, &status);
if (signalfd_descriptor < 0) return status;

struct process_wait_signalfd_arguments signalfd_arguments = {
.selector = selector,
.waiting = {},
.pid = pid,
.flags = flags,
.descriptor = signalfd_descriptor,
.fiber = fiber,
.old_mask = old_mask,
};

RB_OBJ_WRITTEN(self, Qundef, fiber);

return rb_ensure(process_wait_signalfd_transfer, (VALUE)&signalfd_arguments, process_wait_signalfd_ensure, (VALUE)&signalfd_arguments);
}
#endif
rb_sys_fail("IO_Event_Selector_EPoll_process_wait:pidfd_open");
}

rb_update_max_fd(descriptor);

// `pidfd_open` (above) may be edge triggered, so we need to check if the process is already exited, and if so, return immediately, otherwise we will block indefinitely.
VALUE status = IO_Event_Selector_process_status_wait(pid, flags);
if (status != Qnil) {
close(descriptor);
return status;
}

struct IO_Event_Selector_EPoll_Waiting waiting = {
.list = {.type = &IO_Event_Selector_EPoll_process_wait_list_type},
.fiber = fiber,
.events = IO_EVENT_READABLE,
};

RB_OBJ_WRITTEN(self, Qundef, fiber);

int result = IO_Event_Selector_EPoll_Waiting_register(selector, _pid, descriptor, &waiting);

if (result == -1) {
close(descriptor);
rb_sys_fail("IO_Event_Selector_EPoll_process_wait:IO_Event_Selector_EPoll_Waiting_register");
}

struct process_wait_arguments process_wait_arguments = {
.selector = selector,
.pid = pid,
.flags = flags,
.descriptor = descriptor,
.waiting = &waiting,
};

return rb_ensure(process_wait_transfer, (VALUE)&process_wait_arguments, process_wait_ensure, (VALUE)&process_wait_arguments);
}

Expand Down
64 changes: 64 additions & 0 deletions ext/io/event/selector/process_wait_signalfd.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Released under the MIT License.
// Copyright, 2026, by Samuel Williams.

// Fallback for process_wait when pidfd_open(2) returns EPERM, e.g. inside snap
// confinement (pre-snapd 2.75). Uses signalfd(2) + SIGCHLD instead.
//
// Included (not compiled separately) by epoll.c and uring.c, like pidfd.c.

#ifdef HAVE_SYS_SIGNALFD_H
#include <sys/signalfd.h>
#include <signal.h>

// Block SIGCHLD for this thread and create a signalfd.
//
// If the process has already exited, stores the status in *result and returns -1.
// Otherwise returns the signalfd descriptor (>= 0).
static int
process_wait_signalfd_open(pid_t pid, int flags, sigset_t *old_mask, VALUE *result)
{
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGCHLD);
pthread_sigmask(SIG_BLOCK, &mask, old_mask);

int descriptor = signalfd(-1, &mask, SFD_CLOEXEC | SFD_NONBLOCK);
if (descriptor == -1) {
pthread_sigmask(SIG_SETMASK, old_mask, NULL);
rb_sys_fail("process_wait_signalfd_open:signalfd");
}
rb_update_max_fd(descriptor);

// Check if the process has already exited:
*result = IO_Event_Selector_process_status_wait(pid, flags);
if (*result != Qnil) {
close(descriptor);
pthread_sigmask(SIG_SETMASK, old_mask, NULL);
return -1;
}

return descriptor;
}

// Drain the signalfd and check whether a specific process has exited.
//
// Returns the process status, or Qnil if it hasn't exited yet (the SIGCHLD was
// for a different child).
static VALUE
process_wait_signalfd_check(int descriptor, pid_t pid, int flags)
{
struct signalfd_siginfo info;
while (read(descriptor, &info, sizeof(info)) > 0) {}

return IO_Event_Selector_process_status_wait(pid, flags);
}

// Close the signalfd and restore the original signal mask.
static void
process_wait_signalfd_close(int descriptor, sigset_t *old_mask)
{
close(descriptor);
pthread_sigmask(SIG_SETMASK, old_mask, NULL);
}

#endif
89 changes: 81 additions & 8 deletions ext/io/event/selector/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <time.h>

#include "pidfd.c"
#include "process_wait_signalfd.c"

#include <linux/version.h>

Expand Down Expand Up @@ -457,41 +458,113 @@ VALUE process_wait_ensure(VALUE _arguments) {
return Qnil;
}

#ifdef HAVE_SYS_SIGNALFD_H
struct process_wait_signalfd_arguments {
struct IO_Event_Selector_URing *selector;
struct IO_Event_Selector_URing_Waiting waiting;
pid_t pid;
int flags;
int descriptor;
VALUE fiber;
sigset_t old_mask;
};

static
VALUE process_wait_signalfd_transfer(VALUE _arguments) {
struct process_wait_signalfd_arguments *arguments = (struct process_wait_signalfd_arguments *)_arguments;

while (1) {
arguments->waiting = (struct IO_Event_Selector_URing_Waiting){.fiber = arguments->fiber};

struct IO_Event_Selector_URing_Completion *completion = IO_Event_Selector_URing_Completion_acquire(arguments->selector, &arguments->waiting);

struct io_uring_sqe *sqe = io_get_sqe(arguments->selector);
io_uring_prep_poll_add(sqe, arguments->descriptor, POLLIN|POLLHUP|POLLERR);
io_uring_sqe_set_data(sqe, completion);
io_uring_submit_pending(arguments->selector);

IO_Event_Selector_loop_yield(&arguments->selector->backend);

IO_Event_Selector_URing_Waiting_cancel(&arguments->waiting);

if (!arguments->waiting.result) return Qfalse;

VALUE status = process_wait_signalfd_check(arguments->descriptor, arguments->pid, arguments->flags);
if (status != Qnil) return status;
}
}

static
VALUE process_wait_signalfd_ensure(VALUE _arguments) {
struct process_wait_signalfd_arguments *arguments = (struct process_wait_signalfd_arguments *)_arguments;

IO_Event_Selector_URing_Waiting_cancel(&arguments->waiting);

process_wait_signalfd_close(arguments->descriptor, &arguments->old_mask);

return Qnil;
}
#endif

VALUE IO_Event_Selector_URing_process_wait(VALUE self, VALUE fiber, VALUE _pid, VALUE _flags) {
struct IO_Event_Selector_URing *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);

pid_t pid = NUM2PIDT(_pid);
int flags = NUM2INT(_flags);

int descriptor = pidfd_open(pid, 0);
if (descriptor < 0) {
#ifdef HAVE_SYS_SIGNALFD_H
if (errno == EPERM) {
// pidfd_open can fail with EPERM inside confined environments (e.g. snap).
// Fall back to signalfd with SIGCHLD:
VALUE status;
sigset_t old_mask;
int signalfd_descriptor = process_wait_signalfd_open(pid, flags, &old_mask, &status);
if (signalfd_descriptor < 0) return status;

struct process_wait_signalfd_arguments signalfd_arguments = {
.selector = selector,
.waiting = {},
.pid = pid,
.flags = flags,
.descriptor = signalfd_descriptor,
.fiber = fiber,
.old_mask = old_mask,
};

RB_OBJ_WRITTEN(self, Qundef, fiber);

return rb_ensure(process_wait_signalfd_transfer, (VALUE)&signalfd_arguments, process_wait_signalfd_ensure, (VALUE)&signalfd_arguments);
}
#endif
rb_syserr_fail(errno, "IO_Event_Selector_URing_process_wait:pidfd_open");
}
rb_update_max_fd(descriptor);

struct IO_Event_Selector_URing_Waiting waiting = {
.fiber = fiber,
};

RB_OBJ_WRITTEN(self, Qundef, fiber);

struct IO_Event_Selector_URing_Completion *completion = IO_Event_Selector_URing_Completion_acquire(selector, &waiting);

struct process_wait_arguments process_wait_arguments = {
.selector = selector,
.waiting = &waiting,
.pid = pid,
.flags = flags,
.descriptor = descriptor,
};

if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_poll_add(%p)\n", (void*)fiber);
struct io_uring_sqe *sqe = io_get_sqe(selector);
io_uring_prep_poll_add(sqe, descriptor, POLLIN|POLLHUP|POLLERR);
io_uring_sqe_set_data(sqe, completion);
io_uring_submit_pending(selector);

return rb_ensure(process_wait_transfer, (VALUE)&process_wait_arguments, process_wait_ensure, (VALUE)&process_wait_arguments);
}

Expand Down
1 change: 1 addition & 0 deletions gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@
gem "bake-test"
gem "bake-test-external"
gem "async"
gem "fiddle"
end
Loading