Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
*/

// Package buffer implements a reusable buffer abstraction.
//
// Wireguard-go's data processing is constrained by both the hosts API,
// and the transformations performed during encapsulation:
//
// 1. Encryption requires tail- and headroom for extra headers and padding.
// Available via winrio, and pread(2).
// 2. Systems are moving towards coalesced reads for both TCP and UDP.
// The read data has no gaps for individual slices.
// 3. crypto.AEAD interface requires a contiguous dst []byte for Sealing.
// So we can't use scatter-gather to inject the gaps.
//
// Until one of these three conditions is changed, the encryption strategy
// is to copy on read into buffers with the required gaps.
// The buffers are right-sized for the packet to avoid memory inflation.
// To recycle said allocations, each buffer carries a recycle function
// that routes it back to its originating pool.
//
// Decryption shrinks each fragment instead of growing, so buffers can pass
// through the pipeline without copying till the egress coalescion.
// Depending on the chosen head of the coalescion, there may or may be no room
// and reallocation is a necessary fallback until we start passing
// buffers in batches.

package buffer

import "fmt"

// Recycler holds state necessary for a correct Buffer return to its originating Source
type Recycler interface {
Recycle(*Buffer)
}

// RecycleFunc adapts arbitrary closures to the Recycler interface.
type RecycleFunc func(*Buffer)

func (f RecycleFunc) Recycle(b *Buffer) {
f(b)
}

// Buffer is a reusable slice of bytes of fixed length.
// Buffer or its Data must not be retained past Release.
type Buffer struct {
// Data len tracks valid payload + offset (offset passed out of band).
// It starts equal to the requested size for new buffers, and can be adjusted.
// Data is read-only, exposed for convenience but should never be assigned to.
// Buffer methods maintain the invariant that Data[offset:len(Data)] is valid payload,
// SetLen and Shift let callers adjust the offset and length of the valid payload as needed.
data []byte
recycler Recycler
}

// New creates Buffer referencing the provided Recycler.
func New(b []byte, recycler Recycler) *Buffer {
return &Buffer{data: b, recycler: recycler}
}

// Make creates Buffer with a new byte slice of the requested size.
func Make(size int) *Buffer {
buf, _ := DefaultSource.Get(size) // fragment pool never errors
return buf
}

// Bytes returns the valid data in the Buffer.
func (b *Buffer) Bytes() []byte {
return b.data
}

// BytesAt returns the valid data in the Buffer starting at offset.
func (b *Buffer) BytesAt(offset int) []byte {
return b.data[offset:]
}

// SetLen sets the length of the valid data in the Buffer.
// Intended to be used for truncating the valid data post read,
// or extending post encryption. Does not check the capacity.
func (b *Buffer) SetLen(l int) {
b.data = b.data[:l]
}

// Ensure returns a Buffer of the requested len, with the valid data from the provided Buffer.
// The returned Buffer may be the same as the provided Buffer if it has sufficient capacity, or a new Buffer otherwise.
// Safe to call on a nil Buffer.
func Ensure(b *Buffer, size int, src Source) (*Buffer, error) {
if src == nil {
src = DefaultSource
}
if b == nil {
return src.Get(size)
}
if size > cap(b.data) {
bb, err := src.Get(size)
if err != nil {
return nil, err
}
n := copy(bb.data, b.data)
if n != len(b.data) {
panic(fmt.Sprintf("short copy: %d != %d", n, len(b.data)))
}
Release(b)
return bb, nil
}
b.data = b.data[:size]
return b, nil
}

// Release returns Buffer to its Source for reuse.
// Safe to call on a nil Buffer.
func Release(b *Buffer) {
if b == nil {
return
}
b.data = b.data[:cap(b.data)]
clear(b.data)
if b.recycler != nil {
b.recycler.Recycle(b)
}
}

// ReleaseAll calls Release on each non-nil Buffer in the slice, and sets the slice elements to nil.
func ReleaseAll(bs []*Buffer) {
for i := range bs {
Release(bs[i])
bs[i] = nil
}
}
10 changes: 10 additions & 0 deletions buffer/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
*/
package buffer

const (
// MaxMessageSize is the largest buffer that callers may request from a Source.
MaxMessageSize = MaxSegmentSize
)
13 changes: 13 additions & 0 deletions buffer/constants_android.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//go:build android

/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
*/

package buffer

const (
MaxSegmentSize = 2200 // largest possible Android read
MaxBytesPerSource = 4096 * MaxSegmentSize
)
13 changes: 13 additions & 0 deletions buffer/constants_default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//go:build !android && !ios && !windows

/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
*/

package buffer

const (
MaxSegmentSize = (1 << 16) - 1 // largest possible Unix read
MaxBytesPerSource = 0 // Disable and allow for infinite memory growth
)
17 changes: 17 additions & 0 deletions buffer/constants_ios.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//go:build ios

/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
*/

package buffer

// Fit within memory limits for iOS's Network Extension API, which has stricter requirements.
// These are vars instead of consts, because heavier network extensions might want to reduce
// them further.
var (
MaxBytesPerSource = 1024 * MaxSegmentSize
)

const MaxSegmentSize = 1700 // largest possible iOS read
13 changes: 13 additions & 0 deletions buffer/constants_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//go:build windows

/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
*/

package buffer

const (
MaxSegmentSize = 2048 - 32 // largest possible Windows read
MaxBytesPerSource = 0 // Disable and allow for infinite memory growth
)
175 changes: 175 additions & 0 deletions buffer/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
*/
package buffer

import (
"fmt"
"sync"
"sync/atomic"
)

const (
min = 2 << 10 // 2KB, typical MTU-sized packet
mid = 10 << 10 // 10KB, jumbo frame
max = 65 << 10 // 65KB, max UDP datagram read
)

// Source produces new Buffers.
type Source interface {
// Get returns a Buffer of exactly requested len and at least the requested cap.
// Implementations may return error if the request can not be fulfilled.
Get(size int) (*Buffer, error)
}

// DefaultSource is a package-level Source of Buffers.
// Used by Make and Ensure when no Source is provided.
var DefaultSource Source = &CappedSource{s: NewPoolSource(), cap: int64(MaxBytesPerSource)}

var (
_ Source = (*PoolSource)(nil)
_ Source = (*LoggingSource)(nil)
_ Source = (*CappedSource)(nil)
)

// PoolSource is a tiered [Source] of buffers. Tiers are balanced
// to accommodate regular MTU sizes, jumbo frames, and the maximum possible UDP datagram size.
// PoolSource never errors, instead allocating a GC-managed buffer for requests that exceed the max tier size.
type PoolSource struct {
minPool sync.Pool
midPool sync.Pool
maxPool sync.Pool
}

type poolRecycler struct {
*sync.Pool
}

func (p *poolRecycler) Recycle(b *Buffer) {
p.Put(b)
}

// NewPoolSource returns a PoolSource with the default tier sizes.
func NewPoolSource() *PoolSource {
p := new(PoolSource)
p.minPool.New = func() any {
return &Buffer{data: make([]byte, min), recycler: &poolRecycler{&p.minPool}}
}
p.midPool.New = func() any {
return &Buffer{data: make([]byte, mid), recycler: &poolRecycler{&p.midPool}}
}
p.maxPool.New = func() any {
return &Buffer{data: make([]byte, max), recycler: &poolRecycler{&p.maxPool}}
}
return p
}

func (p *PoolSource) Get(size int) (*Buffer, error) {
var buf *Buffer
switch {
case size <= min:
buf = p.minPool.Get().(*Buffer)
case size <= mid:
buf = p.midPool.Get().(*Buffer)
case size <= max:
buf = p.maxPool.Get().(*Buffer)
default:
return &Buffer{data: make([]byte, size)}, nil
}
buf.data = buf.data[:size]
return buf, nil
}

// LoggingSource is a Source that keeps track of all Buffers it has produced.
// Use when the buffers are not retained and can not be released back individually.
// Not safe for concurrent use.
type LoggingSource struct {
Source
log []*Buffer
}

// Get returns a Buffer and records it for later bulk release.
func (l *LoggingSource) Get(size int) (*Buffer, error) {
buf, err := l.Source.Get(size)
if err != nil {
return nil, err
}
l.log = append(l.log, buf)
return buf, nil
}

// Log returns all Buffers produced by this source.
func (l *LoggingSource) Log() []*Buffer {
return l.log
}

// ReleaseAll releases all tracked Buffers and resets the log.
func (l *LoggingSource) ReleaseAll() {
ReleaseAll(l.log)
l.log = l.log[:0]
}

// CappedSource is a Source that tracks the total capacity of all Buffers
// it has produced and returns an error if a request would cause the total
// to exceed a specified cap.
type CappedSource struct {
Used atomic.Int64 // public to expose metrics

s Source
cap int64
recyclers sync.Pool
}

// NewCappedSource returns a CappedSource wrapping src with the given byte cap.
// A cap of zero or less disables the limit.
func NewCappedSource(src Source, cap int64) *CappedSource {
s := &CappedSource{
s: src,
cap: cap,
}
s.recyclers = sync.Pool{
New: func() any {
return &cappedRecycler{s: s}
}}
return s

}

// ErrSizeExceedsCap is returned by CappedSource.Get when the cap would be exceeded.
var ErrSizeExceedsCap = fmt.Errorf("buffer: request exceeds cap")

func (p *CappedSource) Get(size int) (*Buffer, error) {
// This implementation acquires the buffer first to
// account for correct memory footprint via cap().
// There is little point in optimizing this to just
// spin faster when we're over.
b, err := p.s.Get(size)
if err != nil {
return nil, err
}
if p.cap <= 0 {
return b, nil // uncapped path
}
charge := int64(cap(b.data))
if new := p.Used.Add(charge); new > p.cap {
p.Used.Add(-charge)
Release(b)
return nil, ErrSizeExceedsCap
}
r := p.recyclers.Get().(*cappedRecycler)
r.next, b.recycler = b.recycler, r // wrap recycler
return b, nil
}

type cappedRecycler struct {
s *CappedSource
next Recycler
}

func (r *cappedRecycler) Recycle(b *Buffer) {
b.recycler, r.next = r.next, nil // unwrap recycler
r.s.Used.Add(-int64(cap(b.data)))
Release(b) // safe release, recycler may be nil
r.s.recyclers.Put(r)
}
Loading
Loading