Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions datashare-python/datashare_python/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import ClassVar
from typing import Annotated, Literal

from icij_common.es import ESClient
from icij_common.pydantic_utils import ICIJSettings
Expand All @@ -18,7 +18,6 @@
from .objects import BaseModel
from .task_client import DatashareTaskClient
from .types_ import TemporalClient
from .utils import LogWithWorkerIDMixin

_ALL_LOGGERS = [datashare_python.__name__]

Expand Down Expand Up @@ -76,11 +75,20 @@ async def to_client(self) -> TemporalClient:
return self._client


class WorkerConfig(ICIJSettings, LogWithWorkerIDMixin, BaseModel):
LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]


class LoggingConfig(BaseModel):
log_in_json: bool = False
loggers: dict[str, LogLevel]


class WorkerConfig(ICIJSettings, BaseModel):
model_config = DS_WORKER_SETTINGS_CONFIG

loggers: ClassVar[list[str]] = Field(_ALL_LOGGERS, frozen=True)
log_level: str = Field(default="INFO")
logging: Annotated[LoggingConfig, Field(frozen=True)] = {
datashare_python.__name__: "INFO"
}

datashare: DatashareClientConfig = DatashareClientConfig()
elasticsearch: ESClientConfig = ESClientConfig()
Expand Down
22 changes: 11 additions & 11 deletions datashare-python/datashare_python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@

from datashare_python.config import (
DatashareClientConfig,
LoggingConfig,
TemporalClientConfig,
WorkerConfig,
)
from datashare_python.dependencies import (
lifespan_es_client,
lifespan_task_client,
set_es_client,
set_event_loop,
set_loggers,
set_task_client,
set_temporal_client,
with_dependencies,
)
from datashare_python.objects import Document, TaskState
Expand Down Expand Up @@ -78,13 +76,7 @@ async def run(self) -> None:

@pytest.fixture(scope="session")
def test_deps() -> list[ContextManagerFactory]:
return [
set_loggers,
set_event_loop,
set_es_client,
set_temporal_client,
set_task_client,
]
return [set_es_client, set_task_client]


@pytest.fixture(scope="session")
Expand All @@ -99,8 +91,16 @@ def event_loop(

@pytest.fixture(scope="session")
def test_worker_config() -> WorkerConfig:
logging_config = LoggingConfig(
log_in_json=False,
loggers={
"datashare_python": "DEBUG",
"icij_common": "DEBUG",
"worker_template": "DEBUG",
},
)
return WorkerConfig(
log_level="DEBUG",
logging=logging_config,
datashare=DatashareClientConfig(url="http://localhost:8080"),
temporal=TemporalClientConfig(host="localhost:7233"),
)
Expand Down
12 changes: 8 additions & 4 deletions datashare-python/datashare_python/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

from icij_common.es import ESClient

from .config import WorkerConfig
from .config import LogLevel, WorkerConfig
from .exceptions import DependencyInjectionError
from .logging_ import setup_worker_loggers
from .task_client import DatashareTaskClient
from .types_ import ContextManagerFactory, TemporalClient

Expand All @@ -35,10 +36,13 @@ def lifespan_event_loop() -> AbstractEventLoop:
raise DependencyInjectionError("event loop") from e


def set_loggers(worker_config: WorkerConfig, worker_id: str) -> None:
worker_config.setup_loggers(worker_id=worker_id)
def set_loggers(
worker_config: WorkerConfig, worker_id: str, loggers: dict[str, LogLevel]
) -> None:
setup_worker_loggers(
loggers=loggers, worker_id=worker_id, in_json=worker_config.logging.log_in_json
)
logger.info("worker loggers ready to log 💬")
logger.info("app config: %s", worker_config.model_dump_json(indent=2))


def set_worker_config(worker_config: WorkerConfig) -> None:
Expand Down
9 changes: 6 additions & 3 deletions datashare-python/datashare_python/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from importlib.metadata import entry_points

from .config import WorkerConfig
from .dependencies import set_worker_config
from .dependencies import set_loggers, set_worker_config
from .types_ import ContextManagerFactory
from .utils import ActivityWithProgress

Expand All @@ -29,6 +29,8 @@
type[WorkerConfig],
]

_MANDATORY_DEPS = [set_worker_config, set_loggers]


def discover(
wf_names: list[str] | None, *, act_names: list[str] | None, deps_name: str | None
Expand Down Expand Up @@ -68,8 +70,9 @@ def discover(
deps = []
if deps_name is not None:
deps = discover_dependencies(deps_name)
if set_worker_config not in deps:
deps.append(set_worker_config)
for mandatory in _MANDATORY_DEPS:
if mandatory not in deps:
deps.append(mandatory)
if deps:
n_deps = len(deps)
discovered += "\n"
Expand Down
87 changes: 87 additions & 0 deletions datashare-python/datashare_python/logging_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging
import sys
from copy import copy

from icij_common.logging_utils import (
DATE_FMT,
STREAM_HANDLER_FMT,
STREAM_HANDLER_FMT_WITH_WORKER_ID,
)
from pythonjsonlogger.core import RESERVED_ATTRS, BaseJsonFormatter
from pythonjsonlogger.orjson import OrjsonFormatter
from temporalio import activity, workflow

from .config import LogLevel

_ACT_LOGGER_ATTRS = [
"activity_type",
"activity_id",
"activity_run_id",
]

_WF_LOGGED_ATTRS = [
"workflow_type",
"workflow_id",
"workflow_run_id",
]
_LOGGED_ATTRIBUTES = (
copy(RESERVED_ATTRS) + _WF_LOGGED_ATTRS + _ACT_LOGGER_ATTRS + ["worker_id"]
)


def setup_worker_loggers(
loggers: dict[str, LogLevel], *, worker_id: str | None, in_json: bool
) -> None:
worker_filter = WorkerFilter(worker_id)
for logger_name, level_str in loggers.items():
level = getattr(logging, level_str)
logger = logging.getLogger(logger_name)
logger.setLevel(level)
logger.handlers = []
for handler in _get_worker_handlers(level, worker_id, in_json=in_json):
logger.addHandler(handler)
logger.addFilter(worker_filter)


def _get_worker_handlers(
level: int, worker_id: str | None, *, in_json: bool
) -> list[logging.Handler]:
stream_handler = logging.StreamHandler(sys.stderr)
if in_json:
fmt = _json_formatter(datefmt=DATE_FMT, worker_id=worker_id)
else:
if worker_id is not None:
fmt = STREAM_HANDLER_FMT_WITH_WORKER_ID
else:
fmt = STREAM_HANDLER_FMT
fmt = logging.Formatter(fmt, DATE_FMT)
stream_handler.setFormatter(fmt)
stream_handler.setLevel(level)
return [stream_handler]


class WorkerFilter(logging.Filter):
def __init__(self, worker_id: str) -> None:
super().__init__()
self._worker_id = worker_id

def filter(self, record: logging.LogRecord) -> bool:
record.worker_id = self._worker_id
if workflow.in_workflow():
wf_info = workflow.info()
for attr in _WF_LOGGED_ATTRS:
setattr(record, attr, getattr(wf_info, attr))
if activity.in_activity():
act_info = activity.info()
for attr in _ACT_LOGGER_ATTRS:
setattr(record, attr, getattr(act_info, attr))
return True


def _json_formatter(datefmt: str, worker_id: str) -> BaseJsonFormatter:
fmt = OrjsonFormatter( # let's keep logging as fast as possible
_LOGGED_ATTRIBUTES,
extra={"worker_id": worker_id},
datefmt=datefmt,
)
return fmt
22 changes: 22 additions & 0 deletions datashare-python/datashare_python/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from collections.abc import AsyncGenerator, Callable
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from copy import copy
from typing import Any

from temporalio.worker import PollerBehaviorSimpleMaximum, Worker
Expand Down Expand Up @@ -142,12 +143,27 @@ async def worker_context(
task_queue: str,
dependencies: list[ContextManagerFactory] | None = None,
) -> AsyncGenerator[DatashareWorker, None]:
discovered = []
if activities is not None:
discovered.extend(activities)
if workflows is not None:
discovered.extend(workflows)
if dependencies is not None:
discovered.extend(dependencies)
discovered.append(worker_config)
loggers = copy(worker_config.logging.loggers)
discovered_loggers = {_get_object_package(o).__name__ for o in discovered}
for logger in discovered_loggers:
if logger not in loggers:
# Log in info by default
loggers[logger] = "INFO"
deps_cm = (
with_dependencies(
dependencies,
worker_config=worker_config,
worker_id=worker_id,
event_loop=event_loop,
loggers=loggers,
)
if dependencies
else _do_nothing_cm()
Expand Down Expand Up @@ -181,3 +197,9 @@ def _get_class_from_method(method: Callable) -> type:
class_name = method.__qualname__.rsplit(".", 1)[0]
module = sys.modules[method.__module__]
return getattr(module, class_name)


def _get_object_package(obj: Any) -> Any:
mod = inspect.getmodule(obj)
base, _, _ = mod.__name__.partition(".")
return sys.modules[base]
15 changes: 8 additions & 7 deletions datashare-python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "datashare-python"
version = "0.6.3"
version = "0.7.0"
description = "Manage Pythoœn tasks and local resources in Datashare"
authors = [
{ name = "Clément Doumouro", email = "cdoumouro@icij.org" },
Expand All @@ -10,16 +10,17 @@ authors = [
readme = "README.md"
requires-python = ">=3.11,<4"
dependencies = [
"alive-progress~=3.2.0",
"aiohttp~=3.11.9",
"alive-progress~=3.2",
"aiohttp~=3.11",
"icij-common[elasticsearch]~=0.8.2",
"python-json-logger~=4.0.0",
"nest-asyncio~=1.6.0",
"temporalio~=1.23.0",
"python-json-logger~=4.0",
"nest-asyncio~=1.6",
"temporalio~=1.23",
"typer~=0.15.4",
"tomlkit~=0.14.0",
"hatchling~=1.27.0",
"hatchling~=1.27",
"pyyaml~=6.0",
"orjson~=3.11",
]

[project.urls]
Expand Down
14 changes: 8 additions & 6 deletions datashare-python/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion worker-template/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ authors = [
readme = "README.md"
requires-python = ">=3.11,<4"
dependencies = [
"datashare-python~=0.6.2",
"datashare-python~=0.7.0",
"pycountry~=26.2.16",
"temporalio~=1.23.0",
]
Expand Down
7 changes: 6 additions & 1 deletion worker-template/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from collections.abc import AsyncGenerator
from typing import Any

import datashare_python
import pytest
from datashare_python.config import (
DatashareClientConfig,
LoggingConfig,
TemporalClientConfig,
WorkerConfig,
)
Expand Down Expand Up @@ -49,8 +51,11 @@

@pytest.fixture(scope="session")
def test_worker_config() -> TranslateAndClassifyWorkerConfig:
logging_config = LoggingConfig(
loggers={datashare_python.__name__: "INFO", __name__: "INFO"}, log_in_json=False
)
return TranslateAndClassifyWorkerConfig(
log_level="DEBUG",
logging=logging_config,
datashare=DatashareClientConfig(url="http://localhost:8080"),
temporal=TemporalClientConfig(host="localhost:7233"),
)
Expand Down
6 changes: 3 additions & 3 deletions worker-template/uv.dist.lock
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ wheels = [

[[package]]
name = "datashare-python"
version = "0.6.2"
version = "0.6.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiohttp" },
Expand All @@ -330,9 +330,9 @@ dependencies = [
{ name = "tomlkit" },
{ name = "typer" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a0/20/095049cb543235e81c3d0fe9bce31666a3b799782df9fb88198abbea9757/datashare_python-0.6.2.tar.gz", hash = "sha256:2630969c77602a427cb0a784d0896c83e2a071729f440cee25b3945f7f9caba6", size = 299884, upload-time = "2026-04-21T11:58:44.616Z" }
sdist = { url = "https://files.pythonhosted.org/packages/30/88/9d2e2431cf332ed6c272fae5a2f00d298d26722af5d1a7204df90585fe51/datashare_python-0.6.3.tar.gz", hash = "sha256:97308d087c094cbe737127dde6abf0324a39715a91a77e75182c368f09182dbf", size = 300154, upload-time = "2026-04-21T14:25:57.651Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/fd/00cdbcdb2325b62aec3ec78199864531ff08b77ff1f3b33942a79f142908/datashare_python-0.6.2-py3-none-any.whl", hash = "sha256:5578c92b12c321ef1f0dc42473c106cbbe6068d134f866896a4236105a342d66", size = 304735, upload-time = "2026-04-21T11:58:43.504Z" },
{ url = "https://files.pythonhosted.org/packages/bb/ea/ba0f73cfaa3394b3e70e6bf2c67cdcef21e4829cba95b1cee7740482f1a9/datashare_python-0.6.3-py3-none-any.whl", hash = "sha256:d8ed28175fa8530b94f90f586faf9d84cf60e4701d966ba4f5bbe46fcd06d6f2", size = 304978, upload-time = "2026-04-21T14:25:55.929Z" },
]

[[package]]
Expand Down
Loading
Loading