Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions devservices/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ services:
restart: unless-stopped

snuba:
image: ghcr.io/getsentry/snuba:nightly
image: ghcr.io/getsentry/snuba:nightly-distroless
ports:
- 127.0.0.1:1218:1218
- 127.0.0.1:1219:1219
command:
- devserver
- --${SNUBA_NO_WORKERS:+no-workers}
healthcheck:
test: curl -f http://localhost:1218/health_envoy
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:1218/health_envoy')"]
interval: 5s
timeout: 5s
retries: 3
Expand Down Expand Up @@ -114,7 +114,7 @@ services:
- orchestrator=devservices
restart: unless-stopped
profiles-consumer:
image: ghcr.io/getsentry/snuba:nightly
image: ghcr.io/getsentry/snuba:nightly-distroless
command: [
rust-consumer,
--storage=profiles,
Expand Down Expand Up @@ -143,7 +143,7 @@ services:
- orchestrator=devservices
restart: unless-stopped
profile-chunks-consumer:
image: ghcr.io/getsentry/snuba:nightly
image: ghcr.io/getsentry/snuba:nightly-distroless
command: [
rust-consumer,
--storage=profile_chunks,
Expand Down Expand Up @@ -172,7 +172,7 @@ services:
- orchestrator=devservices
restart: unless-stopped
functions-consumer:
image: ghcr.io/getsentry/snuba:nightly
image: ghcr.io/getsentry/snuba:nightly-distroless
command: [
rust-consumer,
--storage=functions_raw,
Expand Down Expand Up @@ -201,7 +201,7 @@ services:
- orchestrator=devservices
restart: unless-stopped
metrics-consumer:
image: ghcr.io/getsentry/snuba:nightly
image: ghcr.io/getsentry/snuba:nightly-distroless
command: [
rust-consumer,
--storage=metrics_raw,
Expand Down Expand Up @@ -230,7 +230,7 @@ services:
- orchestrator=devservices
restart: unless-stopped
generic-metrics-distributions-consumer:
image: ghcr.io/getsentry/snuba:nightly
image: ghcr.io/getsentry/snuba:nightly-distroless
command: [
rust-consumer,
--storage=generic_metrics_distributions_raw,
Expand Down Expand Up @@ -259,7 +259,7 @@ services:
- orchestrator=devservices
restart: unless-stopped
generic-metrics-sets-consumer:
image: ghcr.io/getsentry/snuba:nightly
image: ghcr.io/getsentry/snuba:nightly-distroless
command: [
rust-consumer,
--storage=generic_metrics_sets_raw,
Expand Down Expand Up @@ -288,7 +288,7 @@ services:
- orchestrator=devservices
restart: unless-stopped
generic-metrics-counters-consumer:
image: ghcr.io/getsentry/snuba:nightly
image: ghcr.io/getsentry/snuba:nightly-distroless
command: [
rust-consumer,
--storage=generic_metrics_counters_raw,
Expand Down Expand Up @@ -317,7 +317,7 @@ services:
- orchestrator=devservices
restart: unless-stopped
generic-metrics-gauges-consumer:
image: ghcr.io/getsentry/snuba:nightly
image: ghcr.io/getsentry/snuba:nightly-distroless
command: [
rust-consumer,
--storage=generic_metrics_gauges_raw,
Expand Down
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ snuba = "snuba.cli:main"
dev = [
"devservices>=1.2.1",
"freezegun>=1.5.5",
"honcho>=1.1.0",
"mypy>=1.1.1",
"pre-commit>=4.2.0",
"pytest>=8.3.3",
Expand Down Expand Up @@ -136,7 +135,6 @@ module = [
"fastjsonschema",
"fastjsonschema.exceptions",
"granian",
"honcho.manager",
"jsonschema",
"jsonschema.exceptions",
"jsonschema2md",
Expand Down
105 changes: 95 additions & 10 deletions snuba/cli/devserver.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
import os
import signal
import subprocess
import sys
from subprocess import call, list2cmdline
import threading
from subprocess import call

import click

from snuba import settings

# Match honcho: SIGTERM, then SIGKILL if children do not exit (avoids indefinite
# hang when a child ignores SIGTERM; see also PEP 475 / wait() retry after signals).
_SUBPROCESS_TERM_GRACE_SEC = 5.0


def _reap_after_terminate(proc: subprocess.Popen[bytes], grace_sec: float) -> None:
"""Wait for proc to exit after terminate(); kill -9 if still alive after grace_sec."""
try:
proc.wait(timeout=grace_sec)
except subprocess.TimeoutExpired:
if proc.poll() is None:
proc.kill()
proc.wait()


COMMON_RUST_CONSUMER_DEV_OPTIONS = [
"--use-rust-processor",
"--auto-offset-reset=latest",
Expand All @@ -21,8 +39,6 @@
def devserver(*, bootstrap: bool, workers: bool, log_level: str) -> None:
"Starts all Snuba processes for local development."

from honcho.manager import Manager

os.environ["PYTHONUNBUFFERED"] = "1"

if bootstrap:
Expand Down Expand Up @@ -518,13 +534,82 @@ def devserver(*, bootstrap: bool, workers: bool, log_level: str) -> None:
),
]

manager = Manager()
sys.exit(_run_daemons(daemons))


def _run_daemons(daemons: list[tuple[str, list[str]]]) -> int:
procs: dict[str, subprocess.Popen[bytes]] = {}
threads: list[threading.Thread] = []
first_failure: list[int] = []
done = threading.Event()
cleanup_started = threading.Event()
failure_lock = threading.Lock()
supervisor_signal: list[int] = []

def shutdown(signum: int, frame: object) -> None:
# Mark cleanup before terminate so stream threads do not treat SIGTERM as a
# natural crash (honcho parity when one daemon exits or user interrupts).
cleanup_started.set()
if not supervisor_signal:
supervisor_signal.append(signum)
for proc in procs.values():
if proc.poll() is None:
proc.terminate()
done.set()

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

def stream(name: str, proc: subprocess.Popen[bytes]) -> None:
try:
assert proc.stdout is not None
for line in proc.stdout:
sys.stdout.write(f"{name} | {line.decode(errors='replace')}")
sys.stdout.flush()
rc = proc.wait()
with failure_lock:
if rc != 0 and not cleanup_started.is_set():
if not first_failure:
first_failure.append(rc)
except BaseException:
with failure_lock:
if not cleanup_started.is_set() and not first_failure:
first_failure.append(1)
raise
finally:
# Always unblock the supervisor (e.g. BrokenPipe/EPIPE on stdout write).
done.set()

for name, cmd in daemons:
manager.add_process(
name,
list2cmdline(cmd),
quiet=False,
proc = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
procs[name] = proc
t = threading.Thread(target=stream, args=(name, proc), daemon=True)
t.start()
threads.append(t)

done.wait()
cleanup_started.set()
# Any daemon exit ends the supervisor; terminate the rest (honcho parity).
for proc in procs.values():
if proc.poll() is None:
proc.terminate()

for proc in procs.values():
if proc.poll() is None:
_reap_after_terminate(proc, _SUBPROCESS_TERM_GRACE_SEC)
else:
proc.wait()

for t in threads:
t.join()

manager.loop()
sys.exit(manager.returncode)
if first_failure:
return first_failure[0]
if supervisor_signal:
return 128 + supervisor_signal[0]
return 0
10 changes: 0 additions & 10 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading