@@ -4,40 +4,98 @@ const common = require('../common');
44const { finished, addAbortSignal } = require ( 'stream' ) ;
55const { ReadableStream, WritableStream } = require ( 'stream/web' ) ;
66const assert = require ( 'assert' ) ;
7+ const { setImmediate } = require ( 'timers/promises' ) ;
8+
9+ const typeErrorPredicate = { name : 'TypeError' , code : 'ERR_INVALID_STATE' } ;
710
811function createTestReadableStream ( ) {
9- return new ReadableStream ( {
10- start ( controller ) {
12+ let controller ;
13+ const rs = new ReadableStream ( {
14+ start ( c ) {
15+ controller = c ;
1116 controller . enqueue ( 'a' ) ;
1217 controller . enqueue ( 'b' ) ;
1318 controller . enqueue ( 'c' ) ;
1419 controller . close ( ) ;
1520 }
1621 } ) ;
22+ return [ rs , controller ] ;
1723}
1824
1925function createTestWritableStream ( values ) {
20- return new WritableStream ( {
21- write ( chunk ) {
26+ let controller ;
27+ const ws = new WritableStream ( {
28+ start ( c ) { controller = c ; } ,
29+ write ( chunk , c ) {
2230 values . push ( chunk ) ;
2331 }
2432 } ) ;
33+ return [ ws , controller ] ;
34+ }
35+
36+ /**
37+ *
38+ * @param {ReadableStream } rs
39+ * @param {import('internal/webstreams/readablestream').ReadableStreamReader } reader
40+ * @param {{
41+ * isByob?: boolean,
42+ * controller?: import('internal/webstreams/readablestream').ReadableStreamController,
43+ * additionalAssertions?: () => void,
44+ * }} options
45+ */
46+ function assertReadableStreamEventuallyAborted ( rs , reader , {
47+ isByob,
48+ controller,
49+ additionalAssertions = common . mustCall ( )
50+ } = { } ) {
51+ finished ( rs , { writable : false } , common . mustCall ( ( err ) => {
52+ assert . strictEqual ( err . name , 'AbortError' ) ;
53+ assert . rejects ( reader . read ( ...( isByob ? [ new Uint8Array ( 1 ) ] : [ ] ) ) , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
54+ assert . rejects ( reader . closed , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
55+ if ( controller ) {
56+ assert . throws ( ( ) => controller . close ( ) , typeErrorPredicate ) ;
57+ assert . throws ( ( ) => controller . enqueue ( isByob ? new Uint8Array ( 1 ) : 'a' ) , typeErrorPredicate ) ;
58+ controller . error ( new Error ( ) ) ; // Never throws
59+ }
60+ additionalAssertions ( ) ;
61+ } ) ) ;
62+ }
63+
64+ /**
65+ *
66+ * @param {WritableStream } ws
67+ * @param {import('internal/webstreams/writablestream').WritableStreamDefaultWriter } writer
68+ * @param {{
69+ * controller?: import('internal/webstreams/writablestream').WritableStreamDefaultController,
70+ * additionalAssertions?: () => void,
71+ * }} options
72+ */
73+ function assertWritableStreamEventuallyAborted ( ws , writer , {
74+ controller,
75+ additionalAssertions = common . mustCall ( ) ,
76+ } = { } ) {
77+ finished ( ws , { readable : false } , common . mustCall ( ( err ) => {
78+ assert . strictEqual ( err . name , 'AbortError' ) ;
79+ assert . rejects ( writer . write ( 'a' ) , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
80+ assert . rejects ( writer . closed , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
81+ if ( controller ) {
82+ controller . error ( new Error ( ) ) ; // Never throws
83+ assert . strictEqual ( controller . signal . aborted , true ) ;
84+ }
85+ additionalAssertions ( ) ;
86+ } ) ) ;
2587}
2688
2789{
28- const rs = createTestReadableStream ( ) ;
90+ const [ rs , controller ] = createTestReadableStream ( ) ;
2991
3092 const reader = rs . getReader ( ) ;
3193
3294 const ac = new AbortController ( ) ;
3395
3496 addAbortSignal ( ac . signal , rs ) ;
3597
36- finished ( rs , common . mustCall ( ( err ) => {
37- assert . strictEqual ( err . name , 'AbortError' ) ;
38- assert . rejects ( reader . read ( ) , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
39- assert . rejects ( reader . closed , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
40- } ) ) ;
98+ assertReadableStreamEventuallyAborted ( rs , reader , { controller } ) ;
4199
42100 reader . read ( ) . then ( common . mustCall ( ( result ) => {
43101 assert . strictEqual ( result . value , 'a' ) ;
@@ -46,7 +104,7 @@ function createTestWritableStream(values) {
46104}
47105
48106{
49- const rs = createTestReadableStream ( ) ;
107+ const [ rs ] = createTestReadableStream ( ) ;
50108
51109 const ac = new AbortController ( ) ;
52110
@@ -62,9 +120,80 @@ function createTestWritableStream(values) {
62120}
63121
64122{
65- const rs1 = createTestReadableStream ( ) ;
123+ const [ rs , controller ] = createTestReadableStream ( ) ;
124+ const reader = rs . getReader ( ) ;
125+ const ac = new AbortController ( ) ;
126+
127+ addAbortSignal ( ac . signal , rs ) ;
128+ controller . error = common . mustNotCall (
129+ 'addAbortSignal() must not call an overridden controller.error()' ) ;
66130
67- const rs2 = createTestReadableStream ( ) ;
131+ assertReadableStreamEventuallyAborted ( rs , reader ) ;
132+
133+ reader . read ( ) . then ( common . mustCall ( ( ) => {
134+ ac . abort ( ) ;
135+ } ) ) ;
136+ }
137+
138+ {
139+ let controller ;
140+ const rs = new ReadableStream ( {
141+ type : 'bytes' ,
142+ start ( c ) { controller = c ; } ,
143+ } ) ;
144+ const ac = new AbortController ( ) ;
145+ addAbortSignal ( ac . signal , rs ) ;
146+
147+ const reader = rs . getReader ( { mode : 'byob' } ) ;
148+ assertReadableStreamEventuallyAborted ( rs , reader , { controller, isByob : true } ) ;
149+
150+ ac . abort ( ) ;
151+ }
152+
153+ {
154+ /** @member {import('internal/webstreams/readablestream').ReadableByteStreamController} */
155+ let controller ;
156+ const rs = new ReadableStream ( {
157+ type : 'bytes' ,
158+ start ( c ) { controller = c ; } ,
159+ } ) ;
160+ const ac = new AbortController ( ) ;
161+
162+ addAbortSignal ( ac . signal , rs ) ;
163+ controller . error = common . mustNotCall ( 'addAbortSignal() must not call an overridden controller.error()' ) ;
164+
165+ const reader = rs . getReader ( { mode : 'byob' } ) ;
166+ assertReadableStreamEventuallyAborted ( rs , reader , { isByob : true } ) ;
167+
168+ ac . abort ( ) ;
169+ }
170+
171+ {
172+ /** @member {import('internal/webstreams/readablestream').ReadableStreamDefaultController} */
173+ let controller ;
174+ let pullPromiseWithResolvers = Promise . withResolvers ( ) ;
175+ const rs = new ReadableStream ( {
176+ start ( c ) { controller = c ; } ,
177+ pull ( ) { return pullPromiseWithResolvers . promise ; } ,
178+ } ) ;
179+ const ac = new AbortController ( ) ;
180+ addAbortSignal ( ac . signal , rs ) ;
181+
182+ const reader = rs . getReader ( ) ;
183+ assertReadableStreamEventuallyAborted ( rs , reader ) ;
184+
185+ const readPromise = reader . read ( ) ;
186+ pullPromiseWithResolvers . resolve ( setImmediate ( ) . then ( ( ) => {
187+ ac . abort ( ) ;
188+ controller . enqueue ( 'a' ) ;
189+ } ) ) ;
190+ assert . rejects ( readPromise , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
191+ assert . rejects ( pullPromiseWithResolvers . promise , typeErrorPredicate ) . then ( common . mustCall ( ) ) ;
192+ }
193+
194+ {
195+ const [ rs1 , controller1 ] = createTestReadableStream ( ) ;
196+ const [ rs2 , controller2 ] = createTestReadableStream ( ) ;
68197
69198 const ac = new AbortController ( ) ;
70199
@@ -74,23 +203,14 @@ function createTestWritableStream(values) {
74203 const reader1 = rs1 . getReader ( ) ;
75204 const reader2 = rs2 . getReader ( ) ;
76205
77- finished ( rs1 , common . mustCall ( ( err ) => {
78- assert . strictEqual ( err . name , 'AbortError' ) ;
79- assert . rejects ( reader1 . read ( ) , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
80- assert . rejects ( reader1 . closed , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
81- } ) ) ;
82-
83- finished ( rs2 , common . mustCall ( ( err ) => {
84- assert . strictEqual ( err . name , 'AbortError' ) ;
85- assert . rejects ( reader2 . read ( ) , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
86- assert . rejects ( reader2 . closed , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
87- } ) ) ;
206+ assertReadableStreamEventuallyAborted ( rs1 , reader1 , { controller : controller1 } ) ;
207+ assertReadableStreamEventuallyAborted ( rs2 , reader2 , { controller : controller2 } ) ;
88208
89209 ac . abort ( ) ;
90210}
91211
92212{
93- const rs = createTestReadableStream ( ) ;
213+ const [ rs , controller ] = createTestReadableStream ( ) ;
94214
95215 const { 0 : rs1 , 1 : rs2 } = rs . tee ( ) ;
96216
@@ -101,48 +221,38 @@ function createTestWritableStream(values) {
101221 const reader1 = rs1 . getReader ( ) ;
102222 const reader2 = rs2 . getReader ( ) ;
103223
104- finished ( rs1 , common . mustCall ( ( err ) => {
105- assert . strictEqual ( err . name , 'AbortError' ) ;
106- assert . rejects ( reader1 . read ( ) , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
107- assert . rejects ( reader1 . closed , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
108- } ) ) ;
109-
110- finished ( rs2 , common . mustCall ( ( err ) => {
111- assert . strictEqual ( err . name , 'AbortError' ) ;
112- assert . rejects ( reader2 . read ( ) , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
113- assert . rejects ( reader2 . closed , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
114- } ) ) ;
224+ assertReadableStreamEventuallyAborted ( rs1 , reader1 , { controller } ) ;
225+ assertReadableStreamEventuallyAborted ( rs2 , reader2 , { controller } ) ;
115226
116227 ac . abort ( ) ;
117228}
118229
119230{
120231 const values = [ ] ;
121- const ws = createTestWritableStream ( values ) ;
232+ const [ ws , controller ] = createTestWritableStream ( values ) ;
122233
123234 const ac = new AbortController ( ) ;
124235
125236 addAbortSignal ( ac . signal , ws ) ;
126237
127238 const writer = ws . getWriter ( ) ;
128239
129- finished ( ws , common . mustCall ( ( err ) => {
130- assert . strictEqual ( err . name , 'AbortError' ) ;
131- assert . deepStrictEqual ( values , [ 'a' ] ) ;
132- assert . rejects ( writer . write ( 'b' ) , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
133- assert . rejects ( writer . closed , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
134- } ) ) ;
240+ assertWritableStreamEventuallyAborted ( ws , writer , {
241+ controller ,
242+ additionalAssertions : common . mustCall ( ( ) => {
243+ assert . deepStrictEqual ( values , [ 'a' ] ) ;
244+ } ) ,
245+ } ) ;
135246
136- writer . write ( 'a' ) . then ( ( ) => {
137- ac . abort ( ) ;
138- } ) . then ( common . mustCall ( ) ) ;
247+ writer . write ( 'a' )
248+ . then ( common . mustCall ( ( ) => { ac . abort ( ) ; } ) ) ;
139249}
140250
141251{
142252 const values = [ ] ;
143253
144- const ws1 = createTestWritableStream ( values ) ;
145- const ws2 = createTestWritableStream ( values ) ;
254+ const [ ws1 , controller1 ] = createTestWritableStream ( values ) ;
255+ const [ ws2 , controller2 ] = createTestWritableStream ( values ) ;
146256
147257 const ac = new AbortController ( ) ;
148258
@@ -152,17 +262,29 @@ function createTestWritableStream(values) {
152262 const writer1 = ws1 . getWriter ( ) ;
153263 const writer2 = ws2 . getWriter ( ) ;
154264
155- finished ( ws1 , common . mustCall ( ( err ) => {
156- assert . strictEqual ( err . name , 'AbortError' ) ;
157- assert . rejects ( writer1 . write ( 'a' ) , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
158- assert . rejects ( writer1 . closed , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
159- } ) ) ;
265+ const additionalAssertions = common . mustCall ( ( ) => {
266+ assert . deepStrictEqual ( values , [ ] ) ;
267+ } , 2 ) ;
268+ assertWritableStreamEventuallyAborted ( ws1 , writer1 , { controller : controller1 , additionalAssertions } ) ;
269+ assertWritableStreamEventuallyAborted ( ws2 , writer2 , { controller : controller2 , additionalAssertions } ) ;
160270
161- finished ( ws2 , common . mustCall ( ( err ) => {
162- assert . strictEqual ( err . name , 'AbortError' ) ;
163- assert . rejects ( writer2 . write ( 'a' ) , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
164- assert . rejects ( writer2 . closed , / A b o r t E r r o r / ) . then ( common . mustCall ( ) ) ;
165- } ) ) ;
271+ ac . abort ( ) ;
272+ }
273+
274+ {
275+ /** @member {import('internal/webstreams/writablestream').WritableStreamDefaultController} */
276+ let controller ;
277+ const ws = new WritableStream ( {
278+ start ( c ) { controller = c ; } ,
279+ } ) ;
280+ const ac = new AbortController ( ) ;
281+ addAbortSignal ( ac . signal , ws ) ;
282+
283+ controller . abort = common . mustNotCall ( 'addAbortSignal() must not call an overridden controller.abort()' ) ;
284+ controller . error = common . mustNotCall ( 'addAbortSignal() must not call an overridden controller.error()' ) ;
285+
286+ const writer = ws . getWriter ( ) ;
287+ assertWritableStreamEventuallyAborted ( ws , writer ) ;
166288
167289 ac . abort ( ) ;
168290}
0 commit comments