Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions ext/io/event/selector/kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,108 @@ static VALUE IO_Event_Selector_KQueue_io_write_compatible(int argc, VALUE *argv,
return IO_Event_Selector_KQueue_io_write(self, argv[0], argv[1], argv[2], argv[3], _offset);
}

VALUE IO_Event_Selector_KQueue_io_pread(VALUE self, VALUE fiber, VALUE io, VALUE buffer, VALUE _from, VALUE _length, VALUE _offset) {
struct IO_Event_Selector_KQueue *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, selector);

int descriptor = IO_Event_Selector_io_descriptor(io);

void *base;
size_t size;
rb_io_buffer_get_bytes_for_writing(buffer, &base, &size);

off_t from = NUM2OFFT(_from);
size_t length = NUM2SIZET(_length);
size_t offset = NUM2SIZET(_offset);
size_t total = 0;

if (offset > size) {
return rb_fiber_scheduler_io_result(-1, EINVAL);
}

if (offset == size || length == 0) {
return rb_fiber_scheduler_io_result(0, 0);
}

RB_OBJ_WRITTEN(self, Qundef, fiber);

size_t maximum_size = size - offset;
while (maximum_size) {
ssize_t result = pread(descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
offset += result;
from += result;
if ((size_t)result >= length) break;
length -= result;
} else if (result == 0) {
break;
} else if (length > 0 && IO_Event_try_again(errno)) {
IO_Event_Selector_KQueue_io_wait(self, fiber, io, RB_INT2NUM(IO_EVENT_READABLE));
} else {
return rb_fiber_scheduler_io_result(-1, errno);
}

maximum_size = size - offset;
}

return rb_fiber_scheduler_io_result(total, 0);
}

VALUE IO_Event_Selector_KQueue_io_pwrite(VALUE self, VALUE fiber, VALUE io, VALUE buffer, VALUE _from, VALUE _length, VALUE _offset) {
struct IO_Event_Selector_KQueue *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, selector);

int descriptor = IO_Event_Selector_io_descriptor(io);

const void *base;
size_t size;
rb_io_buffer_get_bytes_for_reading(buffer, &base, &size);

off_t from = NUM2OFFT(_from);
size_t length = NUM2SIZET(_length);
size_t offset = NUM2SIZET(_offset);
size_t total = 0;

if (length > size) {
rb_raise(rb_eRuntimeError, "Length exceeds size of buffer!");
}

if (offset > size) {
return rb_fiber_scheduler_io_result(-1, EINVAL);
}

if (offset == size || length == 0) {
return rb_fiber_scheduler_io_result(0, 0);
}

RB_OBJ_WRITTEN(self, Qundef, fiber);

size_t maximum_size = size - offset;
while (maximum_size) {
ssize_t result = pwrite(descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
offset += result;
from += result;
if ((size_t)result >= length) break;
length -= result;
} else if (result == 0) {
break;
} else if (length > 0 && IO_Event_try_again(errno)) {
IO_Event_Selector_KQueue_io_wait(self, fiber, io, RB_INT2NUM(IO_EVENT_WRITABLE));
} else {
return rb_fiber_scheduler_io_result(-1, errno);
}

maximum_size = size - offset;
}

return rb_fiber_scheduler_io_result(total, 0);
}

#endif

static
Expand Down Expand Up @@ -1094,6 +1196,8 @@ void Init_IO_Event_Selector_KQueue(VALUE IO_Event_Selector) {
#ifdef HAVE_RUBY_IO_BUFFER_H
rb_define_method(IO_Event_Selector_KQueue, "io_read", IO_Event_Selector_KQueue_io_read_compatible, -1);
rb_define_method(IO_Event_Selector_KQueue, "io_write", IO_Event_Selector_KQueue_io_write_compatible, -1);
rb_define_method(IO_Event_Selector_KQueue, "io_pread", IO_Event_Selector_KQueue_io_pread, 6);
rb_define_method(IO_Event_Selector_KQueue, "io_pwrite", IO_Event_Selector_KQueue_io_pwrite, 6);
#endif

rb_define_method(IO_Event_Selector_KQueue, "process_wait", IO_Event_Selector_KQueue_process_wait, 3);
Expand Down