diff --git a/config.env.example b/config.env.example index 49ac85c..72af3d6 100644 --- a/config.env.example +++ b/config.env.example @@ -32,6 +32,7 @@ ASSISTANT_NUMBER=5550000 # --------------------------------------------------------------------------- HOOK_SWITCH_PIN=17 +SD_AMP_PIN=22 PULSE_SWITCH_PIN=27 # --------------------------------------------------------------------------- @@ -40,7 +41,7 @@ PULSE_SWITCH_PIN=27 # ALSA device name passed to aplay -D. Use "plughw:MAX98357A" for the # I2S amplifier or "default" to route through PipeWire/PulseAudio. -ALSA_DEVICE=plughw:MAX98357A +ALSA_DEVICE=plughw:2,0 # Software volume multiplier (0.0–1.0). Lower if the amp clips or buzzes. # The MAX98357A with floating GAIN pin has 15 dB hardware gain, so 0.4 is diff --git a/config.env.local b/config.env.local index 3d0ea86..d4422e7 100644 --- a/config.env.local +++ b/config.env.local @@ -2,21 +2,6 @@ # Copy to /etc/hello-operator/config.env and fill in values before starting. # Lines beginning with # are comments and are ignored. -# --------------------------------------------------------------------------- -# Plex (required) -# --------------------------------------------------------------------------- - -# Full URL of your Plex Media Server -PLEX_URL=http://localhost:32400 - -# Plex authentication token -# Find yours at: https://support.plex.tv/articles/204059436 -PLEX_TOKEN= - -# Machine identifier of the Plex player to control -# Find yours in Plex Settings → Troubleshooting → "Download logs" or via the API -PLEX_PLAYER_IDENTIFIER= - # --------------------------------------------------------------------------- # Phone system (required) # --------------------------------------------------------------------------- diff --git a/docs/AMP_SETUP.md b/docs/AMP_SETUP.md index c80ef52..7e00dc5 100644 --- a/docs/AMP_SETUP.md +++ b/docs/AMP_SETUP.md @@ -165,21 +165,16 @@ Power-cycle the board after changing the gain wiring. ## Channel selection (SD pin) -For reference, the SD pin voltage ranges are: - -1. Wire MAX98357A **SD** to **GPIO 22** (physical pin 15) instead of Vin. -2. Set `AMP_SD_PIN=22` in `/etc/hello-operator/config.env`. +The breakout board's SD pin has a 1MΩ pull-up to Vin, which selects **stereo average** output `(L+R)/2` by default — appropriate for mono use with a stereo audio source. -hello-operator handles the rest automatically at startup. - -### SD pin voltage reference +For reference, the SD pin voltage ranges are: -| SD pin voltage | Outage | +| SD pin voltage | Output | |---|---| -| < 0.16 V | Shutdown | -| 0.16 V – 0.77 V | Stereo average (L+R)/2 | -| 0.77 V – 1.4 V | Right channel only | -| > 1.4 V | Left channel only | +| < 0.16V | Amplifier shutdown | +| 0.16V – 0.77V | Stereo average (L+R)/2 | +| 0.77V – 1.4V | Right channel only | +| > 1.4V | Left channel only | The SD pin is also used for instant audio cutoff — see Step 5 below. diff --git a/install.sh b/install.sh index eb29d5a..91b9a32 100644 --- a/install.sh +++ b/install.sh @@ -47,17 +47,17 @@ echo "" # System packages # --------------------------------------------------------------------------- -echo "==> Installing system packages..." -apt-get update -qq -apt-get install -y \ - python3 \ - python3-venv \ - python3-pip \ - alsa-utils \ - rtl-sdr \ - mpd \ - mpc \ - mopidy +# echo "==> Installing system packages..." +# apt-get update -qq +# apt-get install -y \ +# python3 \ +# python3-venv \ +# python3-pip \ +# alsa-utils \ +# rtl-sdr \ +# mpd \ +# mpc \ +# mopidy # --------------------------------------------------------------------------- # Config directory and files diff --git a/src/audio.py b/src/audio.py index f3f27e0..12da23f 100644 --- a/src/audio.py +++ b/src/audio.py @@ -17,6 +17,7 @@ """ import io +import logging import queue import subprocess import threading @@ -26,6 +27,8 @@ from src.interfaces import AudioInterface +log = logging.getLogger(__name__) + # Sample rate for the persistent aplay stream. All audio is converted to # this rate before being written. _SAMPLE_RATE = 44100 @@ -97,7 +100,8 @@ class SounddeviceAudio(AudioInterface): """ def __init__(self, sample_rate: int = _SAMPLE_RATE, device: str = "default", - volume: float = 1.0, _popen=None) -> None: + volume: float = 1.0, sd_pin: int = None, + _popen=None, _gpio_output=None) -> None: self._sample_rate = sample_rate self._device = device self._volume = max(0.0, min(1.0, volume)) @@ -107,20 +111,37 @@ def __init__(self, sample_rate: int = _SAMPLE_RATE, device: str = "default", self._queue: queue.Queue = queue.Queue() self._busy = False - self._proc = self._popen( - ['aplay', '-q', '-D', self._device, - '-f', 'S16_LE', '-r', str(self._sample_rate), '-c', '1'], - stdin=subprocess.PIPE, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - _warmup_frames = int(self._sample_rate * _WARMUP_MS / 1000) - _warmup_silence = np.zeros(_warmup_frames, dtype=np.int16).tobytes() - try: - self._proc.stdin.write(_warmup_silence) - except (BrokenPipeError, OSError): - pass + # SD pin: drive LOW to cut amp instantly; release to INPUT to re-enable. + # Both callables are None when sd_pin is not configured. + self._amp_off = None + self._amp_on = None + if sd_pin is not None: + if _gpio_output is not None: + self._amp_off = lambda: _gpio_output(sd_pin, False) + self._amp_on = lambda: _gpio_output(sd_pin, None) + else: + try: + import RPi.GPIO as GPIO # type: ignore[import] + _pin = sd_pin + GPIO.setmode(GPIO.BCM) + GPIO.setup(_pin, GPIO.IN) + + def _amp_off(_p=_pin, _g=GPIO): + log.info("SD pin %d → OUTPUT LOW (amp off)", _p) + _g.setup(_p, _g.OUT) + _g.output(_p, _g.LOW) + + def _amp_on(_p=_pin, _g=GPIO): + log.debug("SD pin %d → INPUT (amp on)", _p) + _g.setup(_p, _g.IN) + + self._amp_off = _amp_off + self._amp_on = _amp_on + log.info("SD amp pin configured: BCM %d", sd_pin) + except Exception as exc: + log.warning("SD amp pin setup failed (pin %d): %s", sd_pin, exc) + + self._proc = None self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True) self._worker_thread.start() @@ -157,6 +178,37 @@ def play_off_hook_tone(self) -> None: pcm = self._waveform_to_pcm(waveform) self._enqueue(lambda: self._write_pcm_loop(pcm)) + def amp_off(self) -> None: + """Cut the amp and terminate aplay. Call only from the hook watcher.""" + if self._amp_off: + self._amp_off() + self.stop() + proc, self._proc = self._proc, None + if proc is not None: + try: + proc.terminate() + except OSError: + pass + + def amp_on(self) -> None: + """Start aplay and enable the amp. Call only from the hook watcher.""" + if self._proc is None or self._proc.poll() is not None: + self._proc = self._popen( + ['aplay', '-q', '-D', self._device, + '-f', 'S16_LE', '-r', str(self._sample_rate), '-c', '1'], + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + _warmup_frames = int(self._sample_rate * _WARMUP_MS / 1000) + _warmup_silence = np.zeros(_warmup_frames, dtype=np.int16).tobytes() + try: + self._proc.stdin.write(_warmup_silence) + except (BrokenPipeError, OSError): + pass + if self._amp_on: + self._amp_on() + def stop(self) -> None: """Stop current playback and clear all queued tasks. @@ -209,9 +261,12 @@ def _worker_loop(self) -> None: self._busy = False def _write_raw(self, pcm: bytes) -> None: - """Write raw PCM bytes to the persistent aplay stdin; swallow broken pipe.""" + """Write raw PCM bytes to aplay stdin; no-op if aplay is not running.""" + proc = self._proc + if proc is None: + return try: - self._proc.stdin.write(pcm) + proc.stdin.write(pcm) except (BrokenPipeError, OSError): pass diff --git a/src/constants.py b/src/constants.py index 0286768..4174c5d 100644 --- a/src/constants.py +++ b/src/constants.py @@ -20,7 +20,7 @@ # Audio output device (ALSA device name passed to aplay -D) # --------------------------------------------------------------------------- -ALSA_DEVICE = os.environ.get("ALSA_DEVICE", "plughw:MAX98357A") +ALSA_DEVICE = os.environ.get("ALSA_DEVICE", "hw:2,0") # Software volume multiplier applied to all audio output (0.0–1.0). # Reduce if the amp clips or buzzes; increase if output is too quiet. @@ -39,7 +39,7 @@ # Timing constants (in seconds unless noted) # --------------------------------------------------------------------------- -DIAL_TONE_TIMEOUT_IDLE = 5 # Silence before idle operator prompt +DIAL_TONE_TIMEOUT_IDLE = 3 # Silence before idle operator prompt DIAL_TONE_TIMEOUT_PLAYING = 2 # Silence before playing-state prompt INTER_DIGIT_TIMEOUT = 0.3 # Gap after last pulse → digit complete (300 ms) DIRECT_DIAL_DISAMBIGUATION_TIMEOUT = 1.5 # TODO: tune on hardware — wait after first digit before treating as single nav input @@ -84,6 +84,8 @@ # GPIO pin assignments (BCM numbering) — optional; defaults match recommended wiring docs HOOK_SWITCH_PIN = int(os.environ.get("HOOK_SWITCH_PIN", "17")) PULSE_SWITCH_PIN = int(os.environ.get("PULSE_SWITCH_PIN", "27")) +_sd_amp_pin_env = os.environ.get("SD_AMP_PIN") +SD_AMP_PIN: int | None = int(_sd_amp_pin_env) if _sd_amp_pin_env is not None else None # Radio configuration RADIO_CONFIG_PATH = "/etc/hello-operator/radio_stations.json" diff --git a/src/main.py b/src/main.py index 9bd5cb5..f93b09a 100644 --- a/src/main.py +++ b/src/main.py @@ -13,7 +13,7 @@ MEDIA_BACKEND, MPD_HOST, MPD_PORT, PIPER_BINARY, PIPER_MODEL, TTS_CACHE_DIR, - HOOK_SWITCH_PIN, PULSE_SWITCH_PIN, + HOOK_SWITCH_PIN, PULSE_SWITCH_PIN, SD_AMP_PIN, HOOK_DEBOUNCE, RADIO_CONFIG_PATH, ALSA_DEVICE, @@ -32,6 +32,7 @@ # Import all pre-renderable script strings from menu from src.menu import ( + Menu, SCRIPT_OPERATOR_OPENER, SCRIPT_GREETING, SCRIPT_EXTENSION_HINT, @@ -186,43 +187,40 @@ def pulse_reader() -> int: pulse_pin_reader=pulse_reader, ) +def _start_hook_watcher(hook_pin: int, audio, tts, menu, gpio) -> None: + """Spin a daemon thread that watches the hook pin at ~1 ms intervals. -def _setup_hook_interrupt(hook_pin: int, audio, tts) -> None: - """Register a GPIO edge-detect interrupt for immediate audio cutoff on hang-up. - - Supplements the polling loop so that audio.stop() and tts.abort() are called - the instant the hook switch fires, without waiting for the event loop to - unblock from any ongoing TTS synthesis (e.g. a blocking piper subprocess). - - The callback runs in RPi.GPIO's event thread. Both audio.stop() and - tts.abort() are thread-safe. If RPi.GPIO is unavailable (non-Pi - environment) this function does nothing. - - Parameters - ---------- - hook_pin: - BCM pin number for the hook switch (HIGH = on cradle). - audio: - AudioInterface implementation to stop immediately. - tts: - TTSInterface implementation to abort immediately. + Drives the amp and creates/closes Session the instant the pin changes + state, bypassing the polling loop's debounce delay. No-op on non-Pi hosts. """ - try: - import RPi.GPIO as GPIO - bouncetime_ms = max(1, int(HOOK_DEBOUNCE * 1000)) - - def _on_cradle(channel): - if GPIO.input(channel) == 1: # HIGH = handset on cradle - audio.stop() - tts.abort() - - GPIO.add_event_detect(hook_pin, GPIO.RISING, - callback=_on_cradle, - bouncetime=bouncetime_ms) - log.info("Hook-switch interrupt registered on BCM %d (bouncetime %d ms)", - hook_pin, bouncetime_ms) - except (ImportError, RuntimeError): - pass # Non-Pi environment — polling loop handles events alone + import threading + + def _watch(): + try: + import RPi.GPIO as GPIO + last = GPIO.input(hook_pin) + session = None + while True: + val = GPIO.input(hook_pin) + if val != last: + last = val + if val == 1: # HIGH = on cradle + audio.amp_off() + tts.abort() + if session is not None: + session.close() + session = None + else: # LOW = lifted + audio.amp_on() + session = Session(menu=menu, gpio=gpio) + session.start() + time.sleep(0.001) + except (ImportError, RuntimeError): + pass + + t = threading.Thread(target=_watch, daemon=True, name="hook-watcher") + t.start() + log.info("Hook watcher thread started on BCM %d", hook_pin) def run() -> None: @@ -247,7 +245,7 @@ def run() -> None: ) # Hardware interfaces - audio = SounddeviceAudio(device=ALSA_DEVICE, volume=AUDIO_VOLUME) + audio = SounddeviceAudio(device=ALSA_DEVICE, volume=AUDIO_VOLUME, sd_pin=SD_AMP_PIN) tts = PiperTTS( piper_binary=PIPER_BINARY, piper_model=PIPER_MODEL, @@ -269,15 +267,8 @@ def run() -> None: tts.prerender(_PRERENDER_SCRIPTS) log.info("Pre-render complete.") - # GPIO handler — track whether GPIO was successfully initialised so the - # finally block only calls _gpio_cleanup() when it is safe to do so. - _gpio_ready = False - gpio = build_gpio_handler() - _gpio_ready = True - _setup_hook_interrupt(HOOK_SWITCH_PIN, audio, tts) - - # Session - session = Session( + # Menu state machine — constructed once, reused across sessions + menu = Menu( audio=audio, tts=tts, media_client=media_client, @@ -287,17 +278,18 @@ def run() -> None: radio=radio, ) + # GPIO handler — track whether GPIO was successfully initialised so the + # finally block only calls _gpio_cleanup() when it is safe to do so. + _gpio_ready = False + gpio = build_gpio_handler() + _gpio_ready = True + _start_hook_watcher(HOOK_SWITCH_PIN, audio, tts, menu, gpio) + log.info("hello-operator ready — waiting for handset lift") - # Event loop try: while True: - now = time.monotonic() - event = gpio.poll(now=now) - if event is not None: - session.handle_event(event, now=now) - session.tick(now=now) - time.sleep(0.005) # ~200 Hz polling + time.sleep(1) except KeyboardInterrupt: log.info("Shutting down.") finally: diff --git a/src/menu.py b/src/menu.py index 8518bf4..228984a 100644 --- a/src/menu.py +++ b/src/menu.py @@ -303,7 +303,7 @@ def on_handset_lifted(self, now: Optional[float] = None) -> None: self._pending_digit = None self._failure_mode = None self._lift_playback = self._media_client.now_playing() - self._audio.play_tone(DIAL_TONE_FREQUENCIES, 500) + self._audio.play_tone(DIAL_TONE_FREQUENCIES, 2000) def on_handset_on_cradle(self) -> None: """Called when the handset is replaced.""" diff --git a/src/session.py b/src/session.py index 69dea09..8f6a5cb 100644 --- a/src/session.py +++ b/src/session.py @@ -1,100 +1,90 @@ """Session lifecycle for hello-operator. -Owns the application lifecycle for a single handset interaction. Listens for -GPIO events, routes them to the Menu state machine, and handles cleanup on -hang-up. Does not stop media playback on hang-up — music continues. - -Usage:: - - session = Session(audio, tts, media_client, media_store, phone_book, error_queue, radio) - # In an event loop: - event = gpio_handler.poll(now=time.monotonic()) - if event is not None: - session.handle_event(event, now=time.monotonic()) - session.tick(now=time.monotonic()) +Manages a single handset interaction. Created when the handset is lifted; +closed when it is replaced. Owns the GPIO digit-polling loop. + +Production usage (via hook watcher in main.py):: + + menu = Menu(...) + session = Session(menu=menu, gpio=gpio_handler) + session.start() # starts internal polling thread + # ... on hang-up: + session.close() + +Test usage (no background thread):: + + session = Session(menu=menu, now=0.0) + session.handle_event((GpioEvent.DIGIT_DIALED, 5), now=0.5) + session.tick(now=DIAL_TONE_TIMEOUT_IDLE + 1.0) + session.close() """ +import threading import time from typing import Optional, Tuple, Union from src.gpio_handler import GpioEvent -from src.interfaces import AudioInterface, TTSInterface, MediaClientInterface, ErrorQueueInterface from src.menu import Menu class Session: - """Ties GPIO events to the Menu state machine. + """Manages a single handset interaction. Parameters ---------- - audio : AudioInterface - tts : TTSInterface - media_client : MediaClientInterface - media_store : MediaStore or MockMediaStore - phone_book : PhoneBook - error_queue : ErrorQueueInterface - radio : RadioInterface + menu: + Pre-constructed Menu instance owned by the caller. + gpio: + GPIOHandler for the internal digit-polling loop. Omit in tests. + now: + Monotonic clock value for the handset-lifted event. Defaults to + ``time.monotonic()`` if not provided. """ def __init__( self, - audio: AudioInterface, - tts: TTSInterface, - media_client: MediaClientInterface, - media_store, - phone_book, - error_queue: ErrorQueueInterface, - radio=None, # RadioInterface + menu: Menu, + gpio=None, + now: Optional[float] = None, ) -> None: - self._menu = Menu( - audio=audio, - tts=tts, - media_client=media_client, - media_store=media_store, - phone_book=phone_book, - error_queue=error_queue, - radio=radio, - ) + if now is None: + now = time.monotonic() + self._menu = menu + self._gpio = gpio + self._stop_event = threading.Event() + self._menu.on_handset_lifted(now=now) - # ------------------------------------------------------------------ - # Public API - # ------------------------------------------------------------------ + def start(self) -> None: + """Start the internal GPIO digit-polling loop in a daemon thread. + + Call this in production after construction. Tests omit this and + drive the session directly via handle_event() and tick(). + """ + t = threading.Thread(target=self._run, daemon=True, name="session-poll") + t.start() + + def close(self) -> None: + """Stop polling and deliver the hang-up event to the menu.""" + self._stop_event.set() + self._menu.on_handset_on_cradle() def handle_event( self, event: Union[GpioEvent, Tuple[GpioEvent, int]], now: Optional[float] = None, ) -> None: - """Route a GPIO event to the menu. - - Parameters - ---------- - event: - A ``GpioEvent`` member or a ``(GpioEvent.DIGIT_DIALED, digit)`` - tuple as returned by ``GPIOHandler.poll()``. - now: - Monotonic clock value in seconds. Defaults to ``time.monotonic()``. + """Dispatch a digit event directly (used in tests). + + Only DIGIT_DIALED events are processed; hook events are ignored + since the hook watcher handles those in production. """ if now is None: now = time.monotonic() - - if isinstance(event, tuple): - gpio_event, digit = event - if gpio_event == GpioEvent.DIGIT_DIALED: - self._menu.on_digit(digit, now=now) - elif event == GpioEvent.HANDSET_LIFTED: - self._menu.on_handset_lifted(now=now) - elif event == GpioEvent.HANDSET_ON_CRADLE: - self._menu.on_handset_on_cradle() + if isinstance(event, tuple) and event[0] == GpioEvent.DIGIT_DIALED: + self._menu.on_digit(event[1], now=now) def tick(self, now: Optional[float] = None) -> None: - """Advance menu timeouts. Call from the polling loop. - - Parameters - ---------- - now: - Monotonic clock value in seconds. Defaults to ``time.monotonic()``. - """ + """Advance menu timeouts (used in tests).""" if now is None: now = time.monotonic() self._menu.tick(now=now) @@ -103,3 +93,16 @@ def tick(self, now: Optional[float] = None) -> None: def menu(self) -> Menu: """The underlying Menu instance (for state inspection in tests).""" return self._menu + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _run(self) -> None: + while not self._stop_event.is_set(): + now = time.monotonic() + event = self._gpio.poll(now=now) + if isinstance(event, tuple) and event[0] == GpioEvent.DIGIT_DIALED: + self._menu.on_digit(event[1], now=now) + self._menu.tick(now=now) + time.sleep(0.005) diff --git a/tests/test_audio.py b/tests/test_audio.py index ce52d56..c58f4d3 100644 --- a/tests/test_audio.py +++ b/tests/test_audio.py @@ -91,10 +91,11 @@ def reset_popen_mock(): @pytest.fixture def audio(): - """Fresh SounddeviceAudio with injected mock popen.""" + """Fresh SounddeviceAudio with injected mock popen, handset lifted.""" a = SounddeviceAudio(_popen=_popen_mock) + a.amp_on() yield a - a.stop() + a.amp_off() # --------------------------------------------------------------------------- @@ -126,13 +127,14 @@ def _wait_for(condition, timeout=2.0, interval=0.005): def _capture_pcm(audio_obj, method, *args, timeout=2.0): """Invoke audio.(*args) and return the non-silent PCM bytes written. - Waits until the first non-silent write arrives, then calls stop() and - returns all PCM captured so far (silence prefix stripped). + Waits for the task to complete naturally before collecting PCM so that the + full clip is captured regardless of how fast the mock writes execute. """ _proc.reset() getattr(audio_obj, method)(*args) assert _proc.write_called.wait(timeout=timeout), \ f"{method}(*{args}) never wrote non-silent PCM to aplay" + _wait_for(lambda: not audio_obj.is_playing(), timeout=timeout) audio_obj.stop() all_samples = np.frombuffer(_proc.written_pcm, dtype=np.int16) nonzero = np.nonzero(all_samples)[0] @@ -346,15 +348,17 @@ def test_volume_reduces_amplitude(self, audio, tmp_path): # Capture at full volume a_full = SounddeviceAudio(volume=1.0, _popen=_popen_mock) + a_full.amp_on() pcm_full = _capture_pcm(a_full, 'play_file', wav_path) - a_full.stop() + a_full.amp_off() peak_full = np.max(np.abs(_pcm_to_samples(pcm_full))) # Capture at half volume _proc.reset() a_half = SounddeviceAudio(volume=0.5, _popen=_popen_mock) + a_half.amp_on() pcm_half = _capture_pcm(a_half, 'play_file', wav_path) - a_half.stop() + a_half.amp_off() peak_half = np.max(np.abs(_pcm_to_samples(pcm_half))) assert peak_half < peak_full * 0.6, \ @@ -363,13 +367,15 @@ def test_volume_reduces_amplitude(self, audio, tmp_path): def test_tone_volume_applied(self, audio): """volume=0.5 → generated tone amplitude ≈ half of volume=1.0.""" a_full = SounddeviceAudio(volume=1.0, _popen=_popen_mock) + a_full.amp_on() pcm_full = _capture_pcm(a_full, 'play_tone', [440], 100) - a_full.stop() + a_full.amp_off() _proc.reset() a_half = SounddeviceAudio(volume=0.5, _popen=_popen_mock) + a_half.amp_on() pcm_half = _capture_pcm(a_half, 'play_tone', [440], 100) - a_half.stop() + a_half.amp_off() peak_full = np.max(np.abs(_pcm_to_samples(pcm_full))) peak_half = np.max(np.abs(_pcm_to_samples(pcm_half))) @@ -414,10 +420,21 @@ def test_play_off_hook_tone_is_nonblocking(self, audio): def test_is_playing_true_while_worker_busy(self, audio): """is_playing() returns True while the worker is executing a task.""" - _proc.reset() + gate = threading.Event() + writing_started = threading.Event() + original_write_raw = audio._write_raw + + def gated_write(pcm): + if np.any(np.frombuffer(bytes(pcm), dtype=np.int16) != 0): + writing_started.set() + gate.wait(timeout=5.0) + original_write_raw(pcm) + + audio._write_raw = gated_write audio.play_tone([440], 5000) - assert _proc.write_called.wait(timeout=2.0), "tone never started" + assert writing_started.wait(timeout=2.0), "tone never started" assert audio.is_playing() + gate.set() audio.stop() def test_is_playing_false_after_stop(self, audio): @@ -501,45 +518,43 @@ def test_silence_written_between_clips(self, audio): assert len(all_written) > 0, "Worker never wrote silence during idle period" assert np.all(all_written == 0), "Non-silent data written while idle" - def test_aplay_started_once(self): - """popen is called exactly once per SounddeviceAudio instance.""" + def test_aplay_started_once_per_amp_on(self): + """popen is called exactly once per amp_on() call.""" _popen_mock.reset_mock() _popen_mock.side_effect = lambda *a, **kw: FakeProcess() a = SounddeviceAudio(_popen=_popen_mock) + assert _popen_mock.call_count == 0, "aplay must not start in __init__" + a.amp_on() a.play_tone([440], 50) a.play_tone([350], 50) time.sleep(0.2) - a.stop() + a.amp_off() assert _popen_mock.call_count == 1, \ f"Expected 1 aplay invocation, got {_popen_mock.call_count}" - def test_warmup_silence_written_on_init(self): - """__init__ writes a silence burst synchronously before the worker thread starts. - - This ensures the MAX98357A amp initialises (and its startup transient - fires) at app launch rather than on the first real user-facing clip. - The write must happen in __init__ *before* self._worker_thread.start(), - so write() is called at least once by the time __init__ returns. - """ + def test_warmup_silence_written_on_amp_on(self): + """amp_on() writes warmup silence synchronously before returning.""" local_proc = FakeProcess() local_popen = MagicMock(side_effect=lambda *a, **kw: local_proc) a = SounddeviceAudio(_popen=local_popen) - # Warmup is synchronous in __init__ (before worker starts), so - # write() must already have been called when __init__ returns. + assert local_proc.stdin.write.call_count == 0, \ + "aplay must not start in __init__" + a.amp_on() assert local_proc.stdin.write.call_count >= 1, \ - "SounddeviceAudio.__init__ did not write warmup silence before returning" + "amp_on() must write warmup silence synchronously" warmup_bytes = local_proc.stdin.write.call_args_list[0][0][0] samples = np.frombuffer(warmup_bytes, dtype=np.int16) assert len(samples) > 0, "Warmup write was empty" assert np.all(samples == 0), "Warmup write contains non-silent samples" - a.stop() + a.amp_off() def test_aplay_invoked_with_pcm_format_flags(self): """aplay is started with raw PCM format flags (not WAV stdin).""" _popen_mock.reset_mock() _popen_mock.side_effect = lambda *a, **kw: FakeProcess() a = SounddeviceAudio(device="plughw:TEST", _popen=_popen_mock) - a.stop() + a.amp_on() + a.amp_off() cmd = _popen_mock.call_args[0][0] assert '-f' in cmd and 'S16_LE' in cmd, "Missing -f S16_LE flag" assert '-r' in cmd, "Missing -r (sample rate) flag" diff --git a/tests/test_session.py b/tests/test_session.py index 656e458..3611e86 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -18,15 +18,19 @@ # --------------------------------------------------------------------------- def make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, - mock_error_queue, tmp_path, mock_radio=None): - """Build a Session with all mocked dependencies.""" + mock_error_queue, tmp_path, mock_radio=None, now=0.0): + """Build a Session with all mocked dependencies. + + Handset lift is delivered via Session.__init__ at the given ``now`` timestamp. + """ from src.phone_book import PhoneBook from src.session import Session + from src.menu import Menu from src.radio import MockRadio db = str(tmp_path / "phone_book.db") phone_book = PhoneBook(db_path=db) radio = mock_radio if mock_radio is not None else MockRadio() - return Session( + menu = Menu( audio=mock_audio, tts=mock_tts, media_client=mock_media_client, @@ -35,6 +39,7 @@ def make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, error_queue=mock_error_queue, radio=radio, ) + return Session(menu=menu, now=now) def tts_calls(mock_tts): @@ -51,11 +56,9 @@ class TestSessionHandset: def test_handset_lifted_starts_dial_tone( self, mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path): - """HANDSET_LIFTED event → dial tone begins.""" + """Session construction (handset lift) → dial tone begins.""" session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) - # play_tone should have been called assert any(c[0] == 'play_tone' for c in mock_audio.calls) def test_dial_tone_timeout_idle( @@ -67,7 +70,6 @@ def test_dial_tone_timeout_idle( mock_media_store.set_genres([]) session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) session.tick(now=DIAL_TONE_TIMEOUT_IDLE + 1.0) texts = tts_calls(mock_tts) assert any(SCRIPT_GREETING in t for t in texts) @@ -79,44 +81,39 @@ def test_dial_tone_timeout_playing( mock_media_client.set_now_playing(PlaybackState(MediaItem("/pl/1", "Jazz", "playlist"), False)) session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) session.tick(now=DIAL_TONE_TIMEOUT_PLAYING + 1.0) texts = tts_calls(mock_tts) assert any("Jazz" in t for t in texts) or any(SCRIPT_PLAYING_MENU_DEFAULT in t for t in texts) def test_handset_on_cradle_stops_audio( self, mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path): - """HANDSET_ON_CRADLE → all audio stops.""" + """session.close() → all audio stops.""" session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) mock_audio.calls.clear() - session.handle_event(GpioEvent.HANDSET_ON_CRADLE, now=1.0) + session.close() assert any(c[0] == 'stop' for c in mock_audio.calls) def test_handset_on_cradle_does_not_stop_plex( self, mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path): - """HANDSET_ON_CRADLE while music playing → plex_client.stop() NOT called.""" + """session.close() while music playing → media_client.stop() NOT called.""" mock_media_client.set_now_playing(PlaybackState(MediaItem("/pl/1", "Jazz", "playlist"), False)) session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) mock_media_client.calls.clear() - session.handle_event(GpioEvent.HANDSET_ON_CRADLE, now=1.0) + session.close() stop_calls = [c for c in mock_media_client.calls if c[0] == 'stop'] assert len(stop_calls) == 0 def test_digit_after_hangup_ignored( self, mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path): - """Digit event after HANDSET_ON_CRADLE → ignored, no state change.""" + """Digit event after close() → ignored, no state change.""" session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) - session.handle_event(GpioEvent.HANDSET_ON_CRADLE, now=1.0) + session.close() mock_tts.calls.clear() mock_audio.calls.clear() session.handle_event((GpioEvent.DIGIT_DIALED, 5), now=2.0) - # No TTS or audio changes should happen assert len(tts_calls(mock_tts)) == 0 @@ -128,7 +125,6 @@ def test_direct_dial_during_dial_tone( """Two digits within disambiguation timeout during dial tone → DTMF tones played.""" session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) session.handle_event((GpioEvent.DIGIT_DIALED, 5), now=0.5) session.handle_event((GpioEvent.DIGIT_DIALED, 5), now=0.6) dtmf_calls = [c for c in mock_audio.calls if c[0] == 'play_dtmf'] @@ -143,7 +139,6 @@ def test_single_digit_during_dial_tone_treated_as_navigation( mock_media_store.set_genres([]) session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) session.tick(now=DIAL_TONE_TIMEOUT_IDLE + 1.0) # deliver idle menu mock_tts.calls.clear() session.handle_event((GpioEvent.DIGIT_DIALED, 1), now=10.0) @@ -165,7 +160,6 @@ def test_direct_dial_known_number( mock_media_store.set_genres([]) session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) session.tick(now=DIAL_TONE_TIMEOUT_IDLE + 1.0) mock_media_client.calls.clear() # Dial the number @@ -188,7 +182,6 @@ def test_direct_dial_unknown_number( mock_media_store.set_genres([]) session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) session.tick(now=DIAL_TONE_TIMEOUT_IDLE + 1.0) mock_tts.calls.clear() # Dial unknown number: 1234567 @@ -211,7 +204,6 @@ def test_direct_dial_ignores_digits_after_7( mock_media_store.set_genres([]) session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) session.tick(now=DIAL_TONE_TIMEOUT_IDLE + 1.0) # Dial 8 digits: 12345678 t = 10.0 @@ -235,7 +227,6 @@ def test_direct_dial_hangup_before_7_digits( mock_media_store.set_genres([]) session = make_session(mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path) - session.handle_event(GpioEvent.HANDSET_LIFTED, now=0.0) session.tick(now=DIAL_TONE_TIMEOUT_IDLE + 1.0) mock_media_client.calls.clear() mock_tts.calls.clear() @@ -245,7 +236,7 @@ def test_direct_dial_hangup_before_7_digits( session.handle_event((GpioEvent.DIGIT_DIALED, 2), now=t + 0.05) session.handle_event((GpioEvent.DIGIT_DIALED, 3), now=t + 0.1) # Hang up before completing - session.handle_event(GpioEvent.HANDSET_ON_CRADLE, now=t + 0.2) + session.close() # No lookup or not-in-service play_calls = [c for c in mock_media_client.calls if c[0] == 'play'] not_in_service = [txt for txt in tts_calls(mock_tts) if "not in service" in txt.lower()] @@ -274,12 +265,12 @@ def test_session_passes_radio_to_menu( # --------------------------------------------------------------------------- class TestSessionRequiredParameters: - """Session must reject construction when required dependencies are omitted. + """Menu must reject construction when required dependencies are omitted. media_client, media_store, phone_book, and error_queue are not truly - optional — the session cannot function without them. Removing their + optional — the menu cannot function without them. Removing their = None defaults causes Python to raise TypeError at the call site instead - of letting a broken Session reach runtime. + of letting a broken Menu reach runtime. radio is intentionally optional (hardware-dependent; all usages are guarded with ``if self._radio is not None``). @@ -291,9 +282,9 @@ def _make_phone_book(self, tmp_path): def test_missing_media_client_raises_type_error( self, mock_audio, mock_tts, mock_media_store, mock_error_queue, tmp_path): - from src.session import Session + from src.menu import Menu with pytest.raises(TypeError): - Session( + Menu( audio=mock_audio, tts=mock_tts, media_store=mock_media_store, @@ -303,9 +294,9 @@ def test_missing_media_client_raises_type_error( def test_missing_media_store_raises_type_error( self, mock_audio, mock_tts, mock_media_client, mock_error_queue, tmp_path): - from src.session import Session + from src.menu import Menu with pytest.raises(TypeError): - Session( + Menu( audio=mock_audio, tts=mock_tts, media_client=mock_media_client, @@ -315,9 +306,9 @@ def test_missing_media_store_raises_type_error( def test_missing_phone_book_raises_type_error( self, mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path): - from src.session import Session + from src.menu import Menu with pytest.raises(TypeError): - Session( + Menu( audio=mock_audio, tts=mock_tts, media_client=mock_media_client, @@ -327,9 +318,9 @@ def test_missing_phone_book_raises_type_error( def test_missing_error_queue_raises_type_error( self, mock_audio, mock_tts, mock_media_client, mock_media_store, tmp_path): - from src.session import Session + from src.menu import Menu with pytest.raises(TypeError): - Session( + Menu( audio=mock_audio, tts=mock_tts, media_client=mock_media_client, @@ -339,9 +330,10 @@ def test_missing_error_queue_raises_type_error( def test_radio_is_still_optional( self, mock_audio, mock_tts, mock_media_client, mock_media_store, mock_error_queue, tmp_path): - """radio has no = None default — confirm Session builds without it.""" + """radio has no = None default — confirm Menu+Session builds without it.""" + from src.menu import Menu from src.session import Session - session = Session( + menu = Menu( audio=mock_audio, tts=mock_tts, media_client=mock_media_client, @@ -349,4 +341,5 @@ def test_radio_is_still_optional( phone_book=self._make_phone_book(tmp_path), error_queue=mock_error_queue, ) + session = Session(menu=menu, now=0.0) assert session.menu._radio is None