Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ kj::Array<kj::byte> emptyByteArray() {
return kj::heapArray<kj::byte>(0);
}

void requireValidLabels(jsg::Dict<kj::String>& labels) {
for (auto i: kj::indices(labels.fields)) {
auto& field = labels.fields[i];
JSG_REQUIRE(field.name.size() > 0, Error, "Label names cannot be empty");
for (auto c: field.name) {
JSG_REQUIRE(static_cast<kj::byte>(c) >= 0x20, Error,
"Label names cannot contain control characters (index ", i, ")");
}
for (auto c: field.value) {
JSG_REQUIRE(static_cast<kj::byte>(c) >= 0x20, Error,
"Label values cannot contain control characters (index ", i, ")");
}
}
}

capnp::ByteStream::Client makeExecPipe(
capnp::ByteStreamFactory& factory, kj::Own<kj::AsyncOutputStream> output) {
return factory.kjToCapnp(capnp::ExplicitEndOutputStream::wrap(kj::mv(output), []() {}));
Expand Down Expand Up @@ -237,18 +252,10 @@ void Container::start(jsg::Lock& js, jsg::Optional<StartupOptions> maybeOptions)
}

KJ_IF_SOME(labels, options.labels) {
requireValidLabels(labels);
auto list = req.initLabels(labels.fields.size());
for (auto i: kj::indices(labels.fields)) {
auto& field = labels.fields[i];
JSG_REQUIRE(field.name.size() > 0, Error, "Label names cannot be empty");
for (auto c: field.name) {
JSG_REQUIRE(static_cast<kj::byte>(c) >= 0x20, Error,
"Label names cannot contain control characters (index ", i, ")");
}
for (auto c: field.value) {
JSG_REQUIRE(static_cast<kj::byte>(c) >= 0x20, Error,
"Label values cannot contain control characters (index ", i, ")");
}
list[i].setName(field.name);
list[i].setValue(field.value);
}
Expand Down Expand Up @@ -284,6 +291,22 @@ void Container::start(jsg::Lock& js, jsg::Optional<StartupOptions> maybeOptions)
running = true;
}

jsg::Promise<void> Container::setLabels(jsg::Lock& js, jsg::Dict<kj::String> labels) {
JSG_REQUIRE(running, Error, "setLabels() cannot be called on a container that is not running.");

requireValidLabels(labels);

auto req = rpcClient->setLabelsRequest();
auto list = req.initLabels(labels.fields.size());
for (auto i: kj::indices(labels.fields)) {
auto& field = labels.fields[i];
list[i].setName(field.name);
list[i].setValue(field.value);
}

return IoContext::current().awaitIo(js, req.sendIgnoringResult());
}

jsg::Promise<kj::Maybe<Container::Info>> Container::inspect(jsg::Lock& js) {
return IoContext::current().awaitIo(js, rpcClient->inspectRequest().send(),
[](jsg::Lock& js,
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/api/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ class Container: public jsg::Object {

jsg::Promise<kj::Maybe<Info>> inspect(jsg::Lock& js);

jsg::Promise<void> setLabels(jsg::Lock& js, jsg::Dict<kj::String> labels);

// TODO(containers): listenTcp()

JSG_RESOURCE_TYPE(Container, CompatibilityFlags::Reader flags) {
Expand All @@ -278,6 +280,7 @@ class Container: public jsg::Object {
JSG_METHOD(exec);
JSG_METHOD(interceptOutboundTcp);
JSG_METHOD(inspect);
JSG_METHOD(setLabels);
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/workerd/io/container.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,16 @@ interface Container @0x9aaceefc06523bca {
inspect @14 () -> (info :InspectInfo);
# Returns information about the container, or `none` if the container has not been started.

setLabels @15 (labels :List(Label));
# Replaces the container's current label set with the provided list.

struct InspectInfo {
union {
none @0 :Void;
started :group {
labels @1 :List(Label);
# Echo of StartParams.labels. Empty list means start() was called with no labels.
# Current in-memory label set. Initially populated from StartParams.labels and replaced by
# setLabels(). Empty list means the current set is empty.
}
}
}
Expand Down
70 changes: 40 additions & 30 deletions src/workerd/server/container-client.c++
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ constexpr kj::StringPtr SNAPSHOT_CLONE_VOLUME_PREFIX = "workerd-snap-clone-"_kj;
constexpr kj::StringPtr CONTAINER_SNAPSHOT_IMAGE_PREFIX = "workerd-container-snap-"_kj;
constexpr kj::StringPtr SNAPSHOT_VOLUME_CREATED_AT_LABEL = "dev.workerd.snapshot-created-at"_kj;

// Prefix applied to user-supplied labels when writing them to the Docker container, and
// stripped back out when reading them via inspect(). Lets us distinguish labels the worker
// set via start() from labels that came from the image (via Dockerfile LABEL) or engine.
constexpr kj::StringPtr WORKERD_LABEL_PREFIX = "workerd-"_kj;
constexpr auto SNAPSHOT_STALE_AGE = 30 * kj::DAYS;

// Maximum size of a snapshot tar archive held in memory during snapshot create/restore.
Expand Down Expand Up @@ -1545,20 +1541,11 @@ kj::Promise<kj::Maybe<ContainerClient::InspectResponse>> ContainerClient::inspec
bool running = status == "running" || status == "restarting";

kj::Vector<Label> labels;
if (jsonRoot.hasConfig() && jsonRoot.getConfig().hasLabels()) {
auto labelsJson = jsonRoot.getConfig().getLabels();
if (labelsJson.isObject()) {
for (auto field: labelsJson.getObject()) {
kj::StringPtr name = field.getName();
if (!name.startsWith(WORKERD_LABEL_PREFIX)) continue;
auto value = field.getValue();
JSG_REQUIRE(value.isString(), Error, "Malformed ContainerInspect label value");
labels.add(Label{
.name = kj::str(name.slice(WORKERD_LABEL_PREFIX.size())),
.value = kj::str(value.getString()),
});
}
}
for (auto& entry: currentLabels) {
labels.add(Label{
.name = kj::str(entry.key),
.value = kj::str(entry.value),
});
}

co_return InspectResponse{.isRunning = running, .labels = labels.releaseAsArray()};
Expand Down Expand Up @@ -1686,17 +1673,6 @@ kj::Promise<void> ContainerClient::createContainer(kj::StringPtr effectiveImage,
jsonEnv.set(envSize + i, defaultEnv[i]);
}

// Pass user-supplied labels as Docker object labels, prefixed so we can distinguish
// them from image/engine labels when reading back via inspect().
if (params.hasLabels()) {
auto lbls = params.getLabels();
auto labelsObj = jsonRoot.initLabels().initObject(lbls.size());
for (auto i: kj::zeroTo(lbls.size())) {
labelsObj[i].setName(kj::str(WORKERD_LABEL_PREFIX, lbls[i].getName()));
labelsObj[i].initValue().setString(lbls[i].getValue());
}
}

auto hostConfig = jsonRoot.initHostConfig();
// We need to set a restart policy to avoid having ambiguous states
// where the container we're managing is stuck at "exited" state.
Expand Down Expand Up @@ -2190,6 +2166,9 @@ kj::Promise<void> ContainerClient::status(StatusContext context) {
isRunning = info.isRunning;
}
containerStarted.store(isRunning, std::memory_order_release);
if (!isRunning) {
currentLabels.clear();
}
containerSidecarStarted.store(false, std::memory_order_release);
this->sidecarIngressHostPort = kj::none;

Expand All @@ -2209,6 +2188,24 @@ kj::Promise<void> ContainerClient::status(StatusContext context) {
context.getResults().setRunning(isRunning);
}

kj::Promise<void> ContainerClient::setLabels(SetLabelsContext context) {
auto [ready, done] = getRpcTurn();
co_await ready;
KJ_DEFER(done->fulfill());

JSG_REQUIRE(containerStarted.load(std::memory_order_acquire), Error,
"setLabels() requires a running container.");

auto params = context.getParams();
auto newLabels = params.getLabels();
currentLabels.clear();
for (auto i: kj::zeroTo(newLabels.size())) {
currentLabels.insert(kj::str(newLabels[i].getName()), kj::str(newLabels[i].getValue()));
}

co_return;
}

kj::Promise<void> ContainerClient::inspect(InspectContext context) {
auto maybeResp = co_await inspectContainer();
auto info = context.getResults().initInfo();
Expand Down Expand Up @@ -2247,6 +2244,14 @@ kj::Promise<void> ContainerClient::start(StartContext context) {

internetEnabled = params.getEnableInternet();

currentLabels.clear();
if (params.hasLabels()) {
auto lbls = params.getLabels();
for (auto i: kj::zeroTo(lbls.size())) {
currentLabels.insert(kj::str(lbls[i].getName()), kj::str(lbls[i].getValue()));
}
}

kj::String effectiveImage = kj::str(imageName);
if (params.hasContainerSnapshotId()) {
auto snapshotId = parseSnapshotId(params.getContainerSnapshotId());
Expand Down Expand Up @@ -2328,7 +2333,10 @@ kj::Promise<void> ContainerClient::monitor(MonitorContext context) {
JSG_REQUIRE(containerStarted.load(std::memory_order_acquire), Error, "Container failed to start");

auto results = context.getResults();
KJ_DEFER(containerStarted.store(false, std::memory_order_release));
KJ_DEFER({
containerStarted.store(false, std::memory_order_release);
currentLabels.clear();
});

auto endpoint = kj::str("/containers/", containerName, "/wait");
auto response = co_await dockerApiRequest(
Expand Down Expand Up @@ -2357,6 +2365,8 @@ kj::Promise<void> ContainerClient::destroy(DestroyContext context) {
// in the destructor), or on the next start() if state is inconsistent (e.g. workerd
// restart left an orphaned sidecar; status() recovery handles that case).
co_await destroyContainer();
containerStarted.store(false, std::memory_order_release);
currentLabels.clear();
}

kj::Promise<void> ContainerClient::signal(SignalContext context) {
Expand Down
7 changes: 7 additions & 0 deletions src/workerd/server/container-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte
kj::Promise<void> snapshotDirectory(SnapshotDirectoryContext context) override;
kj::Promise<void> snapshotContainer(SnapshotContainerContext context) override;
kj::Promise<void> inspect(InspectContext context) override;
kj::Promise<void> setLabels(SetLabelsContext context) override;

kj::Own<ContainerClient> addRef();

Expand Down Expand Up @@ -246,6 +247,12 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte
std::atomic_bool egressListenerStarted = false;
std::atomic_bool caCertInjected = false;

// Volatile in-memory label set for this container. Populated from StartParams.labels when
// start() runs, replaced wholesale by setLabels(), and cleared when the container stops.
// Labels are not recovered when a fresh ContainerClient reconnects to an already-running
// local Docker container.
kj::HashMap<kj::String, kj::String> currentLabels;

// Writable clone volumes currently owned by the app container, or by an in-flight start()
// that still needs failure cleanup.
kj::Vector<kj::String> snapshotClones;
Expand Down
Loading
Loading