diff --git a/src/workerd/jsg/promise.c++ b/src/workerd/jsg/promise.c++ index 2faa6732663..cb72ca99e4d 100644 --- a/src/workerd/jsg/promise.c++ +++ b/src/workerd/jsg/promise.c++ @@ -153,12 +153,13 @@ void UnhandledRejectionHandler::onMicrotasksCompleted(v8::Isolate* isolate, void KJ_DEFER(isolate->RemoveMicrotasksCompletedCallback( &UnhandledRejectionHandler::onMicrotasksCompleted, data)); auto& js = Lock::from(isolate); - KJ_IF_SOME(exception, kj::runCatchingExceptions([&]() { + KJ_TRY { handler->processWarnings(js); // Ensure microtasks scheduled by unhandledrejection handlers run promptly. js.requestExtraMicrotaskCheckpoint(); - })) { + } + KJ_CATCH(exception) { handler->scheduled = false; KJ_LOG(ERROR, "uncaught exception while processing unhandled rejections", exception); } diff --git a/src/workerd/server/fallback-service.c++ b/src/workerd/server/fallback-service.c++ index a4f0fba034f..e2567469633 100644 --- a/src/workerd/server/fallback-service.c++ +++ b/src/workerd/server/fallback-service.c++ @@ -40,7 +40,7 @@ ModuleOrRedirect handleReturnPayload( // The response from the fallback service must be a valid JSON serialization // of the workerd module configuration. If it is not, or if there is any other // error when processing here, we'll log the exception and return nothing. - try { + KJ_TRY { capnp::MallocMessageBuilder moduleMessage; capnp::JsonCodec json; json.handleByAnnotation(); @@ -63,8 +63,8 @@ ModuleOrRedirect handleReturnPayload( kj::Own ret = capnp::clone(moduleBuilder.asReader()); return ModuleOrRedirect(kj::mv(ret)); - } catch (...) { - auto exception = kj::getCaughtExceptionAsKj(); + } + KJ_CATCH(exception) { KJ_LOG(ERROR, "Fallback service failed to fetch module", exception, specifier); return kj::none; } @@ -119,7 +119,7 @@ ModuleOrRedirect tryResolveV1(ImportType type, // so that the destructor joins the current thread (blocking it). The thread will // either set the jsonPayload variable or not. kj::Thread loaderThread([&spec, address, &jsonPayload, &redirect, type]() mutable { - try { + KJ_TRY { kj::AsyncIoContext io = kj::setupAsyncIo(); kj::HttpHeaderTable::Builder builder; kj::HttpHeaderId kMethod = builder.add("x-resolve-method"); @@ -153,8 +153,8 @@ ModuleOrRedirect tryResolveV1(ImportType type, } else { jsonPayload = resp.body->readAllText().wait(io.waitScope); } - } catch (...) { - auto exception = kj::getCaughtExceptionAsKj(); + } + KJ_CATCH(exception) { KJ_LOG(ERROR, "Fallback service failed to fetch module", exception, spec); } }); @@ -199,7 +199,7 @@ ModuleOrRedirect tryResolveV2(ImportType type, { kj::Thread loaderThread( [&address, &jsonPayload, payload = kj::mv(payload), &redirect, &specifier]() mutable { - try { + KJ_TRY { kj::AsyncIoContext io = kj::setupAsyncIo(); kj::HttpHeaderTable::Builder builder; auto headerTable = builder.build(); @@ -235,8 +235,8 @@ ModuleOrRedirect tryResolveV2(ImportType type, } else { jsonPayload = resp.body->readAllText().wait(io.waitScope); } - } catch (...) { - auto exception = kj::getCaughtExceptionAsKj(); + } + KJ_CATCH(exception) { KJ_LOG(ERROR, "Fallback service failed to fetch module", exception); } }); @@ -258,4 +258,241 @@ ModuleOrRedirect tryResolve(Version version, : tryResolveV2(type, address, specifier, rawSpecifier, referrer, attributes); } +// ---- FallbackServiceClient implementation ---- + +FallbackServiceClient::FallbackServiceClient(kj::String address) + : ownedAddress(kj::mv(address)), + thread([this]() { threadMain(); }) {} + +FallbackServiceClient::~FallbackServiceClient() noexcept(false) { + // Signal the background thread to exit. The kj::Thread destructor + // (which runs after this body) will join the thread. + auto lock = state.lockExclusive(); + lock->shutdown = true; +} + +ModuleOrRedirect FallbackServiceClient::tryResolve(Version version, + ImportType type, + kj::StringPtr specifier, + kj::StringPtr rawSpecifier, + kj::StringPtr referrer, + const kj::HashMap& attributes) { + // Submit request to background thread. + { + auto lock = state.lockExclusive(); + KJ_REQUIRE(!lock->shutdown, "FallbackServiceClient has been shut down"); + lock->version = version; + lock->type = type; + lock->specifier = specifier; + lock->rawSpecifier = rawSpecifier; + lock->referrer = referrer; + lock->attributes = &attributes; + lock->hasRequest = true; + } + + // Block until the background thread has processed our request. + return state.when([](const SharedState& s) { return s.responseReady || s.shutdown; }, + [](SharedState& s) -> ModuleOrRedirect { + if (!s.responseReady) { + // Background thread shut down without producing a response. + return kj::none; + } + auto result = kj::mv(s.response); + s.responseReady = false; + return result; + }); +} + +void FallbackServiceClient::threadMain() { + KJ_TRY { + // Set up the async I/O context, DNS resolution, and HTTP client once. + // These are reused for all subsequent requests. + kj::AsyncIoContext io = kj::setupAsyncIo(); + kj::HttpHeaderTable::Builder builder; + kj::HttpHeaderId kMethod = builder.add("x-resolve-method"); + auto headerTable = builder.build(); + + auto addr = io.provider->getNetwork().parseAddress(ownedAddress, 80).wait(io.waitScope); + auto client = kj::newHttpClient(io.provider->getTimer(), *headerTable, *addr, {}); + + while (true) { + // Wait for a request or shutdown signal. + Version version; + ImportType type; + kj::StringPtr specifier; + kj::StringPtr rawSpecifier; + kj::StringPtr referrer; + const kj::HashMap* attributes; + bool shouldExit = state.when([](const SharedState& s) { return s.hasRequest || s.shutdown; }, + [&](SharedState& s) -> bool { + if (s.shutdown) return true; + version = s.version; + type = s.type; + specifier = s.specifier; + rawSpecifier = s.rawSpecifier; + referrer = s.referrer; + attributes = s.attributes; + s.hasRequest = false; + return false; + }); + if (shouldExit) return; + + // Process the request using the shared HTTP client. + ModuleOrRedirect result = kj::none; + + if (version == Version::V1) { + // === V1: GET request with query parameters === + kj::Maybe jsonPayload; + bool redirect = false; + bool prefixed = false; + kj::Url url; + kj::StringPtr actualSpecifier = nullptr; + + KJ_IF_SOME(pos, specifier.findLast('/')) { + auto segment = specifier.slice(pos + 1); + if (segment.startsWith("node:") || segment.startsWith("cloudflare:") || + segment.startsWith("workerd:")) { + actualSpecifier = segment; + url.query.add( + kj::Url::QueryParam{.name = kj::str("specifier"), .value = kj::str(segment)}); + prefixed = true; + } + } + if (!prefixed) { + actualSpecifier = specifier; + if (actualSpecifier.startsWith("/")) { + actualSpecifier = specifier.slice(1); + } + url.query.add(kj::Url::QueryParam{kj::str("specifier"), kj::str(specifier)}); + } + url.query.add(kj::Url::QueryParam{kj::str("referrer"), kj::str(referrer)}); + url.query.add(kj::Url::QueryParam{kj::str("rawSpecifier"), kj::str(rawSpecifier)}); + + auto spec = url.toString(kj::Url::HTTP_REQUEST); + + // Retry once on disconnect (stale pooled connection). + for (int attempt = 0; attempt < 2; attempt++) { + KJ_TRY { + kj::HttpHeaders headers(*headerTable); + headers.setPtr(kMethod, getMethodFromType(type)); + headers.setPtr(kj::HttpHeaderId::HOST, "localhost"_kj); + + auto request = client->request(kj::HttpMethod::GET, spec, headers, kj::none); + kj::HttpClient::Response resp = request.response.wait(io.waitScope); + + if (resp.statusCode == 301) { + KJ_IF_SOME(loc, resp.headers->get(kj::HttpHeaderId::LOCATION)) { + redirect = true; + jsonPayload = kj::str(loc); + } else { + KJ_LOG(ERROR, "Fallback service returned a redirect with no location", spec); + } + // Drain the response body to allow HTTP/1.1 connection reuse. + resp.body->readAllBytes().wait(io.waitScope); + } else if (resp.statusCode != 200) { + auto payload = resp.body->readAllText().wait(io.waitScope); + KJ_LOG(ERROR, "Fallback service failed to fetch module", payload, spec); + } else { + jsonPayload = resp.body->readAllText().wait(io.waitScope); + } + break; // Success, no retry needed. + } + KJ_CATCH(exception) { + if (attempt == 0 && exception.getType() == kj::Exception::Type::DISCONNECTED) { + // Stale pooled connection; retry with a fresh one. + continue; + } + KJ_LOG(ERROR, "Fallback service failed to fetch module", exception, spec); + } + } + + result = handleReturnPayload(kj::mv(jsonPayload), redirect, actualSpecifier); + + } else { + // === V2: POST request with JSON body === + capnp::JsonCodec json; + capnp::MallocMessageBuilder moduleMessage; + auto requestMsg = moduleMessage.initRoot(); + requestMsg.setType(getMethodFromType(type)); + requestMsg.setSpecifier(specifier); + requestMsg.setReferrer(referrer); + + if (rawSpecifier != nullptr) { + requestMsg.setRawSpecifier(rawSpecifier); + } + + KJ_ASSERT(attributes != nullptr); + if (attributes->size() > 0) { + auto attrs = requestMsg.initAttributes(attributes->size()); + size_t n = 0; + for (auto& attr: *attributes) { + attrs[n].setName(attr.key); + attrs[n].setValue(attr.value); + n++; + } + } + + auto payload = json.encode(requestMsg); + + kj::Maybe jsonPayload; + bool redirect = false; + + // Retry once on disconnect (stale pooled connection). + for (int attempt = 0; attempt < 2; attempt++) { + KJ_TRY { + kj::HttpHeaders headers(*headerTable); + headers.setPtr(kj::HttpHeaderId::HOST, "localhost"); + + auto request = client->request(kj::HttpMethod::POST, "/", headers, payload.size()); + request.body->write(payload.asPtr().asBytes()).wait(io.waitScope); + + kj::HttpClient::Response resp = request.response.wait(io.waitScope); + + if (resp.statusCode == 301) { + KJ_IF_SOME(loc, resp.headers->get(kj::HttpHeaderId::LOCATION)) { + redirect = true; + jsonPayload = kj::str(loc); + } else { + KJ_LOG(ERROR, "Fallback service returned a redirect with no location", specifier); + } + // Drain the response body to allow HTTP/1.1 connection reuse. + resp.body->readAllBytes().wait(io.waitScope); + } else if (resp.statusCode != 200) { + auto body = resp.body->readAllText().wait(io.waitScope); + KJ_LOG(ERROR, "Fallback service failed to fetch module", body, specifier); + } else { + jsonPayload = resp.body->readAllText().wait(io.waitScope); + } + break; // Success, no retry needed. + } + KJ_CATCH(exception) { + if (attempt == 0 && exception.getType() == kj::Exception::Type::DISCONNECTED) { + // Stale pooled connection; retry with a fresh one. + continue; + } + KJ_LOG(ERROR, "Fallback service failed to fetch module", exception); + } + } + + result = handleReturnPayload(kj::mv(jsonPayload), redirect, specifier); + } + + // Deliver the result to the calling thread. + { + auto lock = state.lockExclusive(); + lock->response = kj::mv(result); + lock->responseReady = true; + } + } + } + KJ_CATCH(exception) { + KJ_LOG(ERROR, "Fallback service background thread failed", exception); + // Signal any waiting caller and prevent future requests. + auto lock = state.lockExclusive(); + lock->response = kj::none; + lock->responseReady = true; + lock->shutdown = true; + } +} + } // namespace workerd::fallback diff --git a/src/workerd/server/fallback-service.h b/src/workerd/server/fallback-service.h index 44b6dfcb646..4ab73562173 100644 --- a/src/workerd/server/fallback-service.h +++ b/src/workerd/server/fallback-service.h @@ -4,8 +4,10 @@ #include #include +#include #include #include +#include namespace workerd::fallback { @@ -63,4 +65,53 @@ ModuleOrRedirect tryResolve(Version version, kj::StringPtr referrer, const kj::HashMap& attributes); +// A persistent client for the fallback service that uses a single background +// thread with a long-lived HTTP client for all module resolution requests. +// This avoids creating a new OS thread, DNS lookup, and TCP connection for +// each request, which can exhaust ephemeral ports when many modules are +// resolved concurrently (e.g. running many test files with vitest-pool-workers). +class FallbackServiceClient { + public: + explicit FallbackServiceClient(kj::String address); + ~FallbackServiceClient() noexcept(false); + + KJ_DISALLOW_COPY_AND_MOVE(FallbackServiceClient); + + ModuleOrRedirect tryResolve(Version version, + ImportType type, + kj::StringPtr specifier, + kj::StringPtr rawSpecifier, + kj::StringPtr referrer, + const kj::HashMap& attributes); + + private: + // Shared state between the calling thread and the background thread. + // Access is serialized through kj::MutexGuarded. + struct SharedState { + // Request fields - valid only when hasRequest is true. + // These contain non-owning references into the caller's stack frame, + // which is safe because the caller blocks until responseReady is set. + Version version = Version::V1; + ImportType type = ImportType::IMPORT; + kj::StringPtr specifier; + kj::StringPtr rawSpecifier; + kj::StringPtr referrer; + const kj::HashMap* attributes = nullptr; + bool hasRequest = false; + + // Response field - valid only when responseReady is true. + ModuleOrRedirect response; + bool responseReady = false; + + // Set to true to signal the background thread to exit. + bool shutdown = false; + }; + + kj::String ownedAddress; + kj::MutexGuarded state; + kj::Thread thread; + + void threadMain(); +}; + } // namespace workerd::fallback diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 681f92cb14b..08a0ece6024 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -4432,20 +4432,22 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr // to load certain modules from a fallback service. This is generally intended for local // dev/testing purposes only. auto& apiIsolate = isolate->getApi(); + auto fallbackClient = + kj::heap(kj::str(moduleFallback)); apiIsolate.setModuleFallbackCallback( - [address = kj::str(moduleFallback), featureFlags = apiIsolate.getFeatureFlags()]( + [client = kj::mv(fallbackClient), featureFlags = apiIsolate.getFeatureFlags()]( jsg::Lock& js, kj::StringPtr specifier, kj::Maybe referrer, jsg::CompilationObserver& observer, jsg::ModuleRegistry::ResolveMethod method, kj::Maybe rawSpecifier) mutable -> kj::Maybe> { kj::HashMap attributes; KJ_IF_SOME(moduleOrRedirect, - workerd::fallback::tryResolve(workerd::fallback::Version::V1, + client->tryResolve(workerd::fallback::Version::V1, method == jsg::ModuleRegistry::ResolveMethod::IMPORT ? workerd::fallback::ImportType::IMPORT : workerd::fallback::ImportType::REQUIRE, - address, specifier, rawSpecifier.orDefault(nullptr), - referrer.orDefault(kj::String()), attributes)) { + specifier, rawSpecifier.orDefault(nullptr), referrer.orDefault(kj::String()), + attributes)) { KJ_SWITCH_ONEOF(moduleOrRedirect) { KJ_CASE_ONEOF(redirect, kj::String) { // If a string is returned, then the fallback service returned a 301 redirect. diff --git a/src/workerd/server/workerd-api.c++ b/src/workerd/server/workerd-api.c++ index 0256de34332..996d075de79 100644 --- a/src/workerd/server/workerd-api.c++ +++ b/src/workerd/server/workerd-api.c++ @@ -1035,15 +1035,17 @@ kj::Arc WorkerdApi::newWorkerdModuleRegistry( // configured to use the fallback will send a request to the fallback // service to try resolving. KJ_IF_SOME(fallbackService, maybeFallbackService) { + auto fallbackClient = + kj::heap(kj::str(fallbackService)); builder.add(jsg::modules::ModuleBundle::newFallbackBundle( - [fallbackService = kj::str(fallbackService), featureFlags]( - const jsg::modules::ResolveContext& context) - -> kj::Maybe>> { + [client = kj::mv(fallbackClient), featureFlags]( + const jsg::modules::ResolveContext& context) mutable + -> kj::Maybe>> { auto normalizedSpecifier = kj::str(context.normalizedSpecifier.getHref()); auto referrer = kj::str(context.referrerNormalizedSpecifier.getHref()); KJ_IF_SOME(resolved, - workerd::fallback::tryResolve(workerd::fallback::Version::V2, - workerd::fallback::ImportType::IMPORT, fallbackService, normalizedSpecifier, + client->tryResolve(workerd::fallback::Version::V2, + workerd::fallback::ImportType::IMPORT, normalizedSpecifier, context.rawSpecifier.orDefault(nullptr), referrer, context.attributes)) { KJ_SWITCH_ONEOF(resolved) { KJ_CASE_ONEOF(str, kj::String) {