Skip to content
Merged
8 changes: 4 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

# 0.28.2 2026-02-20

- Fix `Stream#get_string`
- Fix `Stream#read`

# 0.28.1 2026-02-20

Expand Down Expand Up @@ -169,9 +169,9 @@

# 2025-06-03 Version 0.12

- Add buffer, maxlen params to `Stream#get_line`
- Add buffer param to `Stream#get_string`
- Remove `Stream#resp_get_line`, `Stream#resp_get_string` methods
- Add buffer, maxlen params to `Stream#read_line`
- Add buffer param to `Stream#read`
- Remove `Stream#resp_read_line`, `Stream#resp_read` methods

# 2025-06-02 Version 0.11.1

Expand Down
84 changes: 46 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ implementation that allows integration with the entire Ruby ecosystem.
- Excellent performance characteristics for concurrent I/O-bound applications.
- `Fiber::Scheduler` implementation to automatically integrate with the Ruby
ecosystem in a transparent fashion.
- Read streams with automatic buffer management.
- [Connection](#connections) class with automatic buffer management for reading.
- Optimized I/O for encrypted SSL connections.

## Design
Expand Down Expand Up @@ -286,64 +286,70 @@ fiber = Fiber.schedule do
end
```

## Read Streams
## Connections

A UringMachine stream is used to efficiently read from a socket or other file
descriptor. Streams are ideal for implementing the read side of protocols, and
provide an API that is useful for both line-based protocols and binary
(frame-based) protocols.
`UringMachine::Connection` is a class designed for efficiently read from and
write to a socket or other file descriptor. Connections are ideal for
implementing the read side of protocols, and provide an API that is useful for
both line-based protocols and binary (frame-based) protocols.

A stream is associated with a UringMachine instance and a target file descriptor
(see also [stream modes](#stream-modes) below). Behind the scenes, streams take
advantage of io_uring's registered buffers feature, and more recently, the
introduction of [incremental buffer
A connection is associated with a UringMachine instance and a target file
descriptor (or SSL socket, see also [connection modes](#connection-modes)
below). Behind the scenes, connections take advantage of io_uring's registered
buffers feature, and more recently, the introduction of [incremental buffer
consumption](https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.11-and-6.12#incremental-provided-buffer-consumption).

When streams are used, UringMachine automatically manages the buffers it
When connections are used, UringMachine automatically manages the buffers it
provides to the kernel, maximizing buffer reuse and minimizing allocations.
UringMachine also responds to stress conditions (increased incoming traffic) by
automatically provisioning additional buffers.

To create a stream for a given fd, use `UM#stream`:
To create a connection for a given fd, use `UM#connection`:

```ruby
stream = machine.stream(fd)
conn = machine.connection(fd)

# you can also provide a block that will be passed the stream instance:
machine.stream(fd) { |s| do_something_with(s) }
# you can also provide a block that will be passed the connection instance:
machine.connection(fd) { |c| do_something_with(c) }

# you can also instantiate a stream directly:
stream = UM::Stream.new(machine, fd)
# you can also instantiate a connection directly:
conn = UM::Connection.new(machine, fd)
```

The following API is used to interact with the stream:
The following API is used to interact with the connection:

```ruby
# Read until a newline character is encountered:
line = stream.get_line(0)
line = conn.read_line(0)

# Read line with a maximum length of 13 bytes:
line = stream.get_line(13)
line = conn.read_line(13)

# Read all data:
buf = stream.get_string(0)
buf = conn.read(0)

# Read exactly 13 bytes:
buf = stream.get_string(13)
buf = conn.read(13)

# Read up to 13 bytes:
buf = stream.get_string(-13)
buf = conn.read(-13)

# Read continuously until EOF
conn.read_each { |data| ... }

# Skip 3 bytes:
stream.skip(3)
conn.skip(3)

# Write
conn.write('foo', 'bar', 'baz')
```

Here's an example of a how a basic HTTP request parser might be implemented
using a stream:
using a connection:

```ruby
def parse_http_request_headers(stream)
request_line = stream.get_line(0)
def parse_http_request_headers(conn)
request_line = conn.read_line(0)
m = request_line.match(REQUEST_LINE_RE)
return nil if !m

Expand All @@ -354,7 +360,7 @@ def parse_http_request_headers(stream)
}

while true
line = stream.get_line(0)
line = conn.read_line(0)
break if !line || line.empty?

m = line.match(HEADER_RE)
Expand All @@ -364,24 +370,26 @@ def parse_http_request_headers(stream)
end
```

### Stream modes
### Connection modes

Stream modes allow streams to be transport agnostic. Currently streams support
three modes:
Connection modes allow connections to be transport agnostic. Currently
connections support three modes:

- `:bp_read` - use the buffer pool, read data using multishot read
- `:fd` - use the buffer pool, read data using multishot read
(this is the default mode).
- `:bp_recv` - use the buffer pool, read data using multishot recv.
- `:socket` - use the buffer pool, read data using multishot recv.
- `:ssl` - read from an `SSLSocket` object.

The mode is specified as an additional argument to `Stream.new`:
The mode is specified as an additional argument to `Connection.new`:

```ruby
# stream using recv:
stream = machine.stream(fd, :bp_recv)
# using recv/send:
conn = machine.connection(fd, :socket)

# stream on an SSL socket:
stream = machine.stream(ssl, :ssl)
# SSL I/O:
conn = machine.connection(ssl, :ssl)
# or simply:
conn = machine.connection(ssl)
```

## Performance
Expand Down
33 changes: 19 additions & 14 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,44 @@ What if instead of `Stream` we had something called `Link`, which serves for
both reading and writing:

```ruby
link = machine.link(fd)
while l = link.read_line
link.write(l, '\n')
conn = machine.connection(fd)
while l = conn.read_line
conn.write(l, '\n')
end
# or:
buf = link.read(42)
buf = conn.read(42)
```

RESP:

```ruby
link.resp_write(['foo', 'bar'])
reply = link.resp_read
conn.resp_write(['foo', 'bar'])
reply = conn.resp_read
```

HTTP:

```ruby
r = link.http_read_request
link.http_write_response({ ':status' => 200 }, 'foo')
r = conn.http_read_request
conn.http_write_response({ ':status' => 200 }, 'foo')

# or:
link.http_write_request({ ':method' => 'GET', ':path' => '/foo' }, nil)
conn.http_write_request({ ':method' => 'GET', ':path' => '/foo' }, nil)
```

Plan of action:

- Rename methods:
- rename `#get_line` to `#read_line`
- rename `#get_string` to `#read`
- rename `#get_to_delim` to `#read_to_delim`
- rename `#resp_decode` to `#resp_read`
- Rename `Stream` to `Link`
- [v] rename `#read_line` to `#read_line`
- [v] rename `#read` to `#read`
- [v] rename `#read_to_delim` to `#read_to_delim`
- [v] rename `#each` to `#read_each`
- [v] rename `#resp_decode` to `#resp_read`
- Rename modes:
- [v] :fd to :fd
- [v] :socket to :socket
- [v] auto detect SSL
- Rename `Stream` to `Connection`
- Add methods:
- `#write(*bufs)`
- `#resp_write(obj)`
Expand Down
14 changes: 7 additions & 7 deletions benchmark/gets.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ def um_read
end
end

@fd_stream = @machine.open('/dev/random', UM::O_RDONLY)
@stream = UM::Stream.new(@machine, @fd_stream)
def um_stream_get_line
@stream.get_line(0)
@fd_connection = @machine.open('/dev/random', UM::O_RDONLY)
@conn = UM::Connection.new(@machine, @fd_connection)
def um_connection_read_line
@conn.read_line(0)
end

Benchmark.ips do |x|
x.report('IO#gets') { io_gets }
x.report('UM#read+buf') { um_read }
x.report('UM::Stream') { um_stream_get_line }
x.report('IO#gets') { io_gets }
x.report('UM#read+buf') { um_read }
x.report('UM::Connection') { um_connection_read_line }

x.compare!(order: :baseline)
end
20 changes: 10 additions & 10 deletions benchmark/gets_concurrent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,40 +83,40 @@ def um_read
stop_server
end

@total_stream = 0
def um_stream_do
@total_connection = 0
def um_connection_do
# fd = @machine.open('/dev/random', UM::O_RDONLY)
fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
@machine.connect(fd, '127.0.0.1', 1234)
stream = UM::Stream.new(@machine, fd)
N.times { @total_stream += stream.get_line(0)&.bytesize || 0 }
conn = UM::Connection.new(@machine, fd)
N.times { @total_connection += conn.read_line(0)&.bytesize || 0 }
rescue => e
p e
p e.backtrace
ensure
stream.clear
conn.clear
@machine.close(fd)
end

def um_stream
def um_connection
start_server
ff = C.times.map {
@machine.snooze
@machine.spin { um_stream_do }
@machine.spin { um_connection_do }
}
@machine.await(ff)
pp total: @total_stream
pp total: @total_connection
ensure
stop_server
end

p(C:, N:)
um_stream
um_connection
pp @machine.metrics
exit

Benchmark.bm do
it.report('Thread/IO#gets') { io_gets }
it.report('Fiber/UM#read+buf') { um_read }
it.report('Fiber/UM::Stream') { um_stream }
it.report('Fiber/UM::Stream') { um_connection }
end
Loading
Loading