Skip to content

moving from readable-streams to streamx#42

Draft
martinheidegger wants to merge 3 commits into
mafintosh:masterfrom
martinheidegger:streamx
Draft

moving from readable-streams to streamx#42
martinheidegger wants to merge 3 commits into
mafintosh:masterfrom
martinheidegger:streamx

Conversation

@martinheidegger
Copy link
Copy Markdown

Moving utp-native to work with streamx. This PR is motivated to have less dependencies in the hypercore-stack.

Comment thread binding.cc Outdated
Comment thread index.js
this._closed = true
binding.utp_napi_close(this._handle)
} else {
for (const conn of this.connections) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand how these lines could be missing: When you close the server, any open connections are supposed to be closed, right?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, each connection must be closed individually

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Same as tcp)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instances created in the tests are not properly torn down then, causing a lot of weird errors happen when I tried to remove this. Checking the tests for "why they fail because this is removed" is the reason this takes to long for me to continue.

Comment thread binding.cc
err = uv_udp_recv_stop(&(self->handle));
if (err < 0) UTP_NAPI_THROW(err)

uv_close((uv_handle_t *) &(self->handle), on_uv_close);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handle is now closed serially. If I leave his in parallel, it causes a segfault. Probably a timing error, but I couldn't figure out its cause.

Comment thread package.json
Comment thread lib/connection.js
module.exports = class Connection extends streamx.Duplex {
constructor (utp, port, address, handle, halfOpen) {
super({
mapWritable: Buffer.from
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This copies all buffers as Buffer.from(buf) is a copy. Check if it's a string an only convert it then.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a quick fix that I forgot to remove, thanks for reminding. Thinking about it though: it may be even better to add two options to UTP: mapWritable and mapReadable with defaults going to a String → Buffer conversion? Related: mafintosh/streamx#47

Comment thread lib/connection.js
}

this.once('error', unregister)
this.once('close', unregister)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this in _destroy instead.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had this in _destroy but while debugging ran into a problem where _destroy doesn't seemed to have been called. Don't remember anymore what that was or in which combination it occurred. Can't reproduce at the moment.

Comment thread lib/connection.js
function unregister () {
this.off('error', unregister)
this.off('close', unregister)
process.nextTick(() => {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is nextTick needed?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I remember this right but what I remember: the error, close triggering too quickly caused some chance of binding.utp_napi_close to be called before binding.utp_close finished.. or something alike. Preventing a proper shutdown of the UTP instance.

Comment thread lib/connection.js
}
process.nextTick(() => this.emit('connect'))
initCb()
}
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old plumbing where the _write method waiting for connect seems simpler imo, can we leave that?

Comment thread lib/connection.js

if (bufs.length > 256) return this._write(Buffer.concat(bufs), null, cb)
_write (data, cb) {
if (this.destroyed) return
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destroyed writes don't happen in streamx :)

Comment thread lib/connection.js
cb(null)
}
_writev (datas, cb) {
if (this.destroyed) return
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@martinheidegger
Copy link
Copy Markdown
Author

Thoroughly exhaused about this PR for a while. I will not touch it for the time being, if someone else wants to give it a shot: please be my guest.

@martinheidegger martinheidegger marked this pull request as draft May 25, 2021 07:18
Comment thread index.js

module.exports = UTP

const EMPTY = Buffer.alloc(0)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh my , i readded the EMPTY constant at the wrong place. Should be just as before. (memo)

@jimmywarting
Copy link
Copy Markdown

jimmywarting commented Jun 21, 2021

Any thoughts on using async iterator instead?

for await (const chunk of stream) {
  // yield chunk.toString()
}

@martinheidegger
Copy link
Copy Markdown
Author

The thing about streamx is that it supports AsyncIterator and streams: https://github.com/streamxorg/streamx/blob/7fae781dd08f7cb12ee7c5f41803cfd876846fa5/index.js#L689

@jimmywarting
Copy link
Copy Markdown

jimmywarting commented Jun 21, 2021

I retract my comment, didn't look into the package source code (it's a duplex stream) making it even more complicated

I think you should just simply depend on node:stream instead... it's always going to be up to date

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants