Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Compilation/RuntimeEmitter.MessageChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public partial class RuntimeEmitter
private FieldBuilder _messagePortStartedField = null!;
private FieldBuilder _messagePortClosedField = null!;
private FieldBuilder _messagePortRefedField = null!;
private FieldBuilder _messagePortOnEnqueueField = null!;
private ConstructorBuilder _messagePortCtor = null!;
private MethodBuilder _messagePortDrain = null!;
private MethodBuilder _messagePortStart = null!;
Expand Down Expand Up @@ -59,6 +60,11 @@ private void EmitMessagePortClass(ModuleBuilder moduleBuilder, EmittedRuntime ru
_messagePortStartedField = typeBuilder.DefineField("_started", _types.Boolean, FieldAttributes.Assembly);
_messagePortClosedField = typeBuilder.DefineField("_closed", _types.Boolean, FieldAttributes.Assembly);
_messagePortRefedField = typeBuilder.DefineField("_refed", _types.Boolean, FieldAttributes.Assembly);
// Optional on-enqueue notification. Null for ordinary in-process ports; set
// (reflectively) by CompiledMessagePortBridge when this port has been
// transferred to an interpreter worker, so a parent post wakes the worker loop
// to drain _pending event-driven instead of the worker polling (#465).
_messagePortOnEnqueueField = typeBuilder.DefineField("_onEnqueue", typeof(Action), FieldAttributes.Assembly);

EmitMessagePortConstructorIl(typeBuilder, runtime);
EmitMessagePortDrain(typeBuilder, runtime);
Expand Down Expand Up @@ -286,6 +292,24 @@ private void EmitMessagePortPostMessage(TypeBuilder typeBuilder, EmittedRuntime
il.Emit(OpCodes.Ldloc, clonedLocal);
il.Emit(OpCodes.Callvirt, _types.ConcurrentQueueOfObject.GetMethod("Enqueue", [_types.Object])!);

// var cb = partner._onEnqueue; if (cb != null) cb();
// Wakes a bridge-driven partner (an interpreter worker that adopted this port
// via CompiledMessagePortBridge) so it drains _pending event-driven rather than
// polling (#465). The volatile read pairs with the Thread.MemoryBarrier the
// bridge issues after installing the callback, so a cross-thread post on the
// parent loop reliably observes it. Null (skipped) for ordinary in-process ports.
var onEnqueueLocal = il.DeclareLocal(typeof(Action));
var afterOnEnqueue = il.DefineLabel();
il.Emit(OpCodes.Ldloc, partnerLocal);
il.Emit(OpCodes.Volatile);
il.Emit(OpCodes.Ldfld, _messagePortOnEnqueueField);
il.Emit(OpCodes.Stloc, onEnqueueLocal);
il.Emit(OpCodes.Ldloc, onEnqueueLocal);
il.Emit(OpCodes.Brfalse, afterOnEnqueue);
il.Emit(OpCodes.Ldloc, onEnqueueLocal);
il.Emit(OpCodes.Callvirt, typeof(Action).GetMethod("Invoke", Type.EmptyTypes)!);
il.MarkLabel(afterOnEnqueue);

// if (partner._started) $EventLoop.GetInstance().Schedule(new Action(partner.Drain))
il.Emit(OpCodes.Ldloc, partnerLocal);
il.Emit(OpCodes.Ldfld, _messagePortStartedField);
Expand Down
41 changes: 39 additions & 2 deletions Execution/Interpreter.Properties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,14 @@ public partial class Interpreter
if (klass is ISharpTSCallable callable && klass is not SharpTSClass && klass is not BoundFunction)
{
List<object?> ctorArgs = await ctx.EvaluateAllAsync(newExpr.Arguments);
return callable.Call(this, ctorArgs);
try
{
return callable.Call(this, ctorArgs);
}
catch (Exception ex) when (IsNativeConstructorFailure(ex))
{
throw new ThrowException(new SharpTSError(ex.Message));
}
}

// Bound functions cannot be used as constructors (JS spec compliance)
Expand Down Expand Up @@ -375,7 +382,14 @@ private RuntimeValue EvaluateNew(Expr.New newExpr)
{
ctorArgs.Add(Evaluate(arg));
}
return RuntimeValue.FromBoxed(callable.Call(this, ctorArgs));
try
{
return RuntimeValue.FromBoxed(callable.Call(this, ctorArgs));
}
catch (Exception ex) when (IsNativeConstructorFailure(ex))
{
throw new ThrowException(new SharpTSError(ex.Message));
}
}

// Bound functions cannot be used as constructors (JS spec compliance)
Expand Down Expand Up @@ -406,6 +420,29 @@ private RuntimeValue EvaluateNew(Expr.New newExpr)
return sharpClass.CallRV(this, arguments);
}

/// <summary>
/// True when a host exception escaping a native built-in constructor (a
/// <c>new</c> on an <see cref="ISharpTSCallable"/> that is not a user class)
/// should be re-surfaced to guest code as a real <see cref="SharpTSError"/>
/// instead of the bare message string the host-exception boundary would
/// otherwise bind to the catch variable (#464).
/// </summary>
/// <remarks>
/// Native constructors such as <c>Worker</c> and <c>MessageChannel</c> validate
/// their input by throwing a plain <see cref="Exception"/> (e.g. <c>new Worker(…)</c>
/// failing the <c>workerData</c> structured-clone). In interpreter mode a guest
/// <c>try/catch</c> previously bound that exception's message <em>string</em>, so
/// <c>e.message</c> was <c>undefined</c> and <c>e instanceof Error</c> was false;
/// compiled mode already surfaces a proper <c>Error</c>. Two kinds pass through
/// unwrapped: a <see cref="ThrowException"/> (a guest throw already carrying its
/// value, e.g. routed out through a constructor that consumed a guest iterable)
/// and any <see cref="Diagnostics.Exceptions.SharpTSException"/> (interpreter/
/// runtime errors deliberately kept as message strings per the
/// <see cref="ThrowException.FromResult"/> backward-compat contract).
/// </remarks>
private static bool IsNativeConstructorFailure(Exception ex) =>
ex is not ThrowException && ex is not Diagnostics.Exceptions.SharpTSException;

/// <summary>
/// Evaluates a <c>this</c> expression, returning the current instance.
/// </summary>
Expand Down
27 changes: 27 additions & 0 deletions Execution/Interpreter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,22 @@ internal Runtime.Types.SharpTSObject GetRegExpPrototype()
/// </summary>
public string? EntryModulePath { get; set; }

/// <summary>
/// Worker-thread bindings for the <c>worker_threads</c> built-in module. Non-null
/// only on the isolated interpreter that runs a Worker's script. It makes
/// <c>import { workerData, parentPort, threadId, isMainThread } from "worker_threads"</c>
/// resolve to this worker's live values instead of the main-thread <c>null</c>
/// placeholders, so a worker can read its inputs via the canonical import form and
/// not only the bare worker-context globals (#410). <see cref="WorkerThreadsBindings.ParentPort"/>
/// is the same port instance bound as the bare <c>parentPort</c> global, so a
/// <c>message</c> listener attached through the import receives the messages the
/// worker's message loop delivers to that instance.
/// </summary>
internal WorkerThreadsBindings? WorkerThreadsContext { get; set; }

/// <summary>Live <c>worker_threads</c> values for the running Worker (see <see cref="WorkerThreadsContext"/>).</summary>
internal sealed record WorkerThreadsBindings(object? WorkerData, object? ParentPort, double ThreadId);

// Flag to indicate interpreter has been disposed - timer callbacks should not execute
private volatile bool _isDisposed;

Expand Down Expand Up @@ -1447,6 +1463,17 @@ private void ExecuteModule(ParsedModule module)
if (moduleName != null && BuiltInModuleValues.HasInterpreterSupport(moduleName))
{
var exports = BuiltInModuleValues.GetModuleExports(moduleName);
// On a worker thread, rebind the worker_threads identity exports to this
// worker's live values so `import { workerData, parentPort } from
// "worker_threads"` sees the same inputs as the bare worker-context
// globals instead of the main-thread null placeholders (#410).
if (moduleName == "worker_threads" && WorkerThreadsContext is { } wtc)
{
exports["workerData"] = wtc.WorkerData;
exports["parentPort"] = wtc.ParentPort;
exports["threadId"] = wtc.ThreadId;
exports["isMainThread"] = false;
}
foreach (var (name, value) in exports)
{
moduleInstance.SetExport(name, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@ public static class WorkerThreadsModuleInterpreter
// BroadcastChannel constructor
["BroadcastChannel"] = new BroadcastChannelConstructor(),

// Synchronous message receive
// Synchronous message receive. Accepts a native MessagePort or a
// transferred compiled port adopted by this worker (#465).
["receiveMessageOnPort"] = BuiltInMethod.CreateV2("receiveMessageOnPort", 1, (interp, recv, args) =>
{
if (args.Length == 0 || args[0].ToObject() is not SharpTSMessagePort port)
throw new Exception("receiveMessageOnPort requires a MessagePort argument");
return RuntimeValue.FromBoxed(port.ReceiveMessageSync());
object? result = (args.Length == 0 ? null : args[0].ToObject()) switch
{
SharpTSMessagePort port => port.ReceiveMessageSync(),
CompiledMessagePortBridge bridge => bridge.ReceiveMessageSync(),
_ => throw new Exception("receiveMessageOnPort requires a MessagePort argument"),
};
return RuntimeValue.FromBoxed(result);
}),

// SHARE_ENV constant (placeholder - we don't support env sharing)
Expand Down
110 changes: 74 additions & 36 deletions Runtime/Types/CompiledMessagePortBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,33 @@ namespace SharpTS.Runtime.Types;
/// reusing its enqueue-to-partner + schedule-drain-on-<c>$EventLoop</c> logic so the
/// parent's compiled listeners run on the parent loop thread.</item>
/// <item><b>receive</b> — the compiled partner's posts are always enqueued to the
/// transferred port's <c>_pending</c> queue (the drain is only <i>scheduled</i> when
/// the port has started, which this transferred port never does on the compiled
/// side). A recurring timer on the WORKER interpreter's loop drains that queue and
/// emits <c>'message'</c> to the worker's listeners on the worker thread. The
/// interval timer also keeps the worker loop alive while the port is open, matching
/// Node's "a listening port is ref'd" semantics (#406, same liveness class as #329).</item>
/// transferred port's <c>_pending</c> queue (the compiled <c>Drain</c> is only
/// scheduled when the port has started, which this transferred port never does on the
/// compiled side). The bridge instead installs an on-enqueue callback on the compiled
/// port (<c>_onEnqueue</c>): a parent post invokes it right after enqueuing, and it
/// marshals a drain onto the WORKER loop via the thread-safe <c>EnqueueCallback</c>,
/// which emits <c>'message'</c> to the worker's listeners on the worker thread. This
/// is event-driven — an idle bridged port no longer wakes the worker loop on a timer
/// (#465). A keep-alive <c>Ref</c> on the worker loop holds it open while the port is
/// open, matching Node's "a listening port is ref'd" semantics (#406, same liveness
/// class as #329).</item>
/// </list>
///
/// All access to the emitted type is via reflection cached at adoption time, since
/// this class lives in SharpTS.dll while the compiled port lives in the output
/// assembly. The field/method names mirrored here (<c>PostMessage</c>, <c>_pending</c>)
/// MUST stay in sync with <c>RuntimeEmitter.MessageChannel.cs</c>.
/// assembly. The field/method names mirrored here (<c>PostMessage</c>, <c>_pending</c>,
/// <c>_onEnqueue</c>) MUST stay in sync with <c>RuntimeEmitter.MessageChannel.cs</c>.
/// </remarks>
public sealed class CompiledMessagePortBridge : SharpTSEventEmitter
{
// The poll cadence for draining the compiled port's incoming queue onto the
// worker loop. Matches WorkerMessageHandler's 10ms parent→worker poll so the
// two worker-bound delivery paths feel identical.
private const int PollIntervalMs = 10;

private readonly object _compiledPort; // transferred emitted $MessagePort
private readonly MethodInfo _postMessageMethod; // $MessagePort.PostMessage(object)
private readonly ConcurrentQueue<object> _incoming; // $MessagePort._pending
private readonly FieldInfo _onEnqueueField; // $MessagePort._onEnqueue (Action wake hook)

// The worker interpreter that owns delivery. Captured the first time the worker
// attaches a 'message' listener / starts the port; null until then.
private Interp? _owner;
private Interp.VirtualTimer? _pollTimer;
private bool _started;
private bool _closed;
private bool _loopRefed;
Expand All @@ -63,11 +62,13 @@ public sealed class CompiledMessagePortBridge : SharpTSEventEmitter
// returns Unknown for subclasses, routing member access through the per-type
// registration in BuiltInRegistry, which reaches this class's GetMember.

private CompiledMessagePortBridge(object compiledPort, MethodInfo postMessageMethod, ConcurrentQueue<object> incoming)
private CompiledMessagePortBridge(object compiledPort, MethodInfo postMessageMethod,
ConcurrentQueue<object> incoming, FieldInfo onEnqueueField)
{
_compiledPort = compiledPort;
_postMessageMethod = postMessageMethod;
_incoming = incoming;
_onEnqueueField = onEnqueueField;
}

/// <summary>
Expand Down Expand Up @@ -99,7 +100,11 @@ public static CompiledMessagePortBridge Adopt(object compiledPort)
throw new StructuredClone.DataCloneError(
"Cannot transfer $MessagePort: _pending is not a ConcurrentQueue<object>");

return new CompiledMessagePortBridge(compiledPort, postMessage, incoming);
var onEnqueueField = type.GetField("_onEnqueue", BindingFlags.NonPublic | BindingFlags.Instance)
?? throw new StructuredClone.DataCloneError(
"Cannot transfer $MessagePort: no _onEnqueue field (emitted shape changed?)");

return new CompiledMessagePortBridge(compiledPort, postMessage, incoming, onEnqueueField);
}

/// <summary>
Expand All @@ -122,8 +127,9 @@ private void PostMessage(object? message)
}

/// <summary>
/// Begins delivery: schedules a recurring drain of the compiled port's incoming
/// queue onto the worker loop. Idempotent. No-op until an owner interpreter has
/// Begins delivery: installs an on-enqueue wake callback on the compiled port so a
/// parent post drains the incoming queue onto the worker loop event-driven, and
/// drains anything already queued. Idempotent. No-op until an owner interpreter has
/// been captured (via the first <c>on('message')</c>/<c>start()</c>).
/// </summary>
private void Start()
Expand All @@ -133,23 +139,55 @@ private void Start()

_started = true;

// Keep the worker loop alive while the port is open. RunEventLoop's exit
// check counts active handles and queued callbacks but NOT scheduled timers,
// so the poll interval alone would not hold the loop open — without this Ref
// the worker can quiesce and exit before the parent's message is ever polled
// (Node: a listening port is ref'd; #406, same liveness class as #329).
// Keep the worker loop alive while the port is open. RunEventLoop's exit check
// counts active handles and queued callbacks but NOT a port that is merely
// waiting for a message, so without this Ref the worker could quiesce and exit
// before the parent ever posts (Node: a listening port is ref'd; #406, same
// liveness class as #329).
if (!_loopRefed)
{
_loopRefed = true;
_owner.Ref();
}

// The interval timer drains _incoming on the worker loop thread. Delay 0 so
// anything the parent already posted before the worker started is delivered
// on the next tick; the partner's posts land in _incoming asynchronously, so
// the loop must poll (the compiled enqueue cannot wake this interpreter).
// An event-driven alternative is tracked as a follow-up (#465).
_pollTimer = _owner.ScheduleTimer(0, PollIntervalMs, Pump, isInterval: true);
// Event-driven receive (#465): a parent post enqueues to _incoming on the
// parent loop thread and then invokes this callback, which marshals a drain
// onto the worker loop via the thread-safe EnqueueCallback (it wakes the
// worker's blocking wait). This replaces a 10ms poll, so an idle bridged port
// no longer wakes the worker loop 100×/second. The MemoryBarrier pairs with the
// volatile read the emitted PostMessage does, so the parent reliably observes
// the installed callback once set.
_onEnqueueField.SetValue(_compiledPort, (Action)OnPartnerEnqueued);
Thread.MemoryBarrier();

// Drain anything the parent posted before the callback was installed.
_owner.EnqueueCallback(Pump);
}

/// <summary>
/// On-enqueue hook invoked by the compiled partner's <c>PostMessage</c> (on the
/// parent loop thread) right after it enqueues to <c>_incoming</c>. Marshals a
/// drain onto the worker loop; <see cref="Interp.EnqueueCallback"/> is thread-safe
/// and wakes the worker's event loop.
/// </summary>
private void OnPartnerEnqueued() => _owner?.EnqueueCallback(Pump);

/// <summary>
/// Synchronously dequeues one message from the compiled partner's incoming queue,
/// for <c>worker_threads.receiveMessageOnPort(port)</c> on a transferred compiled
/// port (#465). Returns <c>{ message }</c> (the payload is already an independent
/// structured clone) or <c>null</c> when the queue is empty or the port is closed,
/// matching <see cref="SharpTSMessagePort.ReceiveMessageSync"/>.
/// </summary>
internal object? ReceiveMessageSync()
{
if (_closed || !_incoming.TryDequeue(out var message))
return null;

return new SharpTSObject(new Dictionary<string, object?>
{
["message"] = message
});
}

/// <summary>
Expand All @@ -171,8 +209,9 @@ private void Pump()
}

/// <summary>
/// Stops worker-side delivery and cancels the poll timer so the worker loop can
/// quiesce and the worker can exit. Emits 'close' to the worker's listeners.
/// Stops worker-side delivery: clears the wake callback and releases the keep-alive
/// Ref so the worker loop can quiesce and the worker can exit. Emits 'close' to the
/// worker's listeners.
/// </summary>
private void Close()
{
Expand All @@ -181,11 +220,10 @@ private void Close()

_closed = true;

if (_pollTimer != null)
{
_pollTimer.IsCancelled = true;
_pollTimer = null;
}
// Stop the parent from waking this (now closed) bridge. A post that already
// loaded the old callback is still harmless — Pump short-circuits on _closed.
_onEnqueueField.SetValue(_compiledPort, null);
Thread.MemoryBarrier();

// Release the keep-alive Ref so the worker loop can quiesce and the worker
// thread can exit.
Expand Down
Loading
Loading