Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions cloudinit/distros/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# This file is part of cloud-init. See LICENSE file for license information.

import abc
import functools
import logging
import os
import re
Expand Down Expand Up @@ -52,6 +53,11 @@
from cloudinit.distros.parsers import hosts
from cloudinit.features import ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES
from cloudinit.lifecycle import log_with_downgradable_level
from cloudinit.log.security_event_log import (
OWASPEventLevel,
OWASPEventType,
log_security_event,
)
from cloudinit.net import activators, dhcp, renderers
from cloudinit.net.netops import NetOps
from cloudinit.net.network_state import parse_net_config_data
Expand Down Expand Up @@ -123,6 +129,97 @@
]


def sec_log_user_created(func):
"""A decorator to log a user creation event and group attributes."""

@functools.wraps(func)
def decorator(
self, name: str, *args, groups: Optional[List[str]] = None, **kwargs
):
if not name:
raise RuntimeError(
"sec_log_user_created requires positional param name or kwarg"
)
params = [name]
groups_msg = ""
if groups is None:
groups = []
all_groups = groups + _get_elevated_roles(**kwargs)
if all_groups:
groups_suffix = ",".join(all_groups)
groups_msg = f" in groups: {groups_suffix}"
params.append(f"groups:{groups_suffix}")

response = func(self, name, groups=groups, *args, **kwargs)
log_security_event(
event_type=OWASPEventType.USER_CREATED,
level=OWASPEventLevel.INFO,
description=f"User '{name}' was created{groups_msg}",
event_params=params,
)
return response

return decorator


def sec_log_password_changed_batch(func):
@functools.wraps(func)
def decorator(self, plist_in: List[Tuple[str, str]], *args, **kwargs):
response = func(self, plist_in, *args, **kwargs)
for userid, _ in plist_in:
log_security_event(
event_type=OWASPEventType.AUTHN_PASSWORD_CHANGE,
level=OWASPEventLevel.INFO,
description=f"Password changed for user '{userid}'",
event_params=[userid],
)
return response

return decorator


def sec_log_password_changed(func):
"""A decorator logging a password change event."""

@functools.wraps(func)
def decorator(self, user: str, *args, **kwargs):
response = func(self, user, *args, **kwargs)
log_security_event(
event_type=OWASPEventType.AUTHN_PASSWORD_CHANGE,
level=OWASPEventLevel.INFO,
description=f"Password changed for user '{user}'",
event_params=[user],
)
return response

return decorator


def sec_log_system_shutdown(func):
"""A decorator logging a system shutdown event."""

@functools.wraps(func)
def decorator(cls, mode: str, delay: str, message):
if mode == "reboot":
event_type = OWASPEventType.SYS_RESTART
description = "System restart initiated"
else:
event_type = OWASPEventType.SYS_SHUTDOWN
description = "System shutdown initiated"
if message:
description += f": {message}"

log_security_event(
event_type=event_type,
level=OWASPEventLevel.INFO,
description=description,
additional_data={"delay": delay, "mode": mode},
)
return func(cls, mode=mode, delay=delay, message=message)

return decorator


class PackageInstallerError(Exception):
pass

Expand Down Expand Up @@ -1599,6 +1696,21 @@ def wait_for_network(self) -> None:
"""


def _get_elevated_roles(
*,
sudo: Optional[str] = None,
doas: Optional[List[str]] = None,
**kwargs,
) -> List[str]:
"""Return a list of elevated roles provided as keyword arguments."""
elevated_roles = []
if sudo:
elevated_roles.append("sudo")
if doas:
elevated_roles.append("doas")
return elevated_roles


def _apply_hostname_transformations_to_url(url: str, transformations: list):
"""
Apply transformations to a URL's hostname, return transformed URL.
Expand Down
77 changes: 75 additions & 2 deletions cloudinit/log/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import collections.abc # pylint: disable=import-error
import copy
import io
import json
import logging
import logging.config
import logging.handlers
Expand All @@ -22,6 +23,8 @@
from typing import DefaultDict

DEFAULT_LOG_FORMAT = "%(asctime)s - %(filename)s[%(levelname)s]: %(message)s"
SECURITY_LOG_FORMAT = "%(message)s"
SECURITY = logging.WARNING - 5
DEPRECATED = 35
TRACE = logging.DEBUG - 5

Expand All @@ -41,17 +44,79 @@ def trace(self, *args, **kwargs):
def deprecated(self, *args, **kwargs):
pass

def security(self, *args, **kwargs):
pass


SECURITY_LOG_FILE = "/var/log/cloud-init-security.log"


class SecurityOnlyFilter(logging.Filter):
"""Pass only SECURITY level records."""

def filter(self, record) -> bool:
return record.levelno == SECURITY


class NoSecurityFilter(logging.Filter):
"""Block SECURITY level records from non-security handlers."""

def filter(self, record) -> bool:
return record.levelno != SECURITY


class SecurityFormatter(logging.Formatter):
"""Inject a 'datetime' field (UTC ISO-8601) into SECURITY JSON messages."""

# Provide ISO-8601 millisecond details and TZ info in datetime value.
default_time_format = "%Y-%m-%dT%H:%M:%S"
default_msec_format = "%s.%03d+00:00"

def format(self, record: logging.LogRecord) -> str:
# Use record.msg instead of getMessage which formats dict to JSON
if not isinstance(record.msg, dict):
raise ValueError(
f"SECURITY logs expected dict but found: {record.msg}"
)
# Produce compressed JSON lines with separators to avoid whitespace.
return json.dumps(
{
**record.msg,
"datetime": self.formatTime(record),
},
separators=(",", ":"),
)


def setup_basic_logging(level=logging.DEBUG, formatter=None):
formatter = formatter or logging.Formatter(DEFAULT_LOG_FORMAT)
root = logging.getLogger()
console = logging.StreamHandler(sys.stderr)
console.setFormatter(formatter)
console.setLevel(level)
console.addFilter(NoSecurityFilter())
root.addHandler(console)
root.setLevel(level)


def setup_security_logging(
root: logging.Logger, log_file: str = SECURITY_LOG_FILE
) -> None:
"""Attach a FileHandler routing SECURITY records to log_file if absent."""
for h in root.handlers:
if getattr(h, "baseFilename", None) == log_file:
return # handler already attached

try:
handler = logging.FileHandler(log_file)
except OSError:
return
handler.setFormatter(SecurityFormatter())
handler.addFilter(SecurityOnlyFilter())
handler.setLevel(SECURITY)
root.addHandler(handler)


def flush_loggers(root):
if not root:
return
Expand All @@ -63,7 +128,7 @@ def flush_loggers(root):


def define_extra_loggers() -> None:
"""Add DEPRECATED and TRACE log levels to the logging module."""
"""Add SECURITY, DEPRECATED and TRACE log levels to the logging module."""

def new_logger(level):
def log_at_level(self, message, *args, **kwargs):
Expand All @@ -72,8 +137,10 @@ def log_at_level(self, message, *args, **kwargs):

return log_at_level

logging.addLevelName(SECURITY, "SECURITY")
logging.addLevelName(DEPRECATED, "DEPRECATED")
logging.addLevelName(TRACE, "TRACE")
setattr(logging.Logger, "security", new_logger(SECURITY))
setattr(logging.Logger, "deprecated", new_logger(DEPRECATED))
setattr(logging.Logger, "trace", new_logger(TRACE))

Expand Down Expand Up @@ -122,6 +189,9 @@ def setup_logging(cfg=None):

# Attempt to load its config.
logging.config.fileConfig(log_cfg)
for h in root_logger.handlers:
h.addFilter(NoSecurityFilter())
setup_security_logging(root_logger)

Comment thread
blackboxsw marked this conversation as resolved.
# Configure warning exporter after loading logging configuration
root_logger.addHandler(exporter)
Expand Down Expand Up @@ -216,7 +286,10 @@ def configure_root_logger():
# add handler only to the root logger
handler = LogExporter()
handler.setLevel(logging.WARN)
logging.getLogger().addHandler(handler)
handler.addFilter(NoSecurityFilter())
root_logger = logging.getLogger()
root_logger.addHandler(handler)
setup_security_logging(root_logger)

# LogRecord allows us to report more useful information than __init__.py
logging.setLogRecordFactory(CloudInitLogRecord)
90 changes: 90 additions & 0 deletions cloudinit/log/security_event_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# This file is part of cloud-init. See LICENSE file for license information.

"""
OWASP-formatted Security Event Logging for cloud-init.

This module provides security event logging following the OWASP Logging
Vocabulary Cheat Sheet:
https://github.com/OWASP/CheatSheetSeries/blob/master/cheatsheets/Logging_Vocabulary_Cheat_Sheet.md

Security events are logged in JSON Lines format with standardized fields:
- datetime: ISO 8601 timestamp with UTC offset
- appid: Application identifier (canonical.cloud-init)
- type: "security"
- event: Event type with optional parameters (e.g., user_created:root,ubuntu)
- level: INFO, WARN, or CRITICAL
- description: Human-readable summary
- hostname: System hostname
"""

import logging
from enum import Enum
from typing import Any, Dict, List, Optional

from cloudinit import util
from cloudinit.log import loggers

LOG = logging.getLogger(__name__)

# Hard-coded application identifier
APP_ID = "canonical.cloud-init"


class OWASPEventLevel(Enum):
"""OWASP log levels."""

INFO = "INFO"
WARN = "WARN"
CRITICAL = "CRITICAL"


class OWASPEventType(Enum):
"""OWASP security event types."""

# Authentication events [AUTHN]
AUTHN_PASSWORD_CHANGE = "authn_password_change"

# System events [SYS]
SYS_SHUTDOWN = "sys_shutdown"
SYS_RESTART = "sys_restart"

# User management events [USER]
USER_CREATED = "user_created"
# TODO(USER_UPDATED = "user_updated")


def log_security_event(
event_type: OWASPEventType,
level: OWASPEventLevel,
description: str,
event_params: Optional[List[str]] = None,
additional_data: Optional[Dict[str, Any]] = None,
) -> None:
"""
Log a security event in OWASP format.

:param event_type: Type of security event.
:param level: OWASP Log level (INFO, WARN, CRITICAL).
:param description: Human-readable description of the event.
:param event_params: Parameters to include in the event string.
:param additional_data: Additional context-specific data.
"""
# cloud-init is the default primary 'actor' for any system change operation
params = ["cloud-init"]
if event_params:
params.extend(event_params)
event_str = f"{event_type.value}:{','.join(params)}"
event = {
"appid": APP_ID,
"type": "security",
"event": event_str,
"level": str(level.value),
"description": description,
"hostname": util.get_hostname(),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the purpose of this is to identify the where the event occured if logs are streamed somewhere, this fails to solve the intended problem: hostname changes partway through boot.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hostname is set by cloud-init very early in boot - init-local calls update_hostname even before some platforms make an early dhcpcd request

The current set of logged operations do not happen until well after that:

I don't think this concern is applicable for the types of logs we are currently generating.

}
if additional_data:
# Merge additional non-empty data but don't overwrite core fields
event.update(
{k: v for k, v in additional_data.items() if v and k not in event}
)
LOG.log(loggers.SECURITY, event)
25 changes: 25 additions & 0 deletions tests/unittests/distros/test__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,28 @@ def test_dhcp_configuration(
distro = Distro("", {}, {})
distro._cfg = config
assert isinstance(distro.dhcp_client, chosen_client)


class TestGetElevatedRoles:
"""Tests for get_elevated_roles."""

@pytest.mark.parametrize(
"kwargs,expected",
[
pytest.param({}, [], id="no_kwargs_returns_empty"),
pytest.param({"sudo": True}, ["sudo"], id="sudo_only"),
pytest.param({"doas": True}, ["doas"], id="doas_only"),
pytest.param(
{"sudo": True, "doas": True},
["sudo", "doas"],
id="sudo_and_doas",
),
pytest.param({"sudo": False}, [], id="falsy_sudo_excluded"),
pytest.param({"doas": False}, [], id="falsy_doas_excluded"),
pytest.param(
{"shell": "/bin/bash"}, [], id="unrelated_kwargs_ignored"
),
],
)
def test_get_elevated_roles(self, kwargs, expected):
assert distros._get_elevated_roles(**kwargs) == expected
Loading
Loading