diff --git a/src/puter-js/src/index.js b/src/puter-js/src/index.js index 1287e2d78d..03e6ab218a 100644 --- a/src/puter-js/src/index.js +++ b/src/puter-js/src/index.js @@ -13,9 +13,7 @@ import { PuterJSFileSystemModule } from './modules/FileSystem/index.js'; import FSItem from './modules/FSItem.js'; import Hosting from './modules/Hosting.js'; import KV from './modules/KV.js'; -import { PSocket } from './modules/networking/PSocket.js'; -import { PTLSSocket } from './modules/networking/PTLS.js'; -import { pFetch } from './modules/networking/requests.js'; +import { netAPI } from './modules/networking/index.js'; import OS from './modules/OS.js'; import Perms from './modules/Perms.js'; import UI from './modules/UI.js'; @@ -411,24 +409,7 @@ const puterInit = (function () { // Flag that indicates if a request to `/rao` has been made this.rao_requested_ = false; - this.net = { - generateWispV1URL: async () => { - const { token: wispToken, server: wispServer } = (await (await fetch(`${this.APIOrigin }/wisp/relay-token/create`, { - method: 'POST', - headers: { - Authorization: `Bearer ${this.authToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({}), - })).json()); - return `${wispServer}/${wispToken}/`; - }, - Socket: PSocket, - tls: { - TLSSocket: PTLSSocket, - }, - fetch: pFetch, - }; + this.net = netAPI; this.workers = new WorkersHandler(this.authToken); diff --git a/src/puter-js/src/modules/networking/PSocket.js b/src/puter-js/src/modules/networking/PSocket.js index fe03461d0d..9beb5a9d65 100644 --- a/src/puter-js/src/modules/networking/PSocket.js +++ b/src/puter-js/src/modules/networking/PSocket.js @@ -1,87 +1,266 @@ import EventListener from '../../lib/EventListener.js'; -import { errors } from './parsers.js'; -import { PWispHandler } from './PWispHandler.js'; -const texten = new TextEncoder(); -const requireAuth = false; // for initial launch +import { clearEpoxyClientCache, getEpoxyClient } from './index.js'; -export let wispInfo = { - server: 'wss://puter.cafe/', // Unused currently - handler: undefined, -}; +const textEncoder = new TextEncoder(); + +function normalizeWriteData (data) { + if ( typeof data === 'string' ) { + return textEncoder.encode(data); + } + + if ( data instanceof ArrayBuffer ) { + return new Uint8Array(data); + } + + if ( ArrayBuffer.isView(data) ) { + return new Uint8Array(data.buffer, data.byteOffset, data.byteLength); + } + + throw new Error('Invalid data type (not TypedArray, ArrayBuffer or String).'); +} + +function normalizeErrorReason (reason) { + if ( reason instanceof Error ) { + return reason.message; + } + + return String(reason); +} export class PSocket extends EventListener { - _events = new Map(); - _streamID; - constructor (host, port) { + #host; + #port; + #useTls; + + #reader; + #writer; + + #open = false; + #closing = false; + #closed = false; + #pendingWrites = []; + + constructor (host, port, options = {}) { super(['data', 'drain', 'open', 'error', 'close', 'tlsdata', 'tlsopen', 'tlsclose']); - (async () => { - if ( !puter.authToken && puter.env === 'web' && requireAuth ) { - try { - await puter.ui.authenticateWithPuter(); + this.#host = host; + this.#port = Number(port); + this.#useTls = Boolean(options.tls); - } catch (e) { - // if authentication fails, throw an error - throw (e); - } - } - if ( ! wispInfo.handler ) { - // first launch -- lets init the socket - const { token: wispToken, server: wispServer } = (await (await fetch(`${puter.APIOrigin }/wisp/relay-token/create`, { - method: 'POST', - headers: { - Authorization: puter.authToken ? `Bearer ${puter.authToken}` : '', - 'Content-Type': 'application/json', - }, - body: JSON.stringify({}), - })).json()); - - wispInfo.handler = new PWispHandler(wispServer, wispToken); - // Wait for websocket to fully open - await new Promise((res, req) => { - wispInfo.handler.onReady = res; - }); - } + void this.#connect(); + } + + on (event, callback) { + if ( this.#useTls && (event === 'open' || event === 'data' || event === 'close') ) { + return super.on(`tls${event}`, callback); + } - const callbacks = { - dataCallBack: (data) => { - this.emit('data', data); - }, - closeCallBack: (reason) => { - if ( reason !== 0x02 ) { - this.emit('error', new Error(errors[reason])); - this.emit('close', true); - return; - } - this.emit('close', false); - }, - }; - - this._streamID = wispInfo.handler.register(host, port, callbacks); - setTimeout(() => { - this.emit('open', undefined); - }, 0); - - })(); + return super.on(event, callback); } + addListener (...args) { - this.on(...args); + return this.on(...args); } + write (data, callback) { - if ( data.buffer ) { // TypedArray - wispInfo.handler.write(this._streamID, data); - if ( callback ) callback(); - } else if ( data.resize ) { // ArrayBuffer - data.write(this._streamID, new Uint8Array(data)); - if ( callback ) callback(); - } else if ( typeof (data) === 'string' ) { - wispInfo.handler.write(this._streamID, texten.encode(data)); - if ( callback ) callback(); - } else { - throw new Error('Invalid data type (not TypedArray, ArrayBuffer or String!!)'); + const payload = normalizeWriteData(data); + + if ( this.#closed ) { + throw new Error('Socket is already closed.'); + } + + if ( ! this.#writer ) { + this.#pendingWrites.push({ payload, callback }); + return; } + + void this.#writePayload(payload, callback); } + close () { - wispInfo.handler.close(this._streamID); + if ( this.#closing || this.#closed ) { + return; + } + + this.#closing = true; + void this.#closeStreams(false); + } + + async #connect () { + try { + await this.#connectWithClient(false); + } catch { + try { + await this.#connectWithClient(true); + } catch ( retryError ) { + clearEpoxyClientCache(); + this.#emitErrorAndClose(retryError); + } + } + } + + async #connectWithClient (refresh) { + if ( this.#closing || this.#closed ) { + return; + } + + const client = await getEpoxyClient({ refresh }); + const stream = await this.#openStream(client); + + if ( this.#closing || this.#closed ) { + try { + await stream.read.cancel(); + } catch { + // ignored + } + try { + await stream.write.abort(); + } catch { + // ignored + } + return; + } + + this.#reader = stream.read.getReader(); + this.#writer = stream.write.getWriter(); + this.#open = true; + + this.emit(this.#eventName('open')); + await this.#flushPendingWrites(); + void this.#readLoop(); + } + + async #openStream (client) { + if ( this.#useTls ) { + return await client.connectTls(this.#host, this.#port); + } + + return await client.connect(this.#host, this.#port); + } + + async #flushPendingWrites () { + while ( this.#pendingWrites.length && !this.#closed && !this.#closing ) { + const { payload, callback } = this.#pendingWrites.shift(); + await this.#writePayload(payload, callback); + } + } + + async #writePayload (payload, callback) { + if ( !this.#writer || this.#closed || this.#closing ) { + return; + } + + try { + await this.#writer.write(payload); + if ( callback ) { + callback(); + } + } catch ( error ) { + clearEpoxyClientCache(); + this.#emitErrorAndClose(error); + } + } + + async #readLoop () { + if ( ! this.#reader ) { + return; + } + + try { + while ( !this.#closing && !this.#closed ) { + const { done, value } = await this.#reader.read(); + if ( done ) { + break; + } + + if ( value ) { + this.emit(this.#eventName('data'), value); + } + } + + this.#emitClose(false); + } catch ( error ) { + if ( this.#closing ) { + this.#emitClose(false); + } else { + clearEpoxyClientCache(); + this.#emitErrorAndClose(error); + } + } finally { + try { + this.#reader.releaseLock(); + } catch { + // ignored + } + } + } + + async #closeStreams (hadError) { + this.#pendingWrites = []; + + if ( ! this.#open ) { + this.#emitClose(hadError); + return; + } + + try { + if ( this.#reader ) { + await this.#reader.cancel(); + } + } catch { + // ignored + } + + try { + if ( this.#writer ) { + await this.#writer.close(); + } + } catch { + // ignored + } + + try { + if ( this.#writer ) { + this.#writer.releaseLock(); + } + } catch { + // ignored + } + + this.#open = false; + this.#emitClose(hadError); + } + + #emitErrorAndClose (reason) { + if ( this.#closed ) { + return; + } + + this.emit('error', normalizeErrorReason(reason)); + this.#closing = true; + void this.#closeStreams(true); + } + + #emitClose (hadError) { + if ( this.#closed ) { + return; + } + + this.#closed = true; + this.emit(this.#eventName('close'), Boolean(hadError)); + } + + #eventName (event) { + if ( this.#useTls && (event === 'open' || event === 'data' || event === 'close') ) { + return `tls${event}`; + } + + return event; + } +} + +export class PTLSSocket extends PSocket { + constructor (host, port) { + super(host, port, { tls: true }); } -} \ No newline at end of file +} diff --git a/src/puter-js/src/modules/networking/PTLS.js b/src/puter-js/src/modules/networking/PTLS.js deleted file mode 100644 index 0a25b79186..0000000000 --- a/src/puter-js/src/modules/networking/PTLS.js +++ /dev/null @@ -1,111 +0,0 @@ -/** - * This file uses https://github.com/MercuryWorkshop/rustls-wasm authored by GitHub:@r58Playz under the MIT License - */ - -import { PSocket } from './PSocket.js'; - -let rustls = undefined; - -export class PTLSSocket extends PSocket { - constructor (...args) { - super(...args); - super.on('open', (async () => { - if ( ! rustls ) { - // Safari exists unfortunately without good ReadableStream support. Until that is fixed we need this. - if ( ! globalThis.ReadableByteStreamController ) { - await import( /* webpackIgnore: true */ 'https://unpkg.com/web-streams-polyfill@3.0.2/dist/polyfill.js'); - } - rustls = (await import( /* webpackIgnore: true */ 'https://puter-net.b-cdn.net/rustls.js')); - await rustls.default('https://puter-net.b-cdn.net/rustls.wasm'); - } - - let cancelled = false; - const readable = new ReadableStream({ - /** - * - * @param {ReadableStreamDefaultController} controller - */ - start: (controller) => { - super.on('data', (data) => { - controller.enqueue(data.buffer); - }); - super.on('close', () => { - if ( ! cancelled ) - { - controller.close(); - } - }); - - }, - pull: (controller) => { - - }, - cancel: () => { - cancelled = true; - }, - - }); - - const writable = new WritableStream({ - write: (chunk) => { - super.write(chunk); - }, - abort: () => { - super.close(); - }, - close: () => { - super.close(); - }, - }); - - let read, write; - try { - const TLSConnnection = await rustls.connect_tls(readable, writable, args[0]); - read = TLSConnnection.read; - write = TLSConnnection.write; - } catch (e) { - this.emit('error', new Error(`TLS Handshake failed: ${ e}`)); - return; - } - - this.writer = write.getWriter(); - // writer.write("GET / HTTP/1.1\r\nHost: google.com\r\n\r\n"); - let reader = read.getReader(); - let done = false; - this.emit('tlsopen', undefined); - try { - while ( !done ) { - const { done: readerDone, value } = await reader.read(); - done = readerDone; - if ( ! done ) { - this.emit('tlsdata', value); - } - } - this.emit('tlsclose', false); - } catch (e) { - this.emit('error', e); - this.emit('tlsclose', true); - } - - })); - } - on (event, callback) { - if ( event === 'data' || event === 'open' || event === 'close' ) { - return super.on(`tls${ event}`, callback); - } else { - return super.on(event, callback); - } - } - write (data, callback) { - if ( data.buffer ) { // TypedArray - this.writer.write(data.slice(0).buffer).then(callback); - } else if ( data.resize ) { // ArrayBuffer - this.writer.write(data).then(callback); - } else if ( typeof (data) === 'string' ) { - this.writer.write(data).then(callback); - } else { - throw new Error('Invalid data type (not TypedArray, ArrayBuffer or String!!)'); - } - } - -} \ No newline at end of file diff --git a/src/puter-js/src/modules/networking/PWispHandler.js b/src/puter-js/src/modules/networking/PWispHandler.js deleted file mode 100644 index 2d8d530d3d..0000000000 --- a/src/puter-js/src/modules/networking/PWispHandler.js +++ /dev/null @@ -1,91 +0,0 @@ -import { CLOSE, CONNECT, DATA, CONTINUE, INFO, TCP, UDP, createWispPacket, parseIncomingPacket, textde } from './parsers.js'; - -export class PWispHandler { - _ws; - _nextStreamID = 1; - _bufferMax; - onReady = undefined; - streamMap = new Map(); - constructor (wispURL, puterAuth) { - const setup = () => { - this._ws = new WebSocket(wispURL); - this._ws.binaryType = 'arraybuffer'; - this._ws.onmessage = (event) => { - const parsed = parseIncomingPacket(new Uint8Array(event.data)); - switch ( parsed.packetType ) { - case DATA: - this.streamMap.get(parsed.streamID).dataCallBack(parsed.payload.slice(0)); // return a copy for the user to do as they please - break; - case CONTINUE: - if ( parsed.streamID === 0 ) { - this._bufferMax = parsed.remainingBuffer; - this._ws.onclose = () => { - setTimeout(setup(), 1000); - }; - if ( this.onReady ) { - this.onReady(); - } - return; - } - this.streamMap.get(parsed.streamID).buffer = parsed.remainingBuffer; - this._continue(); - break; - case CLOSE: - if ( parsed.streamID !== 0 ) - { - this.streamMap.get(parsed.streamID).closeCallBack(parsed.reason); - } - break; - case INFO: - puterAuth && this._ws.send(createWispPacket({ - packetType: INFO, - streamID: 0, - puterAuth, - })); - break; - } - }; - }; - setup(); - } - _continue (streamID) { - const queue = this.streamMap.get(streamID).queue; - for ( let i = 0; i < queue.length; i++ ) { - this.write(streamID, queue.shift()); - } - } - register (host, port, callbacks) { - const streamID = this._nextStreamID++; - this.streamMap.set(streamID, { queue: [], streamID, buffer: this._bufferMax, dataCallBack: callbacks.dataCallBack, closeCallBack: callbacks.closeCallBack }); - this._ws.send(createWispPacket({ - packetType: CONNECT, - streamType: TCP, - streamID: streamID, - hostname: host, - port: port, - })); - return streamID; - } - - write (streamID, data) { - const streamData = this.streamMap.get(streamID); - if ( streamData.buffer > 0 ) { - streamData.buffer--; - - this._ws.send(createWispPacket({ - packetType: DATA, - streamID: streamID, - payload: data, - })); - } else { - streamData.queue.push(data); - } - } - close (streamID) { - this._ws.send(createWispPacket({ - packetType: CLOSE, - streamID: streamID, - reason: 0x02, - })); - } -} \ No newline at end of file diff --git a/src/puter-js/src/modules/networking/epoxy.js b/src/puter-js/src/modules/networking/epoxy.js new file mode 100644 index 0000000000..d8860b1427 --- /dev/null +++ b/src/puter-js/src/modules/networking/epoxy.js @@ -0,0 +1,90 @@ +let EPOXY_BASE = 'https://puter-net.b-cdn.net/epoxy/7fbb05b'; +let epoxyRuntimePromise; + +const textEncoder = new TextEncoder(); + +async function getEpoxyRuntime () { + if ( epoxyRuntimePromise ) { + return await epoxyRuntimePromise; + } + + epoxyRuntimePromise = (async () => { + const base = EPOXY_BASE; + const runtime = await import(/* webpackIgnore: true */ `${base}/full.js`); + const wasmResponse = await fetch(`${base}/full.wasm`); + if ( ! wasmResponse.ok ) { + throw new Error( + `Failed to load epoxy wasm (HTTP ${wasmResponse.status} ${wasmResponse.statusText}).`, + ); + } + await runtime.init({ module_or_path: wasmResponse }); + return runtime; + })(); + + try { + return await epoxyRuntimePromise; + } catch ( error ) { + epoxyRuntimePromise = undefined; + throw error; + } +} + +function createPuterPasswordBuilder (runtime, wispToken) { + class PuterPasswordExt extends runtime.JsProtocolExtension { + constructor (required, toSend) { + super(0x02, [], []); + this.required = required; + this.toSend = toSend; + } + + encode () { + if ( ! this.toSend ) { + return new Uint8Array(); + } + + const [_user, _pw] = this.toSend; + const user = textEncoder.encode(_user); + const pw = textEncoder.encode(_pw); + + const buffer = new Uint8Array(3 + user.byteLength + pw.byteLength); + buffer[0] = user.byteLength; + new DataView(buffer.buffer).setUint16(1, pw.byteLength, true); + buffer.set(user, 3); + buffer.set(pw, 3 + user.byteLength); + + return buffer; + } + } + + class PuterPasswordExtBuilder extends runtime.JsProtocolExtensionBuilder { + constructor (toSend) { + super(0x02); + this.toSend = toSend; + } + + buildFromBytes (bytes) { + return new PuterPasswordExt(bytes[0] !== 0); + } + + buildToExtension () { + return new PuterPasswordExt(undefined, this.toSend); + } + } + + return new PuterPasswordExtBuilder(['', wispToken]); +} + +export let initEpoxy = async ({ wispToken, wispServer }) => { + const runtime = await getEpoxyRuntime(); + + const provider = new runtime.WispSocketProvider( + new runtime.WebSocketJsProvider(), + wispServer, + () => [ + { builders: [createPuterPasswordBuilder(runtime, wispToken)] }, + [0x02], + ], + ); + + return new runtime.EpoxyClient(provider); +}; diff --git a/src/puter-js/src/modules/networking/index.js b/src/puter-js/src/modules/networking/index.js new file mode 100644 index 0000000000..254d317fa1 --- /dev/null +++ b/src/puter-js/src/modules/networking/index.js @@ -0,0 +1,99 @@ +import { initEpoxy } from './epoxy.js'; +import { PSocket, PTLSSocket } from './PSocket.js'; +import { pFetch } from './requests.js'; + +let cachedEpoxy = undefined; + +function getPuterInstance () { + const puter = globalThis.puter; + if ( ! puter ) { + throw new Error('Puter runtime is not initialized yet.'); + } + return puter; +} + +function getWispRequestHeaders () { + const puter = getPuterInstance(); + const headers = { + 'Content-Type': 'application/json', + }; + + if ( puter.authToken ) { + headers.Authorization = `Bearer ${puter.authToken}`; + } + + return headers; +} + +function getClientCacheKey () { + const puter = getPuterInstance(); + return `${puter.APIOrigin}::${puter.authToken || ''}`; +} + +export async function getWispCredentials () { + const puter = getPuterInstance(); + const response = await fetch(`${puter.APIOrigin}/wisp/relay-token/create`, { + method: 'POST', + headers: getWispRequestHeaders(), + body: JSON.stringify({}), + }); + + if ( ! response.ok ) { + throw new Error( + `Failed to create relay token (HTTP ${response.status} ${response.statusText}).`, + ); + } + + const { token: wispToken, server: wispServer } = await response.json(); + if ( !wispToken || !wispServer ) { + throw new Error('Relay token endpoint returned an invalid response.'); + } + + return { wispToken, wispServer }; +} + +export async function generateWispV1URL () { + const { wispServer, wispToken } = await getWispCredentials(); + return `${wispServer}/${wispToken}/`; +} + +export async function getEpoxyClient ({ refresh = false } = {}) { + if ( cachedEpoxy && cachedEpoxy.initting ) return await cachedEpoxy.promise; + + const nextKey = getClientCacheKey(); + if ( refresh || !(cachedEpoxy && cachedEpoxy.key === nextKey) ) { + let epoxy = { key: nextKey, initting: true }; + let promise = (async () => { + try { + const { wispToken, wispServer } = await getWispCredentials(); + let ret = await initEpoxy({ wispToken, wispServer }); + epoxy.initting = false; + return ret; + } catch { + if ( cachedEpoxy === epoxy ) { + cachedEpoxy = undefined; + } + } + })(); + epoxy.promise = promise; + + cachedEpoxy = epoxy; + } + + return await cachedEpoxy.promise; +} + +export function clearEpoxyClientCache () { + cachedEpoxy = undefined; +} + +export let netAPI = { + async generateWispV1URL () { + return await generateWispV1URL(); + }, + Socket: PSocket, + tls: { + TLSSocket: PTLSSocket, + }, + fetch: pFetch, +}; diff --git a/src/puter-js/src/modules/networking/parsers.js b/src/puter-js/src/modules/networking/parsers.js deleted file mode 100644 index f2f53ccd82..0000000000 --- a/src/puter-js/src/modules/networking/parsers.js +++ /dev/null @@ -1,159 +0,0 @@ -/* eslint-disable no-unreachable */ -/* eslint-disable no-case-declarations */ -// PACKET TYPES -export const CONNECT = 0x01; -export const DATA = 0x02; -export const CONTINUE = 0x03; -export const CLOSE = 0x04; -export const INFO = 0x05; - -// STREAM TYPES -export const TCP = 0x01; -export const UDP = 0x02; - -// Frequently used objects -export const textde = new TextDecoder(); -const texten = new TextEncoder(); -export const errors = { - 0x01: 'Reason unspecified or unknown. Returning a more specific reason should be preferred.' - , 0x03: 'Unexpected stream closure due to a network error.' - , 0x41: 'Stream creation failed due to invalid information. This could be sent if the destination was a reserved address or the port is invalid.' - , 0x42: 'Stream creation failed due to an unreachable destination host. This could be sent if the destination is an domain which does not resolve to anything.' - , 0x43: 'Stream creation timed out due to the destination server not responding.' - , 0x44: 'Stream creation failed due to the destination server refusing the connection.' - , 0x47: 'TCP data transfer timed out.' - , 0x48: 'Stream destination address/domain is intentionally blocked by the proxy server.' - , 0x49: 'Connection throttled by the server.', -}; - -/** - * @typedef {{packetType: number, streamID: number, streamType?: number, port?: number, hostname?: string, payload?: Uint8Array, reason?: number, remainingBuffer?: number}} ParsedWispPacket - */ - -/** - * Parses a wisp packet fully - * - * @param {Uint8Array} data - * @returns {ParsedWispPacket} Packet Info - */ - -export function parseIncomingPacket (data) { - const view = new DataView(data.buffer, data.byteOffset); - const packetType = view.getUint8(0); - const streamID = view.getUint32(1, true); - switch ( packetType ) { // Packet payload starts at Offset 5 - case CONNECT: - const streamType = view.getUint8(5); - const port = view.getUint16(6, true); - const hostname = textde.decode(data.subarray(8, data.length)); - return { packetType, streamID, streamType, port, hostname }; - break; - case DATA: - const payload = data.subarray(5, data.length); - return { packetType, streamID, payload }; - break; - case CONTINUE: - const remainingBuffer = view.getUint32(5, true); - return { packetType, streamID, remainingBuffer }; - break; - case CLOSE: - const reason = view.getUint8(5); - return { packetType, streamID, reason }; - break; - case INFO: - const infoObj = {}; - infoObj['version_major'] = view.getUint8(5); - infoObj['version_minor'] = view.getUint8(6); - - let ptr = 7; - while ( ptr < data.length ) { - const extType = view.getUint8(ptr); - const extLength = view.getUint32(ptr + 1, true); - const payload = data.subarray(ptr + 5, ptr + 5 + extLength); - infoObj[extType] = payload; - ptr += 5 + extLength; - } - return { packetType, streamID, infoObj }; - break; - } -} -/** - * creates a wisp packet fully - * - * @param {ParsedWispPacket} instructions - * @returns {Uint8Array} Constructed Packet - */ - -export function createWispPacket (instructions) { - let size = 5; - switch ( instructions.packetType ) { // Pass 1: determine size of packet - case CONNECT: - instructions.hostEncoded = texten.encode(instructions.hostname); - size += 3 + instructions.hostEncoded.length; - break; - case DATA: - size += instructions.payload.byteLength; - break; - case CONTINUE: - size += 4; - break; - case CLOSE: - size += 1; - break; - case INFO: - size += 2; - if ( instructions.password ) - { - size += 6; - } - if ( instructions.puterAuth ) { - instructions.passwordEncoded = texten.encode(instructions.puterAuth); - size += 8 + instructions.passwordEncoded.length; - } - break; - default: - throw new Error('Not supported'); - } - - let data = new Uint8Array(size); - const view = new DataView(data.buffer); - view.setUint8(0, instructions.packetType); - view.setUint32(1, instructions.streamID, true); - switch ( instructions.packetType ) { // Pass 2: fill out packet - case CONNECT: - view.setUint8(5, instructions.streamType); - view.setUint16(6, instructions.port, true); - data.set(instructions.hostEncoded, 8); - break; - case DATA: - data.set(instructions.payload, 5); - break; - case CONTINUE: - view.setUint32(5, instructions.remainingBuffer, true); - break; - case CLOSE: - view.setUint8(5, instructions.reason); - break; - case INFO: - // WISP 2.0 - view.setUint8(5, 2); - view.setUint8(6, 0); - - if ( instructions.password ) { - // PASSWORD AUTH REQUIRED - view.setUint8(7, 0x02); // Protocol ID (Password) - view.setUint32(8, 1, true); - view.setUint8(12, 0); // Password required? true - } - - if ( instructions.puterAuth ) { - // PASSWORD AUTH REQUIRED - view.setUint8(7, 0x02); // Protocol ID (Password) - view.setUint32(8, 5 + instructions.passwordEncoded.length, true); - view.setUint8(12, 0); - view.setUint16(13, instructions.passwordEncoded.length, true); - data.set(instructions.passwordEncoded, 15); - } - } - return data; -} \ No newline at end of file diff --git a/src/puter-js/src/modules/networking/requests.js b/src/puter-js/src/modules/networking/requests.js index add4cee4fb..df859a48c0 100644 --- a/src/puter-js/src/modules/networking/requests.js +++ b/src/puter-js/src/modules/networking/requests.js @@ -1,271 +1,84 @@ -// SO: https://stackoverflow.com/a/76332760/ under CC BY-SA 4.0 -function mergeUint8Arrays (...arrays) { - const totalSize = arrays.reduce((acc, e) => acc + e.length, 0); - const merged = new Uint8Array(totalSize); +import { clearEpoxyClientCache, getEpoxyClient } from './index.js'; - arrays.forEach((array, i, arrays) => { - const offset = arrays.slice(0, i).reduce((acc, e) => acc + e.length, 0); - merged.set(array, offset); - }); +function logFetchResult ({ params, result, error }) { + if ( ! globalThis.puter?.apiCallLogger?.isEnabled() ) { + return; + } - return merged; + globalThis.puter.apiCallLogger.logRequest({ + service: 'network', + operation: 'pFetch', + params, + result, + error, + }); } -function parseHTTPHead (head) { - const lines = head.split('\r\n'); - - const firstLine = lines.shift().split(' '); - const status = Number(firstLine[1]); - const statusText = firstLine.slice(2).join(' ') || ''; - - const headersArray = []; - for ( const header of lines ) { - const splitHeaders = header.split(': '); - const key = splitHeaders[0]; - const value = splitHeaders.slice(1).join(': '); - headersArray.push([key, value]); +function normalizeErrorMessage (error) { + if ( error instanceof Error ) { + return error.message; } - new Headers(headersArray); - return { headers: new Headers(headersArray), statusText, status }; -} - -// Trivial stream based HTTP 1.1 client -// TODO optional redirect handling - -export function pFetch (...args) { - return new Promise(async (res, rej) => { - try { - const reqObj = new Request(...args); - const parsedURL = new URL(reqObj.url); - let headers = new Headers(reqObj.headers); // Make a headers object we can modify - - // Socket creation: regular for HTTP, TLS for https - let socket; - if ( parsedURL.protocol === 'http:' ) { - socket = new puter.net.Socket(parsedURL.hostname, - parsedURL.port || 80); - } else if ( parsedURL.protocol === 'https:' ) { - socket = new puter.net.tls.TLSSocket(parsedURL.hostname, - parsedURL.port || 443); - } else { - const errorMsg = `Failed to fetch. URL scheme "${parsedURL.protocol}" is not supported.`; - - // Log the error - if ( globalThis.puter?.apiCallLogger?.isEnabled() ) { - globalThis.puter.apiCallLogger.logRequest({ - service: 'network', - operation: 'pFetch', - params: { url: reqObj.url, method: reqObj.method }, - error: { message: errorMsg }, - }); - } - - rej(errorMsg); - return; - } - - // Sending default UA - if ( ! headers.get('user-agent') ) { - headers.set('user-agent', navigator.userAgent); - } - - let reqHead = `${reqObj.method} ${parsedURL.pathname}${parsedURL.search} HTTP/1.1\r\nHost: ${parsedURL.host}\r\nConnection: close\r\n`; - for ( const [key, value] of headers ) { - reqHead += `${key}: ${value}\r\n`; - } - let requestBody; - if ( reqObj.body ) { - requestBody = new Uint8Array(await reqObj.arrayBuffer()); - // If we have a body, we need to set the content length - if ( ! headers.has('content-length') ) { - headers.set('content-length', requestBody.length); - } else if ( - headers.get('content-length') !== String(requestBody.length) - ) { - return rej('Content-Length header does not match the body length. Please check your request.'); - } - reqHead += `Content-Length: ${requestBody.length}\r\n`; - } - - reqHead += '\r\n'; - socket.on('open', async () => { - socket.write(reqHead); // Send headers - if ( requestBody ) { - socket.write(requestBody); // Send body if present - } - }); - const decoder = new TextDecoder(); - let responseHead = ''; - let dataOffset = -1; - const fullDataParts = []; - let responseReturned = false; - let contentLength = -1; - let ingestedContent = 0; - let chunkedTransfer = false; - let currentChunkLeft = -1; - let buffer = new Uint8Array(0); - - const outStream = new ReadableStream({ - start (controller) { - // This is annoyingly long - function parseIncomingChunk (data) { - // append new data to our rolling buffer - const tmp = new Uint8Array(buffer.length + data.length); - tmp.set(buffer, 0); - tmp.set(data, buffer.length); - buffer = tmp; - - // pull out as many complete chunks (or headers) as we can - while ( true ) { - if ( currentChunkLeft > 0 ) { - // we’re in the middle of reading a chunk body - // need size + 2 bytes (for trailing \r\n) - if ( buffer.length >= currentChunkLeft + 2 ) { - // full body + CRLF available - const chunk = buffer.slice(0, currentChunkLeft); - controller.enqueue(chunk); - - // strip body + CRLF and reset for next header - buffer = buffer.slice(currentChunkLeft + 2); - currentChunkLeft = 0; - } else { - // only a partial body available - controller.enqueue(buffer); - currentChunkLeft -= buffer.length; - buffer = new Uint8Array(0); - break; // wait for more data - } - } else { - // we need to parse the next size line - // find the first "\r\n" - let idx = -1; - for ( let i = 0; i + 1 < buffer.length; i++ ) { - if ( - buffer[i] === 0x0d && - buffer[i + 1] === 0x0a - ) { - idx = i; - break; - } - } - if ( idx < 0 ) { - // we don’t yet have a full size line - break; - } - - // decode just the size line as ASCII hex - const sizeText = decoder - .decode(buffer.slice(0, idx)) - .trim(); - currentChunkLeft = parseInt(sizeText, 16); - if ( isNaN(currentChunkLeft) ) { - controller.error('Invalid chunk length from server'); - } - // strip off the size line + CRLF - buffer = buffer.slice(idx + 2); - - // zero-length => end of stream - if ( currentChunkLeft === 0 ) { - responseReturned = true; - controller.close(); - return; - } - } - } - } - socket.on('data', (data) => { - // Dataoffset is set to another value once head is returned, its safe to assume all remaining data is body - if ( dataOffset !== -1 && !chunkedTransfer ) { - controller.enqueue(data); - ingestedContent += data.length; - } - - // We dont have the full responseHead yet - if ( dataOffset === -1 ) { - fullDataParts.push(data); - responseHead += decoder.decode(data, { stream: true }); - } - if ( chunkedTransfer ) { - parseIncomingChunk(data); - } + return String(error); +} - // See if we have the HEAD of an HTTP/1.1 yet - if ( responseHead.indexOf('\r\n\r\n') !== -1 ) { - dataOffset = responseHead.indexOf('\r\n\r\n'); - responseHead = responseHead.slice(0, dataOffset); - const parsedHead = parseHTTPHead(responseHead); - contentLength = Number(parsedHead.headers.get('content-length')); - chunkedTransfer = - parsedHead.headers.get('transfer-encoding') === - 'chunked'; +function getFetchLogParams (args) { + const [resource, init] = args; - // Log the response - if ( globalThis.puter?.apiCallLogger?.isEnabled() ) { - globalThis.puter.apiCallLogger.logRequest({ - service: 'network', - operation: 'pFetch', - params: { url: reqObj.url, method: reqObj.method }, - result: { status: parsedHead.status, statusText: parsedHead.statusText }, - }); - } + let url; + if ( typeof resource === 'string' ) { + url = resource; + } else if ( resource instanceof URL ) { + url = resource.toString(); + } else if ( resource && typeof resource.url === 'string' ) { + url = resource.url; + } - // Return initial response object - res(new Response(outStream, parsedHead)); + let method; + if ( init && typeof init.method === 'string' ) { + method = init.method; + } else if ( resource && typeof resource.method === 'string' ) { + method = resource.method; + } else { + method = 'GET'; + } - const residualBody = mergeUint8Arrays(...fullDataParts).slice(dataOffset + 4); - if ( ! chunkedTransfer ) { - // Add any content we have but isn't part of the head into the body stream - ingestedContent += residualBody.length; - controller.enqueue(residualBody); - } else { - parseIncomingChunk(residualBody); - } - } + return { + url, + method, + }; +} - if ( - contentLength !== -1 && - ingestedContent === contentLength && - !chunkedTransfer - ) { - // Work around for the close bug for compliant HTTP/1.1 servers - if ( ! responseReturned ) { - responseReturned = true; - controller.close(); - } - } - }); - socket.on('close', () => { - if ( ! responseReturned ) { - responseReturned = true; - controller.close(); - } - }); - socket.on('error', (reason) => { - // Log the error - if ( globalThis.puter?.apiCallLogger?.isEnabled() ) { - globalThis.puter.apiCallLogger.logRequest({ - service: 'network', - operation: 'pFetch', - params: { url: reqObj.url, method: reqObj.method }, - error: { message: `Socket errored with the following reason: ${ reason}` }, - }); - } - rej(`Socket errored with the following reason: ${ reason}`); - }); - }, - }); - } catch (e) { - // Log unexpected errors - if ( globalThis.puter?.apiCallLogger?.isEnabled() ) { - globalThis.puter.apiCallLogger.logRequest({ - service: 'network', - operation: 'pFetch', - params: { url: reqObj.url, method: reqObj.method }, - error: { message: e.message || e.toString(), stack: e.stack }, - }); - } - rej(e); +export async function pFetch (...args) { + const params = getFetchLogParams(args); + let usedEpoxyClient = false; + + try { + const client = await getEpoxyClient(); + usedEpoxyClient = true; + const response = await client.fetch(...args); + + logFetchResult({ + params, + result: { + status: response.status, + statusText: response.statusText, + }, + }); + + return response; + } catch ( error ) { + if ( usedEpoxyClient ) { + clearEpoxyClientCache(); } - }); + + logFetchResult({ + params, + error: { + message: normalizeErrorMessage(error), + stack: error?.stack, + }, + }); + throw error; + } }