Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install ruff
- run: pip install -e ".[dev]"
- run: ruff check .
- run: ruff format --check .

Expand All @@ -28,7 +28,7 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install mypy pyyaml types-PyYAML
- run: pip install -e ".[dev]"
- run: mypy hubble_audit2policy.py

test:
Expand Down
31 changes: 30 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,12 +1,41 @@
# macOS
.DS_Store

# Python
__pycache__/
*.py[cod]
*.pyo
*.egg-info/
*.egg
dist/
build/
.venv/
venv/
env/
*.whl

# Editors / IDEs
.vscode/
.idea/
*.swp
*.swo
*~
\#*\#
.project
.settings/

# Tool caches
.mypy_cache/
.pytest_cache/
.ruff_cache/
.claude/
.tox/
.nox/
.coverage
htmlcov/

# Claude
.claude/

# OS artifacts
Thumbs.db
Desktop.ini
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.7.5] - 2026-03-28

### Fixed

- Fix `_ReconnectState` dataclass: promote `_DELAY_INIT` and `_DELAY_MAX` from instance fields to `ClassVar` so they no longer pollute `__init__`, `__eq__`, or `__repr__`.
- Fix thread-safety gap in `LiveFlowStore.suspend_capture()`: acquire lock when swapping `_capture_fh` to match lock discipline in `add()`.
- Fix `--capture-file` help text: replace incorrect "Append" wording with "Write" to match actual `"w"` (overwrite) behaviour.

### Changed

- CI lint and typecheck jobs now install from `.[dev]` instead of ad-hoc unpinned `pip install`, ensuring consistent tool versions across all CI jobs.
- Expand `.gitignore` with common Unix/Linux/IDE entries (vim swap files, `.idea/`, `.tox/`, `.coverage`, `Thumbs.db`, etc.).

### Added

- CLI integration tests (`tests/test_cli.py`): 14 new tests covering dry-run output, output-dir writes, single-file mode, report-only exit code, namespace/verdict filtering, empty/missing file exit codes, and all argument validation paths (134 total).

## [0.7.4] - 2026-03-28

### Fixed
Expand Down
21 changes: 12 additions & 9 deletions hubble_audit2policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from __future__ import annotations

__version__ = "0.7.4"
__version__ = "0.7.5"
__author__ = "noexecstack"
__license__ = "Apache-2.0"

Expand All @@ -34,7 +34,7 @@
from collections import Counter, defaultdict, deque
from collections.abc import Generator, Iterator
from contextlib import contextmanager
from typing import IO, Any, cast
from typing import IO, Any, ClassVar, cast

import yaml

Expand Down Expand Up @@ -990,12 +990,14 @@ def snapshot(self) -> list[dict[str, Any]]:
@contextmanager
def suspend_capture(self) -> Generator[None, None, None]:
"""Context manager that suppresses flow capture for its duration."""
saved = self._capture_fh
self._capture_fh = None
with self._lock:
saved = self._capture_fh
self._capture_fh = None
try:
yield
finally:
self._capture_fh = saved
with self._lock:
self._capture_fh = saved

@property
def count(self) -> int:
Expand Down Expand Up @@ -1172,10 +1174,11 @@ def _is_workload(ns: str, app: str) -> bool:
class _ReconnectState:
"""Mutable state for exponential-backoff reconnection in watch mode."""

_DELAY_INIT: ClassVar[float] = 2.0
_DELAY_MAX: ClassVar[float] = 60.0

delay: float = 2.0
at: float = 0.0
_DELAY_INIT: float = dataclasses.field(default=2.0, repr=False)
_DELAY_MAX: float = dataclasses.field(default=60.0, repr=False)

def reset(self) -> None:
self.delay = self._DELAY_INIT
Expand Down Expand Up @@ -1905,8 +1908,8 @@ def _build_parser() -> argparse.ArgumentParser:
"--capture-file",
metavar="FILE",
help=(
"Append all flows received during --watch mode to FILE as JSONL. "
"The file is overwritten at the start of each session. "
"Write all flows received during --watch mode to FILE as JSONL. "
"The file is created (or overwritten) at the start of each session. "
"Use the file later to generate policies: "
"%(prog)s FILE -o policies/"
),
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "hubble-audit2policy"
version = "0.7.4"
version = "0.7.5"
description = "Generate least-privilege CiliumNetworkPolicy YAML from Hubble flow logs."
readme = "README.md"
license = "Apache-2.0"
Expand Down
240 changes: 240 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
"""Tests for CLI entry point: argument validation, dry-run output, and exit codes."""

from __future__ import annotations

import json
import os
import tempfile
from typing import Any
from unittest import mock

import pytest
import yaml

import hubble_audit2policy as h


def _make_flow(
src_app: str = "frontend",
dst_app: str = "api",
port: int = 8080,
) -> dict[str, Any]:
"""Minimal Hubble flow dict for CLI tests."""
return {
"verdict": "AUDIT",
"source": {
"namespace": "default",
"labels": [f"k8s:app={src_app}"],
"pod_name": f"{src_app}-abc",
},
"destination": {
"namespace": "default",
"labels": [f"k8s:app={dst_app}"],
"pod_name": f"{dst_app}-xyz",
},
"l4": {"TCP": {"destination_port": port}},
}


def _write_flows_file(flows: list[dict[str, Any]]) -> str:
"""Write flows to a temp JSONL file and return the path."""
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
for flow in flows:
f.write(json.dumps(flow) + "\n")
f.close()
return f.name


class TestCliFileMode:
def test_dry_run_prints_yaml(self) -> None:
path = _write_flows_file([_make_flow()])
try:
with mock.patch("sys.argv", ["hubble-audit2policy", path, "--dry-run", "--no-enrich"]):
buf = _capture_stdout(h.main)
docs = list(yaml.safe_load_all(buf))
assert len(docs) >= 1
assert docs[0]["kind"] == "CiliumNetworkPolicy"
finally:
os.unlink(path)

def test_output_dir_writes_files(self) -> None:
path = _write_flows_file([_make_flow()])
try:
with tempfile.TemporaryDirectory() as tmpdir:
with mock.patch(
"sys.argv",
["hubble-audit2policy", path, "-o", tmpdir, "--no-enrich"],
):
h.main()
files = os.listdir(tmpdir)
assert any(f.endswith(".yaml") for f in files)
finally:
os.unlink(path)

def test_single_file_mode(self) -> None:
path = _write_flows_file([_make_flow()])
try:
with tempfile.TemporaryDirectory() as tmpdir:
outfile = os.path.join(tmpdir, "all.yaml")
with mock.patch(
"sys.argv",
["hubble-audit2policy", path, "--single-file", outfile, "--no-enrich"],
):
h.main()
assert os.path.isfile(outfile)
with open(outfile) as f:
docs = list(yaml.safe_load_all(f))
assert len(docs) >= 1
finally:
os.unlink(path)

def test_report_only_exits_ok(self) -> None:
path = _write_flows_file([_make_flow()])
try:
with mock.patch(
"sys.argv",
["hubble-audit2policy", path, "--report-only", "--no-enrich"],
):
with pytest.raises(SystemExit) as exc_info:
h.main()
assert exc_info.value.code == h.EXIT_OK
finally:
os.unlink(path)

def test_namespace_filter(self) -> None:
path = _write_flows_file([_make_flow()])
try:
with mock.patch(
"sys.argv",
["hubble-audit2policy", path, "--dry-run", "--no-enrich", "-n", "nonexistent"],
):
with pytest.raises(SystemExit) as exc_info:
h.main()
assert exc_info.value.code == h.EXIT_NO_POLICIES
finally:
os.unlink(path)

def test_verdict_filter(self) -> None:
path = _write_flows_file([_make_flow()])
try:
with mock.patch(
"sys.argv",
[
"hubble-audit2policy",
path,
"--dry-run",
"--no-enrich",
"--verdict",
"FORWARDED",
],
):
with pytest.raises(SystemExit) as exc_info:
h.main()
assert exc_info.value.code == h.EXIT_NO_POLICIES
finally:
os.unlink(path)

def test_empty_file_exits_no_policies(self) -> None:
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
f.close()
try:
with mock.patch(
"sys.argv",
["hubble-audit2policy", f.name, "--dry-run", "--no-enrich"],
):
with pytest.raises(SystemExit) as exc_info:
h.main()
assert exc_info.value.code == h.EXIT_NO_POLICIES
finally:
os.unlink(f.name)

def test_nonexistent_file_exits_error(self) -> None:
with mock.patch(
"sys.argv",
["hubble-audit2policy", "/tmp/no_such_hubble_file_12345.jsonl", "--no-enrich"],
):
with pytest.raises(SystemExit) as exc_info:
h.main()
assert exc_info.value.code == h.EXIT_ERROR


class TestCliArgValidation:
def test_missing_flows_file_errors(self) -> None:
with mock.patch("sys.argv", ["hubble-audit2policy"]):
with pytest.raises(SystemExit) as exc_info:
h.main()
assert exc_info.value.code == 2 # argparse error

def test_loki_without_url_errors(self) -> None:
with mock.patch("sys.argv", ["hubble-audit2policy", "--from", "loki"]):
with pytest.raises(SystemExit) as exc_info:
h.main()
assert exc_info.value.code == 2

def test_loki_token_and_user_mutual_exclusion(self) -> None:
with mock.patch(
"sys.argv",
[
"hubble-audit2policy",
"--from",
"loki",
"--loki-url",
"http://loki:3100",
"--loki-token",
"tok",
"--loki-user",
"admin",
],
):
with pytest.raises(SystemExit) as exc_info:
h.main()
assert exc_info.value.code == 2

def test_loki_password_without_user_errors(self) -> None:
with mock.patch(
"sys.argv",
[
"hubble-audit2policy",
"--from",
"loki",
"--loki-url",
"http://loki:3100",
"--loki-password",
"secret",
],
):
with pytest.raises(SystemExit) as exc_info:
h.main()
assert exc_info.value.code == 2


class TestCliBuildParser:
def test_version_flag(self) -> None:
parser = h._build_parser()
with pytest.raises(SystemExit) as exc_info:
parser.parse_args(["-V"])
assert exc_info.value.code == 0

def test_defaults(self) -> None:
parser = h._build_parser()
args = parser.parse_args(["flows.json"])
assert args.flows_file == "flows.json"
assert args.output_dir == "."
assert args.source == "file"
assert args.dry_run is False
assert args.watch is False
assert args.no_enrich is False
assert args.verbose is False


def _capture_stdout(func: object) -> str:
"""Call func and return everything it wrote to stdout."""
from io import StringIO

buf = StringIO()
with mock.patch("sys.stdout", buf):
try:
func() # type: ignore[operator]
except SystemExit:
pass
return buf.getvalue()
Loading