Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,32 @@ class ColoredFormatter(logging.Formatter):
}

def __init__(self, fmt=None, datefmt=None, style="%", validate=True):
"""Initialize the formatter with a fixed delegate formatter for timestamps.

Args:
fmt: Log format string (unused; delegate formatter controls layout).
datefmt: Date format string (unused; delegate uses %H:%M:%S).
style: Format style character (default: "%").
validate: Whether to validate the format string (default: True).
"""
super().__init__(fmt, datefmt, style, validate)
self.delegate_formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(message)s", datefmt="%H:%M:%S"
)

def format(self, record):
"""Format a log record, injecting ANSI color codes around the level name.

Temporarily replaces ``record.levelname`` with a color-padded version,
delegates to the internal formatter, then restores the original level
name so the record is not mutated for downstream handlers.

Args:
record: The :class:`logging.LogRecord` to format.

Returns:
Formatted log string with color codes applied to the level name.
"""
original_levelname = record.levelname
color = self.LEVEL_COLORS.get(record.levelno, Colors.ENDC)
padded_level = f"{original_levelname:<8}"
Expand Down Expand Up @@ -346,10 +366,30 @@ def print_summary_table(results: List[Dict[str, Any]], dry_run: bool) -> None:
}

def _print_separator(left, mid, right):
"""Print a horizontal table separator using box-drawing characters.

Args:
left: Key for the left-corner character in ``chars``.
mid: Key for the junction character between column segments.
right: Key for the right-corner character in ``chars``.
"""
segments = [chars["h"] * (width + 2) for width in col_widths.values()]
print(f"{chars[left]}{chars[mid].join(segments)}{chars[right]}")

def _print_row(profile, folders, rules, duration, status, is_header=False):
"""Print a single data row with column values padded to fixed widths.

Applies ANSI bold formatting to every cell when ``is_header`` is True
and colors are enabled.

Args:
profile: Profile ID string for the first column.
folders: Folder count value for the second column.
rules: Rule count value for the third column.
duration: Elapsed-time string for the fourth column.
status: Status label for the fifth column.
is_header: When True, wraps all cells in bold ANSI codes.
"""
v = chars["v"]

# 1. Pad raw strings first (so padding is calculated on visible chars)
Expand Down Expand Up @@ -631,6 +671,15 @@ def get_password(
# 2. Clients
# --------------------------------------------------------------------------- #
def _api_client() -> httpx.Client:
"""Create and return an authenticated Control D API HTTP client.

Configures standard JSON ``Accept`` and ``Authorization: Bearer`` headers
using the module-level ``TOKEN`` constant, sets a 30-second timeout, and
disables automatic redirect following to avoid silent data loss.

Returns:
A configured :class:`httpx.Client` ready for Control D API requests.
"""
return httpx.Client(
headers={
"Accept": "application/json",
Expand Down Expand Up @@ -1121,27 +1170,90 @@ def validate_folder_data(data: Dict[str, Any], url: str) -> bool:


def _api_get(client: httpx.Client, url: str) -> httpx.Response:
"""Send an authenticated GET request to the Control D API with retry logic.

Increments the ``control_d_api_calls`` counter under a thread-safe lock
before delegating to :func:`_retry_request`.

Args:
client: Authenticated :class:`httpx.Client` created by :func:`_api_client`.
url: Full URL of the Control D API endpoint to call.

Returns:
The :class:`httpx.Response` from the API.

Raises:
httpx.HTTPError: After all retry attempts are exhausted.
"""
global _api_stats
with _api_stats_lock:
_api_stats["control_d_api_calls"] += 1
return _retry_request(lambda: client.get(url))


def _api_delete(client: httpx.Client, url: str) -> httpx.Response:
"""Send an authenticated DELETE request to the Control D API with retry logic.

Increments the ``control_d_api_calls`` counter under a thread-safe lock
before delegating to :func:`_retry_request`.

Args:
client: Authenticated :class:`httpx.Client` created by :func:`_api_client`.
url: Full URL of the Control D API endpoint to call.

Returns:
The :class:`httpx.Response` from the API.

Raises:
httpx.HTTPError: After all retry attempts are exhausted.
"""
global _api_stats
with _api_stats_lock:
_api_stats["control_d_api_calls"] += 1
return _retry_request(lambda: client.delete(url))


def _api_post(client: httpx.Client, url: str, data: Dict) -> httpx.Response:
"""Send an authenticated POST request with JSON body to the Control D API.

Increments the ``control_d_api_calls`` counter under a thread-safe lock
before delegating to :func:`_retry_request`.

Args:
client: Authenticated :class:`httpx.Client` created by :func:`_api_client`.
url: Full URL of the Control D API endpoint to call.
data: Dictionary payload to encode as the request body.

Returns:
The :class:`httpx.Response` from the API.

Raises:
httpx.HTTPError: After all retry attempts are exhausted.
"""
global _api_stats
with _api_stats_lock:
_api_stats["control_d_api_calls"] += 1
return _retry_request(lambda: client.post(url, data=data))


def _api_post_form(client: httpx.Client, url: str, data: Dict) -> httpx.Response:
"""Send an authenticated form-encoded POST request to the Control D API.

Sets ``Content-Type: application/x-www-form-urlencoded`` explicitly and
increments the ``control_d_api_calls`` counter under a thread-safe lock
before delegating to :func:`_retry_request`.

Args:
client: Authenticated :class:`httpx.Client` created by :func:`_api_client`.
url: Full URL of the Control D API endpoint to call.
data: Dictionary payload to URL-encode as the request body.

Returns:
The :class:`httpx.Response` from the API.

Raises:
httpx.HTTPError: After all retry attempts are exhausted.
"""
global _api_stats
with _api_stats_lock:
_api_stats["control_d_api_calls"] += 1
Expand Down Expand Up @@ -1666,6 +1778,18 @@ def get_all_existing_rules(
all_rules = set()

def _fetch_folder_rules(folder_id: str) -> List[str]:
"""Fetch all rule primary keys for a single folder from the API.

Silently returns an empty list on HTTP or other errors so that
a single folder failure does not abort the entire sync.

Args:
folder_id: The Control D folder identifier to query.

Returns:
List of rule PK strings contained in the folder, or ``[]`` on
any error.
"""
try:
data = _api_get(client, f"{API_BASE}/{profile_id}/rules/{folder_id}").json()
folder_rules = data.get("body", {}).get("rules", [])
Expand Down Expand Up @@ -1758,6 +1882,18 @@ def warm_up_cache(urls: Sequence[str]) -> None:
# OPTIMIZATION: Combine validation (DNS) and fetching (HTTP) in one task
# to allow validation latency to be parallelized.
def _validate_and_fetch(url: str):
"""Validate a folder URL and fetch its content in a single step.

Combines DNS/URL validation with HTTP fetching so that both
operations can be parallelized across multiple URLs in a thread pool.

Args:
url: Remote URL to validate and fetch.

Returns:
Parsed JSON dict returned by :func:`_gh_get`, or ``None`` if
validation fails.
"""
if validate_folder_url(url):
return _gh_get(url)
return None
Expand Down Expand Up @@ -2094,6 +2230,28 @@ def _process_single_folder(
client: httpx.Client,
batch_executor: Optional[concurrent.futures.Executor] = None,
) -> bool:
"""Create a Control D folder and push all its rules to the given profile.

Reads folder metadata and action settings from ``folder_data``, creates the
folder via the API, then pushes rules either from nested ``rule_groups`` or
directly from the top-level ``rules`` list. An optional shared
``batch_executor`` may be passed in to parallelize rule-push batches across
folders.

Args:
folder_data: Parsed JSON dict describing one folder (group name,
action flags, and rule list).
profile_id: Control D profile identifier to create the folder in.
existing_rules: Set of rule PKs already present in the profile, used
to skip duplicate pushes.
client: Authenticated :class:`httpx.Client` for all API calls.
batch_executor: Optional thread-pool executor shared across folder
workers; when ``None`` a temporary executor is created internally.

Returns:
``True`` if the folder and all its rules were processed successfully,
``False`` if folder creation or any rule-push batch failed.
"""
grp = folder_data["group"]
name = grp["group"].strip()

Expand Down Expand Up @@ -2171,6 +2329,18 @@ def sync_profile(
# OPTIMIZATION: Move validation inside the thread pool to parallelize DNS lookups.
# Previously, sequential validation blocked the main thread.
def _fetch_if_valid(url: str):
"""Return folder data from cache or fetch it after URL validation.

Checks the in-memory cache first to avoid redundant HTTP requests
when :func:`warm_up_cache` has already fetched the content. Falls
back to validating the URL then calling :func:`fetch_folder_data`.

Args:
url: Remote folder JSON URL to retrieve.

Returns:
Parsed folder data dict, or ``None`` if URL validation fails.
"""
# Optimization: If we already have the content in cache, return it directly.
# The content was validated at the time of fetch (warm_up_cache).
# Read directly from cache to avoid calling fetch_folder_data while holding lock.
Expand Down Expand Up @@ -2358,6 +2528,21 @@ def _fetch_if_valid(url: str):
def print_summary_table(
sync_results: List[Dict[str, Any]], success_count: int, total: int, dry_run: bool
) -> None:
"""Print a formatted summary table of sync results for all profiles.

Renders either a Unicode box-drawing table (when colors are enabled) or a
plain ASCII table (fallback). Includes per-profile rows with folder/rule
counts and durations, plus a totals footer row.

Args:
sync_results: List of per-profile result dicts, each containing at
least ``profile``, ``folders``, ``rules``, ``duration``,
``status_label``, and ``success`` keys.
success_count: Number of profiles that completed without errors.
total: Total number of profiles that were processed.
dry_run: When ``True``, labels the table header as "DRY RUN SUMMARY"
and uses a cyan title color instead of the default header color.
"""
# 1. Setup Data
max_p = max((len(r["profile"]) for r in sync_results), default=25)
w = [max(25, max_p), 10, 12, 10, 15]
Expand Down Expand Up @@ -2619,6 +2804,20 @@ def validate_profile_input(value: str) -> bool:
w_status = 15

def make_col_separator(left, mid, right, horiz):
"""Build a full-width horizontal separator string for the results table.

Constructs a separator by joining fixed-width column segments with a
junction character and capping both ends with corner characters.

Args:
left: Left-corner box-drawing character (e.g. ``Box.TL``).
mid: Junction character between column segments (e.g. ``Box.T``).
right: Right-corner box-drawing character (e.g. ``Box.TR``).
horiz: Horizontal fill character repeated per column (e.g. ``Box.H``).

Returns:
Complete separator string ready to print.
"""
parts = [
horiz * (w_profile + 2),
horiz * (w_folders + 2),
Expand Down
24 changes: 24 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading