-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathring_buffer.ts
More file actions
121 lines (99 loc) · 3.72 KB
/
ring_buffer.ts
File metadata and controls
121 lines (99 loc) · 3.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/// <reference path="typings/es6-promise.d.ts" />
interface TypedArray extends ArrayBufferView, ArrayLike<number> {
length: number;
set(array: TypedArray, offset?: number): void;
subarray(begin: number, end?: number): TypedArray;
};
class RingBuffer {
static MAX_POS = (1 << 16);
private buf: TypedArray;
private wpos = 0;
private rpos = 0;
private remaining_write_data: [TypedArray, {():void}] = null;
constructor(buffer: TypedArray) {
this.buf = buffer;
}
append(data: TypedArray): Promise<any> {
/*if (this.rpos >= RingBuffer.MAX_POS && this.wpos >= RingBuffer.MAX_POS) {
this.wpos -= RingBuffer.MAX_POS;
this.rpos -= RingBuffer.MAX_POS;
}*/
return new Promise((resolve, reject) => {
// 書き込み処理が実施中の場合は常にrejectする
if (this.remaining_write_data) {
reject();
return;
}
var size = this._append_some(data);
if (size == data.length) {
resolve();
return;
}
// 空き容量がないので,読み込み処理が実施時に書き込むようにする
this.remaining_write_data = [data.subarray(size), resolve];
});
}
read_some(output: TypedArray): number {
var ret = this._read_some(output);
if (this.remaining_write_data) {
this._append_remaining_data();
if (ret < output.length)
ret += this._read_some(output.subarray(ret));
}
return ret;
}
private _append_some(data: TypedArray): number {
var total_size = Math.min(data.length, this.available());
if (total_size == 0) return 0;
// 書き込み位置からバッファの終端まで書き込む
var pos = this.wpos % this.buf.length;
var size = Math.min(total_size, this.buf.length - pos);
this.buf.set(data.subarray(0, size), pos);
// バッファの終端に達したが,書き込むデータがまだあるため
// バッファの先頭から書き込みを継続する
if (size < total_size) {
this.buf.set(data.subarray(size, total_size), 0);
}
this.wpos += total_size;
return total_size;
}
private _append_remaining_data() {
var data = this.remaining_write_data[0];
var resolve = this.remaining_write_data[1];
this.remaining_write_data = null;
var size = this._append_some(data);
if (size == data.length) {
resolve();
} else {
this.remaining_write_data = [data.subarray(size), resolve];
}
}
private _read_some(output: TypedArray): number {
var total_size = Math.min(output.length, this.size());
if (total_size == 0) return 0;
// 読み込み位置からバッファ終端方向に読み込む
var pos = this.rpos % this.buf.length;
var size = Math.min(total_size, this.buf.length - pos);
output.set(this.buf.subarray(pos, pos + size), 0);
// バッファの終端に達したが読み込むデータがまだあるため
// バッファの先頭から読み込みを継続する
if (size < total_size) {
output.set(this.buf.subarray(0, total_size - size), size);
}
this.rpos += total_size;
return total_size;
}
clear(): void {
this.rpos = this.wpos = 0;
this.remaining_write_data = null;
}
capacity(): number {
return this.buf.length;
}
size(): number {
return this.wpos - this.rpos;
}
available(): number {
return this.capacity() - this.size();
}
}