From 36a7d52dc6626dcfbb19168446619f831f8dc4c8 Mon Sep 17 00:00:00 2001 From: Danny Burrow Date: Sat, 28 Mar 2026 18:30:09 +0100 Subject: [PATCH 1/3] fix: address code quality issues and repo hygiene gaps, bump 0.7.1 - Use LOG.debug instead of logging.debug in _detect_hubble_cmd - Replace assert with early-return guards in reader threads - Guard cursor_flow_idx when ordered_keys is empty in watch mode - Close capture file handle on early exit paths to prevent leak - Add missing py.typed marker (referenced in CHANGELOG since v0.4.0) - Add types-PyYAML to dev dependencies for local mypy - Add .mypy_cache, .pytest_cache, .ruff_cache to .gitignore - Bump version to 0.7.1, update changelog --- .gitignore | 5 ++++- CHANGELOG.md | 12 ++++++++++++ hubble_audit2policy.py | 20 ++++++++++++++------ py.typed | 0 pyproject.toml | 4 ++-- 5 files changed, 32 insertions(+), 9 deletions(-) create mode 100644 py.typed diff --git a/.gitignore b/.gitignore index 9f6eccf..3ca49bd 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,7 @@ __pycache__/ dist/ build/ .venv/ -.vscode/ \ No newline at end of file +.vscode/ +.mypy_cache/ +.pytest_cache/ +.ruff_cache/ \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b62adf..27aa7e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.7.1] - 2026-03-28 + +### Fixed + +- Use module logger (`LOG.debug`) instead of root `logging.debug` in `_detect_hubble_cmd`. +- Replace `assert` guards in reader threads with early-return checks (assertions are stripped under `python -O`). +- Guard `cursor_flow_idx` when `ordered_keys` is empty in watch select mode. +- Close capture file handle on early exit paths to prevent resource leak. +- Add missing `py.typed` marker file (referenced in CHANGELOG since v0.4.0 but never created). +- Add `types-PyYAML` to dev dependencies so local `mypy` works without manual installs. +- Add `.mypy_cache/`, `.pytest_cache/`, `.ruff_cache/` to `.gitignore`. + ## [0.7.0] - 2026-03-27 ### Added diff --git a/hubble_audit2policy.py b/hubble_audit2policy.py index 74f46ba..75fac24 100755 --- a/hubble_audit2policy.py +++ b/hubble_audit2policy.py @@ -8,7 +8,7 @@ from __future__ import annotations -__version__ = "0.7.0" +__version__ = "0.7.1" __author__ = "noexecstack" __license__ = "Apache-2.0" @@ -1013,7 +1013,7 @@ def _detect_hubble_cmd() -> tuple[list[str], bool]: only when appropriate. """ if shutil.which("hubble"): - logging.debug("hubble: using PATH binary with -P (auto port-forward)") + LOG.debug("hubble: using PATH binary with -P (auto port-forward)") return ["hubble", "observe"], True # Fall back to kubectl exec into a Cilium pod. if shutil.which("kubectl"): @@ -1021,7 +1021,7 @@ def _detect_hubble_cmd() -> tuple[list[str], bool]: cilium_node_map = _build_cilium_node_map() if cilium_node_map: cilium_pod = next(iter(sorted(cilium_node_map.values()))) - logging.debug("hubble: using kubectl exec into %s", cilium_pod) + LOG.debug("hubble: using kubectl exec into %s", cilium_pod) return [ "kubectl", "exec", @@ -1075,7 +1075,8 @@ def _hubble_reader_thread( stop_event: threading.Event, ) -> None: """Read JSON flow lines from hubble stdout and feed them into *store*.""" - assert proc.stdout is not None + if proc.stdout is None: + return for line in proc.stdout: if stop_event.is_set(): break @@ -1100,7 +1101,8 @@ def _hubble_stderr_thread( stop_event: threading.Event, ) -> None: """Drain hubble stderr and record the last meaningful line in *store*.""" - assert proc.stderr is not None + if proc.stderr is None: + return for line in proc.stderr: if stop_event.is_set(): break @@ -1218,6 +1220,8 @@ def _watch_mode(args: argparse.Namespace) -> None: try: hubble_cmd = _build_hubble_observe_cmd(args) except RuntimeError as exc: + if capture_fh: + capture_fh.close() LOG.error("%s", exc) sys.exit(EXIT_ERROR) @@ -1230,6 +1234,8 @@ def _watch_mode(args: argparse.Namespace) -> None: try: proc_holder = [_launch_hubble(hubble_cmd, store, stop_event)] except FileNotFoundError: + if capture_fh: + capture_fh.close() LOG.error("Command not found: %s", hubble_cmd[0]) sys.exit(EXIT_ERROR) @@ -1297,9 +1303,11 @@ def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] is_paused = not is_paused elif key in (ord("s"), ord("S")): # toggle select mode + if not is_selecting and not ordered_keys: + continue # nothing to select is_selecting = not is_selecting if is_selecting: - cursor_flow_idx = min(cursor_flow_idx, max(0, len(ordered_keys) - 1)) + cursor_flow_idx = min(cursor_flow_idx, len(ordered_keys) - 1) elif key == 27: # Esc – exit select + clear is_selecting = False diff --git a/py.typed b/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml index 91f7457..7315788 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hubble-audit2policy" -version = "0.7.0" +version = "0.7.1" description = "Generate least-privilege CiliumNetworkPolicy YAML from Hubble flow logs." readme = "README.md" license = "Apache-2.0" @@ -26,7 +26,7 @@ classifiers = [ dependencies = ["pyyaml>=6.0"] [project.optional-dependencies] -dev = ["pytest>=7.0", "ruff>=0.4", "mypy>=1.10"] +dev = ["pytest>=7.0", "ruff>=0.4", "mypy>=1.10", "types-PyYAML>=6.0"] [project.scripts] hubble-audit2policy = "hubble_audit2policy:main" From 28ddad9e52cb98525594c1a7a0917e6671654445 Mon Sep 17 00:00:00 2001 From: Danny Burrow Date: Sat, 28 Mar 2026 18:47:15 +0100 Subject: [PATCH 2/3] refactor: extract watch-mode helpers, replace unicode, add tests, bump 0.7.2 - Extract _handle_key, _draw_header, _draw_content from 271-line nested _run - Replace reconnect_state dict with _ReconnectState dataclass - Replace all Unicode symbols with ASCII equivalents in TUI and CLI output - Add 39 new tests for output helpers, key handling, and error paths (120 total) - Bump version to 0.7.2, update changelog --- CHANGELOG.md | 13 + hubble_audit2policy.py | 559 ++++++++++++++++++++++++++--------------- pyproject.toml | 2 +- tests/test_output.py | 348 +++++++++++++++++++++++++ 4 files changed, 725 insertions(+), 197 deletions(-) create mode 100644 tests/test_output.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 27aa7e4..4369b2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.7.2] - 2026-03-28 + +### Changed + +- Refactor watch mode: extract `_handle_key`, `_draw_header`, `_draw_content` helpers from 271-line nested `_run` function. +- Replace `reconnect_state` dict with `_ReconnectState` dataclass for type-safe reconnection state. +- Replace all Unicode symbols with ASCII equivalents throughout TUI and CLI output. + +### Added + +- Tests for `_dump_yaml`, `_write_multi_doc_yaml`, `_find_unknown_flows`, `_print_unknown_warnings`, `_print_summary`, `_trunc`, `_print_report`, `_ReconnectState`, and `_handle_key` (39 new tests, 120 total). +- Error-path tests for `_parse_duration` and `_read_flows`. + ## [0.7.1] - 2026-03-28 ### Fixed diff --git a/hubble_audit2policy.py b/hubble_audit2policy.py index 75fac24..4bfd6a3 100755 --- a/hubble_audit2policy.py +++ b/hubble_audit2policy.py @@ -8,13 +8,14 @@ from __future__ import annotations -__version__ = "0.7.1" +__version__ = "0.7.2" __author__ = "noexecstack" __license__ = "Apache-2.0" import argparse import base64 import curses +import dataclasses import io import json import logging @@ -862,8 +863,8 @@ def _print_summary(total: int, matched: int, policy_count: int, file: IO[str] = def _trunc(s: str, width: int) -> str: - """Truncate *s* to *width* chars, appending '…' when cut.""" - return (s[: width - 1] + "\u2026") if len(s) > width else s + """Truncate *s* to *width* chars, appending ``...`` when cut.""" + return (s[: width - 3] + "...") if len(s) > width else s def _print_report( @@ -1167,12 +1168,298 @@ def _is_workload(ns: str, app: str) -> bool: return dict(policies) +@dataclasses.dataclass +class _ReconnectState: + """Mutable state for exponential-backoff reconnection in watch mode.""" + + delay: float = 2.0 + at: float = 0.0 + _DELAY_INIT: float = dataclasses.field(default=2.0, repr=False) + _DELAY_MAX: float = dataclasses.field(default=60.0, repr=False) + + def reset(self) -> None: + self.delay = self._DELAY_INIT + + def backoff(self, now: float) -> None: + self.at = now + self.delay + self.delay = min(self.delay * 2, self._DELAY_MAX) + + +def _handle_key( + key: int, + *, + is_paused: bool, + is_selecting: bool, + cursor_flow_idx: int, + ordered_keys: list[FlowKey], + selected_keys: set[FlowKey], + scroll_offset: int, + is_following: bool, + half_page: int, + content_lines_len: int, +) -> tuple[bool | None, bool, bool, int, int, bool, bool]: + """Process a single curses key press and return updated TUI state. + + Returns ``(quit_signal, is_paused, is_selecting, cursor_flow_idx, + scroll_offset, is_following, generate_flag)``. + + *quit_signal*: ``True`` = quit, ``None`` = key not handled (no-op), + ``False`` = handled, continue. + """ + if key in (ord("q"), ord("Q"), 3): # quit + return (True, is_paused, is_selecting, cursor_flow_idx, scroll_offset, is_following, False) + + if key == ord(" "): + if is_selecting: # toggle selection + if 0 <= cursor_flow_idx < len(ordered_keys): + fk = ordered_keys[cursor_flow_idx] + if fk in selected_keys: + selected_keys.discard(fk) + else: + selected_keys.add(fk) + else: # pause / resume + is_paused = not is_paused + return (False, is_paused, is_selecting, cursor_flow_idx, scroll_offset, is_following, False) + + if key in (ord("s"), ord("S")): # toggle select mode + if not is_selecting and not ordered_keys: + return ( + None, + is_paused, + is_selecting, + cursor_flow_idx, + scroll_offset, + is_following, + False, + ) + is_selecting = not is_selecting + if is_selecting: + cursor_flow_idx = min(cursor_flow_idx, len(ordered_keys) - 1) + return (False, is_paused, is_selecting, cursor_flow_idx, scroll_offset, is_following, False) + + if key == 27: # Esc -- exit select + clear + return False, is_paused, False, 0, scroll_offset, is_following, False + + if key in (10, 13, curses.KEY_ENTER): # Enter -- generate + quit + if is_selecting and selected_keys: + return ( + True, + is_paused, + is_selecting, + cursor_flow_idx, + scroll_offset, + is_following, + True, + ) + return ( + None, + is_paused, + is_selecting, + cursor_flow_idx, + scroll_offset, + is_following, + False, + ) + + # -- navigation -------------------------------------------------------- + if key in (ord("j"), curses.KEY_DOWN): + if is_selecting and ordered_keys: + cursor_flow_idx = min(cursor_flow_idx + 1, len(ordered_keys) - 1) + else: + scroll_offset += 1 + is_following = False + + elif key in (ord("k"), curses.KEY_UP): + if is_selecting and ordered_keys: + cursor_flow_idx = max(0, cursor_flow_idx - 1) + else: + scroll_offset = max(0, scroll_offset - 1) + + elif key in (ord("d"), curses.KEY_NPAGE): + if is_selecting and ordered_keys: + cursor_flow_idx = min(cursor_flow_idx + half_page, len(ordered_keys) - 1) + else: + scroll_offset += half_page + is_following = False + + elif key in (ord("u"), curses.KEY_PPAGE): + if is_selecting and ordered_keys: + cursor_flow_idx = max(0, cursor_flow_idx - half_page) + else: + scroll_offset = max(0, scroll_offset - half_page) + + elif key in (ord("g"), curses.KEY_HOME): + if is_selecting: + cursor_flow_idx = 0 + scroll_offset = 0 + is_following = True + + elif key in (ord("G"), curses.KEY_END): + if is_selecting and ordered_keys: + cursor_flow_idx = len(ordered_keys) - 1 + scroll_offset = content_lines_len # clamped by caller + is_following = False + + else: + return ( + None, + is_paused, + is_selecting, + cursor_flow_idx, + scroll_offset, + is_following, + False, + ) + + return ( + False, + is_paused, + is_selecting, + cursor_flow_idx, + scroll_offset, + is_following, + False, + ) + + +def _draw_header( + stdscr: Any, + width: int, + *, + cmd_display: str, + interval: float, + store: LiveFlowStore, + capture_file: str | None, + is_paused: bool, + is_selecting: bool, + selected_count: int, + reconn: _ReconnectState, + now_mono: float, +) -> None: + """Render the fixed header rows (0-2) of the watch mode TUI.""" + # Row 0: command + timestamp + now_str = time.strftime("%Y-%m-%d %H:%M:%S") + left = f"Every {interval:.1f}s: {cmd_display}" + if len(left) > width - len(now_str) - 2: + left = left[: width - len(now_str) - 5] + "..." + padding = max(1, width - len(left) - len(now_str)) + try: + stdscr.addnstr(0, 0, f"{left}{' ' * padding}{now_str}", width - 1) + except curses.error: + pass + + # Row 1: window info + status badge + win_label = ( + f"Window: last {store.window_seconds:.0f}s" + if store.window_seconds > 0 + else "Window: all flows" + ) + cap_note = f" | Capturing -> {capture_file}" if capture_file else "" + prefix = ( + f"{win_label} | In window: {store.count}" + f" | Total received: {store.total_received}{cap_note} " + ) + if is_paused: + badge, badge_attr = ( + "|| PAUSED", + curses.color_pair(2) | curses.A_BOLD if curses.has_colors() else curses.A_BOLD, + ) + elif store.connected: + badge, badge_attr = ( + "* Live", + curses.color_pair(1) if curses.has_colors() else curses.A_NORMAL, + ) + elif now_mono < reconn.at: + secs_left = int(reconn.at - now_mono) + badge = f"! Disconnected - reconnecting in {secs_left}s" + badge_attr = curses.color_pair(2) if curses.has_colors() else curses.A_NORMAL + else: + badge, badge_attr = ( + "! Reconnecting...", + curses.color_pair(2) if curses.has_colors() else curses.A_NORMAL, + ) + try: + stdscr.addnstr(1, 0, prefix, width - 1) + stdscr.addnstr(1, len(prefix), badge, width - 1 - len(prefix), badge_attr) + except curses.error: + pass + + # Row 2: selection hint (select mode) OR last hubble error OR blank + if is_selecting: + sel_hint = ( + f"SELECT Space toggle Enter generate ({selected_count} selected)" + " j/k move s/Esc exit" + ) + sel_attr = curses.color_pair(2) | curses.A_BOLD if curses.has_colors() else curses.A_BOLD + try: + stdscr.addnstr(2, 0, sel_hint, width - 1, sel_attr) + except curses.error: + pass + elif not store.connected and store.last_error: + err_text = f"Last error: {_trunc(store.last_error, width - 13)}" + err_attr = curses.color_pair(3) if curses.has_colors() else curses.A_NORMAL + try: + stdscr.addnstr(2, 0, err_text, width - 1, err_attr) + except curses.error: + pass + + +def _draw_content( + stdscr: Any, + width: int, + height: int, + *, + content_lines: list[str], + scroll_offset: int, + is_selecting: bool, + is_following: bool, + cursor_flow_idx: int, + key_map: dict[int, FlowKey], + selected_keys: set[FlowKey], + header_lines: int, + data_row_offset: int, +) -> None: + """Render the scrollable content area and corner scroll indicator.""" + content_viewport = max(1, height - header_lines) + max_scroll = max(0, len(content_lines) - content_viewport) + + cursor_line = data_row_offset + cursor_flow_idx if is_selecting else -1 + for i, line in enumerate(content_lines[scroll_offset : scroll_offset + content_viewport]): + abs_idx = scroll_offset + i + attr = curses.A_NORMAL + if is_selecting and abs_idx in key_map: + fk = key_map[abs_idx] + if fk in selected_keys: + attr |= ( + curses.color_pair(1) | curses.A_BOLD if curses.has_colors() else curses.A_BOLD + ) + if abs_idx == cursor_line: + attr |= curses.A_REVERSE + try: + stdscr.addnstr(header_lines + i, 0, line, width - 1, attr) + except curses.error: + pass + + # Corner scroll indicator + if len(content_lines) > content_viewport: + if is_following: + indicator = " TOP j/d v " + else: + pct = int(100 * scroll_offset / max(1, max_scroll)) + indicator = f" {pct}% k/u ^ j/d v g# " + try: + col = max(0, width - len(indicator) - 1) + stdscr.addnstr(height - 1, col, indicator, width - 1, curses.A_REVERSE) + except curses.error: + pass + + def _watch_mode(args: argparse.Namespace) -> None: """Live monitoring mode: spawn ``hubble observe`` and refresh the report. Keys (Mac-primary, all modes): - j / ↓ scroll down one line - k / ↑ scroll up one line + j / down scroll down one line + k / up scroll up one line d scroll down half a page u scroll up half a page g jump to top / re-enable auto-follow @@ -1182,7 +1469,7 @@ def _watch_mode(args: argparse.Namespace) -> None: s enter / exit flow-selection mode Enter generate policies from selected flows and quit (select mode) Esc exit select mode and clear selections - q / ^C quit — last report is printed to the terminal + q / ^C quit -- last report is printed to the terminal PgUp / PgDn / Home / End also work for non-Mac keyboards. @@ -1239,9 +1526,7 @@ def _watch_mode(args: argparse.Namespace) -> None: LOG.error("Command not found: %s", hubble_cmd[0]) sys.exit(EXIT_ERROR) - _RECONNECT_DELAY_INIT = 2.0 - _RECONNECT_DELAY_MAX = 60.0 - reconnect_state = {"delay": _RECONNECT_DELAY_INIT, "at": 0.0} + reconn = _ReconnectState() # Header layout (screen rows): # 0 command + timestamp @@ -1252,7 +1537,7 @@ def _watch_mode(args: argparse.Namespace) -> None: HEADER_LINES = 4 # _print_report always writes these many lines before the first data row: - # "" (leading \n), sep, "FLOW REPORT...", sep, header-row, dash → 6 lines + # "" (leading \n), sep, "FLOW REPORT...", sep, header-row, dash = 6 lines DATA_ROW_OFFSET = 6 # Shared state between _run() and the outer scope. @@ -1260,6 +1545,8 @@ def _watch_mode(args: argparse.Namespace) -> None: generate_flag: bool = False selected_keys_final: set[FlowKey] = set() + capture_file_name: str | None = args.capture_file if capture_fh else None + def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] nonlocal final_content, generate_flag if curses.has_colors(): @@ -1279,7 +1566,7 @@ def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] selected_keys: set[FlowKey] = set() ordered_keys: list[FlowKey] = [] # flow keys in display order (rebuilt each refresh) - key_map: dict[int, FlowKey] = {} # content_line_index → FlowKey + key_map: dict[int, FlowKey] = {} # content_line_index -> FlowKey content_lines: list[str] = [] last_refresh = 0.0 @@ -1288,76 +1575,37 @@ def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] height, width = stdscr.getmaxyx() half_page = max(1, (height - HEADER_LINES) // 2) - if key in (ord("q"), ord("Q"), 3): # quit - break - - elif key == ord(" "): - if is_selecting: # toggle selection - if 0 <= cursor_flow_idx < len(ordered_keys): - fk = ordered_keys[cursor_flow_idx] - if fk in selected_keys: - selected_keys.discard(fk) - else: - selected_keys.add(fk) - else: # pause / resume - is_paused = not is_paused - - elif key in (ord("s"), ord("S")): # toggle select mode - if not is_selecting and not ordered_keys: - continue # nothing to select - is_selecting = not is_selecting - if is_selecting: - cursor_flow_idx = min(cursor_flow_idx, len(ordered_keys) - 1) - - elif key == 27: # Esc – exit select + clear - is_selecting = False - selected_keys.clear() - cursor_flow_idx = 0 - - elif key in (10, 13, curses.KEY_ENTER): # Enter – generate + quit - if is_selecting and selected_keys: - generate_flag = True - selected_keys_final.update(selected_keys) + if key != -1: + ( + quit_sig, + is_paused, + is_selecting, + cursor_flow_idx, + scroll_offset, + is_following, + gen, + ) = _handle_key( + key, + is_paused=is_paused, + is_selecting=is_selecting, + cursor_flow_idx=cursor_flow_idx, + ordered_keys=ordered_keys, + selected_keys=selected_keys, + scroll_offset=scroll_offset, + is_following=is_following, + half_page=half_page, + content_lines_len=len(content_lines), + ) + if quit_sig: + if gen: + generate_flag = True + selected_keys_final.update(selected_keys) break - - # ---- navigation -------------------------------------------------- - elif key in (ord("j"), curses.KEY_DOWN): - if is_selecting and ordered_keys: - cursor_flow_idx = min(cursor_flow_idx + 1, len(ordered_keys) - 1) - else: - scroll_offset += 1 - is_following = False - - elif key in (ord("k"), curses.KEY_UP): - if is_selecting and ordered_keys: - cursor_flow_idx = max(0, cursor_flow_idx - 1) - else: - scroll_offset = max(0, scroll_offset - 1) - - elif key in (ord("d"), curses.KEY_NPAGE): - if is_selecting and ordered_keys: - cursor_flow_idx = min(cursor_flow_idx + half_page, len(ordered_keys) - 1) - else: - scroll_offset += half_page - is_following = False - - elif key in (ord("u"), curses.KEY_PPAGE): - if is_selecting and ordered_keys: - cursor_flow_idx = max(0, cursor_flow_idx - half_page) - else: - scroll_offset = max(0, scroll_offset - half_page) - - elif key in (ord("g"), curses.KEY_HOME): - if is_selecting: - cursor_flow_idx = 0 - scroll_offset = 0 - is_following = True - - elif key in (ord("G"), curses.KEY_END): - if is_selecting and ordered_keys: - cursor_flow_idx = len(ordered_keys) - 1 - scroll_offset = len(content_lines) # clamped below - is_following = False + # Esc clears selections inside _handle_key via the returned + # is_selecting=False, but the set is mutated in-place only for + # toggle; for Esc we need to clear here when select was exited. + if key == 27: + selected_keys.clear() now_mono = time.monotonic() @@ -1367,15 +1615,12 @@ def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] # Auto-reconnect if not store.connected and proc_holder[0].poll() is not None: - if now_mono >= reconnect_state["at"]: + if now_mono >= reconn.at: try: proc_holder[0] = _launch_hubble(hubble_cmd, store, stop_event) - reconnect_state["delay"] = _RECONNECT_DELAY_INIT + reconn.reset() except FileNotFoundError: - reconnect_state["at"] = now_mono + reconnect_state["delay"] - reconnect_state["delay"] = min( - reconnect_state["delay"] * 2, _RECONNECT_DELAY_MAX - ) + reconn.backoff(now_mono) flows = store.snapshot() _, flow_counts, total, matched, _ = _parse_flow_list( @@ -1393,8 +1638,8 @@ def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] _print_unknown_warnings(unknown_keys, flow_counts, file=buf) buf.write( - "\nSpace pause \u2022 j/k line \u2022 d/u half-page" - " \u2022 g/G top/bottom \u2022 s select \u2022 q quit\n" + "\nSpace pause | j/k line | d/u half-page" + " | g/G top/bottom | s select | q quit\n" ) content_lines = buf.getvalue().splitlines() final_content = content_lines @@ -1414,7 +1659,7 @@ def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] else: scroll_offset = min(scroll_offset, max_scroll) if scroll_offset <= 0: - is_following = True # scrolled back to top → re-enable + is_following = True # scrolled back to top -- re-enable # In selection mode keep the cursor row visible. if is_selecting and ordered_keys: @@ -1425,115 +1670,37 @@ def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] elif cursor_line >= scroll_offset + content_viewport: scroll_offset = min(cursor_line - content_viewport + 1, max_scroll) - # ---- Draw header ------------------------------------------------- + # ---- Draw -------------------------------------------------------- stdscr.erase() - - # Row 0: command + timestamp - now_str = time.strftime("%Y-%m-%d %H:%M:%S") - left = f"Every {interval:.1f}s: {cmd_display}" - if len(left) > width - len(now_str) - 2: - left = left[: width - len(now_str) - 5] + "\u2026" - padding = max(1, width - len(left) - len(now_str)) - try: - stdscr.addnstr(0, 0, f"{left}{' ' * padding}{now_str}", width - 1) - except curses.error: - pass - - # Row 1: window info + status badge - win_label = f"Window: last {window:.0f}s" if window > 0 else "Window: all flows" - cap_note = f" • Capturing → {args.capture_file}" if capture_fh else "" - prefix = ( - f"{win_label} \u2022 In window: {store.count}" - f" \u2022 Total received: {store.total_received}{cap_note} " + _draw_header( + stdscr, + width, + cmd_display=cmd_display, + interval=interval, + store=store, + capture_file=capture_file_name, + is_paused=is_paused, + is_selecting=is_selecting, + selected_count=len(selected_keys), + reconn=reconn, + now_mono=now_mono, + ) + _draw_content( + stdscr, + width, + height, + content_lines=content_lines, + scroll_offset=scroll_offset, + is_selecting=is_selecting, + is_following=is_following, + cursor_flow_idx=cursor_flow_idx, + key_map=key_map, + selected_keys=selected_keys, + header_lines=HEADER_LINES, + data_row_offset=DATA_ROW_OFFSET, ) - if is_paused: - badge, badge_attr = ( - "\u23f8 PAUSED", - ( - curses.color_pair(2) | curses.A_BOLD - if curses.has_colors() - else curses.A_BOLD - ), - ) - elif store.connected: - badge, badge_attr = ( - "\u25cf Live", - (curses.color_pair(1) if curses.has_colors() else curses.A_NORMAL), - ) - elif now_mono < reconnect_state["at"]: - secs_left = int(reconnect_state["at"] - now_mono) - badge = f"\u26a0 Disconnected \u2013 reconnecting in {secs_left}s" - badge_attr = curses.color_pair(2) if curses.has_colors() else curses.A_NORMAL - else: - badge, badge_attr = ( - "\u26a0 Reconnecting\u2026", - (curses.color_pair(2) if curses.has_colors() else curses.A_NORMAL), - ) - try: - stdscr.addnstr(1, 0, prefix, width - 1) - stdscr.addnstr(1, len(prefix), badge, width - 1 - len(prefix), badge_attr) - except curses.error: - pass - - # Row 2: selection hint (select mode) OR last hubble error OR blank - if is_selecting: - n_sel = len(selected_keys) - sel_hint = ( - f"SELECT Space toggle Enter generate ({n_sel} selected) j/k move s/Esc exit" - ) - sel_attr = ( - curses.color_pair(2) | curses.A_BOLD if curses.has_colors() else curses.A_BOLD - ) - try: - stdscr.addnstr(2, 0, sel_hint, width - 1, sel_attr) - except curses.error: - pass - elif not store.connected and store.last_error: - err_text = f"Last error: {_trunc(store.last_error, width - 13)}" - err_attr = curses.color_pair(3) if curses.has_colors() else curses.A_NORMAL - try: - stdscr.addnstr(2, 0, err_text, width - 1, err_attr) - except curses.error: - pass - # Row 3 is blank (implicit from erase) - - # ---- Draw scrollable content ------------------------------------- - cursor_line = DATA_ROW_OFFSET + cursor_flow_idx if is_selecting else -1 - for i, line in enumerate( - content_lines[scroll_offset : scroll_offset + content_viewport] - ): - abs_idx = scroll_offset + i - attr = curses.A_NORMAL - if is_selecting and abs_idx in key_map: - fk = key_map[abs_idx] - if fk in selected_keys: - attr |= ( - curses.color_pair(1) | curses.A_BOLD - if curses.has_colors() - else curses.A_BOLD - ) - if abs_idx == cursor_line: - attr |= curses.A_REVERSE - try: - stdscr.addnstr(HEADER_LINES + i, 0, line, width - 1, attr) - except curses.error: - pass - - # ---- Corner indicator -------------------------------------------- - if len(content_lines) > content_viewport: - if is_following: - indicator = " TOP j/d \u2193 " - else: - pct = int(100 * scroll_offset / max(1, max_scroll)) - indicator = f" {pct}% k/u \u2191 j/d \u2193 g\u21a5 " - try: - col = max(0, width - len(indicator) - 1) - stdscr.addnstr(height - 1, col, indicator, width - 1, curses.A_REVERSE) - except curses.error: - pass - stdscr.refresh() - time.sleep(0.05) # ~20 fps – keeps key input responsive + time.sleep(0.05) # ~20 fps -- keeps key input responsive try: curses.wrapper(_run) @@ -1562,7 +1729,7 @@ def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] n_pol = len(sorted_policies) print( f"\nGenerating {n_pol} {'policy' if n_pol == 1 else 'policies'} " - f"from {len(selected_keys_final)} selected flows\u2026", + f"from {len(selected_keys_final)} selected flows...", file=sys.stderr, ) if args.dry_run: diff --git a/pyproject.toml b/pyproject.toml index 7315788..2021f93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hubble-audit2policy" -version = "0.7.1" +version = "0.7.2" description = "Generate least-privilege CiliumNetworkPolicy YAML from Hubble flow logs." readme = "README.md" license = "Apache-2.0" diff --git a/tests/test_output.py b/tests/test_output.py new file mode 100644 index 0000000..1a340e9 --- /dev/null +++ b/tests/test_output.py @@ -0,0 +1,348 @@ +"""Tests for output helpers, display functions, watch-mode key handling, and error paths.""" + +from __future__ import annotations + +import argparse +import io +from collections import Counter + +import pytest +import yaml + +import hubble_audit2policy as h + +# --------------------------------------------------------------------------- +# _dump_yaml / _write_multi_doc_yaml +# --------------------------------------------------------------------------- + + +class TestDumpYaml: + def test_basic_roundtrip(self) -> None: + policy = h.build_policy( + "default", "web", {"egress": {("default", "api", 8080, "TCP")}, "ingress": set()} + ) + buf = io.StringIO() + h._dump_yaml(policy, buf) + loaded = yaml.safe_load(buf.getvalue()) + assert loaded["metadata"]["name"] == "allow-web" + assert loaded["metadata"]["namespace"] == "default" + + def test_preserves_key_order(self) -> None: + policy = h.build_policy( + "ns", "app", {"egress": {("ns", "peer", 443, "TCP")}, "ingress": set()} + ) + buf = io.StringIO() + h._dump_yaml(policy, buf) + text = buf.getvalue() + api_pos = text.index("apiVersion") + kind_pos = text.index("kind") + meta_pos = text.index("metadata") + assert api_pos < kind_pos < meta_pos + + +class TestWriteMultiDocYaml: + def test_two_policies_separated_by_doc_marker(self) -> None: + policies = [ + ("ns1", "app1", {"egress": {("ns1", "peer", 80, "TCP")}, "ingress": set()}), + ("ns2", "app2", {"egress": set(), "ingress": {("ns2", "src", 443, "TCP")}}), + ] + buf = io.StringIO() + h._write_multi_doc_yaml(policies, buf) + text = buf.getvalue() + docs = list(yaml.safe_load_all(text)) + assert len(docs) == 2 + assert docs[0]["metadata"]["name"] == "allow-app1" + assert docs[1]["metadata"]["name"] == "allow-app2" + + def test_single_policy_no_separator(self) -> None: + policies = [ + ("ns", "app", {"egress": {("ns", "peer", 80, "TCP")}, "ingress": set()}), + ] + buf = io.StringIO() + h._write_multi_doc_yaml(policies, buf) + assert "---" not in buf.getvalue() + + +# --------------------------------------------------------------------------- +# _find_unknown_flows +# --------------------------------------------------------------------------- + + +class TestFindUnknownFlows: + def test_detects_unknown_source(self) -> None: + counts: Counter[h.FlowKey] = Counter() + key: h.FlowKey = ("ns", "unknown", "ns", "app", 80, "TCP") + counts[key] = 3 + result = h._find_unknown_flows(counts) + assert key in result + + def test_detects_unknown_destination(self) -> None: + counts: Counter[h.FlowKey] = Counter() + key: h.FlowKey = ("ns", "app", "ns", "unknown", 80, "TCP") + counts[key] = 1 + result = h._find_unknown_flows(counts) + assert key in result + + def test_ignores_known_flows(self) -> None: + counts: Counter[h.FlowKey] = Counter() + key: h.FlowKey = ("ns", "web", "ns", "api", 80, "TCP") + counts[key] = 5 + assert h._find_unknown_flows(counts) == [] + + +# --------------------------------------------------------------------------- +# _print_unknown_warnings +# --------------------------------------------------------------------------- + + +class TestPrintUnknownWarnings: + def test_contains_warning_and_hint(self) -> None: + counts: Counter[h.FlowKey] = Counter() + key: h.FlowKey = ("ns", "unknown", "ns", "api", 80, "TCP") + counts[key] = 2 + buf = io.StringIO() + h._print_unknown_warnings([key], counts, file=buf) + text = buf.getvalue() + assert "WARNING" in text + assert "--label-key" in text + assert "x2" in text + + +# --------------------------------------------------------------------------- +# _print_summary +# --------------------------------------------------------------------------- + + +class TestPrintSummary: + def test_singular_policy(self) -> None: + buf = io.StringIO() + h._print_summary(10, 5, 1, file=buf) + text = buf.getvalue() + assert "1 policy" in text + assert "policies" not in text + + def test_plural_policies(self) -> None: + buf = io.StringIO() + h._print_summary(100, 50, 3, file=buf) + assert "3 policies" in buf.getvalue() + + def test_counts_present(self) -> None: + buf = io.StringIO() + h._print_summary(42, 17, 5, file=buf) + text = buf.getvalue() + assert "42" in text + assert "17" in text + + +# --------------------------------------------------------------------------- +# _trunc +# --------------------------------------------------------------------------- + + +class TestTrunc: + def test_short_string_unchanged(self) -> None: + assert h._trunc("hello", 10) == "hello" + + def test_exact_width_unchanged(self) -> None: + assert h._trunc("12345", 5) == "12345" + + def test_long_string_truncated(self) -> None: + result = h._trunc("abcdefghij", 7) + assert result == "abcd..." + assert len(result) == 7 + + def test_uses_ascii_ellipsis(self) -> None: + result = h._trunc("a" * 20, 10) + assert result.endswith("...") + assert "\u2026" not in result + + +# --------------------------------------------------------------------------- +# _print_report +# --------------------------------------------------------------------------- + + +class TestPrintReport: + def _make_counts(self) -> Counter[h.FlowKey]: + counts: Counter[h.FlowKey] = Counter() + counts[("ns", "web", "ns", "api", 8080, "TCP")] = 10 + counts[("ns", "api", "ns", "db", 5432, "TCP")] = 5 + return counts + + def test_returns_keys_in_frequency_order(self) -> None: + counts = self._make_counts() + buf = io.StringIO() + keys = h._print_report(counts, 15, 15, file=buf, term_width=120) + assert len(keys) == 2 + # Most common first + assert keys[0] == ("ns", "web", "ns", "api", 8080, "TCP") + assert keys[1] == ("ns", "api", "ns", "db", 5432, "TCP") + + def test_output_contains_header_and_data(self) -> None: + counts = self._make_counts() + buf = io.StringIO() + h._print_report(counts, 15, 15, file=buf, term_width=120) + text = buf.getvalue() + assert "FLOW REPORT" in text + assert "COUNT" in text + assert "SOURCE" in text + assert "DESTINATION" in text + assert "8080" in text + assert "5432" in text + + def test_empty_counts(self) -> None: + counts: Counter[h.FlowKey] = Counter() + buf = io.StringIO() + keys = h._print_report(counts, 0, 0, file=buf, term_width=80) + assert keys == [] + assert "FLOW REPORT" in buf.getvalue() + + def test_narrow_terminal_does_not_crash(self) -> None: + counts = self._make_counts() + buf = io.StringIO() + keys = h._print_report(counts, 15, 15, file=buf, term_width=40) + assert len(keys) == 2 + + +# --------------------------------------------------------------------------- +# _ReconnectState +# --------------------------------------------------------------------------- + + +class TestReconnectState: + def test_initial_values(self) -> None: + rs = h._ReconnectState() + assert rs.delay == 2.0 + assert rs.at == 0.0 + + def test_reset(self) -> None: + rs = h._ReconnectState() + rs.delay = 32.0 + rs.reset() + assert rs.delay == 2.0 + + def test_backoff_doubles(self) -> None: + rs = h._ReconnectState() + rs.backoff(100.0) + assert rs.at == 102.0 + assert rs.delay == 4.0 + + def test_backoff_caps_at_max(self) -> None: + rs = h._ReconnectState() + for _ in range(20): + rs.backoff(0.0) + assert rs.delay == 60.0 + + +# --------------------------------------------------------------------------- +# _handle_key +# --------------------------------------------------------------------------- + + +_DEFAULT_KEY_ARGS = { + "is_paused": False, + "is_selecting": False, + "cursor_flow_idx": 0, + "ordered_keys": [], + "selected_keys": set(), + "scroll_offset": 0, + "is_following": True, + "half_page": 10, + "content_lines_len": 50, +} + + +class TestHandleKey: + def _call(self, key: int, **overrides: object) -> tuple[object, ...]: + kw = {**_DEFAULT_KEY_ARGS, **overrides} + return h._handle_key(key, **kw) # type: ignore[arg-type] + + def test_quit_q(self) -> None: + result = self._call(ord("q")) + assert result[0] is True # quit signal + + def test_quit_ctrl_c(self) -> None: + result = self._call(3) + assert result[0] is True + + def test_space_toggles_pause(self) -> None: + result = self._call(ord(" "), is_paused=False) + assert result[1] is True # is_paused + + def test_space_unpauses(self) -> None: + result = self._call(ord(" "), is_paused=True) + assert result[1] is False + + def test_j_scrolls_down(self) -> None: + result = self._call(ord("j"), scroll_offset=5) + assert result[4] == 6 # scroll_offset + + def test_k_scrolls_up(self) -> None: + result = self._call(ord("k"), scroll_offset=5, is_following=False) + assert result[4] == 4 + + def test_g_jumps_to_top(self) -> None: + result = self._call(ord("g"), scroll_offset=20, is_following=False) + assert result[4] == 0 # scroll_offset + assert result[5] is True # is_following + + def test_s_enters_select_mode(self) -> None: + keys: list[h.FlowKey] = [("ns", "a", "ns", "b", 80, "TCP")] + result = self._call(ord("s"), ordered_keys=keys) + assert result[2] is True # is_selecting + + def test_s_noop_when_no_keys(self) -> None: + result = self._call(ord("s"), ordered_keys=[]) + assert result[0] is None # no-op + assert result[2] is False # still not selecting + + def test_enter_generates_when_selecting(self) -> None: + keys: list[h.FlowKey] = [("ns", "a", "ns", "b", 80, "TCP")] + selected: set[h.FlowKey] = {keys[0]} + result = self._call(ord("\r"), is_selecting=True, ordered_keys=keys, selected_keys=selected) + assert result[0] is True # quit + assert result[6] is True # generate_flag + + def test_unknown_key_returns_none(self) -> None: + result = self._call(ord("z")) + assert result[0] is None # no-op + + +# --------------------------------------------------------------------------- +# Error path tests +# --------------------------------------------------------------------------- + + +class TestParseDurationErrors: + def test_invalid_unit(self) -> None: + with pytest.raises(argparse.ArgumentTypeError): + h._parse_duration("5x") + + def test_empty_string(self) -> None: + with pytest.raises(argparse.ArgumentTypeError): + h._parse_duration("") + + def test_letters_only(self) -> None: + with pytest.raises(argparse.ArgumentTypeError): + h._parse_duration("abc") + + +class TestReadFlowsErrors: + def test_nonexistent_file(self) -> None: + with pytest.raises(FileNotFoundError): + list(h._read_flows("/tmp/nonexistent_hubble_flows_12345.jsonl")) + + def test_completely_malformed_file(self) -> None: + import tempfile + + with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f: + f.write("not json at all\n") + f.write("{totally broken\n") + path = f.name + try: + result = list(h._read_flows(path)) + assert result == [] + finally: + import os + + os.unlink(path) From d0a8941616fc9b3bcec91a1392045e65b233da19 Mon Sep 17 00:00:00 2001 From: Danny Burrow Date: Sat, 28 Mar 2026 18:58:27 +0100 Subject: [PATCH 3/3] docs: add missing Loki auth flags to README, fix py.typed packaging, bump 0.7.3 - Add --loki-user, --loki-password, --loki-token, --loki-tls-ca to README usage signature and flag reference table - Declare py.typed in pyproject.toml package-data so the marker is included in built distributions - Bump version to 0.7.3, update changelog --- CHANGELOG.md | 7 +++++++ README.md | 6 ++++++ hubble_audit2policy.py | 2 +- pyproject.toml | 5 ++++- 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4369b2d..defaa53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.7.3] - 2026-03-28 + +### Fixed + +- Add missing Loki auth flags (`--loki-user`, `--loki-password`, `--loki-token`, `--loki-tls-ca`) to README usage signature and flag reference table. +- Declare `py.typed` in `pyproject.toml` `[tool.setuptools.package-data]` so the marker is included in built distributions. + ## [0.7.2] - 2026-03-28 ### Changed diff --git a/README.md b/README.md index 100e162..c73b6e5 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,8 @@ hubble-audit2policy [-h] [-o OUTPUT_DIR] [-n NAMESPACE] [--from {file,loki}] [--loki-url URL] [--loki-query LOGQL] [--since DURATION] [--until DURATION] [--loki-limit N] + [--loki-user USER] [--loki-password PASSWORD] + [--loki-token TOKEN] [--loki-tls-ca PATH] [-v] [-V] [flows_file] ``` @@ -181,6 +183,10 @@ hubble-audit2policy [-h] [-o OUTPUT_DIR] [-n NAMESPACE] | `--since DURATION` | How far back to query, e.g. `30m`, `2h`, `1d` (default: `1h`) | | `--until DURATION` | End of query window as duration before now (default: `0s` = now) | | `--loki-limit N` | Max entries per Loki request batch (default: `5000`) | +| `--loki-user USER` | Username for Loki HTTP Basic authentication | +| `--loki-password PASSWORD` | Password for Loki HTTP Basic authentication (used with `--loki-user`) | +| `--loki-token TOKEN` | Bearer token for Loki (`Authorization: Bearer ...`) header | +| `--loki-tls-ca PATH` | Path to a PEM CA certificate for verifying the Loki server (self-signed certs) | | `-v, --verbose` | Enable verbose logging | | `-V, --version` | Show version and exit | diff --git a/hubble_audit2policy.py b/hubble_audit2policy.py index 4bfd6a3..0e35bc2 100755 --- a/hubble_audit2policy.py +++ b/hubble_audit2policy.py @@ -8,7 +8,7 @@ from __future__ import annotations -__version__ = "0.7.2" +__version__ = "0.7.3" __author__ = "noexecstack" __license__ = "Apache-2.0" diff --git a/pyproject.toml b/pyproject.toml index 2021f93..c71bb22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hubble-audit2policy" -version = "0.7.2" +version = "0.7.3" description = "Generate least-privilege CiliumNetworkPolicy YAML from Hubble flow logs." readme = "README.md" license = "Apache-2.0" @@ -35,6 +35,9 @@ hubble-audit2policy = "hubble_audit2policy:main" Homepage = "https://github.com/noexecstack/hubble-audit2policy" Issues = "https://github.com/noexecstack/hubble-audit2policy/issues" +[tool.setuptools.package-data] +hubble_audit2policy = ["py.typed"] + [tool.ruff] target-version = "py310" line-length = 100