diff --git a/helm/templates/deployment.yaml b/helm/templates/deployment.yaml index 0fa4530..cab8c4c 100644 --- a/helm/templates/deployment.yaml +++ b/helm/templates/deployment.yaml @@ -21,6 +21,8 @@ spec: containers: - name: manager env: + - name: CLUSTER_DOMAIN + value: "{{ .Values.clusterDomain }}" - name: MANAGE_CLAIMS_INTERVAL value: "{{ .Values.manageClaimsInterval }}" - name: MANAGE_HANDLES_INTERVAL @@ -43,6 +45,10 @@ spec: value: {{ .Values.resourceWatchResources | toJson | quote }} {{- end }} {{- end }} + {{- if .Values.enablePrometheusMetrics}} + - name: METRICS_ENABLED + value: "true" + {{- end }} - name: RESOURCE_REFRESH_INTERVAL value: "{{ .Values.resourceRefreshInterval }}" image: "{{ include "poolboy.image" . }}" @@ -55,8 +61,10 @@ spec: port: 8080 timeoutSeconds: 1 ports: + - name: kopf + containerPort: 8080 - name: metrics - containerPort: 8000 + containerPort: 9091 {{- with .Values.imagePullSecrets }} imagePullSecrets: {{- toYaml . | nindent 8 }} diff --git a/helm/templates/service-monitor.yaml b/helm/templates/service-monitor.yaml new file mode 100644 index 0000000..5494984 --- /dev/null +++ b/helm/templates/service-monitor.yaml @@ -0,0 +1,20 @@ +{{- if .Values.enablePrometheusMetrics -}} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ include "poolboy.name" . }}-monitor + namespace: {{ include "poolboy.namespaceName" . }} + labels: + {{- include "poolboy.labels" . | nindent 4 }} +spec: + selector: + matchLabels: + {{- include "poolboy.selectorLabels" . | nindent 6 }} + namespaceSelector: + matchNames: + - {{ include "poolboy.namespaceName" . }} + endpoints: + - port: metrics + interval: "30s" + path: /metrics +{{- end }} diff --git a/helm/values.yaml b/helm/values.yaml index 7c0cff3..22982b3 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -72,12 +72,12 @@ serviceAccount: service: type: ClusterIP - port: 8000 ports: - name: metrics - port: 8000 + port: 9091 + prometheus: 9091 protocol: TCP - targetPort: 8000 + targetPort: 9091 resources: {} # We usually recommend not to specify default resources and to leave this as a conscious @@ -96,3 +96,6 @@ nodeSelector: {} tolerations: [] affinity: {} + +clusterDomain: unknown +enablePrometheusMetrics: true diff --git a/operator/kopfobject.py b/operator/kopfobject.py index c8628ae..1e042b6 100644 --- a/operator/kopfobject.py +++ b/operator/kopfobject.py @@ -1,14 +1,14 @@ import asyncio - -from datetime import datetime, timezone -from typing import List, Mapping, TypeVar +from datetime import datetime +from typing import List, Mapping import kopf import kubernetes_asyncio - +from metrics.timer_decorator import TimerDecoratorMeta from poolboy import Poolboy -class KopfObject: + +class KopfObject(metaclass=TimerDecoratorMeta): @classmethod def from_definition(cls, definition): return cls( diff --git a/operator/metrics/__init__.py b/operator/metrics/__init__.py new file mode 100644 index 0000000..53571c5 --- /dev/null +++ b/operator/metrics/__init__.py @@ -0,0 +1,11 @@ +from .app_metrics import AppMetrics +from .metrics_service import MetricsService +from .timer_decorator import TimerDecoratorMeta, async_timer, sync_timer + +__all__ = [ + "AppMetrics", + "MetricsService", + "TimerDecoratorMeta", + "async_timer", + "sync_timer", +] diff --git a/operator/metrics/app_metrics.py b/operator/metrics/app_metrics.py new file mode 100644 index 0000000..fb1e30b --- /dev/null +++ b/operator/metrics/app_metrics.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from aioprometheus import REGISTRY, Counter, Histogram + + +class AppMetrics: + registry = REGISTRY + + process_time = Histogram( + "poolboy_process_time_seconds", + "Execution time of processes in the app", + { + "method": "The method name", + "status": "The status of the request", + "app": "The application name", + "cluster_domain": "The cluster name", + }, + registry=registry, + ) + + invalid_resource_counter = Counter( + "poolboy_invalid_resource_count", + "Counts the number of resources in invalid states", + registry=registry, + ) diff --git a/operator/metrics/metrics_service.py b/operator/metrics/metrics_service.py new file mode 100644 index 0000000..2f2ba46 --- /dev/null +++ b/operator/metrics/metrics_service.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import logging + +from aioprometheus.service import Service + +from .app_metrics import AppMetrics + +logger = logging.getLogger(__name__) + + +class MetricsService: + service = Service(registry=AppMetrics.registry) + + @classmethod + async def start(cls, addr="0.0.0.0", port=8000) -> None: + # Reduce logging level for aiohttp to avoid spamming the logs + logging.getLogger("aiohttp").setLevel(logging.ERROR) + + await cls.service.start(addr=addr, port=port, metrics_url="/metrics") + logger.info(f"Serving metrics on: {cls.service.metrics_url}") + + @classmethod + async def stop(cls) -> None: + logger.info("Stopping metrics service") + await cls.service.stop() diff --git a/operator/metrics/timer_decorator.py b/operator/metrics/timer_decorator.py new file mode 100644 index 0000000..1e6b486 --- /dev/null +++ b/operator/metrics/timer_decorator.py @@ -0,0 +1,78 @@ +import inspect +import os +import time +from functools import wraps + +from .app_metrics import AppMetrics + +cluster_domain = os.environ.get('CLUSTER_DOMAIN') + +def async_timer(app: str): + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + start_time = time.time() + status = 'success' + try: + result = await func(*args, **kwargs) + status = 'success' + except Exception as e: + status = 'error' + raise e + finally: + duration = time.time() - start_time + method_name = func.__name__ + labels = {'method': method_name, + 'status': status, + 'app': app, + 'cluster_domain': cluster_domain + } + AppMetrics.process_time.observe(labels, duration) + + return result + return wrapper + return decorator + + +def sync_timer(app: str): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() + try: + result = func(*args, **kwargs) + status = 'success' + except Exception as e: + status = 'error' + raise e + finally: + duration = time.time() - start_time + method_name = func.__name__ + labels = {'method': method_name, + 'status': status, + 'app': app, + 'cluster_domain': cluster_domain + } + AppMetrics.process_time.observe(labels, duration) + + return result + return wrapper + return decorator + + +class TimerDecoratorMeta(type): + def __new__(cls, name, bases, dct): + for attr_name, attr_value in dct.items(): + if isinstance(attr_value, classmethod): + original_method = attr_value.__func__ + if inspect.iscoroutinefunction(original_method): + decorated_method = async_timer(name)(original_method) + else: + decorated_method = sync_timer(name)(original_method) + dct[attr_name] = classmethod(decorated_method) + elif callable(attr_value) and not attr_name.startswith("__"): + if inspect.iscoroutinefunction(attr_value): + dct[attr_name] = async_timer(name)(attr_value) + else: + dct[attr_name] = sync_timer(name)(attr_value) + return super().__new__(cls, name, bases, dct) diff --git a/operator/operator.py b/operator/operator.py index fbdd366..da2dd6c 100755 --- a/operator/operator.py +++ b/operator/operator.py @@ -1,16 +1,13 @@ #!/usr/bin/env python3 import asyncio import logging - -from datetime import datetime from typing import Mapping import kopf - -from poolboy import Poolboy from configure_kopf_logging import configure_kopf_logging from infinite_relative_backoff import InfiniteRelativeBackoff - +from metrics import MetricsService +from poolboy import Poolboy from resourceclaim import ResourceClaim from resourcehandle import ResourceHandle from resourcepool import ResourcePool @@ -49,6 +46,10 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, ** await Poolboy.on_startup(logger=logger) + if Poolboy.metrics_enabled: + # Start metrics service + await MetricsService.start(port=Poolboy.metrics_port) + # Preload configuration from ResourceProviders await ResourceProvider.preload(logger=logger) @@ -64,6 +65,7 @@ async def cleanup(logger: kopf.ObjectLogger, **_): ResourceHandle.stop_watch_other() await ResourceWatch.stop_all() await Poolboy.on_cleanup() + await MetricsService.stop() @kopf.on.event(Poolboy.operator_domain, Poolboy.operator_version, 'resourceproviders') async def resource_provider_event(event: Mapping, logger: kopf.ObjectLogger, **_) -> None: diff --git a/operator/poolboy.py b/operator/poolboy.py index 7c1004d..b1fddb3 100644 --- a/operator/poolboy.py +++ b/operator/poolboy.py @@ -1,13 +1,15 @@ import os -import yaml - from copy import deepcopy from uuid import UUID import kopf import kubernetes_asyncio +import yaml + class Poolboy(): + metrics_enabled = os.environ.get('METRICS_ENABLED', 'true').lower() == 'true' + metrics_port = int(os.environ.get('METRICS_PORT', 9091)) manage_claims_interval = int(os.environ.get('MANAGE_CLAIMS_INTERVAL', 60)) manage_handles_interval = int(os.environ.get('MANAGE_HANDLES_INTERVAL', 60)) manage_pools_interval = int(os.environ.get('MANAGE_POOLS_INTERVAL', 10)) diff --git a/operator/resourceprovider.py b/operator/resourceprovider.py index 1fabb1c..4bc9393 100644 --- a/operator/resourceprovider.py +++ b/operator/resourceprovider.py @@ -1,20 +1,19 @@ import asyncio -import jinja2 -import jsonpointer -import kopf -import pytimeparse import re - from copy import deepcopy from datetime import timedelta -from openapi_schema_validator import OAS30Validator -from openapi_schema_util import defaults_from_schema from typing import List, Mapping, TypeVar +import jinja2 +import jsonpointer +import kopf import poolboy_k8s - +import pytimeparse from deep_merge import deep_merge from jsonpatch_from_diff import jsonpatch_from_diff +from metrics.timer_decorator import TimerDecoratorMeta +from openapi_schema_util import defaults_from_schema +from openapi_schema_validator import OAS30Validator from poolboy import Poolboy from poolboy_templating import check_condition, recursive_process_template_strings @@ -144,7 +143,7 @@ class _ValidationException(Exception): pass -class ResourceProvider: +class ResourceProvider(metaclass=TimerDecoratorMeta): instances = {} lock = asyncio.Lock() @@ -167,11 +166,11 @@ def find_provider_by_template_match(cls, template: Mapping) -> ResourceProviderT if provider.is_match_for_template(template): provider_matches.append(provider) if len(provider_matches) == 0: - raise kopf.TemporaryError(f"Unable to match template to ResourceProvider", delay=60) + raise kopf.TemporaryError("Unable to match template to ResourceProvider", delay=60) elif len(provider_matches) == 1: return provider_matches[0] else: - raise kopf.TemporaryError(f"Resource template matches multiple ResourceProviders", delay=600) + raise kopf.TemporaryError("Resource template matches multiple ResourceProviders", delay=600) @classmethod async def get(cls, name: str) -> ResourceProviderT: @@ -736,9 +735,9 @@ async def resource_definition_from_template(self, if 'namespace' in resource_reference: resource_definition['metadata']['namespace'] = resource_reference['namespace'] if resource_definition['apiVersion'] != resource_reference['apiVersion']: - raise kopf.TemporaryError(f"Unable to change apiVersion for resource!", delay=600) + raise kopf.TemporaryError("Unable to change apiVersion for resource!", delay=600) if resource_definition['kind'] != resource_reference['kind']: - raise kopf.TemporaryError(f"Unable to change kind for resource!", delay=600) + raise kopf.TemporaryError("Unable to change kind for resource!", delay=600) if 'annotations' not in resource_definition['metadata']: resource_definition['metadata']['annotations'] = {} diff --git a/requirements.txt b/requirements.txt index 389975b..6b992ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +aioprometheus==23.12.0 inflection==0.5.1 Jinja2==3.1.5 jmespath-community==1.1.2