diff --git a/pdm.lock b/pdm.lock index 361cb50c..00dfa448 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "all", "android", "chat", "dev", "pynput", "web"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:84cdd0968bd718fef2370839d9b3112bf519d420fefceb61e82b8f0150a8df13" +content_hash = "sha256:1d2d76cd5a60e5e1bd0b28e095e97c88afd9ac61ba00468b91168df6455b7494" [[metadata.targets]] requires_python = ">=3.10" @@ -84,6 +84,21 @@ files = [ {file = "argcomplete-3.6.2.tar.gz", hash = "sha256:d0519b1bc867f5f4f4713c41ad0aba73a4a5f007449716b16f385f2166dc6adf"}, ] +[[package]] +name = "asgi-correlation-id" +version = "4.3.4" +requires_python = "<4.0,>=3.8" +summary = "Middleware correlating project logs to individual requests" +groups = ["all", "chat"] +dependencies = [ + "packaging", + "starlette>=0.18", +] +files = [ + {file = "asgi_correlation_id-4.3.4-py3-none-any.whl", hash = "sha256:36ce69b06c7d96b4acb89c7556a4c4f01a972463d3d49c675026cbbd08e9a0a2"}, + {file = "asgi_correlation_id-4.3.4.tar.gz", hash = "sha256:ea6bc310380373cb9f731dc2e8b2b6fb978a76afe33f7a2384f697b8d6cd811d"}, +] + [[package]] name = "asyncer" version = "0.0.8" @@ -2188,7 +2203,7 @@ name = "packaging" version = "25.0" requires_python = ">=3.8" summary = "Core utilities for Python packages" -groups = ["default", "dev"] +groups = ["default", "all", "chat", "dev"] files = [ {file = "packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484"}, {file = "packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f"}, @@ -2425,6 +2440,32 @@ files = [ {file = "pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3"}, ] +[[package]] +name = "prometheus-client" +version = "0.23.1" +requires_python = ">=3.9" +summary = "Python client for the Prometheus monitoring system." +groups = ["all", "chat"] +files = [ + {file = "prometheus_client-0.23.1-py3-none-any.whl", hash = "sha256:dd1913e6e76b59cfe44e7a4b83e01afc9873c1bdfd2ed8739f1e76aeca115f99"}, + {file = "prometheus_client-0.23.1.tar.gz", hash = "sha256:6ae8f9081eaaaf153a2e959d2e6c4f4fb57b12ef76c8c7980202f1e57b48b2ce"}, +] + +[[package]] +name = "prometheus-fastapi-instrumentator" +version = "7.1.0" +requires_python = ">=3.8" +summary = "Instrument your FastAPI app with Prometheus metrics" +groups = ["all", "chat"] +dependencies = [ + "prometheus-client<1.0.0,>=0.8.0", + "starlette<1.0.0,>=0.30.0", +] +files = [ + {file = "prometheus_fastapi_instrumentator-7.1.0-py3-none-any.whl", hash = "sha256:978130f3c0bb7b8ebcc90d35516a6fe13e02d2eb358c8f83887cdef7020c31e9"}, + {file = "prometheus_fastapi_instrumentator-7.1.0.tar.gz", hash = "sha256:be7cd61eeea4e5912aeccb4261c6631b3f227d8924542d79eaf5af3f439cbe5e"}, +] + [[package]] name = "protobuf" version = "6.32.1" @@ -3394,7 +3435,7 @@ name = "starlette" version = "0.48.0" requires_python = ">=3.9" summary = "The little ASGI library that shines." -groups = ["default"] +groups = ["default", "all", "chat"] dependencies = [ "anyio<5,>=3.6.2", "typing-extensions>=4.10.0; python_version < \"3.13\"", @@ -3404,6 +3445,34 @@ files = [ {file = "starlette-0.48.0.tar.gz", hash = "sha256:7e8cee469a8ab2352911528110ce9088fdc6a37d9876926e73da7ce4aa4c7a46"}, ] +[[package]] +name = "starlette-context" +version = "0.4.0" +requires_python = "<4.0,>=3.9" +summary = "Middleware for Starlette that allows you to store and access the context data of a request. Can be used with logging so logs automatically use request headers such as x-request-id or x-correlation-id." +groups = ["all", "chat"] +dependencies = [ + "starlette>=0.27.0", +] +files = [ + {file = "starlette_context-0.4.0-py3-none-any.whl", hash = "sha256:dbcc11006587f901edd3d0a989a69a628fccf9d00c1ca3c28fab23ab88bd0093"}, + {file = "starlette_context-0.4.0.tar.gz", hash = "sha256:3242417c9354c067a4ac5009aff762dc0b322074216f664825d5d127108553be"}, +] + +[[package]] +name = "structlog" +version = "25.4.0" +requires_python = ">=3.8" +summary = "Structured Logging for Python" +groups = ["all", "chat"] +dependencies = [ + "typing-extensions; python_version < \"3.11\"", +] +files = [ + {file = "structlog-25.4.0-py3-none-any.whl", hash = "sha256:fe809ff5c27e557d14e613f45ca441aabda051d119ee5a0102aaba6ce40eed2c"}, + {file = "structlog-25.4.0.tar.gz", hash = "sha256:186cd1b0a8ae762e29417095664adf1d6a31702160a46dacb7796ea82f7409e4"}, +] + [[package]] name = "sympy" version = "1.14.0" diff --git a/pyproject.toml b/pyproject.toml index b1f1f6e2..5bf44deb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,7 +65,7 @@ lint = "ruff check src tests" "lint:fix" = "ruff check --fix src tests" typecheck = "mypy" "typecheck:all" = "mypy ." -"chat:api" = "uvicorn askui.chat.api.app:app --reload --port 9261" +"chat:api" = "python -m askui.chat" "qa:fix" = { composite = [ "typecheck:all", "format", @@ -222,6 +222,10 @@ chat = [ "askui[android,web]", "uvicorn>=0.34.3", "anyio>=4.10.0", + "structlog>=25.4.0", + "asgi-correlation-id>=4.3.4", + "prometheus-fastapi-instrumentator>=7.1.0", + "starlette-context>=0.4.0", ] pynput = [ "mss>=10.0.0", diff --git a/src/askui/chat/__main__.py b/src/askui/chat/__main__.py index c03101f7..0e7d7772 100644 --- a/src/askui/chat/__main__.py +++ b/src/askui/chat/__main__.py @@ -2,14 +2,16 @@ from askui.chat.api.app import app from askui.chat.api.dependencies import get_settings +from askui.chat.api.telemetry.integrations.fastapi import instrument if __name__ == "__main__": settings = get_settings() + instrument(app, settings.telemetry) uvicorn.run( app, host=settings.host, port=settings.port, - log_level=settings.log_level, reload=False, workers=1, + log_config=None, ) diff --git a/src/askui/chat/api/settings.py b/src/askui/chat/api/settings.py index 5abc8a36..0a1a7b0e 100644 --- a/src/askui/chat/api/settings.py +++ b/src/askui/chat/api/settings.py @@ -5,6 +5,7 @@ from pydantic_settings import BaseSettings, SettingsConfigDict from askui.chat.api.mcp_configs.models import McpConfig +from askui.chat.api.telemetry.integrations.fastapi.settings import TelemetrySettings from askui.utils.datetime_utils import now @@ -49,10 +50,6 @@ class Settings(BaseSettings): default="127.0.0.1", description="Host for the chat API", ) - log_level: str | int = Field( - default="info", - description="Log level for the chat API", - ) port: int = Field( default=9261, description="Port for the chat API", @@ -68,3 +65,6 @@ class Settings(BaseSettings): "connect to MCP servers shared across all workspaces." ), ) + telemetry: TelemetrySettings = Field( + default_factory=TelemetrySettings, + ) diff --git a/src/askui/chat/api/telemetry/__init__.py b/src/askui/chat/api/telemetry/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/askui/chat/api/telemetry/integrations/__init__.py b/src/askui/chat/api/telemetry/integrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/askui/chat/api/telemetry/integrations/fastapi/__init__.py b/src/askui/chat/api/telemetry/integrations/fastapi/__init__.py new file mode 100644 index 00000000..34dd835e --- /dev/null +++ b/src/askui/chat/api/telemetry/integrations/fastapi/__init__.py @@ -0,0 +1,35 @@ +from asgi_correlation_id import CorrelationIdMiddleware +from fastapi import FastAPI +from prometheus_fastapi_instrumentator import Instrumentator +from starlette_context.middleware import RawContextMiddleware + +from askui.chat.api.telemetry.integrations.fastapi.settings import TelemetrySettings +from askui.chat.api.telemetry.logs import propagate_logs_up, setup_logging, silence_logs + +from .fastapi_middleware import ( + AccessLoggingMiddleware, + ExceptionHandlingMiddleware, + ProcessTimingMiddleware, + TracingMiddleware, +) +from .structlog_processors import merge_starlette_contextvars + + +def instrument( + app: FastAPI, + settings: TelemetrySettings | None = None, +) -> None: + _settings = settings or TelemetrySettings() + setup_logging( + _settings.log, + pre_processors=[merge_starlette_contextvars], + ) + silence_logs(["uvicorn.access"]) + propagate_logs_up(["uvicorn", "uvicorn.error"]) + app.add_middleware(ExceptionHandlingMiddleware) + app.add_middleware(TracingMiddleware) + app.add_middleware(ProcessTimingMiddleware) + app.add_middleware(AccessLoggingMiddleware) + app.add_middleware(CorrelationIdMiddleware) + app.add_middleware(RawContextMiddleware) + Instrumentator().instrument(app).expose(app) diff --git a/src/askui/chat/api/telemetry/integrations/fastapi/fastapi_middleware.py b/src/askui/chat/api/telemetry/integrations/fastapi/fastapi_middleware.py new file mode 100644 index 00000000..825cf082 --- /dev/null +++ b/src/askui/chat/api/telemetry/integrations/fastapi/fastapi_middleware.py @@ -0,0 +1,82 @@ +import logging +from typing import Awaitable, Callable + +import structlog +from asgi_correlation_id.context import correlation_id +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import Response +from starlette.types import ASGIApp + +from askui.chat.api.telemetry.integrations.fastapi.models import AccessLogLine, TimeSpan + +from . import structlog_context +from .utils import compact + +access_logger = structlog.stdlib.get_logger("api.access") +error_logger = structlog.stdlib.get_logger("api.error") + + +EVENT = "API Accessed" + + +class ExceptionHandlingMiddleware(BaseHTTPMiddleware): + async def dispatch( + self, request: Request, call_next: Callable[[Request], Awaitable[Response]] + ) -> Response: + try: + return await call_next(request) + except Exception: # noqa: BLE001 + error_message = "Uncaught exception raised handling request" + error_logger.exception(error_message) + return Response("Internal Server Error", status_code=500) + + +class TracingMiddleware(BaseHTTPMiddleware): + async def dispatch( + self, request: Request, call_next: Callable[[Request], Awaitable[Response]] + ) -> Response: + request_id = correlation_id.get() + structlog_context.bind(request_id=request_id) + return await call_next(request) + + +class ProcessTimingMiddleware(BaseHTTPMiddleware): + async def dispatch( + self, request: Request, call_next: Callable[[Request], Awaitable[Response]] + ) -> Response: + time_span = TimeSpan() + + response = await call_next(request) + time_span.end() + response.headers.append("x-process-time", str(time_span.in_s)) + structlog_context.bind(time_ms=time_span.in_ms) + return response + + +class AccessLoggingMiddleware(BaseHTTPMiddleware): + def __init__(self, app: ASGIApp): + super().__init__(app) + + def determine_log_level(self, request: Request) -> int: # noqa: ARG002 + return logging.INFO + + async def dispatch( + self, request: Request, call_next: Callable[[Request], Awaitable[Response]] + ) -> Response: + response = await call_next(request) + access_log_line = AccessLogLine( + level=self.determine_log_level(request), + event=EVENT, + method=request.method, + path=request.url.path, + query=request.url.query, + status=response.status_code, + http_version=request.scope["http_version"], + ip=request.client.host if request.client else None, + port=request.client.port if request.client else None, + ) + await access_logger.alog( + **compact({**access_log_line, **structlog_context.get()}) + ) + return response diff --git a/src/askui/chat/api/telemetry/integrations/fastapi/models.py b/src/askui/chat/api/telemetry/integrations/fastapi/models.py new file mode 100644 index 00000000..003615cd --- /dev/null +++ b/src/askui/chat/api/telemetry/integrations/fastapi/models.py @@ -0,0 +1,43 @@ +import time +from typing import Optional, TypedDict + + +class AccessLogLine(TypedDict): + level: int + event: str + method: str + path: str + query: Optional[str] + status: int + http_version: str + ip: Optional[str] + port: Optional[int] + + +class TimeSpanData(TypedDict, total=False): + started_at: int + ended_at: Optional[int] + + +class TimeSpan: + def __init__(self) -> None: + self.started_at: int = time.perf_counter_ns() + self.ended_at: Optional[int] = None + + def end(self) -> None: + self.ended_at = time.perf_counter_ns() + + @property + def in_ns(self) -> Optional[int]: + if self.ended_at is None: + return None + + return self.ended_at - self.started_at + + @property + def in_ms(self) -> Optional[float]: + return self.in_ns / 10**6 if self.in_ns is not None else None + + @property + def in_s(self) -> Optional[float]: + return self.in_ns / 10**9 if self.in_ns is not None else None diff --git a/src/askui/chat/api/telemetry/integrations/fastapi/settings.py b/src/askui/chat/api/telemetry/integrations/fastapi/settings.py new file mode 100644 index 00000000..7ac3e27d --- /dev/null +++ b/src/askui/chat/api/telemetry/integrations/fastapi/settings.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel, Field + +from askui.chat.api.telemetry.logs.settings import LogSettings + + +class TelemetrySettings(BaseModel): + log: LogSettings = Field(default_factory=LogSettings) diff --git a/src/askui/chat/api/telemetry/integrations/fastapi/structlog_context.py b/src/askui/chat/api/telemetry/integrations/fastapi/structlog_context.py new file mode 100644 index 00000000..c33a9a08 --- /dev/null +++ b/src/askui/chat/api/telemetry/integrations/fastapi/structlog_context.py @@ -0,0 +1,24 @@ +from copy import deepcopy +from typing import Any + +from starlette_context import context + +STRUCTLOG_REQUEST_CONTEXT_KEY = "structlog_context" + + +def is_available() -> bool: + return context.exists() + + +def get() -> dict[str, Any]: + return deepcopy(context.get(STRUCTLOG_REQUEST_CONTEXT_KEY, {})) + + +def bind(**kw: Any) -> None: + new_context = get() + new_context.update(kw) + context[STRUCTLOG_REQUEST_CONTEXT_KEY] = new_context + + +def reset() -> None: + context[STRUCTLOG_REQUEST_CONTEXT_KEY] = {} diff --git a/src/askui/chat/api/telemetry/integrations/fastapi/structlog_processors.py b/src/askui/chat/api/telemetry/integrations/fastapi/structlog_processors.py new file mode 100644 index 00000000..d4212aaf --- /dev/null +++ b/src/askui/chat/api/telemetry/integrations/fastapi/structlog_processors.py @@ -0,0 +1,22 @@ +import logging + +from structlog.types import EventDict + +from . import structlog_context + + +def merge_starlette_contextvars( + logger: logging.Logger, # noqa: ARG001 + method_name: str, # noqa: ARG001 + event_dict: EventDict, +) -> EventDict: + """ + Merges the starlette contextvars into the structlog contextvars. + """ + + if structlog_context.is_available(): + return { + **event_dict, + **structlog_context.get(), + } + return event_dict diff --git a/src/askui/chat/api/telemetry/integrations/fastapi/utils.py b/src/askui/chat/api/telemetry/integrations/fastapi/utils.py new file mode 100644 index 00000000..2dba0c91 --- /dev/null +++ b/src/askui/chat/api/telemetry/integrations/fastapi/utils.py @@ -0,0 +1,11 @@ +from typing import Any + + +def compact(d: dict[Any, Any]) -> dict[Any, Any]: + result = {} + for k, v in d.items(): + if isinstance(v, dict): + v = compact(v) + if not (not v and type(v) not in (bool, int, float, complex)): + result[k] = v + return result diff --git a/src/askui/chat/api/telemetry/logs/__init__.py b/src/askui/chat/api/telemetry/logs/__init__.py new file mode 100644 index 00000000..d572fd8c --- /dev/null +++ b/src/askui/chat/api/telemetry/logs/__init__.py @@ -0,0 +1,54 @@ +import logging as logging_stdlib +import sys +from types import TracebackType + +from structlog import types as structlog_types + +from .settings import LogSettings +from .structlog import setup_structlog + + +def setup_uncaught_exception_logging(logger: logging_stdlib.Logger) -> None: + def handle_uncaught_exception( + exc_type: type[BaseException], + exc_value: BaseException, + exc_traceback: TracebackType | None, + ) -> None: + """ + Log any uncaught exception instead of letting it be printed by Python + (but leave KeyboardInterrupt untouched to allow users to Ctrl+C to stop) + See https://stackoverflow.com/a/16993115/3641865 + """ + if issubclass(exc_type, KeyboardInterrupt): + sys.__excepthook__(exc_type, exc_value, exc_traceback) + return + + logger.error( + "Uncaught exception raised", exc_info=(exc_type, exc_value, exc_traceback) + ) + + sys.excepthook = handle_uncaught_exception + + +def propagate_logs_up(loggers: list[str]) -> None: + for logger_name in loggers: + logger = logging_stdlib.getLogger(logger_name) + logger.handlers.clear() + logger.propagate = True + + +def silence_logs(loggers: list[str]) -> None: + for logger_name in loggers: + logger = logging_stdlib.getLogger(logger_name) + logger.handlers.clear() + logger.propagate = False + + +def setup_logging( + settings: LogSettings, + pre_processors: list[structlog_types.Processor] | None = None, +) -> None: + logging_stdlib.captureWarnings(True) + root_logger = logging_stdlib.getLogger() + setup_structlog(root_logger, settings, pre_processors) + setup_uncaught_exception_logging(root_logger) diff --git a/src/askui/chat/api/telemetry/logs/settings.py b/src/askui/chat/api/telemetry/logs/settings.py new file mode 100644 index 00000000..356b8773 --- /dev/null +++ b/src/askui/chat/api/telemetry/logs/settings.py @@ -0,0 +1,21 @@ +import enum + +from pydantic import BaseModel + + +class LogLevel(str, enum.Enum): + CRITICAL = "CRITICAL" + ERROR = "ERROR" + WARNING = "WARNING" + INFO = "INFO" + DEBUG = "DEBUG" + + +class LogFormat(str, enum.Enum): + JSON = "json" + LOGFMT = "logfmt" + + +class LogSettings(BaseModel): + format: LogFormat = LogFormat.LOGFMT + level: LogLevel = LogLevel.INFO diff --git a/src/askui/chat/api/telemetry/logs/structlog.py b/src/askui/chat/api/telemetry/logs/structlog.py new file mode 100644 index 00000000..c26bd1c6 --- /dev/null +++ b/src/askui/chat/api/telemetry/logs/structlog.py @@ -0,0 +1,78 @@ +import logging + +import structlog + +from .settings import LogFormat, LogLevel, LogSettings +from .structlog_processors import ( + drop_color_message_key_processor, + flatten_dict_processor, +) + + +def setup_structlog( + root_logger: logging.Logger, + settings: LogSettings, + pre_processors: list[structlog.types.Processor] | None = None, +) -> None: + shared_processors = (pre_processors or []) + get_shared_processors(settings) + structlog.configure( + processors=shared_processors + + [structlog.stdlib.ProcessorFormatter.wrap_for_formatter], + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, + ) + formatter = structlog.stdlib.ProcessorFormatter( + foreign_pre_chain=shared_processors, + processors=[ + structlog.stdlib.ProcessorFormatter.remove_processors_meta, + get_renderer(settings.format), + ], + ) + configure_stdlib_logger(root_logger, settings.level, formatter) + + +def configure_stdlib_logger( + logger: logging.Logger, log_level: LogLevel, formatter: logging.Formatter +) -> None: + handler = logging.StreamHandler() + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(log_level.value) + + +EVENT_KEY = "message" + + +def get_shared_processors(settings: LogSettings) -> list[structlog.types.Processor]: + """Returns a list of processors, i.e., a processor chain, that can be shared between + structlog and stdlib loggers so that their content is consistent.""" + format_dependent_processors = get_format_dependent_processors(settings.format) + return [ + structlog.contextvars.merge_contextvars, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.stdlib.ExtraAdder(), + drop_color_message_key_processor, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + *format_dependent_processors, + structlog.processors.EventRenamer(EVENT_KEY), + ] + + +def get_format_dependent_processors( + log_format: LogFormat, +) -> list[structlog.types.Processor]: + if log_format == LogFormat.JSON: + return [structlog.processors.format_exc_info] + return [ + structlog.dev.set_exc_info, + flatten_dict_processor, + ] + + +def get_renderer(log_format: LogFormat) -> structlog.types.Processor: + if log_format == LogFormat.JSON: + return structlog.processors.JSONRenderer() + return structlog.dev.ConsoleRenderer(event_key=EVENT_KEY) diff --git a/src/askui/chat/api/telemetry/logs/structlog_processors.py b/src/askui/chat/api/telemetry/logs/structlog_processors.py new file mode 100644 index 00000000..8fb785f9 --- /dev/null +++ b/src/askui/chat/api/telemetry/logs/structlog_processors.py @@ -0,0 +1,29 @@ +import logging + +from structlog.types import EventDict + +from .utils import flatten_dict + + +def flatten_dict_processor( + logger: logging.Logger, # noqa: ARG001 + method_name: str, # noqa: ARG001 + event_dict: EventDict, +) -> EventDict: + """ + Flattens a nested event dictionary deeply. Nested keys are concatenated with dot notation. + """ + return flatten_dict(event_dict) + + +def drop_color_message_key_processor( + logger: logging.Logger, # noqa: ARG001 + method_name: str, # noqa: ARG001 + event_dict: EventDict, +) -> EventDict: + """ + Uvicorn logs the message a second time in the extra `color_message`, but we don't + need it. This processor drops the key from the event dict if it exists. + """ + event_dict.pop("color_message", None) + return event_dict diff --git a/src/askui/chat/api/telemetry/logs/utils.py b/src/askui/chat/api/telemetry/logs/utils.py new file mode 100644 index 00000000..c978902a --- /dev/null +++ b/src/askui/chat/api/telemetry/logs/utils.py @@ -0,0 +1,16 @@ +from collections.abc import MutableMapping +from typing import Any + + +def flatten_dict( + d: MutableMapping[Any, Any], parent_key: str = "", sep: str = "." +) -> MutableMapping[str, Any]: + result: list[tuple[str, Any]] = [] + for k, v in d.items(): + k = str(k) + new_key = parent_key + sep + k if parent_key else k + if isinstance(v, MutableMapping): + result.extend(flatten_dict(v, new_key, sep=sep).items()) + else: + result.append((new_key, v)) + return dict(result)