diff --git a/pkg/dind/attach.go b/pkg/dind/attach.go new file mode 100644 index 0000000..d6ea263 --- /dev/null +++ b/pkg/dind/attach.go @@ -0,0 +1,254 @@ +package dind + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "strings" + "sync" + "time" + + "github.com/containerd/containerd/v2/pkg/namespaces" +) + +// handleContainerAttach implements Docker's POST /containers/{id}/attach. +// +// Docker CLI's `docker run ` (non-detached) issues a three-step dance: +// +// 1. POST /containers/create → returns container ID +// 2. POST /containers/{id}/attach → HTTP/1.1 Upgrade hijacks the conn +// 3. POST /containers/{id}/start → kicks off the task +// +// The attach step is the one that fails today with "unable to upgrade to tcp, +// received 501" because we previously returned StatusNotImplemented for any +// /attach call. This handler completes the hijack handshake, blocks until +// handleContainerStart signals the task is running, then tails the container's +// log file back through the upgraded connection until the task exits. +// +// Output framing follows Docker's stdcopy convention when the container was +// not started with a TTY: every chunk is prefixed with an 8-byte header +// (stream type + 4-byte big-endian size) so the client can demultiplex stdout +// and stderr. With TTY=true the bytes stream raw. Because containerd's +// cio.LogFile merges stdout and stderr into a single file, we report every +// chunk on stream 1 (stdout) under non-TTY mode; perfect-fidelity stdout/ +// stderr split would require splitting the log files at task-create time and +// isn't worth the surgery for the `docker run` flows we need to unblock. +// +// Stdin attach (POST with ?stdin=1) is accepted at the protocol level but +// not piped into the container's task — Docker's stdin-attach semantics +// require an IO mode set at task-create time, which dind doesn't currently +// expose. Clients that care (`docker run -i`) will see a writable hijacked +// conn but their bytes go nowhere. Out of scope for this fix; container +// stdout/stderr capture is what blocks ephpm's `docker run alpine:3.20`. +func (s *Server) handleContainerAttach(w http.ResponseWriter, r *http.Request, id string) { + s.mu.Lock() + entry, ok := s.containers[id] + s.mu.Unlock() + if !ok { + writeJSON(w, http.StatusNotFound, map[string]string{ + "message": fmt.Sprintf("container %s not found", id), + }) + return + } + + upgradeHdr := r.Header.Get("Upgrade") + connHdr := strings.ToLower(r.Header.Get("Connection")) + wantHijack := upgradeHdr != "" && strings.Contains(connHdr, "upgrade") + if !wantHijack { + // Non-hijack attach (clients that send Accept: application/vnd.docker.raw-stream + // over plain HTTP). Less common; respond with 400 so the client picks + // a different code path. Docker daemon also accepts this with chunked + // streaming, but every CLI we care about does the upgrade. + writeJSON(w, http.StatusBadRequest, map[string]string{ + "message": "attach requires HTTP/1.1 Upgrade: tcp (set Upgrade and Connection headers)", + }) + return + } + + // Drain the request body before hijacking; same rationale as the exec + // hijack path — the body bytes sit in the bufio.Reader and would corrupt + // the upgraded stream if left unread. + if _, err := io.Copy(io.Discard, r.Body); err != nil { + s.log.Debug("draining attach body", "container", id, "error", err) + } + + hijacker, ok := w.(http.Hijacker) + if !ok { + writeJSON(w, http.StatusInternalServerError, map[string]string{ + "message": "server does not support hijacking", + }) + return + } + conn, _, err := hijacker.Hijack() + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{ + "message": fmt.Sprintf("hijack failed: %v", err), + }) + return + } + defer func() { + if cerr := conn.Close(); cerr != nil { + s.log.Debug("closing attach conn", "container", id, "error", cerr) + } + }() + + upgradeResp := "HTTP/1.1 101 UPGRADED\r\n" + + "Content-Type: application/vnd.docker.raw-stream\r\n" + + "Connection: Upgrade\r\n" + + "Upgrade: tcp\r\n\r\n" + if _, werr := conn.Write([]byte(upgradeResp)); werr != nil { + s.log.Debug("writing attach upgrade header", "container", id, "error", werr) + return + } + + // Wait for handleContainerStart to signal the task is running (and + // LogPath is set). The client controls the timing here: it sends + // /start a few hundred ms after /attach, so 60s is generous. If the + // caller never sends /start, we bail cleanly when the request context + // is cancelled. + if !s.waitForStart(r.Context(), entry, 60*time.Second) { + // Either the deadline expired or the request was cancelled. + // Either way nothing useful to stream. + s.log.Debug("attach: task never started", "container", id) + return + } + + // Tail the log file until the task exits. cio.LogFile merges stdout and + // stderr, so under non-TTY framing we emit everything on stream 1. + var sink io.Writer + if entry.Tty { + sink = conn + } else { + mu := &sync.Mutex{} + sink = &stdcopyWriter{mu: mu, w: conn, streamType: 1} + } + + ctx := namespaces.WithNamespace(context.Background(), s.jobNamespace) + done := make(chan struct{}) + if entry.Task != nil { + ch, werr := entry.Task.Wait(ctx) + if werr != nil { + s.log.Debug("attach: task.Wait failed", "container", id, "error", werr) + // No status channel — close immediately so the tail loop + // doesn't wait forever. The client will see whatever has + // already been written to the log. + close(done) + } else { + // Bridge containerd's ExitStatus channel onto a plain done + // signal so tailLogToWriter doesn't depend on containerd + // types (keeps it unit-testable). + go func() { + <-ch + close(done) + }() + } + } else { + close(done) + } + + s.tailLogToWriter(entry.LogPath, sink, done) +} + +// waitForStart blocks until entry.started is closed (i.e. handleContainerStart +// finished task.Start successfully), returning true. If the request is +// cancelled or the deadline expires first, returns false. Idempotent and +// safe to call after the channel is already closed. +func (s *Server) waitForStart(reqCtx context.Context, entry *containerEntry, deadline time.Duration) bool { + if entry.started == nil { + // Defensive: a container created before this field was wired in. We + // can still infer "started" from entry.Task being non-nil — fall + // back to a polling loop with the same deadline. + t := time.NewTicker(50 * time.Millisecond) + defer t.Stop() + expiry := time.Now().Add(deadline) + for { + if entry.Task != nil && entry.LogPath != "" { + return true + } + select { + case <-reqCtx.Done(): + return false + case <-t.C: + if time.Now().After(expiry) { + return false + } + } + } + } + select { + case <-entry.started: + return true + case <-reqCtx.Done(): + return false + case <-time.After(deadline): + return false + } +} + +// tailLogToWriter reads logPath as it grows and streams new bytes to dst +// until either done is closed (task has exited or the caller gave up) or a +// write to dst fails (client disconnected). On hot poll the loop sleeps +// briefly between EOF retries; on actual data it forwards as fast as the +// source produces. +// +// dst is the framed writer (stdcopyWriter for non-TTY, raw conn for TTY). +// The dst writer is the only signal we use for "client gone" — a failed +// write returns and the caller closes the conn via defer. +func (s *Server) tailLogToWriter(logPath string, dst io.Writer, done <-chan struct{}) { + // Open the log file. handleContainerStart created it before task.Start + // returned, so by the time waitForStart unblocks, this open should + // succeed immediately. + f, err := os.Open(logPath) + if err != nil { + s.log.Debug("attach: opening log file", "path", logPath, "error", err) + return + } + defer func() { + if cerr := f.Close(); cerr != nil { + s.log.Debug("attach: closing log file", "path", logPath, "error", cerr) + } + }() + + buf := make([]byte, 32*1024) + for { + // Drain whatever is currently available. + for { + n, rerr := f.Read(buf) + if n > 0 { + if _, werr := dst.Write(buf[:n]); werr != nil { + // Client gone or upgraded conn closed. + s.log.Debug("attach: writing to client", "error", werr) + return + } + } + if rerr == io.EOF { + break + } + if rerr != nil { + s.log.Debug("attach: reading log file", "error", rerr) + return + } + } + // At EOF — wait for either more data, task exit, or client disconnect. + select { + case <-done: + // Task exited. Drain any final bytes before closing. + for { + n, rerr := f.Read(buf) + if n > 0 { + if _, werr := dst.Write(buf[:n]); werr != nil { + return + } + } + if rerr == io.EOF || rerr != nil { + return + } + } + case <-time.After(100 * time.Millisecond): + // Poll again. (No fsnotify; portable to Windows-host VM and + // Linux native without extra deps.) + } + } +} diff --git a/pkg/dind/attach_test.go b/pkg/dind/attach_test.go new file mode 100644 index 0000000..74d9421 --- /dev/null +++ b/pkg/dind/attach_test.go @@ -0,0 +1,206 @@ +package dind + +import ( + "bytes" + "context" + "encoding/binary" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +// TestHandleContainerAttach_404Unknown verifies the route returns 404 when the +// container id isn't tracked, before any hijack attempt. Catches regressions +// where the handler skips the lookup or panics on missing entries. +func TestHandleContainerAttach_404Unknown(t *testing.T) { + s := &Server{ + log: slog.New(slog.NewTextHandler(io.Discard, nil)), + containers: map[string]*containerEntry{}, + } + req := httptest.NewRequest(http.MethodPost, "/containers/missing/attach", nil) + w := httptest.NewRecorder() + s.handleContainerAttach(w, req, "missing") + if w.Code != http.StatusNotFound { + t.Errorf("status = %d, want 404", w.Code) + } +} + +// TestHandleContainerAttach_400WithoutUpgrade verifies a plain POST (no +// Upgrade/Connection headers) is rejected with 400 rather than dropping into +// the hijack path (which would crash since httptest.ResponseRecorder isn't a +// Hijacker). Docker daemon accepts the non-hijack path too, but every CLI +// we care about sends the headers. +func TestHandleContainerAttach_400WithoutUpgrade(t *testing.T) { + s := &Server{ + log: slog.New(slog.NewTextHandler(io.Discard, nil)), + containers: map[string]*containerEntry{ + "abc": {ID: "abc", started: make(chan struct{})}, + }, + } + req := httptest.NewRequest(http.MethodPost, "/containers/abc/attach", nil) + w := httptest.NewRecorder() + s.handleContainerAttach(w, req, "abc") + if w.Code != http.StatusBadRequest { + t.Errorf("status = %d, want 400", w.Code) + } +} + +// TestWaitForStart_SignalUnblocks closes the started channel and verifies +// waitForStart returns true before the deadline. +func TestWaitForStart_SignalUnblocks(t *testing.T) { + s := &Server{log: slog.New(slog.NewTextHandler(io.Discard, nil))} + entry := &containerEntry{started: make(chan struct{})} + + go func() { + time.Sleep(30 * time.Millisecond) + close(entry.started) + }() + + if !s.waitForStart(context.Background(), entry, time.Second) { + t.Error("waitForStart returned false; expected true once started closed") + } +} + +// TestWaitForStart_DeadlineExpires verifies the deadline fires when the +// channel never closes. +func TestWaitForStart_DeadlineExpires(t *testing.T) { + s := &Server{log: slog.New(slog.NewTextHandler(io.Discard, nil))} + entry := &containerEntry{started: make(chan struct{})} + + start := time.Now() + if s.waitForStart(context.Background(), entry, 50*time.Millisecond) { + t.Error("waitForStart returned true; expected deadline expiry") + } + if elapsed := time.Since(start); elapsed < 50*time.Millisecond { + t.Errorf("returned too early: %s", elapsed) + } +} + +// TestWaitForStart_RequestCancelled verifies the request context cancellation +// short-circuits the wait — handles the case where the Docker CLI disconnects +// after sending /attach but before sending /start. +func TestWaitForStart_RequestCancelled(t *testing.T) { + s := &Server{log: slog.New(slog.NewTextHandler(io.Discard, nil))} + entry := &containerEntry{started: make(chan struct{})} + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + + if s.waitForStart(ctx, entry, time.Second) { + t.Error("waitForStart returned true; expected false on cancellation") + } +} + +// TestTailLogToWriter_ForwardsThenExitsOnStatus writes content to a log file, +// then signals the task exit, and verifies the streamed content matches and +// the function returns promptly. +func TestTailLogToWriter_ForwardsThenExitsOnStatus(t *testing.T) { + s := &Server{log: slog.New(slog.NewTextHandler(io.Discard, nil))} + dir := t.TempDir() + logPath := filepath.Join(dir, "out.log") + + // Pre-seed the log with some content (handleContainerStart's log file is + // already opened-for-write by cio.LogFile before we get here). + if err := os.WriteFile(logPath, []byte("hello world\n"), 0o644); err != nil { + t.Fatalf("seed log: %v", err) + } + + var buf bytes.Buffer + var mu sync.Mutex + syncedBuf := &lockedWriter{mu: &mu, w: &buf} + + statusCh := make(chan struct{}) + done := make(chan struct{}) + + go func() { + s.tailLogToWriter(logPath, syncedBuf, statusCh) + close(done) + }() + + // Append more content while tailing. + time.Sleep(30 * time.Millisecond) + f, err := os.OpenFile(logPath, os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + t.Fatalf("reopen log: %v", err) + } + if _, err := f.WriteString("more bytes\n"); err != nil { + t.Fatalf("append: %v", err) + } + if err := f.Close(); err != nil { + t.Fatalf("close append: %v", err) + } + + // Signal task exit and wait for the tailer to drain + return. + time.Sleep(150 * time.Millisecond) + close(statusCh) + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("tailLogToWriter did not return after task exit") + } + + mu.Lock() + got := buf.String() + mu.Unlock() + want := "hello world\nmore bytes\n" + if got != want { + t.Errorf("tailed bytes = %q, want %q", got, want) + } +} + +// TestStdcopyWriter_FrameLayout double-checks the stdcopy header format that +// the attach handler relies on for non-TTY containers (one-byte stream type, +// three NUL padding bytes, four big-endian length bytes, then payload). +// +// stdcopyWriter is defined in exec.go and reused here; keeping the explicit +// regression test next to the attach code makes the frame contract visible +// for anyone tracing how docker run output reaches the CLI. +func TestStdcopyWriter_FrameLayout(t *testing.T) { + var buf bytes.Buffer + mu := &sync.Mutex{} + w := &stdcopyWriter{mu: mu, w: &buf, streamType: 1} + if _, err := w.Write([]byte("hi")); err != nil { + t.Fatalf("write: %v", err) + } + + got := buf.Bytes() + if len(got) != 10 { // 8-byte header + 2-byte payload + t.Fatalf("frame size = %d, want 10", len(got)) + } + if got[0] != 1 { + t.Errorf("stream type = %d, want 1 (stdout)", got[0]) + } + for i := 1; i <= 3; i++ { + if got[i] != 0 { + t.Errorf("padding byte %d = %d, want 0", i, got[i]) + } + } + if size := binary.BigEndian.Uint32(got[4:8]); size != 2 { + t.Errorf("size field = %d, want 2", size) + } + if string(got[8:]) != "hi" { + t.Errorf("payload = %q, want %q", got[8:], "hi") + } +} + +// lockedWriter is a tiny helper so test goroutines can append to a single +// buffer without data races. +type lockedWriter struct { + mu *sync.Mutex + w io.Writer +} + +func (l *lockedWriter) Write(p []byte) (int, error) { + l.mu.Lock() + defer l.mu.Unlock() + return l.w.Write(p) +} diff --git a/pkg/dind/containers.go b/pkg/dind/containers.go index a0b2f93..db84e09 100644 --- a/pkg/dind/containers.go +++ b/pkg/dind/containers.go @@ -86,6 +86,12 @@ type containerEntry struct { HostsPath string // host-side /etc/hosts file bind-mounted into the container ExtraHosts []string // user-provided "host:ip" entries (--add-host) PortForwards []func() // stop functions for port-forward proxy goroutines + + // started is closed by handleContainerStart once the task is created and + // running. handleContainerAttach blocks on it so the Docker CLI's "attach + // then start" sequence works correctly: attach hijacks the conn early, + // then waits here, and resumes once start has the task + LogPath set. + started chan struct{} } // createRequest is the subset of Docker's container create body we support. @@ -167,6 +173,8 @@ func (s *Server) routeContainer(w http.ResponseWriter, r *http.Request, path str s.handleContainerInspect(w, r, id) case action == "logs" && r.Method == http.MethodGet: s.handleContainerLogs(w, r, id) + case action == "attach" && r.Method == http.MethodPost: + s.handleContainerAttach(w, r, id) case action == "exec" && r.Method == http.MethodPost: s.handleExecCreate(w, r, id) case action == "archive" && r.Method == http.MethodPut: @@ -522,6 +530,7 @@ func (s *Server) handleContainerCreate(w http.ResponseWriter, r *http.Request) { Tty: req.Tty, HostsPath: hostsPath, ExtraHosts: extraHosts, + started: make(chan struct{}), } s.mu.Lock() @@ -643,6 +652,14 @@ func (s *Server) handleContainerStart(w http.ResponseWriter, r *http.Request, id entry.Status = "running" s.log.Info("container started", "id", id, "ip", entry.IP) + // Unblock any /containers/{id}/attach handlers that hijacked the conn + // before start (Docker CLI's normal `docker run` sequence is attach → + // start → wait). They block on this channel and resume by tailing + // entry.LogPath once it's signaled. + if entry.started != nil { + close(entry.started) + } + // Install DNAT rules for any port bindings. KIND's `kind create cluster` // creates the kindest/node with -p 127.0.0.1::6443 and writes a // kubeconfig pointing at that 127.0.0.1:. Without DNAT in the