diff --git a/hubble_audit2policy.py b/hubble_audit2policy.py index 2aa5599..addabd3 100755 --- a/hubble_audit2policy.py +++ b/hubble_audit2policy.py @@ -8,7 +8,7 @@ from __future__ import annotations -__version__ = "0.7.5" +__version__ = "0.9.0" __author__ = "noexecstack" __license__ = "Apache-2.0" @@ -1407,6 +1407,53 @@ def _draw_header( pass +def _draw_loki_header( + stdscr: Any, + width: int, + *, + loki_url: str, + loki_query: str, + since: str, + until: str, + flow_count: int, + is_selecting: bool, + selected_count: int, +) -> None: + """Render the fixed header rows (0-2) for Loki watch mode TUI.""" + # Row 0: source + timestamp + now_str = time.strftime("%Y-%m-%d %H:%M:%S") + left = f"Loki: {loki_url} query={loki_query}" + if len(left) > width - len(now_str) - 2: + left = left[: width - len(now_str) - 5] + "..." + padding = max(1, width - len(left) - len(now_str)) + try: + stdscr.addnstr(0, 0, f"{left}{' ' * padding}{now_str}", width - 1) + except curses.error: + pass + + # Row 1: time range + flow count + status badge + prefix = f"Range: since={since} until={until} | Flows: {flow_count} " + badge = "* Loaded" + badge_attr = curses.color_pair(1) if curses.has_colors() else curses.A_NORMAL + try: + stdscr.addnstr(1, 0, prefix, width - 1) + stdscr.addnstr(1, len(prefix), badge, width - 1 - len(prefix), badge_attr) + except curses.error: + pass + + # Row 2: selection hint or blank + if is_selecting: + sel_hint = ( + f"SELECT Space toggle Enter generate ({selected_count} selected)" + " j/k move s/Esc exit" + ) + sel_attr = curses.color_pair(2) | curses.A_BOLD if curses.has_colors() else curses.A_BOLD + try: + stdscr.addnstr(2, 0, sel_hint, width - 1, sel_attr) + except curses.error: + pass + + def _draw_content( stdscr: Any, width: int, @@ -1457,6 +1504,242 @@ def _draw_content( pass +def _loki_watch_mode(args: argparse.Namespace) -> None: + """Interactive TUI for Loki flows with workload selection. + + Fetches historical flows from a Loki instance, then presents the same + curses TUI as live watch mode so the user can browse, scroll, and + interactively select workloads to generate policies for. + + Keys are the same as live watch mode (j/k scroll, s select, Space + toggle, Enter generate, q quit) except that pause/resume is not + applicable since flows are pre-loaded. + """ + label_keys: list[str] = args.label_keys or DEFAULT_LABEL_KEYS + verdicts: set[str] = {v.upper() for v in args.verdict} if args.verdict else set() + namespaces: set[str] = set(args.namespaces or []) + interval: float = args.interval + + # Validate Loki arguments. + if not args.loki_url: + LOG.error("--loki-url is required when using --from loki") + sys.exit(EXIT_ERROR) + if args.loki_token and args.loki_user: + LOG.error("--loki-token and --loki-user are mutually exclusive") + sys.exit(EXIT_ERROR) + if args.loki_password and not args.loki_user: + LOG.error("--loki-password requires --loki-user") + sys.exit(EXIT_ERROR) + + since_sec = _parse_duration(args.since) + until_sec = _parse_duration(args.until) + + print( + f"Querying Loki at {args.loki_url} " + f"(query={args.loki_query!r}, since={args.since}, until={args.until}) ...", + file=sys.stderr, + ) + + loki_flows: list[dict[str, Any]] = [] + for _, flow in _read_flows_loki( + args.loki_url, + args.loki_query, + since_sec, + until_sec, + args.loki_limit, + loki_user=args.loki_user, + loki_password=args.loki_password, + loki_token=args.loki_token, + loki_tls_ca=args.loki_tls_ca, + ): + loki_flows.append(flow) + + print(f"Loaded {len(loki_flows)} flows from Loki.", file=sys.stderr) + + if not loki_flows: + LOG.warning("No flows returned from Loki query") + sys.exit(EXIT_NO_POLICIES) + + # Header layout matches live watch mode. + HEADER_LINES = 4 + DATA_ROW_OFFSET = 6 + + # Shared state between _run() and the outer scope. + final_content: list[str] = [] + generate_flag: bool = False + selected_keys_final: set[FlowKey] = set() + + def _run(stdscr: curses.window) -> None: # type: ignore[name-defined] + nonlocal final_content, generate_flag + if curses.has_colors(): + curses.use_default_colors() + curses.init_pair(1, curses.COLOR_GREEN, -1) + curses.init_pair(2, curses.COLOR_YELLOW, -1) + curses.init_pair(3, curses.COLOR_RED, -1) + curses.curs_set(0) + stdscr.nodelay(True) + stdscr.keypad(True) + + scroll_offset = 0 + is_following = True + is_paused = False + is_selecting = False + cursor_flow_idx = 0 + selected_keys: set[FlowKey] = set() + + ordered_keys: list[FlowKey] = [] + key_map: dict[int, FlowKey] = {} + content_lines: list[str] = [] + last_refresh = 0.0 + + while True: + key = stdscr.getch() + height, width = stdscr.getmaxyx() + half_page = max(1, (height - HEADER_LINES) // 2) + + if key != -1: + ( + quit_sig, + is_paused, + is_selecting, + cursor_flow_idx, + scroll_offset, + is_following, + gen, + ) = _handle_key( + key, + is_paused=is_paused, + is_selecting=is_selecting, + cursor_flow_idx=cursor_flow_idx, + ordered_keys=ordered_keys, + selected_keys=selected_keys, + scroll_offset=scroll_offset, + is_following=is_following, + half_page=half_page, + content_lines_len=len(content_lines), + ) + if quit_sig: + if gen: + generate_flag = True + selected_keys_final.update(selected_keys) + break + if key == 27: + selected_keys.clear() + + now_mono = time.monotonic() + + # Refresh the report periodically. + if now_mono - last_refresh >= interval: + last_refresh = now_mono + + _, flow_counts, total, matched, _ = _parse_flow_list( + loki_flows, label_keys, verdicts, namespaces + ) + + buf = io.StringIO() + ordered_keys = _print_report( + flow_counts, total, matched, file=buf, term_width=width + ) + key_map = {DATA_ROW_OFFSET + i: fk for i, fk in enumerate(ordered_keys)} + + unknown_keys = _find_unknown_flows(flow_counts) + if unknown_keys: + _print_unknown_warnings(unknown_keys, flow_counts, file=buf) + + buf.write( + "\nj/k line | d/u half-page | g/G top/bottom | s select | q quit\n" + ) + content_lines = buf.getvalue().splitlines() + final_content = content_lines + + if ordered_keys: + cursor_flow_idx = min(cursor_flow_idx, len(ordered_keys) - 1) + else: + cursor_flow_idx = 0 + + # Clamp scroll / auto-follow. + content_viewport = max(1, height - HEADER_LINES) + max_scroll = max(0, len(content_lines) - content_viewport) + + if is_following: + scroll_offset = 0 + else: + scroll_offset = min(scroll_offset, max_scroll) + if scroll_offset <= 0: + is_following = True + + if is_selecting and ordered_keys: + cursor_line = DATA_ROW_OFFSET + cursor_flow_idx + if not is_following: + if cursor_line < scroll_offset: + scroll_offset = cursor_line + elif cursor_line >= scroll_offset + content_viewport: + scroll_offset = min(cursor_line - content_viewport + 1, max_scroll) + + # Draw. + stdscr.erase() + _draw_loki_header( + stdscr, + width, + loki_url=args.loki_url, + loki_query=args.loki_query, + since=args.since, + until=args.until, + flow_count=len(loki_flows), + is_selecting=is_selecting, + selected_count=len(selected_keys), + ) + _draw_content( + stdscr, + width, + height, + content_lines=content_lines, + scroll_offset=scroll_offset, + is_selecting=is_selecting, + is_following=is_following, + cursor_flow_idx=cursor_flow_idx, + key_map=key_map, + selected_keys=selected_keys, + header_lines=HEADER_LINES, + data_row_offset=DATA_ROW_OFFSET, + ) + stdscr.refresh() + time.sleep(0.05) + + try: + curses.wrapper(_run) + except KeyboardInterrupt: + pass + + # Print the last snapshot so the user is not left with a blank screen. + if final_content: + print() + for line in final_content: + print(line) + + # Generate policies from selected flows (triggered by Enter in select mode). + if generate_flag and selected_keys_final: + policies = _build_policies_from_flow_keys(selected_keys_final) + sorted_policies = [(ns, app, rules) for (ns, app), rules in sorted(policies.items())] + n_pol = len(sorted_policies) + print( + f"\nGenerating {n_pol} {'policy' if n_pol == 1 else 'policies'} " + f"from {len(selected_keys_final)} selected flows...", + file=sys.stderr, + ) + if args.dry_run: + _write_multi_doc_yaml(sorted_policies, sys.stdout) + else: + written = _write_policy_dir(sorted_policies, args.output_dir) + print( + f"Wrote {written} {'policy' if written == 1 else 'policies'} " + f"to {os.path.realpath(args.output_dir)}", + file=sys.stderr, + ) + + print("\nLoki watch mode stopped.", file=sys.stderr) + + def _watch_mode(args: argparse.Namespace) -> None: """Live monitoring mode: spawn ``hubble observe`` and refresh the report. @@ -1870,10 +2153,11 @@ def _build_parser() -> argparse.ArgumentParser: "--watch", action="store_true", help=( - "Live monitoring mode: spawn hubble observe internally and " - "continuously refresh the flow-frequency report on screen. " - "Replaces running 'watch -n1 hubble-audit2policy --report-only' " - "and 'hubble observe' in separate terminals." + "Interactive TUI mode with flow-frequency report, scrolling, " + "and workload selection for policy generation. " + "In live mode (default): spawns hubble observe and continuously " + "refreshes. With --from loki: fetches historical flows from " + "Loki and presents them in the same interactive TUI." ), ) parser.add_argument( @@ -2013,9 +2297,12 @@ def main() -> None: level=logging.DEBUG if args.verbose else logging.WARNING, ) - # Live watch mode: spawn hubble observe and refresh the report on screen. + # Interactive watch mode with TUI. if args.watch: - _watch_mode(args) + if args.source == "loki": + _loki_watch_mode(args) + else: + _watch_mode(args) return verdicts: set[str] = {v.upper() for v in args.verdict} if args.verdict else set() diff --git a/pyproject.toml b/pyproject.toml index 1e01e51..00067e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hubble-audit2policy" -version = "0.7.6" +version = "0.9.0" description = "Generate least-privilege CiliumNetworkPolicy YAML from Hubble flow logs." readme = "README.md" license = "Apache-2.0"