-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbytes.go
More file actions
199 lines (165 loc) · 4.07 KB
/
bytes.go
File metadata and controls
199 lines (165 loc) · 4.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package bytespool
import (
"container/list"
"context"
"errors"
"io"
"runtime"
"sync"
)
/* Bytes */
const (
unLimitedBytesCapacity int = 0
)
var ErrBytesCapacityTooSmall = errors.New("bytes capacity is too small")
type Bytes struct {
mux sync.Mutex
list *list.List
pool *Pool
capacity int
size int // current data size that wrote to the list
}
func (bs *Bytes) withPool(p *Pool) *Bytes {
bs.pool = p
return bs
}
func (bs *Bytes) capacityInBytes() int {
return bs.capacity * bs.pool.segmentSize
}
func (bs *Bytes) isUnlimited() bool {
return bs.capacity == unLimitedBytesCapacity
}
func (bs *Bytes) Write(p []byte) (n int, err error) {
bs.mux.Lock()
defer bs.mux.Unlock()
if !bs.pool.isUnlimited() && bs.size+len(p) > bs.pool.capacityInBytes() {
return 0, ErrPoolTooSmall
}
if !bs.isUnlimited() && bs.size+len(p) > bs.capacityInBytes() {
return 0, ErrBytesCapacityTooSmall
}
if bs.list.Len() == 0 {
buf, err := bs.pool.Get(context.TODO()) // actually, it will never return error
if err != nil {
return 0, err
}
bs.list.PushBack(buf)
}
for len(p) > 0 {
back := bs.list.Back()
seg := back.Value.(*segment)
if seg.size == bs.pool.segmentSize {
buf, err := bs.pool.Get(context.TODO())
if err != nil {
return n, err
}
bs.list.PushBack(buf)
continue
}
c := copy(seg.data[seg.size:], p)
n += c
bs.size += c
seg.size += c
p = p[c:]
}
return n, nil
}
func (bs *Bytes) Read(p []byte) (n int, err error) {
bs.mux.Lock()
defer bs.mux.Unlock()
return bs.read(p)
}
func (bs *Bytes) read(p []byte) (n int, err error) {
for len(p) > 0 && bs.list.Len() > 0 {
front := bs.list.Front()
segment := front.Value.(*segment)
h := min(segment.offset+len(p), segment.size)
c := copy(p, segment.data[segment.offset:h])
n += c
segment.offset += c
p = p[c:]
if segment.size == segment.offset {
bs.list.Remove(front)
bs.pool.Put(segment)
// only when a segment is removed from the list,
// the size should be decreased
bs.size -= segment.size
}
}
if len(p) > 0 {
err = io.EOF
}
return n, err
}
// Free all buffers in the list
// it's not big deal if you forget to call this function
// gc will do it for you
func (bs *Bytes) Free() {
bs.mux.Lock()
defer bs.mux.Unlock()
for bs.list.Len() > 0 {
e := bs.list.Front()
bs.list.Remove(e)
buf := e.Value.(*segment)
bs.pool.Put(buf)
}
bs.size = 0
}
func newBytes(capacity int) *Bytes {
bs := &Bytes{
list: list.New(),
capacity: capacity,
mux: sync.Mutex{},
}
finalized := func(bs *Bytes) {
bs.mux.Lock()
defer bs.mux.Unlock()
for bs.list.Len() > 0 {
e := bs.list.Front()
bs.list.Remove(e)
buf := e.Value.(*segment)
bs.pool.Put(buf)
}
}
runtime.SetFinalizer(bs, finalized)
return bs
}
// BytesPool is a pool of Bytes
type BytesPool struct {
pool *Pool
}
// NewBytesPool create a new BytesPool
// maxMemory is the threshold value (in byte) of memory the BytesPool could increase,
// eg. 1024 * 1024 * 1024 (1G)
// if maxMemory == 0, the capacity of the pool is unLimited
// segmentSize (byte) is the size of the segment, eg. 1024 * 4,
// maxMemory / segmentSize is the capacity of the pool which means
// the max number of the segments in the pool
func NewBytesPool(maxMemory, segmentSize int) *BytesPool {
if segmentSize <= 0 {
panic("segmentSize must be greater than 0")
}
capacity := maxMemory / segmentSize
if capacity == 0 {
capacity = unLimitedPoolCapacity
}
bp := &BytesPool{
pool: newPool(capacity, segmentSize),
}
return bp
}
// NewBytes create a new Bytes something like []byte
// length is the max length of the Bytes (in byte) eg, 1024 * 1024 (1M)
// data in Bytes is a list of []byte, the size of []byte is pool's segmentSize, like 4k
// so, the max length of the Bytes is bl / segmentSize
// if length == 0, it is not limited, it may use all of the bytes in the pool
func (bp *BytesPool) NewBytes(length int) *Bytes {
capacity := length / bp.pool.segmentSize
if length%bp.pool.segmentSize > 0 {
capacity += 1
}
if length == 0 {
capacity = unLimitedPoolCapacity
}
return newBytes(capacity).withPool(bp.pool)
}