Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 2 additions & 21 deletions src/puter-js/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import { PuterJSFileSystemModule } from './modules/FileSystem/index.js';
import FSItem from './modules/FSItem.js';
import Hosting from './modules/Hosting.js';
import KV from './modules/KV.js';
import { PSocket } from './modules/networking/PSocket.js';
import { PTLSSocket } from './modules/networking/PTLS.js';
import { pFetch } from './modules/networking/requests.js';
import { netAPI } from './modules/networking/index.js';
import OS from './modules/OS.js';
import Perms from './modules/Perms.js';
import UI from './modules/UI.js';
Expand Down Expand Up @@ -411,24 +409,7 @@ const puterInit = (function () {
// Flag that indicates if a request to `/rao` has been made
this.rao_requested_ = false;

this.net = {
generateWispV1URL: async () => {
const { token: wispToken, server: wispServer } = (await (await fetch(`${this.APIOrigin }/wisp/relay-token/create`, {
method: 'POST',
headers: {
Authorization: `Bearer ${this.authToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({}),
})).json());
return `${wispServer}/${wispToken}/`;
},
Socket: PSocket,
tls: {
TLSSocket: PTLSSocket,
},
fetch: pFetch,
};
this.net = netAPI;

this.workers = new WorkersHandler(this.authToken);

Expand Down
321 changes: 250 additions & 71 deletions src/puter-js/src/modules/networking/PSocket.js
Original file line number Diff line number Diff line change
@@ -1,87 +1,266 @@
import EventListener from '../../lib/EventListener.js';
import { errors } from './parsers.js';
import { PWispHandler } from './PWispHandler.js';
const texten = new TextEncoder();
const requireAuth = false; // for initial launch
import { clearEpoxyClientCache, getEpoxyClient } from './index.js';

export let wispInfo = {
server: 'wss://puter.cafe/', // Unused currently
handler: undefined,
};
const textEncoder = new TextEncoder();

function normalizeWriteData (data) {
if ( typeof data === 'string' ) {
return textEncoder.encode(data);
}

if ( data instanceof ArrayBuffer ) {
return new Uint8Array(data);
}

if ( ArrayBuffer.isView(data) ) {
return new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
}

throw new Error('Invalid data type (not TypedArray, ArrayBuffer or String).');
}

function normalizeErrorReason (reason) {
if ( reason instanceof Error ) {
return reason.message;
}

return String(reason);
}

export class PSocket extends EventListener {
_events = new Map();
_streamID;
constructor (host, port) {
#host;
#port;
#useTls;

#reader;
#writer;

#open = false;
#closing = false;
#closed = false;
#pendingWrites = [];

constructor (host, port, options = {}) {
super(['data', 'drain', 'open', 'error', 'close', 'tlsdata', 'tlsopen', 'tlsclose']);

(async () => {
if ( !puter.authToken && puter.env === 'web' && requireAuth ) {
try {
await puter.ui.authenticateWithPuter();
this.#host = host;
this.#port = Number(port);
this.#useTls = Boolean(options.tls);

} catch (e) {
// if authentication fails, throw an error
throw (e);
}
}
if ( ! wispInfo.handler ) {
// first launch -- lets init the socket
const { token: wispToken, server: wispServer } = (await (await fetch(`${puter.APIOrigin }/wisp/relay-token/create`, {
method: 'POST',
headers: {
Authorization: puter.authToken ? `Bearer ${puter.authToken}` : '',
'Content-Type': 'application/json',
},
body: JSON.stringify({}),
})).json());

wispInfo.handler = new PWispHandler(wispServer, wispToken);
// Wait for websocket to fully open
await new Promise((res, req) => {
wispInfo.handler.onReady = res;
});
}
void this.#connect();
}

on (event, callback) {
if ( this.#useTls && (event === 'open' || event === 'data' || event === 'close') ) {
return super.on(`tls${event}`, callback);
}

const callbacks = {
dataCallBack: (data) => {
this.emit('data', data);
},
closeCallBack: (reason) => {
if ( reason !== 0x02 ) {
this.emit('error', new Error(errors[reason]));
this.emit('close', true);
return;
}
this.emit('close', false);
},
};

this._streamID = wispInfo.handler.register(host, port, callbacks);
setTimeout(() => {
this.emit('open', undefined);
}, 0);

})();
return super.on(event, callback);
}

addListener (...args) {
this.on(...args);
return this.on(...args);
}

write (data, callback) {
if ( data.buffer ) { // TypedArray
wispInfo.handler.write(this._streamID, data);
if ( callback ) callback();
} else if ( data.resize ) { // ArrayBuffer
data.write(this._streamID, new Uint8Array(data));
if ( callback ) callback();
} else if ( typeof (data) === 'string' ) {
wispInfo.handler.write(this._streamID, texten.encode(data));
if ( callback ) callback();
} else {
throw new Error('Invalid data type (not TypedArray, ArrayBuffer or String!!)');
const payload = normalizeWriteData(data);

if ( this.#closed ) {
throw new Error('Socket is already closed.');
}

if ( ! this.#writer ) {
this.#pendingWrites.push({ payload, callback });
return;
}

void this.#writePayload(payload, callback);
}

close () {
wispInfo.handler.close(this._streamID);
if ( this.#closing || this.#closed ) {
return;
}

this.#closing = true;
void this.#closeStreams(false);
}

async #connect () {
try {
await this.#connectWithClient(false);
} catch {
try {
await this.#connectWithClient(true);
} catch ( retryError ) {
clearEpoxyClientCache();
this.#emitErrorAndClose(retryError);
}
}
}

async #connectWithClient (refresh) {
if ( this.#closing || this.#closed ) {
return;
}

const client = await getEpoxyClient({ refresh });
const stream = await this.#openStream(client);

if ( this.#closing || this.#closed ) {
try {
await stream.read.cancel();
} catch {
// ignored
}
try {
await stream.write.abort();
} catch {
// ignored
}
return;
}

this.#reader = stream.read.getReader();
this.#writer = stream.write.getWriter();
this.#open = true;

this.emit(this.#eventName('open'));
await this.#flushPendingWrites();
void this.#readLoop();
}

async #openStream (client) {
if ( this.#useTls ) {
return await client.connectTls(this.#host, this.#port);
}

return await client.connect(this.#host, this.#port);
}

async #flushPendingWrites () {
while ( this.#pendingWrites.length && !this.#closed && !this.#closing ) {
const { payload, callback } = this.#pendingWrites.shift();
await this.#writePayload(payload, callback);
}
}

async #writePayload (payload, callback) {
if ( !this.#writer || this.#closed || this.#closing ) {
return;
}

try {
await this.#writer.write(payload);
if ( callback ) {
callback();
}
} catch ( error ) {
clearEpoxyClientCache();
this.#emitErrorAndClose(error);
}
}

async #readLoop () {
if ( ! this.#reader ) {
return;
}

try {
while ( !this.#closing && !this.#closed ) {
const { done, value } = await this.#reader.read();
if ( done ) {
break;
}

if ( value ) {
this.emit(this.#eventName('data'), value);
}
}

this.#emitClose(false);
} catch ( error ) {
if ( this.#closing ) {
this.#emitClose(false);
} else {
clearEpoxyClientCache();
this.#emitErrorAndClose(error);
}
} finally {
try {
this.#reader.releaseLock();
} catch {
// ignored
}
}
}

async #closeStreams (hadError) {
this.#pendingWrites = [];

if ( ! this.#open ) {
this.#emitClose(hadError);
return;
}

try {
if ( this.#reader ) {
await this.#reader.cancel();
}
} catch {
// ignored
}

try {
if ( this.#writer ) {
await this.#writer.close();
}
} catch {
// ignored
}

try {
if ( this.#writer ) {
this.#writer.releaseLock();
}
} catch {
// ignored
}

this.#open = false;
this.#emitClose(hadError);
}

#emitErrorAndClose (reason) {
if ( this.#closed ) {
return;
}

this.emit('error', normalizeErrorReason(reason));
this.#closing = true;
void this.#closeStreams(true);
}

#emitClose (hadError) {
if ( this.#closed ) {
return;
}

this.#closed = true;
this.emit(this.#eventName('close'), Boolean(hadError));
}

#eventName (event) {
if ( this.#useTls && (event === 'open' || event === 'data' || event === 'close') ) {
return `tls${event}`;
}

return event;
}
}

export class PTLSSocket extends PSocket {
constructor (host, port) {
super(host, port, { tls: true });
}
}
}
Loading
Loading