Skip to content

Commit b09ebd1

Browse files
committed
stream: preserve half-open duplexes in async iteration
Signed-off-by: Efe Karasakal <hi@efe.dev>
1 parent 8979001 commit b09ebd1

3 files changed

Lines changed: 129 additions & 1 deletion

File tree

lib/internal/streams/readable.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1418,9 +1418,16 @@ async function* createAsyncIterator(stream, options) {
14181418
error = aggregateTwoErrors(error, err);
14191419
throw error;
14201420
} finally {
1421+
const preserveHalfOpenDuplex =
1422+
error === null &&
1423+
stream.allowHalfOpen === true &&
1424+
stream.writable === true &&
1425+
stream.writableEnded !== true;
1426+
14211427
if (
14221428
(error || options?.destroyOnReturn !== false) &&
1423-
(error === undefined || stream._readableState.autoDestroy)
1429+
(error === undefined || stream._readableState.autoDestroy) &&
1430+
!preserveHalfOpenDuplex
14241431
) {
14251432
destroyImpl.destroyer(stream, null);
14261433
} else {
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const net = require('net');
6+
7+
(async function() {
8+
let resolveServerSocket;
9+
const serverSocketPromise = new Promise((resolve) => {
10+
resolveServerSocket = resolve;
11+
});
12+
13+
const server = net.createServer({
14+
allowHalfOpen: true,
15+
}, common.mustCall((socket) => {
16+
resolveServerSocket(socket);
17+
}));
18+
19+
server.on('error', common.mustNotCall());
20+
server.on('close', common.mustCall());
21+
22+
await new Promise((resolve) => {
23+
server.listen(0, common.localhostIPv4, resolve);
24+
});
25+
26+
const clientSocket = await new Promise((resolve) => {
27+
const socket = net.createConnection({
28+
allowHalfOpen: true,
29+
port: server.address().port,
30+
host: server.address().address,
31+
}, common.mustCall(() => {
32+
resolve(socket);
33+
}));
34+
socket.on('error', common.mustNotCall());
35+
});
36+
37+
const serverSocket = await serverSocketPromise;
38+
serverSocket.on('error', common.mustNotCall());
39+
40+
await new Promise((resolve, reject) => {
41+
clientSocket.write('data written to client socket', (err) => {
42+
if (err) reject(err);
43+
else resolve();
44+
});
45+
});
46+
47+
await new Promise((resolve) => {
48+
clientSocket.end(resolve);
49+
});
50+
51+
let serverRead = '';
52+
for await (const chunk of serverSocket) {
53+
serverRead += chunk;
54+
}
55+
56+
assert.strictEqual(serverRead, 'data written to client socket');
57+
assert.strictEqual(serverSocket.destroyed, false);
58+
59+
await new Promise((resolve, reject) => {
60+
serverSocket.write('data written to server socket', (err) => {
61+
if (err) reject(err);
62+
else resolve();
63+
});
64+
});
65+
66+
await new Promise((resolve) => {
67+
serverSocket.end(resolve);
68+
});
69+
70+
let clientRead = '';
71+
for await (const chunk of clientSocket) {
72+
clientRead += chunk;
73+
}
74+
75+
assert.strictEqual(clientRead, 'data written to server socket');
76+
77+
await new Promise((resolve) => {
78+
server.close(resolve);
79+
});
80+
})().then(common.mustCall());
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const {
6+
Duplex,
7+
} = require('stream');
8+
9+
{
10+
let written = '';
11+
12+
const duplex = new Duplex({
13+
allowHalfOpen: true,
14+
read() {
15+
this.push('hello');
16+
this.push(null);
17+
},
18+
write(chunk, encoding, callback) {
19+
written += chunk;
20+
callback();
21+
},
22+
});
23+
24+
duplex.on('error', common.mustNotCall());
25+
duplex.on('close', common.mustCall());
26+
27+
(async () => {
28+
let read = '';
29+
for await (const chunk of duplex) {
30+
read += chunk;
31+
}
32+
33+
assert.strictEqual(read, 'hello');
34+
assert.strictEqual(duplex.destroyed, false);
35+
36+
duplex.write('world', common.mustSucceed(() => {
37+
assert.strictEqual(written, 'world');
38+
duplex.end();
39+
}));
40+
})().then(common.mustCall());
41+
}

0 commit comments

Comments
 (0)