Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions ssh_zone_handler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,6 @@ def __init__(self, config: ZoneHandlerConf) -> None:
f"--user={self.service_user}",
)

def __zone_list(self, username: str) -> Sequence[str]:
user_zones: Sequence[str] = ()

try:
user_zones = tuple(self.config.users[username].zones)
except KeyError:
pass

return user_zones

@staticmethod
def __parse(
ssh_command: str,
Expand Down Expand Up @@ -141,8 +131,6 @@ def __logs(self, zones: list[str]) -> None:
failure = f"Failed to output log lines for the following zone(s): {zones_str}"
command = ("/usr/bin/sudo", f"--user={self.journal_user}") + self.journal_cmd

logging.info("Outputting logs for the following zone(s): %s", zones_str)

result: CompletedProcess[str] = self._runner(command, failure)
log_lines = result.stdout.split("\n")

Expand All @@ -164,8 +152,7 @@ def invoke(self, ssh_command: str, username: str) -> None:
:param username: Current user, executing the program
"""

user_zones: Sequence[str] = self.__zone_list(username)

user_zones: Sequence[str] = tuple(self.config.users[username].zones)
if not user_zones:
raise InvokeError(f'No zones configured for user "{username}"')

Expand All @@ -177,16 +164,32 @@ def invoke(self, ssh_command: str, username: str) -> None:
raise InvokeError('Invalid command, try "help"')

if command == "help":
logging.info("'%s' runs help command", username)
self.__usage()
elif command == "list":
uzn: str
logging.info("'%s' lists available zones", username)
for uzn in user_zones:
print(uzn)
elif not zones:
raise InvokeError("No valid zone provided")
elif command == "dump":
logging.info(
"'%s' requests dump of '%s' zone content",
username,
zones[0],
)
self._dump(zones[0])
elif command == "logs":
logging.info(
"'%s' requests log output for the following zone(s): %s",
username,
", ".join(zones),
)
self.__logs(zones)
elif command == "retransfer":
logging.info(
"'%s' requests '%s' AXFR zone retransfer",
username,
zones[0],
)
self._retransfer(zones[0])
5 changes: 0 additions & 5 deletions ssh_zone_handler/bind.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""BIND specific subclasses"""

import logging
import re
from collections.abc import Iterator
from subprocess import CompletedProcess
Expand Down Expand Up @@ -50,8 +49,6 @@ def __lookup(self, zone: str, failure: str) -> str | None:
return zone_file

def _dump(self, zone: str) -> None:
logging.info('Outputting "%s" zone content', zone)

lookup_failure = f'Failed to lookup zone file for zone "{zone}"'
zone_file: str | None = self.__lookup(zone, lookup_failure)
if not zone_file:
Expand Down Expand Up @@ -87,8 +84,6 @@ def _filter_logs(log_lines: list[str], zones: list[str]) -> Iterator[str]:
yield line

def _retransfer(self, zone: str) -> None:
logging.info('Triggering "%s" AXFR zone retransfer', zone)

failure = f'Failed to trigger retransfer of zone "{zone}"'
command = self.rndc_prefix + ("retransfer", zone)

Expand Down
5 changes: 0 additions & 5 deletions ssh_zone_handler/knot.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Knot specific subclasses"""

import logging
from collections.abc import Iterator
from subprocess import CompletedProcess
from typing import Final
Expand Down Expand Up @@ -50,8 +49,6 @@ def __filter_dump(content: str, zone: str) -> str:
return "\n".join(filtered)

def _dump(self, zone: str) -> None:
logging.info('Outputting "%s" zone content', zone)

command = self.knotc_prefix + ("zone-read", zone)
run_failure = f'Failed to dump content of zone "{zone}"'

Expand All @@ -69,8 +66,6 @@ def _filter_logs(log_lines: list[str], zones: list[str]) -> Iterator[str]:
yield line

def _retransfer(self, zone: str) -> None:
logging.info('Triggering "%s" AXFR zone retransfer', zone)

failure = f'Failed to trigger retransfer of zone "{zone}"'
command = self.knotc_prefix + ("zone-retransfer", zone)

Expand Down
3 changes: 1 addition & 2 deletions tests/data/bind-alternative-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ system:
systemd_unit: bind9.service
users:
bob:
zones:
- example.org
zones: []
14 changes: 10 additions & 4 deletions tests/test_ssh_zone_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_cli_read_config():
"users": {
"bob": {
"ssh_keys": [],
"zones": ["example.org"],
"zones": [],
},
},
}
Expand Down Expand Up @@ -181,12 +181,18 @@ def test_cli_zone_wrapper(caplog, capsys, mocker):
assert captured_outdated == "Invalid server side config file\n"

caplog.clear()
mocker.patch("sys.argv", ["_", "mallory"])
mocker.patch("sys.argv", ["_", "bob"])
os.environ["SSH_ORIGINAL_COMMAND"] = "help"
with pytest.raises(SystemExit):
wrapper(Path("./tests/data/bind-alternative-config.yaml"))
captured_nozones_user = caplog.text
assert captured_nozones_user == 'No zones configured for user "bob"\n'

caplog.clear()
mocker.patch("sys.argv", ["_", "mallory"])
os.environ["SSH_ORIGINAL_COMMAND"] = "help"
with pytest.raises(KeyError):
wrapper(Path("./tests/data/bind-example-config.yaml"))
captured_unconf_user = caplog.text
assert captured_unconf_user == 'No zones configured for user "mallory"\n'


def test_bind_log_filtering():
Expand Down