Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 294 additions & 7 deletions hubble_audit2policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from __future__ import annotations

__version__ = "0.7.5"
__version__ = "0.9.0"
__author__ = "noexecstack"
__license__ = "Apache-2.0"

Expand Down Expand Up @@ -1407,6 +1407,53 @@ def _draw_header(
pass


def _draw_loki_header(
stdscr: Any,
width: int,
*,
loki_url: str,
loki_query: str,
since: str,
until: str,
flow_count: int,
is_selecting: bool,
selected_count: int,
) -> None:
"""Render the fixed header rows (0-2) for Loki watch mode TUI."""
# Row 0: source + timestamp
now_str = time.strftime("%Y-%m-%d %H:%M:%S")
left = f"Loki: {loki_url} query={loki_query}"
if len(left) > width - len(now_str) - 2:
left = left[: width - len(now_str) - 5] + "..."
padding = max(1, width - len(left) - len(now_str))
try:
stdscr.addnstr(0, 0, f"{left}{' ' * padding}{now_str}", width - 1)
except curses.error:
pass

# Row 1: time range + flow count + status badge
prefix = f"Range: since={since} until={until} | Flows: {flow_count} "
badge = "* Loaded"
badge_attr = curses.color_pair(1) if curses.has_colors() else curses.A_NORMAL
try:
stdscr.addnstr(1, 0, prefix, width - 1)
stdscr.addnstr(1, len(prefix), badge, width - 1 - len(prefix), badge_attr)
except curses.error:
pass

# Row 2: selection hint or blank
if is_selecting:
sel_hint = (
f"SELECT Space toggle Enter generate ({selected_count} selected)"
" j/k move s/Esc exit"
)
sel_attr = curses.color_pair(2) | curses.A_BOLD if curses.has_colors() else curses.A_BOLD
try:
stdscr.addnstr(2, 0, sel_hint, width - 1, sel_attr)
except curses.error:
pass


def _draw_content(
stdscr: Any,
width: int,
Expand Down Expand Up @@ -1457,6 +1504,242 @@ def _draw_content(
pass


def _loki_watch_mode(args: argparse.Namespace) -> None:
"""Interactive TUI for Loki flows with workload selection.

Fetches historical flows from a Loki instance, then presents the same
curses TUI as live watch mode so the user can browse, scroll, and
interactively select workloads to generate policies for.

Keys are the same as live watch mode (j/k scroll, s select, Space
toggle, Enter generate, q quit) except that pause/resume is not
applicable since flows are pre-loaded.
"""
label_keys: list[str] = args.label_keys or DEFAULT_LABEL_KEYS
verdicts: set[str] = {v.upper() for v in args.verdict} if args.verdict else set()
namespaces: set[str] = set(args.namespaces or [])
interval: float = args.interval

# Validate Loki arguments.
if not args.loki_url:
LOG.error("--loki-url is required when using --from loki")
sys.exit(EXIT_ERROR)
if args.loki_token and args.loki_user:
LOG.error("--loki-token and --loki-user are mutually exclusive")
sys.exit(EXIT_ERROR)
if args.loki_password and not args.loki_user:
LOG.error("--loki-password requires --loki-user")
sys.exit(EXIT_ERROR)

since_sec = _parse_duration(args.since)
until_sec = _parse_duration(args.until)

print(
f"Querying Loki at {args.loki_url} "
f"(query={args.loki_query!r}, since={args.since}, until={args.until}) ...",
file=sys.stderr,
)

loki_flows: list[dict[str, Any]] = []
for _, flow in _read_flows_loki(
args.loki_url,
args.loki_query,
since_sec,
until_sec,
args.loki_limit,
loki_user=args.loki_user,
loki_password=args.loki_password,
loki_token=args.loki_token,
loki_tls_ca=args.loki_tls_ca,
):
loki_flows.append(flow)

print(f"Loaded {len(loki_flows)} flows from Loki.", file=sys.stderr)

if not loki_flows:
LOG.warning("No flows returned from Loki query")
sys.exit(EXIT_NO_POLICIES)

# Header layout matches live watch mode.
HEADER_LINES = 4
DATA_ROW_OFFSET = 6

# Shared state between _run() and the outer scope.
final_content: list[str] = []
generate_flag: bool = False
selected_keys_final: set[FlowKey] = set()

def _run(stdscr: curses.window) -> None: # type: ignore[name-defined]
nonlocal final_content, generate_flag
if curses.has_colors():
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_GREEN, -1)
curses.init_pair(2, curses.COLOR_YELLOW, -1)
curses.init_pair(3, curses.COLOR_RED, -1)
curses.curs_set(0)
stdscr.nodelay(True)
stdscr.keypad(True)

scroll_offset = 0
is_following = True
is_paused = False
is_selecting = False
cursor_flow_idx = 0
selected_keys: set[FlowKey] = set()

ordered_keys: list[FlowKey] = []
key_map: dict[int, FlowKey] = {}
content_lines: list[str] = []
last_refresh = 0.0

while True:
key = stdscr.getch()
height, width = stdscr.getmaxyx()
half_page = max(1, (height - HEADER_LINES) // 2)

if key != -1:
(
quit_sig,
is_paused,
is_selecting,
cursor_flow_idx,
scroll_offset,
is_following,
gen,
) = _handle_key(
key,
is_paused=is_paused,
is_selecting=is_selecting,
cursor_flow_idx=cursor_flow_idx,
ordered_keys=ordered_keys,
selected_keys=selected_keys,
scroll_offset=scroll_offset,
is_following=is_following,
half_page=half_page,
content_lines_len=len(content_lines),
)
if quit_sig:
if gen:
generate_flag = True
selected_keys_final.update(selected_keys)
break
if key == 27:
selected_keys.clear()

now_mono = time.monotonic()

# Refresh the report periodically.
if now_mono - last_refresh >= interval:
last_refresh = now_mono

_, flow_counts, total, matched, _ = _parse_flow_list(
loki_flows, label_keys, verdicts, namespaces
)

buf = io.StringIO()
ordered_keys = _print_report(
flow_counts, total, matched, file=buf, term_width=width
)
key_map = {DATA_ROW_OFFSET + i: fk for i, fk in enumerate(ordered_keys)}

unknown_keys = _find_unknown_flows(flow_counts)
if unknown_keys:
_print_unknown_warnings(unknown_keys, flow_counts, file=buf)

buf.write(
"\nj/k line | d/u half-page | g/G top/bottom | s select | q quit\n"
)
content_lines = buf.getvalue().splitlines()
final_content = content_lines

if ordered_keys:
cursor_flow_idx = min(cursor_flow_idx, len(ordered_keys) - 1)
else:
cursor_flow_idx = 0

# Clamp scroll / auto-follow.
content_viewport = max(1, height - HEADER_LINES)
max_scroll = max(0, len(content_lines) - content_viewport)

if is_following:
scroll_offset = 0
else:
scroll_offset = min(scroll_offset, max_scroll)
if scroll_offset <= 0:
is_following = True

if is_selecting and ordered_keys:
cursor_line = DATA_ROW_OFFSET + cursor_flow_idx
if not is_following:
if cursor_line < scroll_offset:
scroll_offset = cursor_line
elif cursor_line >= scroll_offset + content_viewport:
scroll_offset = min(cursor_line - content_viewport + 1, max_scroll)

# Draw.
stdscr.erase()
_draw_loki_header(
stdscr,
width,
loki_url=args.loki_url,
loki_query=args.loki_query,
since=args.since,
until=args.until,
flow_count=len(loki_flows),
is_selecting=is_selecting,
selected_count=len(selected_keys),
)
_draw_content(
stdscr,
width,
height,
content_lines=content_lines,
scroll_offset=scroll_offset,
is_selecting=is_selecting,
is_following=is_following,
cursor_flow_idx=cursor_flow_idx,
key_map=key_map,
selected_keys=selected_keys,
header_lines=HEADER_LINES,
data_row_offset=DATA_ROW_OFFSET,
)
stdscr.refresh()
time.sleep(0.05)

try:
curses.wrapper(_run)
except KeyboardInterrupt:
pass

# Print the last snapshot so the user is not left with a blank screen.
if final_content:
print()
for line in final_content:
print(line)

# Generate policies from selected flows (triggered by Enter in select mode).
if generate_flag and selected_keys_final:
policies = _build_policies_from_flow_keys(selected_keys_final)
sorted_policies = [(ns, app, rules) for (ns, app), rules in sorted(policies.items())]
n_pol = len(sorted_policies)
print(
f"\nGenerating {n_pol} {'policy' if n_pol == 1 else 'policies'} "
f"from {len(selected_keys_final)} selected flows...",
file=sys.stderr,
)
if args.dry_run:
_write_multi_doc_yaml(sorted_policies, sys.stdout)
else:
written = _write_policy_dir(sorted_policies, args.output_dir)
print(
f"Wrote {written} {'policy' if written == 1 else 'policies'} "
f"to {os.path.realpath(args.output_dir)}",
file=sys.stderr,
)

print("\nLoki watch mode stopped.", file=sys.stderr)


def _watch_mode(args: argparse.Namespace) -> None:
"""Live monitoring mode: spawn ``hubble observe`` and refresh the report.

Expand Down Expand Up @@ -1870,10 +2153,11 @@ def _build_parser() -> argparse.ArgumentParser:
"--watch",
action="store_true",
help=(
"Live monitoring mode: spawn hubble observe internally and "
"continuously refresh the flow-frequency report on screen. "
"Replaces running 'watch -n1 hubble-audit2policy --report-only' "
"and 'hubble observe' in separate terminals."
"Interactive TUI mode with flow-frequency report, scrolling, "
"and workload selection for policy generation. "
"In live mode (default): spawns hubble observe and continuously "
"refreshes. With --from loki: fetches historical flows from "
"Loki and presents them in the same interactive TUI."
),
)
parser.add_argument(
Expand Down Expand Up @@ -2013,9 +2297,12 @@ def main() -> None:
level=logging.DEBUG if args.verbose else logging.WARNING,
)

# Live watch mode: spawn hubble observe and refresh the report on screen.
# Interactive watch mode with TUI.
if args.watch:
_watch_mode(args)
if args.source == "loki":
_loki_watch_mode(args)
else:
_watch_mode(args)
return

verdicts: set[str] = {v.upper() for v in args.verdict} if args.verdict else set()
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "hubble-audit2policy"
version = "0.7.6"
version = "0.9.0"
description = "Generate least-privilege CiliumNetworkPolicy YAML from Hubble flow logs."
readme = "README.md"
license = "Apache-2.0"
Expand Down
Loading