-
Notifications
You must be signed in to change notification settings - Fork 539
fix: reuse HTTP connections in fallback service to prevent port exhaustion #6115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -40,7 +40,7 @@ ModuleOrRedirect handleReturnPayload( | |||||||||||
| // The response from the fallback service must be a valid JSON serialization | ||||||||||||
| // of the workerd module configuration. If it is not, or if there is any other | ||||||||||||
| // error when processing here, we'll log the exception and return nothing. | ||||||||||||
| try { | ||||||||||||
| KJ_TRY { | ||||||||||||
| capnp::MallocMessageBuilder moduleMessage; | ||||||||||||
| capnp::JsonCodec json; | ||||||||||||
| json.handleByAnnotation<server::config::Worker::Module>(); | ||||||||||||
|
|
@@ -63,8 +63,8 @@ ModuleOrRedirect handleReturnPayload( | |||||||||||
|
|
||||||||||||
| kj::Own<server::config::Worker::Module::Reader> ret = capnp::clone(moduleBuilder.asReader()); | ||||||||||||
| return ModuleOrRedirect(kj::mv(ret)); | ||||||||||||
| } catch (...) { | ||||||||||||
| auto exception = kj::getCaughtExceptionAsKj(); | ||||||||||||
| } | ||||||||||||
| KJ_CATCH(exception) { | ||||||||||||
| KJ_LOG(ERROR, "Fallback service failed to fetch module", exception, specifier); | ||||||||||||
| return kj::none; | ||||||||||||
| } | ||||||||||||
|
|
@@ -119,7 +119,7 @@ ModuleOrRedirect tryResolveV1(ImportType type, | |||||||||||
| // so that the destructor joins the current thread (blocking it). The thread will | ||||||||||||
| // either set the jsonPayload variable or not. | ||||||||||||
| kj::Thread loaderThread([&spec, address, &jsonPayload, &redirect, type]() mutable { | ||||||||||||
| try { | ||||||||||||
| KJ_TRY { | ||||||||||||
| kj::AsyncIoContext io = kj::setupAsyncIo(); | ||||||||||||
| kj::HttpHeaderTable::Builder builder; | ||||||||||||
| kj::HttpHeaderId kMethod = builder.add("x-resolve-method"); | ||||||||||||
|
|
@@ -153,8 +153,8 @@ ModuleOrRedirect tryResolveV1(ImportType type, | |||||||||||
| } else { | ||||||||||||
| jsonPayload = resp.body->readAllText().wait(io.waitScope); | ||||||||||||
| } | ||||||||||||
| } catch (...) { | ||||||||||||
| auto exception = kj::getCaughtExceptionAsKj(); | ||||||||||||
| } | ||||||||||||
| KJ_CATCH(exception) { | ||||||||||||
| KJ_LOG(ERROR, "Fallback service failed to fetch module", exception, spec); | ||||||||||||
| } | ||||||||||||
| }); | ||||||||||||
|
|
@@ -199,7 +199,7 @@ ModuleOrRedirect tryResolveV2(ImportType type, | |||||||||||
| { | ||||||||||||
| kj::Thread loaderThread( | ||||||||||||
| [&address, &jsonPayload, payload = kj::mv(payload), &redirect, &specifier]() mutable { | ||||||||||||
| try { | ||||||||||||
| KJ_TRY { | ||||||||||||
| kj::AsyncIoContext io = kj::setupAsyncIo(); | ||||||||||||
| kj::HttpHeaderTable::Builder builder; | ||||||||||||
| auto headerTable = builder.build(); | ||||||||||||
|
|
@@ -235,8 +235,8 @@ ModuleOrRedirect tryResolveV2(ImportType type, | |||||||||||
| } else { | ||||||||||||
| jsonPayload = resp.body->readAllText().wait(io.waitScope); | ||||||||||||
| } | ||||||||||||
| } catch (...) { | ||||||||||||
| auto exception = kj::getCaughtExceptionAsKj(); | ||||||||||||
| } | ||||||||||||
| KJ_CATCH(exception) { | ||||||||||||
| KJ_LOG(ERROR, "Fallback service failed to fetch module", exception); | ||||||||||||
| } | ||||||||||||
| }); | ||||||||||||
|
|
@@ -258,4 +258,241 @@ ModuleOrRedirect tryResolve(Version version, | |||||||||||
| : tryResolveV2(type, address, specifier, rawSpecifier, referrer, attributes); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // ---- FallbackServiceClient implementation ---- | ||||||||||||
|
|
||||||||||||
| FallbackServiceClient::FallbackServiceClient(kj::String address) | ||||||||||||
| : ownedAddress(kj::mv(address)), | ||||||||||||
| thread([this]() { threadMain(); }) {} | ||||||||||||
|
|
||||||||||||
| FallbackServiceClient::~FallbackServiceClient() noexcept(false) { | ||||||||||||
| // Signal the background thread to exit. The kj::Thread destructor | ||||||||||||
| // (which runs after this body) will join the thread. | ||||||||||||
| auto lock = state.lockExclusive(); | ||||||||||||
| lock->shutdown = true; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| ModuleOrRedirect FallbackServiceClient::tryResolve(Version version, | ||||||||||||
| ImportType type, | ||||||||||||
| kj::StringPtr specifier, | ||||||||||||
| kj::StringPtr rawSpecifier, | ||||||||||||
| kj::StringPtr referrer, | ||||||||||||
| const kj::HashMap<kj::StringPtr, kj::StringPtr>& attributes) { | ||||||||||||
| // Submit request to background thread. | ||||||||||||
| { | ||||||||||||
| auto lock = state.lockExclusive(); | ||||||||||||
| KJ_REQUIRE(!lock->shutdown, "FallbackServiceClient has been shut down"); | ||||||||||||
| lock->version = version; | ||||||||||||
| lock->type = type; | ||||||||||||
| lock->specifier = specifier; | ||||||||||||
| lock->rawSpecifier = rawSpecifier; | ||||||||||||
| lock->referrer = referrer; | ||||||||||||
| lock->attributes = &attributes; | ||||||||||||
| lock->hasRequest = true; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // Block until the background thread has processed our request. | ||||||||||||
| return state.when([](const SharedState& s) { return s.responseReady || s.shutdown; }, | ||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll need to experiment with it a bit to verify everything but this is good. One thing we might consider, however, is having a small pool of these clients so that multiple concurrent loads aren't blocking each other and bottlenecking things quite as much. |
||||||||||||
| [](SharedState& s) -> ModuleOrRedirect { | ||||||||||||
| if (!s.responseReady) { | ||||||||||||
| // Background thread shut down without producing a response. | ||||||||||||
| return kj::none; | ||||||||||||
| } | ||||||||||||
| auto result = kj::mv(s.response); | ||||||||||||
| s.responseReady = false; | ||||||||||||
| return result; | ||||||||||||
| }); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| void FallbackServiceClient::threadMain() { | ||||||||||||
| KJ_TRY { | ||||||||||||
| // Set up the async I/O context, DNS resolution, and HTTP client once. | ||||||||||||
| // These are reused for all subsequent requests. | ||||||||||||
| kj::AsyncIoContext io = kj::setupAsyncIo(); | ||||||||||||
| kj::HttpHeaderTable::Builder builder; | ||||||||||||
| kj::HttpHeaderId kMethod = builder.add("x-resolve-method"); | ||||||||||||
| auto headerTable = builder.build(); | ||||||||||||
|
|
||||||||||||
| auto addr = io.provider->getNetwork().parseAddress(ownedAddress, 80).wait(io.waitScope); | ||||||||||||
| auto client = kj::newHttpClient(io.provider->getTimer(), *headerTable, *addr, {}); | ||||||||||||
|
|
||||||||||||
| while (true) { | ||||||||||||
| // Wait for a request or shutdown signal. | ||||||||||||
| Version version; | ||||||||||||
| ImportType type; | ||||||||||||
| kj::StringPtr specifier; | ||||||||||||
| kj::StringPtr rawSpecifier; | ||||||||||||
| kj::StringPtr referrer; | ||||||||||||
| const kj::HashMap<kj::StringPtr, kj::StringPtr>* attributes; | ||||||||||||
| bool shouldExit = state.when([](const SharedState& s) { return s.hasRequest || s.shutdown; }, | ||||||||||||
| [&](SharedState& s) -> bool { | ||||||||||||
| if (s.shutdown) return true; | ||||||||||||
| version = s.version; | ||||||||||||
| type = s.type; | ||||||||||||
| specifier = s.specifier; | ||||||||||||
| rawSpecifier = s.rawSpecifier; | ||||||||||||
| referrer = s.referrer; | ||||||||||||
| attributes = s.attributes; | ||||||||||||
| s.hasRequest = false; | ||||||||||||
| return false; | ||||||||||||
| }); | ||||||||||||
| if (shouldExit) return; | ||||||||||||
|
|
||||||||||||
| // Process the request using the shared HTTP client. | ||||||||||||
| ModuleOrRedirect result = kj::none; | ||||||||||||
|
|
||||||||||||
| if (version == Version::V1) { | ||||||||||||
| // === V1: GET request with query parameters === | ||||||||||||
| kj::Maybe<kj::String> jsonPayload; | ||||||||||||
| bool redirect = false; | ||||||||||||
| bool prefixed = false; | ||||||||||||
| kj::Url url; | ||||||||||||
| kj::StringPtr actualSpecifier = nullptr; | ||||||||||||
|
|
||||||||||||
| KJ_IF_SOME(pos, specifier.findLast('/')) { | ||||||||||||
| auto segment = specifier.slice(pos + 1); | ||||||||||||
| if (segment.startsWith("node:") || segment.startsWith("cloudflare:") || | ||||||||||||
| segment.startsWith("workerd:")) { | ||||||||||||
| actualSpecifier = segment; | ||||||||||||
| url.query.add( | ||||||||||||
| kj::Url::QueryParam{.name = kj::str("specifier"), .value = kj::str(segment)}); | ||||||||||||
| prefixed = true; | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| if (!prefixed) { | ||||||||||||
| actualSpecifier = specifier; | ||||||||||||
| if (actualSpecifier.startsWith("/")) { | ||||||||||||
| actualSpecifier = specifier.slice(1); | ||||||||||||
| } | ||||||||||||
| url.query.add(kj::Url::QueryParam{kj::str("specifier"), kj::str(specifier)}); | ||||||||||||
| } | ||||||||||||
| url.query.add(kj::Url::QueryParam{kj::str("referrer"), kj::str(referrer)}); | ||||||||||||
| url.query.add(kj::Url::QueryParam{kj::str("rawSpecifier"), kj::str(rawSpecifier)}); | ||||||||||||
|
|
||||||||||||
| auto spec = url.toString(kj::Url::HTTP_REQUEST); | ||||||||||||
|
|
||||||||||||
| // Retry once on disconnect (stale pooled connection). | ||||||||||||
| for (int attempt = 0; attempt < 2; attempt++) { | ||||||||||||
| KJ_TRY { | ||||||||||||
| kj::HttpHeaders headers(*headerTable); | ||||||||||||
| headers.setPtr(kMethod, getMethodFromType(type)); | ||||||||||||
| headers.setPtr(kj::HttpHeaderId::HOST, "localhost"_kj); | ||||||||||||
|
|
||||||||||||
| auto request = client->request(kj::HttpMethod::GET, spec, headers, kj::none); | ||||||||||||
| kj::HttpClient::Response resp = request.response.wait(io.waitScope); | ||||||||||||
|
|
||||||||||||
| if (resp.statusCode == 301) { | ||||||||||||
| KJ_IF_SOME(loc, resp.headers->get(kj::HttpHeaderId::LOCATION)) { | ||||||||||||
| redirect = true; | ||||||||||||
| jsonPayload = kj::str(loc); | ||||||||||||
| } else { | ||||||||||||
| KJ_LOG(ERROR, "Fallback service returned a redirect with no location", spec); | ||||||||||||
| } | ||||||||||||
| // Drain the response body to allow HTTP/1.1 connection reuse. | ||||||||||||
| resp.body->readAllBytes().wait(io.waitScope); | ||||||||||||
| } else if (resp.statusCode != 200) { | ||||||||||||
| auto payload = resp.body->readAllText().wait(io.waitScope); | ||||||||||||
| KJ_LOG(ERROR, "Fallback service failed to fetch module", payload, spec); | ||||||||||||
| } else { | ||||||||||||
| jsonPayload = resp.body->readAllText().wait(io.waitScope); | ||||||||||||
| } | ||||||||||||
| break; // Success, no retry needed. | ||||||||||||
| } | ||||||||||||
| KJ_CATCH(exception) { | ||||||||||||
| if (attempt == 0 && exception.getType() == kj::Exception::Type::DISCONNECTED) { | ||||||||||||
| // Stale pooled connection; retry with a fresh one. | ||||||||||||
| continue; | ||||||||||||
| } | ||||||||||||
| KJ_LOG(ERROR, "Fallback service failed to fetch module", exception, spec); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| result = handleReturnPayload(kj::mv(jsonPayload), redirect, actualSpecifier); | ||||||||||||
|
|
||||||||||||
| } else { | ||||||||||||
| // === V2: POST request with JSON body === | ||||||||||||
| capnp::JsonCodec json; | ||||||||||||
| capnp::MallocMessageBuilder moduleMessage; | ||||||||||||
| auto requestMsg = moduleMessage.initRoot<server::config::FallbackServiceRequest>(); | ||||||||||||
| requestMsg.setType(getMethodFromType(type)); | ||||||||||||
| requestMsg.setSpecifier(specifier); | ||||||||||||
| requestMsg.setReferrer(referrer); | ||||||||||||
|
|
||||||||||||
| if (rawSpecifier != nullptr) { | ||||||||||||
| requestMsg.setRawSpecifier(rawSpecifier); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| KJ_ASSERT(attributes != nullptr); | ||||||||||||
| if (attributes->size() > 0) { | ||||||||||||
| auto attrs = requestMsg.initAttributes(attributes->size()); | ||||||||||||
| size_t n = 0; | ||||||||||||
| for (auto& attr: *attributes) { | ||||||||||||
| attrs[n].setName(attr.key); | ||||||||||||
| attrs[n].setValue(attr.value); | ||||||||||||
| n++; | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| auto payload = json.encode(requestMsg); | ||||||||||||
|
|
||||||||||||
| kj::Maybe<kj::String> jsonPayload; | ||||||||||||
| bool redirect = false; | ||||||||||||
|
|
||||||||||||
| // Retry once on disconnect (stale pooled connection). | ||||||||||||
| for (int attempt = 0; attempt < 2; attempt++) { | ||||||||||||
| KJ_TRY { | ||||||||||||
| kj::HttpHeaders headers(*headerTable); | ||||||||||||
| headers.setPtr(kj::HttpHeaderId::HOST, "localhost"); | ||||||||||||
|
|
||||||||||||
| auto request = client->request(kj::HttpMethod::POST, "/", headers, payload.size()); | ||||||||||||
| request.body->write(payload.asPtr().asBytes()).wait(io.waitScope); | ||||||||||||
|
|
||||||||||||
| kj::HttpClient::Response resp = request.response.wait(io.waitScope); | ||||||||||||
|
|
||||||||||||
| if (resp.statusCode == 301) { | ||||||||||||
| KJ_IF_SOME(loc, resp.headers->get(kj::HttpHeaderId::LOCATION)) { | ||||||||||||
| redirect = true; | ||||||||||||
| jsonPayload = kj::str(loc); | ||||||||||||
| } else { | ||||||||||||
| KJ_LOG(ERROR, "Fallback service returned a redirect with no location", specifier); | ||||||||||||
| } | ||||||||||||
| // Drain the response body to allow HTTP/1.1 connection reuse. | ||||||||||||
| resp.body->readAllBytes().wait(io.waitScope); | ||||||||||||
| } else if (resp.statusCode != 200) { | ||||||||||||
| auto body = resp.body->readAllText().wait(io.waitScope); | ||||||||||||
| KJ_LOG(ERROR, "Fallback service failed to fetch module", body, specifier); | ||||||||||||
| } else { | ||||||||||||
| jsonPayload = resp.body->readAllText().wait(io.waitScope); | ||||||||||||
| } | ||||||||||||
| break; // Success, no retry needed. | ||||||||||||
| } | ||||||||||||
| KJ_CATCH(exception) { | ||||||||||||
| if (attempt == 0 && exception.getType() == kj::Exception::Type::DISCONNECTED) { | ||||||||||||
| // Stale pooled connection; retry with a fresh one. | ||||||||||||
| continue; | ||||||||||||
| } | ||||||||||||
| KJ_LOG(ERROR, "Fallback service failed to fetch module", exception); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| result = handleReturnPayload(kj::mv(jsonPayload), redirect, specifier); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // Deliver the result to the calling thread. | ||||||||||||
| { | ||||||||||||
| auto lock = state.lockExclusive(); | ||||||||||||
| lock->response = kj::mv(result); | ||||||||||||
| lock->responseReady = true; | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| KJ_CATCH(exception) { | ||||||||||||
| KJ_LOG(ERROR, "Fallback service background thread failed", exception); | ||||||||||||
|
Comment on lines
+488
to
+489
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM] If the background thread catches any exception during setup (e.g., DNS resolution failure, This is acceptable for a local-dev-only feature, but the log message could be more descriptive to help users diagnose the issue. Consider something like:
Suggested change
Separately, @jasnell's suggestion of a small pool of clients would also mitigate this — a single transient failure wouldn't take down the whole fallback mechanism. |
||||||||||||
| // Signal any waiting caller and prevent future requests. | ||||||||||||
| auto lock = state.lockExclusive(); | ||||||||||||
| lock->response = kj::none; | ||||||||||||
| lock->responseReady = true; | ||||||||||||
| lock->shutdown = true; | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| } // namespace workerd::fallback | ||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,8 +4,10 @@ | |
|
|
||
| #include <kj/common.h> | ||
| #include <kj/map.h> | ||
| #include <kj/mutex.h> | ||
| #include <kj/one-of.h> | ||
| #include <kj/string.h> | ||
| #include <kj/thread.h> | ||
|
|
||
| namespace workerd::fallback { | ||
|
|
||
|
|
@@ -63,4 +65,53 @@ ModuleOrRedirect tryResolve(Version version, | |
| kj::StringPtr referrer, | ||
| const kj::HashMap<kj::StringPtr, kj::StringPtr>& attributes); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM] Both call sites (the V1 path in Consider removing them in this PR (or a follow-up) to reduce the ~170 lines of duplication in the implementation file. If they're kept intentionally as a fallback, a comment explaining why would be helpful. |
||
|
|
||
| // A persistent client for the fallback service that uses a single background | ||
| // thread with a long-lived HTTP client for all module resolution requests. | ||
| // This avoids creating a new OS thread, DNS lookup, and TCP connection for | ||
| // each request, which can exhaust ephemeral ports when many modules are | ||
| // resolved concurrently (e.g. running many test files with vitest-pool-workers). | ||
| class FallbackServiceClient { | ||
| public: | ||
| explicit FallbackServiceClient(kj::String address); | ||
| ~FallbackServiceClient() noexcept(false); | ||
|
|
||
| KJ_DISALLOW_COPY_AND_MOVE(FallbackServiceClient); | ||
|
|
||
| ModuleOrRedirect tryResolve(Version version, | ||
| ImportType type, | ||
| kj::StringPtr specifier, | ||
| kj::StringPtr rawSpecifier, | ||
| kj::StringPtr referrer, | ||
| const kj::HashMap<kj::StringPtr, kj::StringPtr>& attributes); | ||
|
|
||
| private: | ||
| // Shared state between the calling thread and the background thread. | ||
| // Access is serialized through kj::MutexGuarded. | ||
| struct SharedState { | ||
| // Request fields - valid only when hasRequest is true. | ||
| // These contain non-owning references into the caller's stack frame, | ||
| // which is safe because the caller blocks until responseReady is set. | ||
| Version version = Version::V1; | ||
| ImportType type = ImportType::IMPORT; | ||
| kj::StringPtr specifier; | ||
| kj::StringPtr rawSpecifier; | ||
| kj::StringPtr referrer; | ||
| const kj::HashMap<kj::StringPtr, kj::StringPtr>* attributes = nullptr; | ||
| bool hasRequest = false; | ||
|
Comment on lines
+91
to
+100
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [HIGH] These However, in Consider copying to owned |
||
|
|
||
| // Response field - valid only when responseReady is true. | ||
| ModuleOrRedirect response; | ||
| bool responseReady = false; | ||
|
|
||
| // Set to true to signal the background thread to exit. | ||
| bool shutdown = false; | ||
| }; | ||
|
|
||
| kj::String ownedAddress; | ||
| kj::MutexGuarded<SharedState> state; | ||
| kj::Thread thread; | ||
|
|
||
| void threadMain(); | ||
| }; | ||
|
|
||
| } // namespace workerd::fallback | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[HIGH] No guard against concurrent callers — if two threads call
tryResolve()simultaneously, the second caller will overwrite the first's request fields. The background thread then processes the wrong request and the first caller gets the second caller's response — silently.I understand module resolution is currently single-threaded per isolate/registry, so this is safe in practice. But the class API doesn't enforce the single-caller invariant. Consider adding an assertion here so misuse crashes immediately rather than producing wrong results:
Also consider documenting the single-caller invariant in the class comment in the header.