Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ wd_cc_library(

wd_cc_library(
name = "limit-enforcer",
hdrs = ["limit-enforcer.h"],
hdrs = [
"limit-enforcer.h",
"wasm-shutdown-signal.h",
],
visibility = ["//visibility:public"],
deps = [
":outcome_capnp",
Expand Down
52 changes: 52 additions & 0 deletions src/workerd/io/limit-enforcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@
#pragma once

#include <workerd/io/outcome.capnp.h>
#include <workerd/io/wasm-shutdown-signal.h>

#include <v8-array-buffer.h>
#include <v8-isolate.h>

#include <kj/async.h> // For Promise
#include <kj/debug.h> // For KJ_REQUIRE
#include <kj/memory.h> // for Own
#include <kj/one-of.h> // for OneOf
#include <kj/time.h> // for Duration

#include <memory> // for std::shared_ptr

namespace workerd {
class IsolateObserver;
class RequestObserver;
Expand Down Expand Up @@ -98,6 +103,43 @@ class IsolateLimitEnforcer: public kj::Refcounted {

virtual bool hasExcessivelyExceededHeapLimit() const = 0;

// Registers a WASM module for receiving the "shut down" signal when CPU time is nearly
// exhausted. The signal handler will write 1 (as a uint32) into the module's linear memory
// at `signalOffset` bytes from the start of `backingStore`. The runtime reads
// `terminatedOffset` in a GC prologue to detect when the module has exited.
//
// Must be called with the isolate lock held.
void registerWasmShutdownSignal(std::shared_ptr<v8::BackingStore> backingStore,
uint32_t signalOffset,
uint32_t terminatedOffset) const {
KJ_REQUIRE(
static_cast<size_t>(signalOffset) + WASM_SIGNAL_FIELD_BYTES <= backingStore->ByteLength(),
"__signal_address offset is out of bounds: need ", WASM_SIGNAL_FIELD_BYTES,
" bytes but memory is too small");
KJ_REQUIRE(static_cast<size_t>(terminatedOffset) + WASM_SIGNAL_FIELD_BYTES <=
backingStore->ByteLength(),
"__terminated_address offset is out of bounds: need ", WASM_SIGNAL_FIELD_BYTES,
" bytes but memory is too small");
wasmShutdownSignals.pushFront(
WasmShutdownSignal{kj::mv(backingStore), signalOffset, terminatedOffset});
}

// Filters out WASM shutdown signal entries where the module has exited (indicated by a
// non-zero value at the terminated address). This should be called from a GC prologue
// hook to allow linear memory to be reclaimed.
//
// Must be called with the isolate lock held.
void filterWasmShutdownSignals() const {
wasmShutdownSignals.filter(
[](const WasmShutdownSignal& signal) { return signal.isModuleListening(); });
}

// Returns the list of registered WASM shutdown signals. The list itself is signal-safe for
// reading (via iterate()), so a signal handler can safely walk it.
const AtomicList<WasmShutdownSignal>& getWasmShutdownSignals() const {
return wasmShutdownSignals;
}

// Inserts a custom mark event named `name` into this isolate's perf event data stream. At
// present, this is only implemented internally. Call this function from various APIs to be able
// to correlate perf event data with usage of those APIs.
Expand All @@ -107,6 +149,16 @@ class IsolateLimitEnforcer: public kj::Refcounted {
// coupled with our CPU time limiting system, so adding this function here is a path of least
// resistance.
virtual void markPerfEvent(kj::LiteralStringConst name) const {};

private:
// WASM modules that have opted into receiving the "shut down" signal by exporting i32 globals
// named "__signal_address" and "__terminated_address". When the CPU time limiter fires
// NEARLY_OUT_OF_TIME, it writes 1 into each module's linear memory at the signal address.
//
// Marked mutable because registration happens through `const IsolateLimitEnforcer&` (the
// standard access pattern), and the AtomicList itself uses atomic stores for safe concurrent
// access from signal handlers on the same thread.
mutable AtomicList<WasmShutdownSignal> wasmShutdownSignals;
};

// Abstract interface that enforces resource limits on a IoContext.
Expand Down
141 changes: 141 additions & 0 deletions src/workerd/io/wasm-shutdown-signal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (c) 2017-2022 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

#pragma once

#include <v8-array-buffer.h>

#include <kj/common.h>

#include <atomic>
#include <memory>

namespace workerd {

// Byte size of each signal field in WASM linear memory (a single uint32).
constexpr size_t WASM_SIGNAL_FIELD_BYTES = sizeof(uint32_t);

// Represents a single WASM module that has opted into receiving the "shut down" signal when CPU
// time is nearly exhausted. The module exports two i32 globals:
//
// "__signal_address" — address of a uint32 in linear memory. The runtime writes 1 here
// when CPU time is nearly exhausted.
// "__terminated_address" — address of a uint32 in linear memory. The WASM module writes a
// non-zero value here when it has exited and is no longer listening.
// The runtime checks this in a GC prologue hook and removes entries
// where terminated is non-zero, allowing the linear memory to be
// reclaimed.
struct WasmShutdownSignal {
// This reference is shared rather than weak so that we can be sure it is not being
// garbage collected when the signal handler runs. This memory gets cleaned up in a
// V8 GC prelude hook where we can atomically remove it from the signal list before
// freeing the memory.
std::shared_ptr<v8::BackingStore> backingStore;

// Offset into `backingStore` of the uint32 the runtime writes 1 to (__signal_address).
uint32_t signalByteOffset;

// Offset into `backingStore` of the uint32 the module writes to (__terminated_address).
uint32_t terminatedByteOffset;

// Returns true if the module is still listening for signals (terminated == 0).
// Returns false if the module has exited and this entry should be removed.
bool isModuleListening() const {
uint32_t terminated;
memcpy(&terminated, static_cast<kj::byte*>(backingStore->Data()) + terminatedByteOffset,
sizeof(terminated));
return terminated == 0;
}
};

// A linked list type which is signal-safe (for reading), but not thread safe - it can handle
// same-thread concurrency ONLY. Mutations (pushFront, filter) are not signal safe, but are
// implemented such that they can be interrupted at any point by a signal handler, and the list will
// still be in a valid state. This means that reading the list (iterate) IS signal safe.
template <typename T>
class AtomicList {
public:
struct Node {
T value;
Node* next;
template <typename... Args>
explicit Node(Args&&... args): value(kj::fwd<Args>(args)...),
next(nullptr) {}
};

AtomicList() {}

~AtomicList() noexcept(false) {
Node* node = __atomic_load_n(&head, __ATOMIC_RELAXED);
while (node != nullptr) {
Node* doomed = node;
node = __atomic_load_n(&doomed->next, __ATOMIC_RELAXED);
delete doomed;
}
}

// Prepends a new node constructed from `args` at the front of the list
template <typename... Args>
void pushFront(Args&&... args) {
Node* node = new Node(kj::fwd<Args>(args)...);
__atomic_store_n(&node->next, __atomic_load_n(&head, __ATOMIC_RELAXED), __ATOMIC_RELAXED);
__atomic_store_n(&head, node, __ATOMIC_RELEASE);
}

// Removes all nodes for which `predicate(node.value)` returns false
template <typename Predicate>
void filter(Predicate&& predicate) {
Node** prev = &head;
Node* current = __atomic_load_n(prev, __ATOMIC_RELAXED);

while (current != nullptr) {
Node* next = __atomic_load_n(&current->next, __ATOMIC_RELAXED);

if (predicate(current->value)) {
prev = &current->next;
} else {
// Splice out `current` by pointing its predecessor at `next`. Release ordering ensures a
// signal handler that loads *prev with acquire sees a fully consistent successor chain.
__atomic_store_n(prev, next, __ATOMIC_RELEASE);
delete current;
}

current = next;
}
}

// Returns true if the list is empty. Signal safe.
bool isEmpty() const {
return __atomic_load_n(&head, __ATOMIC_ACQUIRE) == nullptr;
}

// Traverses the list, calling `func(node.value)` for each node. Signal safe.
template <typename Func>
void iterate(Func&& func) const {
Node* current = __atomic_load_n(&head, __ATOMIC_ACQUIRE);
while (current != nullptr) {
func(current->value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can you ensure that current is a valid pointer?

load current at line 116
context switch
filter() above, calling delete current at line 101
context switch back

while the chain kept intact correctly for reading, i don't think you can ever verify pointer integrity with this method. it would only work with some sort of atomic reference count or something on the nodes, or
am i missing something?

current = __atomic_load_n(&current->next, __ATOMIC_ACQUIRE);
}
}

private:
Node* head = nullptr;

KJ_DISALLOW_COPY_AND_MOVE(AtomicList);
};

// Iterates a WasmShutdownSignal list and writes the shutdown signal (value 1) to each
// registered memory location. This function is signal-safe.
inline void writeWasmShutdownSignals(const AtomicList<WasmShutdownSignal>& signals) {
signals.iterate([](const WasmShutdownSignal& signal) {
// Signal-safe: BackingStore::Data() is a trivial getter; memcpy into mapped WASM memory
// is a plain store.
uint32_t value = 1;
memcpy(static_cast<kj::byte*>(signal.backingStore->Data()) + signal.signalByteOffset, &value,
sizeof(value));
});
}

} // namespace workerd
Loading
Loading