Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/workerd/jsg/promise.c++
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,13 @@ void UnhandledRejectionHandler::onMicrotasksCompleted(v8::Isolate* isolate, void
KJ_DEFER(isolate->RemoveMicrotasksCompletedCallback(
&UnhandledRejectionHandler::onMicrotasksCompleted, data));
auto& js = Lock::from(isolate);
KJ_IF_SOME(exception, kj::runCatchingExceptions([&]() {
KJ_TRY {
handler->processWarnings(js);

// Ensure microtasks scheduled by unhandledrejection handlers run promptly.
js.requestExtraMicrotaskCheckpoint();
})) {
}
KJ_CATCH(exception) {
handler->scheduled = false;
KJ_LOG(ERROR, "uncaught exception while processing unhandled rejections", exception);
}
Expand Down
255 changes: 246 additions & 9 deletions src/workerd/server/fallback-service.c++
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ ModuleOrRedirect handleReturnPayload(
// The response from the fallback service must be a valid JSON serialization
// of the workerd module configuration. If it is not, or if there is any other
// error when processing here, we'll log the exception and return nothing.
try {
KJ_TRY {
capnp::MallocMessageBuilder moduleMessage;
capnp::JsonCodec json;
json.handleByAnnotation<server::config::Worker::Module>();
Expand All @@ -63,8 +63,8 @@ ModuleOrRedirect handleReturnPayload(

kj::Own<server::config::Worker::Module::Reader> ret = capnp::clone(moduleBuilder.asReader());
return ModuleOrRedirect(kj::mv(ret));
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
}
KJ_CATCH(exception) {
KJ_LOG(ERROR, "Fallback service failed to fetch module", exception, specifier);
return kj::none;
}
Expand Down Expand Up @@ -119,7 +119,7 @@ ModuleOrRedirect tryResolveV1(ImportType type,
// so that the destructor joins the current thread (blocking it). The thread will
// either set the jsonPayload variable or not.
kj::Thread loaderThread([&spec, address, &jsonPayload, &redirect, type]() mutable {
try {
KJ_TRY {
kj::AsyncIoContext io = kj::setupAsyncIo();
kj::HttpHeaderTable::Builder builder;
kj::HttpHeaderId kMethod = builder.add("x-resolve-method");
Expand Down Expand Up @@ -153,8 +153,8 @@ ModuleOrRedirect tryResolveV1(ImportType type,
} else {
jsonPayload = resp.body->readAllText().wait(io.waitScope);
}
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
}
KJ_CATCH(exception) {
KJ_LOG(ERROR, "Fallback service failed to fetch module", exception, spec);
}
});
Expand Down Expand Up @@ -199,7 +199,7 @@ ModuleOrRedirect tryResolveV2(ImportType type,
{
kj::Thread loaderThread(
[&address, &jsonPayload, payload = kj::mv(payload), &redirect, &specifier]() mutable {
try {
KJ_TRY {
kj::AsyncIoContext io = kj::setupAsyncIo();
kj::HttpHeaderTable::Builder builder;
auto headerTable = builder.build();
Expand Down Expand Up @@ -235,8 +235,8 @@ ModuleOrRedirect tryResolveV2(ImportType type,
} else {
jsonPayload = resp.body->readAllText().wait(io.waitScope);
}
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
}
KJ_CATCH(exception) {
KJ_LOG(ERROR, "Fallback service failed to fetch module", exception);
}
});
Expand All @@ -258,4 +258,241 @@ ModuleOrRedirect tryResolve(Version version,
: tryResolveV2(type, address, specifier, rawSpecifier, referrer, attributes);
}

// ---- FallbackServiceClient implementation ----

FallbackServiceClient::FallbackServiceClient(kj::String address)
: ownedAddress(kj::mv(address)),
thread([this]() { threadMain(); }) {}

FallbackServiceClient::~FallbackServiceClient() noexcept(false) {
// Signal the background thread to exit. The kj::Thread destructor
// (which runs after this body) will join the thread.
auto lock = state.lockExclusive();
lock->shutdown = true;
}

ModuleOrRedirect FallbackServiceClient::tryResolve(Version version,
ImportType type,
kj::StringPtr specifier,
kj::StringPtr rawSpecifier,
kj::StringPtr referrer,
const kj::HashMap<kj::StringPtr, kj::StringPtr>& attributes) {
// Submit request to background thread.
{
auto lock = state.lockExclusive();
KJ_REQUIRE(!lock->shutdown, "FallbackServiceClient has been shut down");
lock->version = version;
Comment on lines +283 to +284
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] No guard against concurrent callers — if two threads call tryResolve() simultaneously, the second caller will overwrite the first's request fields. The background thread then processes the wrong request and the first caller gets the second caller's response — silently.

I understand module resolution is currently single-threaded per isolate/registry, so this is safe in practice. But the class API doesn't enforce the single-caller invariant. Consider adding an assertion here so misuse crashes immediately rather than producing wrong results:

Suggested change
KJ_REQUIRE(!lock->shutdown, "FallbackServiceClient has been shut down");
lock->version = version;
KJ_REQUIRE(!lock->shutdown, "FallbackServiceClient has been shut down");
KJ_ASSERT(!lock->hasRequest, "FallbackServiceClient does not support concurrent requests");
lock->version = version;

Also consider documenting the single-caller invariant in the class comment in the header.

lock->type = type;
lock->specifier = specifier;
lock->rawSpecifier = rawSpecifier;
lock->referrer = referrer;
lock->attributes = &attributes;
lock->hasRequest = true;
}

// Block until the background thread has processed our request.
return state.when([](const SharedState& s) { return s.responseReady || s.shutdown; },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll need to experiment with it a bit to verify everything but this is good. One thing we might consider, however, is having a small pool of these clients so that multiple concurrent loads aren't blocking each other and bottlenecking things quite as much.

[](SharedState& s) -> ModuleOrRedirect {
if (!s.responseReady) {
// Background thread shut down without producing a response.
return kj::none;
}
auto result = kj::mv(s.response);
s.responseReady = false;
return result;
});
}

void FallbackServiceClient::threadMain() {
KJ_TRY {
// Set up the async I/O context, DNS resolution, and HTTP client once.
// These are reused for all subsequent requests.
kj::AsyncIoContext io = kj::setupAsyncIo();
kj::HttpHeaderTable::Builder builder;
kj::HttpHeaderId kMethod = builder.add("x-resolve-method");
auto headerTable = builder.build();

auto addr = io.provider->getNetwork().parseAddress(ownedAddress, 80).wait(io.waitScope);
auto client = kj::newHttpClient(io.provider->getTimer(), *headerTable, *addr, {});

while (true) {
// Wait for a request or shutdown signal.
Version version;
ImportType type;
kj::StringPtr specifier;
kj::StringPtr rawSpecifier;
kj::StringPtr referrer;
const kj::HashMap<kj::StringPtr, kj::StringPtr>* attributes;
bool shouldExit = state.when([](const SharedState& s) { return s.hasRequest || s.shutdown; },
[&](SharedState& s) -> bool {
if (s.shutdown) return true;
version = s.version;
type = s.type;
specifier = s.specifier;
rawSpecifier = s.rawSpecifier;
referrer = s.referrer;
attributes = s.attributes;
s.hasRequest = false;
return false;
});
if (shouldExit) return;

// Process the request using the shared HTTP client.
ModuleOrRedirect result = kj::none;

if (version == Version::V1) {
// === V1: GET request with query parameters ===
kj::Maybe<kj::String> jsonPayload;
bool redirect = false;
bool prefixed = false;
kj::Url url;
kj::StringPtr actualSpecifier = nullptr;

KJ_IF_SOME(pos, specifier.findLast('/')) {
auto segment = specifier.slice(pos + 1);
if (segment.startsWith("node:") || segment.startsWith("cloudflare:") ||
segment.startsWith("workerd:")) {
actualSpecifier = segment;
url.query.add(
kj::Url::QueryParam{.name = kj::str("specifier"), .value = kj::str(segment)});
prefixed = true;
}
}
if (!prefixed) {
actualSpecifier = specifier;
if (actualSpecifier.startsWith("/")) {
actualSpecifier = specifier.slice(1);
}
url.query.add(kj::Url::QueryParam{kj::str("specifier"), kj::str(specifier)});
}
url.query.add(kj::Url::QueryParam{kj::str("referrer"), kj::str(referrer)});
url.query.add(kj::Url::QueryParam{kj::str("rawSpecifier"), kj::str(rawSpecifier)});

auto spec = url.toString(kj::Url::HTTP_REQUEST);

// Retry once on disconnect (stale pooled connection).
for (int attempt = 0; attempt < 2; attempt++) {
KJ_TRY {
kj::HttpHeaders headers(*headerTable);
headers.setPtr(kMethod, getMethodFromType(type));
headers.setPtr(kj::HttpHeaderId::HOST, "localhost"_kj);

auto request = client->request(kj::HttpMethod::GET, spec, headers, kj::none);
kj::HttpClient::Response resp = request.response.wait(io.waitScope);

if (resp.statusCode == 301) {
KJ_IF_SOME(loc, resp.headers->get(kj::HttpHeaderId::LOCATION)) {
redirect = true;
jsonPayload = kj::str(loc);
} else {
KJ_LOG(ERROR, "Fallback service returned a redirect with no location", spec);
}
// Drain the response body to allow HTTP/1.1 connection reuse.
resp.body->readAllBytes().wait(io.waitScope);
} else if (resp.statusCode != 200) {
auto payload = resp.body->readAllText().wait(io.waitScope);
KJ_LOG(ERROR, "Fallback service failed to fetch module", payload, spec);
} else {
jsonPayload = resp.body->readAllText().wait(io.waitScope);
}
break; // Success, no retry needed.
}
KJ_CATCH(exception) {
if (attempt == 0 && exception.getType() == kj::Exception::Type::DISCONNECTED) {
// Stale pooled connection; retry with a fresh one.
continue;
}
KJ_LOG(ERROR, "Fallback service failed to fetch module", exception, spec);
}
}

result = handleReturnPayload(kj::mv(jsonPayload), redirect, actualSpecifier);

} else {
// === V2: POST request with JSON body ===
capnp::JsonCodec json;
capnp::MallocMessageBuilder moduleMessage;
auto requestMsg = moduleMessage.initRoot<server::config::FallbackServiceRequest>();
requestMsg.setType(getMethodFromType(type));
requestMsg.setSpecifier(specifier);
requestMsg.setReferrer(referrer);

if (rawSpecifier != nullptr) {
requestMsg.setRawSpecifier(rawSpecifier);
}

KJ_ASSERT(attributes != nullptr);
if (attributes->size() > 0) {
auto attrs = requestMsg.initAttributes(attributes->size());
size_t n = 0;
for (auto& attr: *attributes) {
attrs[n].setName(attr.key);
attrs[n].setValue(attr.value);
n++;
}
}

auto payload = json.encode(requestMsg);

kj::Maybe<kj::String> jsonPayload;
bool redirect = false;

// Retry once on disconnect (stale pooled connection).
for (int attempt = 0; attempt < 2; attempt++) {
KJ_TRY {
kj::HttpHeaders headers(*headerTable);
headers.setPtr(kj::HttpHeaderId::HOST, "localhost");

auto request = client->request(kj::HttpMethod::POST, "/", headers, payload.size());
request.body->write(payload.asPtr().asBytes()).wait(io.waitScope);

kj::HttpClient::Response resp = request.response.wait(io.waitScope);

if (resp.statusCode == 301) {
KJ_IF_SOME(loc, resp.headers->get(kj::HttpHeaderId::LOCATION)) {
redirect = true;
jsonPayload = kj::str(loc);
} else {
KJ_LOG(ERROR, "Fallback service returned a redirect with no location", specifier);
}
// Drain the response body to allow HTTP/1.1 connection reuse.
resp.body->readAllBytes().wait(io.waitScope);
} else if (resp.statusCode != 200) {
auto body = resp.body->readAllText().wait(io.waitScope);
KJ_LOG(ERROR, "Fallback service failed to fetch module", body, specifier);
} else {
jsonPayload = resp.body->readAllText().wait(io.waitScope);
}
break; // Success, no retry needed.
}
KJ_CATCH(exception) {
if (attempt == 0 && exception.getType() == kj::Exception::Type::DISCONNECTED) {
// Stale pooled connection; retry with a fresh one.
continue;
}
KJ_LOG(ERROR, "Fallback service failed to fetch module", exception);
}
}

result = handleReturnPayload(kj::mv(jsonPayload), redirect, specifier);
}

// Deliver the result to the calling thread.
{
auto lock = state.lockExclusive();
lock->response = kj::mv(result);
lock->responseReady = true;
}
}
}
KJ_CATCH(exception) {
KJ_LOG(ERROR, "Fallback service background thread failed", exception);
Comment on lines +488 to +489
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] If the background thread catches any exception during setup (e.g., DNS resolution failure, kj::setupAsyncIo() failure), it sets shutdown = true and exits permanently. All future tryResolve() calls will throw KJ_REQUIRE failures.

This is acceptable for a local-dev-only feature, but the log message could be more descriptive to help users diagnose the issue. Consider something like:

Suggested change
KJ_CATCH(exception) {
KJ_LOG(ERROR, "Fallback service background thread failed", exception);
KJ_CATCH(exception) {
KJ_LOG(ERROR, "Fallback service background thread exiting permanently; "
"module resolution via fallback will no longer work", exception);

Separately, @jasnell's suggestion of a small pool of clients would also mitigate this — a single transient failure wouldn't take down the whole fallback mechanism.

// Signal any waiting caller and prevent future requests.
auto lock = state.lockExclusive();
lock->response = kj::none;
lock->responseReady = true;
lock->shutdown = true;
}
}

} // namespace workerd::fallback
51 changes: 51 additions & 0 deletions src/workerd/server/fallback-service.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

#include <kj/common.h>
#include <kj/map.h>
#include <kj/mutex.h>
#include <kj/one-of.h>
#include <kj/string.h>
#include <kj/thread.h>

namespace workerd::fallback {

Expand Down Expand Up @@ -63,4 +65,53 @@ ModuleOrRedirect tryResolve(Version version,
kj::StringPtr referrer,
const kj::HashMap<kj::StringPtr, kj::StringPtr>& attributes);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Both call sites (the V1 path in server.c++ and the V2 path in workerd-api.c++) now use FallbackServiceClient::tryResolve(). This free tryResolve() function and its tryResolveV1/tryResolveV2 helpers appear to be dead code.

Consider removing them in this PR (or a follow-up) to reduce the ~170 lines of duplication in the implementation file. If they're kept intentionally as a fallback, a comment explaining why would be helpful.


// A persistent client for the fallback service that uses a single background
// thread with a long-lived HTTP client for all module resolution requests.
// This avoids creating a new OS thread, DNS lookup, and TCP connection for
// each request, which can exhaust ephemeral ports when many modules are
// resolved concurrently (e.g. running many test files with vitest-pool-workers).
class FallbackServiceClient {
public:
explicit FallbackServiceClient(kj::String address);
~FallbackServiceClient() noexcept(false);

KJ_DISALLOW_COPY_AND_MOVE(FallbackServiceClient);

ModuleOrRedirect tryResolve(Version version,
ImportType type,
kj::StringPtr specifier,
kj::StringPtr rawSpecifier,
kj::StringPtr referrer,
const kj::HashMap<kj::StringPtr, kj::StringPtr>& attributes);

private:
// Shared state between the calling thread and the background thread.
// Access is serialized through kj::MutexGuarded.
struct SharedState {
// Request fields - valid only when hasRequest is true.
// These contain non-owning references into the caller's stack frame,
// which is safe because the caller blocks until responseReady is set.
Version version = Version::V1;
ImportType type = ImportType::IMPORT;
kj::StringPtr specifier;
kj::StringPtr rawSpecifier;
kj::StringPtr referrer;
const kj::HashMap<kj::StringPtr, kj::StringPtr>* attributes = nullptr;
bool hasRequest = false;
Comment on lines +91 to +100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] These kj::StringPtr fields store non-owning views into the caller's stack frame. The comment correctly notes this is safe because the caller blocks until responseReady is set.

However, in threadMain(), the background thread copies these StringPtr values into local StringPtr variables (lines 330-332) and then clears hasRequest (line 334) — but those local variables are still non-owning views into the caller's stack. This is currently safe because the caller blocks on responseReady (not hasRequest), but it's fragile. If someone later changes the caller to unblock on hasRequest being cleared (e.g., to implement a request queue), those become dangling pointers.

Consider copying to owned kj::String locals in the background thread's when callback. The cost of a few string copies per module resolution is negligible and makes the lifetime relationship explicit and robust against future changes.


// Response field - valid only when responseReady is true.
ModuleOrRedirect response;
bool responseReady = false;

// Set to true to signal the background thread to exit.
bool shutdown = false;
};

kj::String ownedAddress;
kj::MutexGuarded<SharedState> state;
kj::Thread thread;

void threadMain();
};

} // namespace workerd::fallback
Loading
Loading