moving from readable-streams to streamx#42
Conversation
| this._closed = true | ||
| binding.utp_napi_close(this._handle) | ||
| } else { | ||
| for (const conn of this.connections) { |
There was a problem hiding this comment.
I didn't understand how these lines could be missing: When you close the server, any open connections are supposed to be closed, right?
There was a problem hiding this comment.
No, each connection must be closed individually
There was a problem hiding this comment.
The instances created in the tests are not properly torn down then, causing a lot of weird errors happen when I tried to remove this. Checking the tests for "why they fail because this is removed" is the reason this takes to long for me to continue.
| err = uv_udp_recv_stop(&(self->handle)); | ||
| if (err < 0) UTP_NAPI_THROW(err) | ||
|
|
||
| uv_close((uv_handle_t *) &(self->handle), on_uv_close); |
There was a problem hiding this comment.
This handle is now closed serially. If I leave his in parallel, it causes a segfault. Probably a timing error, but I couldn't figure out its cause.
| module.exports = class Connection extends streamx.Duplex { | ||
| constructor (utp, port, address, handle, halfOpen) { | ||
| super({ | ||
| mapWritable: Buffer.from |
There was a problem hiding this comment.
This copies all buffers as Buffer.from(buf) is a copy. Check if it's a string an only convert it then.
There was a problem hiding this comment.
This was a quick fix that I forgot to remove, thanks for reminding. Thinking about it though: it may be even better to add two options to UTP: mapWritable and mapReadable with defaults going to a String → Buffer conversion? Related: mafintosh/streamx#47
| } | ||
|
|
||
| this.once('error', unregister) | ||
| this.once('close', unregister) |
There was a problem hiding this comment.
I originally had this in _destroy but while debugging ran into a problem where _destroy doesn't seemed to have been called. Don't remember anymore what that was or in which combination it occurred. Can't reproduce at the moment.
| function unregister () { | ||
| this.off('error', unregister) | ||
| this.off('close', unregister) | ||
| process.nextTick(() => { |
There was a problem hiding this comment.
I am not sure I remember this right but what I remember: the error, close triggering too quickly caused some chance of binding.utp_napi_close to be called before binding.utp_close finished.. or something alike. Preventing a proper shutdown of the UTP instance.
| } | ||
| process.nextTick(() => this.emit('connect')) | ||
| initCb() | ||
| } |
There was a problem hiding this comment.
the old plumbing where the _write method waiting for connect seems simpler imo, can we leave that?
|
|
||
| if (bufs.length > 256) return this._write(Buffer.concat(bufs), null, cb) | ||
| _write (data, cb) { | ||
| if (this.destroyed) return |
There was a problem hiding this comment.
destroyed writes don't happen in streamx :)
| cb(null) | ||
| } | ||
| _writev (datas, cb) { | ||
| if (this.destroyed) return |
|
Thoroughly exhaused about this PR for a while. I will not touch it for the time being, if someone else wants to give it a shot: please be my guest. |
|
|
||
| module.exports = UTP | ||
|
|
||
| const EMPTY = Buffer.alloc(0) |
There was a problem hiding this comment.
Oh my , i readded the EMPTY constant at the wrong place. Should be just as before. (memo)
|
Any thoughts on using async iterator instead? for await (const chunk of stream) {
// yield chunk.toString()
} |
|
The thing about streamx is that it supports AsyncIterator and streams: https://github.com/streamxorg/streamx/blob/7fae781dd08f7cb12ee7c5f41803cfd876846fa5/index.js#L689 |
|
I retract my comment, didn't look into the package source code (it's a duplex stream) making it even more complicated I think you should just simply depend on |
Moving utp-native to work with streamx. This PR is motivated to have less dependencies in the hypercore-stack.