From a07a71de979a6334d2248ffed2652c3864d25458 Mon Sep 17 00:00:00 2001 From: Alexander Tarasov Date: Tue, 28 Apr 2026 14:17:08 +0200 Subject: [PATCH 1/7] replace honcho with subprocess --- pyproject.toml | 2 -- snuba/cli/devserver.py | 54 +++++++++++++++++++++++++++++++++--------- uv.lock | 10 -------- 3 files changed, 43 insertions(+), 23 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 59979a4617a..9b8a986e90a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,7 +78,6 @@ snuba = "snuba.cli:main" dev = [ "devservices>=1.2.1", "freezegun>=1.5.5", - "honcho>=1.1.0", "mypy>=1.1.1", "pre-commit>=4.2.0", "pytest>=8.3.3", @@ -136,7 +135,6 @@ module = [ "fastjsonschema", "fastjsonschema.exceptions", "granian", - "honcho.manager", "jsonschema", "jsonschema.exceptions", "jsonschema2md", diff --git a/snuba/cli/devserver.py b/snuba/cli/devserver.py index f7e1128828e..4fb34243d5b 100644 --- a/snuba/cli/devserver.py +++ b/snuba/cli/devserver.py @@ -1,6 +1,9 @@ import os +import signal +import subprocess import sys -from subprocess import call, list2cmdline +import threading +from subprocess import call import click @@ -21,8 +24,6 @@ def devserver(*, bootstrap: bool, workers: bool, log_level: str) -> None: "Starts all Snuba processes for local development." - from honcho.manager import Manager - os.environ["PYTHONUNBUFFERED"] = "1" if bootstrap: @@ -518,13 +519,44 @@ def devserver(*, bootstrap: bool, workers: bool, log_level: str) -> None: ), ] - manager = Manager() + sys.exit(_run_daemons(daemons)) + + +def _run_daemons(daemons: list[tuple[str, list[str]]]) -> int: + procs: dict[str, subprocess.Popen[bytes]] = {} + first_failure: list[int] = [] + done = threading.Event() + + def stream(name: str, proc: subprocess.Popen[bytes]) -> None: + assert proc.stdout is not None + for line in proc.stdout: + sys.stdout.write(f"{name} | {line.decode(errors='replace')}") + sys.stdout.flush() + rc = proc.wait() + if rc != 0 and not first_failure: + first_failure.append(rc) + done.set() + for name, cmd in daemons: - manager.add_process( - name, - list2cmdline(cmd), - quiet=False, - ) + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + procs[name] = proc + threading.Thread(target=stream, args=(name, proc), daemon=True).start() + + def shutdown(signum: int, frame: object) -> None: + for proc in procs.values(): + if proc.poll() is None: + proc.terminate() + + signal.signal(signal.SIGINT, shutdown) + signal.signal(signal.SIGTERM, shutdown) + + done.wait() + if first_failure: + for proc in procs.values(): + if proc.poll() is None: + proc.terminate() + + for proc in procs.values(): + proc.wait() - manager.loop() - sys.exit(manager.returncode) + return first_failure[0] if first_failure else 0 diff --git a/uv.lock b/uv.lock index 117e6add27d..d8ee6fb53cc 100644 --- a/uv.lock +++ b/uv.lock @@ -418,14 +418,6 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/grpcio-1.73.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0ab860d5bfa788c5a021fba264802e2593688cd965d1374d31d2b1a34cacd854" }, ] -[[package]] -name = "honcho" -version = "1.1.0" -source = { registry = "https://pypi.devinfra.sentry.io/simple" } -wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/honcho-1.1.0-py2.py3-none-any.whl", hash = "sha256:a4d6e3a88a7b51b66351ecfc6e9d79d8f4b87351db9ad7e923f5632cc498122f" }, -] - [[package]] name = "httplib2" version = "0.22.0" @@ -1106,7 +1098,6 @@ dependencies = [ dev = [ { name = "devservices", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "freezegun", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, - { name = "honcho", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "mypy", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "pre-commit", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "pytest", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -1177,7 +1168,6 @@ requires-dist = [ dev = [ { name = "devservices", specifier = ">=1.2.1" }, { name = "freezegun", specifier = ">=1.5.5" }, - { name = "honcho", specifier = ">=1.1.0" }, { name = "mypy", specifier = ">=1.1.1" }, { name = "pre-commit", specifier = ">=4.2.0" }, { name = "pytest", specifier = ">=8.3.3" }, From 7ed5d0a03b8c7fee3950e9e3b2b4346d070de173 Mon Sep 17 00:00:00 2001 From: Alexander Tarasov Date: Tue, 28 Apr 2026 14:17:34 +0200 Subject: [PATCH 2/7] in devservices, use distroless image --- devservices/config.yml | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/devservices/config.yml b/devservices/config.yml index 16aa137158b..1ddbd3131a2 100644 --- a/devservices/config.yml +++ b/devservices/config.yml @@ -75,7 +75,7 @@ services: restart: unless-stopped snuba: - image: ghcr.io/getsentry/snuba:nightly + image: ghcr.io/getsentry/snuba:nightly-distroless ports: - 127.0.0.1:1218:1218 - 127.0.0.1:1219:1219 @@ -83,7 +83,7 @@ services: - devserver - --${SNUBA_NO_WORKERS:+no-workers} healthcheck: - test: curl -f http://localhost:1218/health_envoy + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:1218/health_envoy')"] interval: 5s timeout: 5s retries: 3 @@ -114,7 +114,7 @@ services: - orchestrator=devservices restart: unless-stopped profiles-consumer: - image: ghcr.io/getsentry/snuba:nightly + image: ghcr.io/getsentry/snuba:nightly-distroless command: [ rust-consumer, --storage=profiles, @@ -143,7 +143,7 @@ services: - orchestrator=devservices restart: unless-stopped profile-chunks-consumer: - image: ghcr.io/getsentry/snuba:nightly + image: ghcr.io/getsentry/snuba:nightly-distroless command: [ rust-consumer, --storage=profile_chunks, @@ -172,7 +172,7 @@ services: - orchestrator=devservices restart: unless-stopped functions-consumer: - image: ghcr.io/getsentry/snuba:nightly + image: ghcr.io/getsentry/snuba:nightly-distroless command: [ rust-consumer, --storage=functions_raw, @@ -201,7 +201,7 @@ services: - orchestrator=devservices restart: unless-stopped metrics-consumer: - image: ghcr.io/getsentry/snuba:nightly + image: ghcr.io/getsentry/snuba:nightly-distroless command: [ rust-consumer, --storage=metrics_raw, @@ -230,7 +230,7 @@ services: - orchestrator=devservices restart: unless-stopped generic-metrics-distributions-consumer: - image: ghcr.io/getsentry/snuba:nightly + image: ghcr.io/getsentry/snuba:nightly-distroless command: [ rust-consumer, --storage=generic_metrics_distributions_raw, @@ -259,7 +259,7 @@ services: - orchestrator=devservices restart: unless-stopped generic-metrics-sets-consumer: - image: ghcr.io/getsentry/snuba:nightly + image: ghcr.io/getsentry/snuba:nightly-distroless command: [ rust-consumer, --storage=generic_metrics_sets_raw, @@ -288,7 +288,7 @@ services: - orchestrator=devservices restart: unless-stopped generic-metrics-counters-consumer: - image: ghcr.io/getsentry/snuba:nightly + image: ghcr.io/getsentry/snuba:nightly-distroless command: [ rust-consumer, --storage=generic_metrics_counters_raw, @@ -317,7 +317,7 @@ services: - orchestrator=devservices restart: unless-stopped generic-metrics-gauges-consumer: - image: ghcr.io/getsentry/snuba:nightly + image: ghcr.io/getsentry/snuba:nightly-distroless command: [ rust-consumer, --storage=generic_metrics_gauges_raw, From c25f84a678963aa21dde896062ced6647090f535 Mon Sep 17 00:00:00 2001 From: Alexander Tarasov Date: Tue, 28 Apr 2026 14:32:39 +0200 Subject: [PATCH 3/7] fix "Devserver hangs when any daemon exits cleanly" --- snuba/cli/devserver.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/snuba/cli/devserver.py b/snuba/cli/devserver.py index 4fb34243d5b..41641b8b082 100644 --- a/snuba/cli/devserver.py +++ b/snuba/cli/devserver.py @@ -551,10 +551,10 @@ def shutdown(signum: int, frame: object) -> None: signal.signal(signal.SIGTERM, shutdown) done.wait() - if first_failure: - for proc in procs.values(): - if proc.poll() is None: - proc.terminate() + # Any daemon exit ends the supervisor; terminate the rest (honcho parity). + for proc in procs.values(): + if proc.poll() is None: + proc.terminate() for proc in procs.values(): proc.wait() From 596d7fc608ff9a0df50b264893920e705f7c00ff Mon Sep 17 00:00:00 2001 From: Alexander Tarasov Date: Tue, 28 Apr 2026 14:49:25 +0200 Subject: [PATCH 4/7] fix "Signal handlers registered after spawning child processes" --- snuba/cli/devserver.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/snuba/cli/devserver.py b/snuba/cli/devserver.py index 41641b8b082..118c550bd85 100644 --- a/snuba/cli/devserver.py +++ b/snuba/cli/devserver.py @@ -527,6 +527,14 @@ def _run_daemons(daemons: list[tuple[str, list[str]]]) -> int: first_failure: list[int] = [] done = threading.Event() + def shutdown(signum: int, frame: object) -> None: + for proc in procs.values(): + if proc.poll() is None: + proc.terminate() + + signal.signal(signal.SIGINT, shutdown) + signal.signal(signal.SIGTERM, shutdown) + def stream(name: str, proc: subprocess.Popen[bytes]) -> None: assert proc.stdout is not None for line in proc.stdout: @@ -542,14 +550,6 @@ def stream(name: str, proc: subprocess.Popen[bytes]) -> None: procs[name] = proc threading.Thread(target=stream, args=(name, proc), daemon=True).start() - def shutdown(signum: int, frame: object) -> None: - for proc in procs.values(): - if proc.poll() is None: - proc.terminate() - - signal.signal(signal.SIGINT, shutdown) - signal.signal(signal.SIGTERM, shutdown) - done.wait() # Any daemon exit ends the supervisor; terminate the rest (honcho parity). for proc in procs.values(): From d4b0a7f21dcec4ccc2ea8ba0539705b79e72dc30 Mon Sep 17 00:00:00 2001 From: Alexander Tarasov Date: Tue, 28 Apr 2026 15:16:28 +0200 Subject: [PATCH 5/7] fix "Terminated processes exit codes contaminate return value" --- snuba/cli/devserver.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/snuba/cli/devserver.py b/snuba/cli/devserver.py index 118c550bd85..e3f27611f6e 100644 --- a/snuba/cli/devserver.py +++ b/snuba/cli/devserver.py @@ -526,11 +526,20 @@ def _run_daemons(daemons: list[tuple[str, list[str]]]) -> int: procs: dict[str, subprocess.Popen[bytes]] = {} first_failure: list[int] = [] done = threading.Event() + cleanup_started = threading.Event() + failure_lock = threading.Lock() + supervisor_signal: list[int] = [] def shutdown(signum: int, frame: object) -> None: + # Mark cleanup before terminate so stream threads do not treat SIGTERM as a + # natural crash (honcho parity when one daemon exits or user interrupts). + cleanup_started.set() + if not supervisor_signal: + supervisor_signal.append(signum) for proc in procs.values(): if proc.poll() is None: proc.terminate() + done.set() signal.signal(signal.SIGINT, shutdown) signal.signal(signal.SIGTERM, shutdown) @@ -541,8 +550,10 @@ def stream(name: str, proc: subprocess.Popen[bytes]) -> None: sys.stdout.write(f"{name} | {line.decode(errors='replace')}") sys.stdout.flush() rc = proc.wait() - if rc != 0 and not first_failure: - first_failure.append(rc) + with failure_lock: + if rc != 0 and not cleanup_started.is_set(): + if not first_failure: + first_failure.append(rc) done.set() for name, cmd in daemons: @@ -551,6 +562,7 @@ def stream(name: str, proc: subprocess.Popen[bytes]) -> None: threading.Thread(target=stream, args=(name, proc), daemon=True).start() done.wait() + cleanup_started.set() # Any daemon exit ends the supervisor; terminate the rest (honcho parity). for proc in procs.values(): if proc.poll() is None: @@ -559,4 +571,8 @@ def stream(name: str, proc: subprocess.Popen[bytes]) -> None: for proc in procs.values(): proc.wait() - return first_failure[0] if first_failure else 0 + if first_failure: + return first_failure[0] + if supervisor_signal: + return 128 + supervisor_signal[0] + return 0 From 49fea6a935b08f5ca1d8bc46e410eecfb95cfe8c Mon Sep 17 00:00:00 2001 From: Alexander Tarasov Date: Wed, 29 Apr 2026 06:22:25 +0200 Subject: [PATCH 6/7] fix "Missing SIGKILL fallback causes unrecoverable devserver hang" --- snuba/cli/devserver.py | 48 ++++++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/snuba/cli/devserver.py b/snuba/cli/devserver.py index e3f27611f6e..90404c709f5 100644 --- a/snuba/cli/devserver.py +++ b/snuba/cli/devserver.py @@ -9,6 +9,21 @@ from snuba import settings +# Match honcho: SIGTERM, then SIGKILL if children do not exit (avoids indefinite +# hang when a child ignores SIGTERM; see also PEP 475 / wait() retry after signals). +_SUBPROCESS_TERM_GRACE_SEC = 5.0 + + +def _reap_after_terminate(proc: subprocess.Popen[bytes], grace_sec: float) -> None: + """Wait for proc to exit after terminate(); kill -9 if still alive after grace_sec.""" + try: + proc.wait(timeout=grace_sec) + except subprocess.TimeoutExpired: + if proc.poll() is None: + proc.kill() + proc.wait() + + COMMON_RUST_CONSUMER_DEV_OPTIONS = [ "--use-rust-processor", "--auto-offset-reset=latest", @@ -545,16 +560,24 @@ def shutdown(signum: int, frame: object) -> None: signal.signal(signal.SIGTERM, shutdown) def stream(name: str, proc: subprocess.Popen[bytes]) -> None: - assert proc.stdout is not None - for line in proc.stdout: - sys.stdout.write(f"{name} | {line.decode(errors='replace')}") - sys.stdout.flush() - rc = proc.wait() - with failure_lock: - if rc != 0 and not cleanup_started.is_set(): - if not first_failure: - first_failure.append(rc) - done.set() + try: + assert proc.stdout is not None + for line in proc.stdout: + sys.stdout.write(f"{name} | {line.decode(errors='replace')}") + sys.stdout.flush() + rc = proc.wait() + with failure_lock: + if rc != 0 and not cleanup_started.is_set(): + if not first_failure: + first_failure.append(rc) + except BaseException: + with failure_lock: + if not cleanup_started.is_set() and not first_failure: + first_failure.append(1) + raise + finally: + # Always unblock the supervisor (e.g. BrokenPipe/EPIPE on stdout write). + done.set() for name, cmd in daemons: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) @@ -569,7 +592,10 @@ def stream(name: str, proc: subprocess.Popen[bytes]) -> None: proc.terminate() for proc in procs.values(): - proc.wait() + if proc.poll() is None: + _reap_after_terminate(proc, _SUBPROCESS_TERM_GRACE_SEC) + else: + proc.wait() if first_failure: return first_failure[0] From f0cf499c2fd6320044d86ee7097713629ca71000 Mon Sep 17 00:00:00 2001 From: Alexander Tarasov Date: Wed, 29 Apr 2026 06:25:10 +0200 Subject: [PATCH 7/7] fix "Race on first_failure after proc.wait()" --- snuba/cli/devserver.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/snuba/cli/devserver.py b/snuba/cli/devserver.py index 90404c709f5..744b1ccff29 100644 --- a/snuba/cli/devserver.py +++ b/snuba/cli/devserver.py @@ -539,6 +539,7 @@ def devserver(*, bootstrap: bool, workers: bool, log_level: str) -> None: def _run_daemons(daemons: list[tuple[str, list[str]]]) -> int: procs: dict[str, subprocess.Popen[bytes]] = {} + threads: list[threading.Thread] = [] first_failure: list[int] = [] done = threading.Event() cleanup_started = threading.Event() @@ -580,9 +581,16 @@ def stream(name: str, proc: subprocess.Popen[bytes]) -> None: done.set() for name, cmd in daemons: - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + proc = subprocess.Popen( + cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) procs[name] = proc - threading.Thread(target=stream, args=(name, proc), daemon=True).start() + t = threading.Thread(target=stream, args=(name, proc), daemon=True) + t.start() + threads.append(t) done.wait() cleanup_started.set() @@ -597,6 +605,9 @@ def stream(name: str, proc: subprocess.Popen[bytes]) -> None: else: proc.wait() + for t in threads: + t.join() + if first_failure: return first_failure[0] if supervisor_signal: