Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions sdks/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ config = create_config(

# Create evaluator and run evaluation
evaluator = ConventionalityEvaluator(config)
result = evaluator.evaluate(
result = evaluator.evaluate_sync(
ConventionalityEvaluationInput(text="The cat's out of the bag now.", grade=5)
)

Expand Down Expand Up @@ -185,7 +185,7 @@ config = create_config(
)
evaluator = ConventionalityEvaluator(config)

result = evaluator.evaluate(
result = evaluator.evaluate_sync(
ConventionalityEvaluationInput(text="Your text here.", grade=5)
)

Expand Down Expand Up @@ -254,16 +254,17 @@ evaluator = ConventionalityEvaluator(
default_evaluation_settings=settings,
)

# Uses the instance default (a deep copy is taken inside evaluate)
result = evaluator.evaluate(input)
# Uses the instance default (a deep copy is taken inside evaluate / evaluate_sync)
result = evaluator.evaluate_sync(input)

# Per-call override still wins
result = evaluator.evaluate(input, evaluation_settings=other_settings)
result = evaluator.evaluate_sync(input, evaluation_settings=other_settings)
```

If you omit `default_evaluation_settings` at construction, attribute lookup uses the
subclass class attribute, same as before. Whenever you call `evaluate()` without
`evaluation_settings`, the SDK uses `model_copy(deep=True)` of the resolved default,
subclass class attribute, same as before. Whenever you call `evaluate_sync()` or
`await evaluator.evaluate(...)` without `evaluation_settings`, the SDK uses
`model_copy(deep=True)` of the resolved default,
so the object you keep on the instance is not mutated by a run.

### Logging
Expand Down Expand Up @@ -313,7 +314,7 @@ from learning_commons_evaluators import (
)

try:
result = evaluator.evaluate(input)
result = evaluator.evaluate_sync(input)
except ConfigurationError as e:
print(f"Config issue: {e}")
except ValidationError as e:
Expand All @@ -326,7 +327,7 @@ except APIError as e:

Failures inside LLM prompt steps are passed through `wrap_provider_error()` (see `learning_commons_evaluators.schemas.errors`) so you typically see `APIError` subclasses rather than raw LangChain or HTTP client exceptions. Use `EvaluatorTimeoutError` for timeouts (the package does not export a `TimeoutError` alias, to avoid shadowing the Python builtin).

On evaluation failure, `metadata.status` and `error_details` are set on the in-memory metadata object for the run and appear on the evaluation end log line; `BaseEvaluator.evaluate` still re-raises and does not return a result object.
On evaluation failure, `metadata.status` and `error_details` are set on the in-memory metadata object for the run and appear on the evaluation end log line; `BaseEvaluator.evaluate` / `evaluate_sync` still re-raises and does not return a result object.
Comment thread
czi-fsisenda marked this conversation as resolved.

## Creating custom evaluators

Expand All @@ -350,14 +351,14 @@ class MyEvaluator(BaseEvaluator[MyInput, EvaluationResult, MySettings]):
)
default_evaluation_settings = MySettings(...)

def evaluate_impl(
async def evaluate_impl(
self,
input: MyInput,
evaluation_settings: MySettings,
evaluation_metadata: EvaluationMetadata,
) -> EvaluationResult:
# Use self.execute_prompt_chain_step() for LLM calls
output = self.execute_prompt_chain_step(
# Use await self.execute_prompt_chain_step(...) for LLM calls
output = await self.execute_prompt_chain_step(
step_name="main",
prompt_settings=evaluation_settings.prompt_settings,
evaluation_metadata=evaluation_metadata,
Expand Down
128 changes: 98 additions & 30 deletions sdks/python/src/learning_commons_evaluators/evaluators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from __future__ import annotations

import asyncio
import time
from abc import ABC, abstractmethod
from collections.abc import Callable
from collections.abc import Awaitable, Callable
from typing import Any, Generic, TypeVar, overload

from pydantic import BaseModel
Expand Down Expand Up @@ -48,7 +49,13 @@
class BaseEvaluator(ABC, Generic[InputT, OutputT, SettingsT]):
"""
Abstract base class for all evaluators.
Subclasses must set metadata, default_evaluation_settings, and implement evaluate_impl().
Subclasses must set ``metadata``, ``default_evaluation_settings``, and implement
:meth:`evaluate_impl`.

Callers run an evaluation with :meth:`evaluate` (async: ``await evaluator.evaluate(...)``)
or :meth:`evaluate_sync` from synchronous code (uses :func:`asyncio.run`). If a loop is
already running on this thread, :meth:`evaluate_sync` raises :exc:`RuntimeError`; use
``await`` :meth:`evaluate` in that case.

Pass ``default_evaluation_settings`` at construction to override the class-level
defaults for that instance (used when :meth:`evaluate` is called without
Expand All @@ -70,7 +77,7 @@ def __init__(
self.default_evaluation_settings = default_evaluation_settings
# TODO: validate config

def evaluate(
async def evaluate(
self,
input: InputT,
evaluation_settings: SettingsT | None = None,
Expand All @@ -79,10 +86,14 @@ def evaluate(

Validates the input, delegates to :meth:`evaluate_impl`, records timing
and status on the returned metadata, and logs start/end events via the
configured logger. If ``evaluation_settings`` is ``None``, a deep copy of
configured logger. If ``evaluation_settings`` is ``None``, a deep copy of
the instance's :attr:`default_evaluation_settings` is used (from the
constructor keyword when given, otherwise the subclass class attribute).

The ``finally`` block always runs so ``processing_time_ms`` and the end log
line are emitted even when validation or the implementation raises; telemetry
hooks remain TODOs here until wired.

Args:
input: Typed input for this evaluator.
evaluation_settings: Optional override for evaluation settings.
Expand Down Expand Up @@ -116,7 +127,7 @@ def evaluate(
)
try:
input.validate()
result = self.evaluate_impl(input, evaluation_settings, evaluation_metadata)
result = await self.evaluate_impl(input, evaluation_settings, evaluation_metadata)
evaluation_metadata.status = Status.succeeded
result.metadata = evaluation_metadata
return result
Expand All @@ -133,38 +144,94 @@ def evaluate(
# TODO: add full input to telemetry if enabled
# TODO: send_telemetry(evaluation_metadata)

def evaluate_sync(
self,
input: InputT,
evaluation_settings: SettingsT | None = None,
) -> OutputT:
"""Run :meth:`evaluate` to completion from synchronous code.

This is a thin wrapper around :func:`asyncio.run` over :meth:`evaluate`. Use it
from scripts, REPLs, or tests that are not already inside an asyncio event loop.

Args:
input: Same as :meth:`evaluate`.
evaluation_settings: Same as :meth:`evaluate`.

Returns:
Same typed result as :meth:`evaluate` on success.

Raises:
Same exceptions as :meth:`evaluate`.
RuntimeError: If this thread already has a running asyncio event loop; use
``await evaluator.evaluate(...)`` instead of :meth:`evaluate_sync`.

Note:
Do not call this method when a running event loop is active (for example
from inside an ``async def`` without nesting another ``asyncio.run``); prefer
awaiting :meth:`evaluate` instead.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
pass
else:
raise RuntimeError(
"evaluate_sync() cannot be used while an asyncio event loop is running in "
"this thread; use await evaluator.evaluate(...) from async code instead."
) from None
return asyncio.run(self.evaluate(input, evaluation_settings))

@abstractmethod
def evaluate_impl(
async def evaluate_impl(
self,
input: InputT,
evaluation_settings: SettingsT,
evaluation_metadata: EvaluationMetadata,
) -> OutputT:
"""Implement the evaluation logic. Return a result; base assigns evaluation_metadata onto it."""
"""Implement the evaluator-specific logic for one run.

Subclasses perform prompt steps (often via :meth:`execute_prompt_chain_step`),
assemble the typed result, and return it. The base :meth:`evaluate` assigns
``evaluation_metadata`` onto ``result.metadata`` after a successful return.

Args:
input: Validated evaluation input.
evaluation_settings: Resolved settings for this run (already deep-copied
when the caller omitted overrides).
evaluation_metadata: Run metadata; populate ``step_details`` and related
fields as steps execute.

Returns:
A fully constructed result object (``metadata`` may still point at the
same ``evaluation_metadata`` instance; the base layer sets status and timing).
"""
...

def execute_step(
async def execute_step(
self,
step_name: str,
evaluation_metadata: EvaluationMetadata,
implementation_function: Callable[[], StepResultT],
implementation_function: Callable[[], Awaitable[StepResultT]],
*,
extras: dict[str, Any] | None = None,
) -> StepResultT:
"""Run ``implementation_function`` and record step metadata on ``evaluation_metadata``.
"""Await ``implementation_function`` and record step metadata on ``evaluation_metadata``.

``step_name`` is always the step id. Optional ``extras`` is copied into
:attr:`StepMetadata.extras` (merged with any updates made during the step, e.g. token usage).

The step may return any type (e.g. a Pydantic model, a plain ``str``, or ``None``); the same
type is returned to the caller.
``implementation_function`` must be a zero-argument callable that returns an
awaitable (typically an ``async def`` with no parameters, or a lambda that
returns one). The awaited value may be any type (e.g. a Pydantic model, a plain
``str``, or ``None``); the same type is returned to the caller.
"""
start = time.perf_counter()
step_extras = dict(extras) if extras is not None else {}
step_metadata = StepMetadata(step_id=step_name, extras=step_extras)
self.config.logger.info("step start", extra={"step_metadata": step_metadata})
try:
result = implementation_function()
result = await implementation_function()
step_metadata.status = Status.succeeded
return result
except Exception as e:
Expand All @@ -177,7 +244,7 @@ def execute_step(
evaluation_metadata.step_details[step_name] = step_metadata

@overload
def execute_prompt_chain_step(
async def execute_prompt_chain_step(
self,
step_name: str,
prompt_settings: PromptSettings,
Expand All @@ -188,7 +255,7 @@ def execute_prompt_chain_step(
) -> str: ...

@overload
def execute_prompt_chain_step(
async def execute_prompt_chain_step(
self,
step_name: str,
prompt_settings: PromptSettings,
Expand All @@ -199,7 +266,7 @@ def execute_prompt_chain_step(
json_dict_normalizer: Callable[[dict], dict] | None = None,
) -> ParsedT: ...

def execute_prompt_chain_step(
async def execute_prompt_chain_step(
self,
step_name: str,
prompt_settings: PromptSettings,
Expand All @@ -209,19 +276,20 @@ def execute_prompt_chain_step(
parser_output_type: type[BaseModel] | None = None,
json_dict_normalizer: Callable[[dict], dict] | None = None,
) -> BaseModel | str:
"""Run a prompt chain (template | LLM), record metadata, and return the result.
"""Run a prompt chain (template | LLM) with ``ainvoke``, record metadata, and return the result.

When ``parser_output_type`` is a Pydantic model class, the LLM response is
parsed as JSON and returned as an instance of that class. When it is
``None`` (the default), the raw response content is returned as a plain
``str`` (no JSON parser) — use that for steps that produce unstructured prose
(e.g. a background-knowledge assumption).
The LangChain runnable ``template | provider`` is invoked asynchronously via
:meth:`~langchain_core.runnables.base.Runnable.ainvoke`. When ``parser_output_type``
is a Pydantic model class, the LLM response is parsed as JSON and returned as an
instance of that class. When it is ``None`` (the default), the raw response content
is returned as a plain ``str`` (no JSON parser) — use that for steps that produce
unstructured prose (e.g. a background-knowledge assumption).

Provider config (e.g. API key) is resolved from ``self.config`` by
``prompt_settings.provider_type``.

Args:
step_name: Identifier for this step in evaluation_metadata.step_details.
step_name: Identifier for this step in ``evaluation_metadata.step_details``.
prompt_settings: Provider type, model, and temperature for the LLM call.
evaluation_metadata: Metadata for the full evaluation; step metadata and
token usage are updated in place.
Expand All @@ -241,7 +309,7 @@ def execute_prompt_chain_step(
``str`` when ``parser_output_type`` is omitted or ``None``.

Raises:
ConfigurationError: No provider config for prompt_settings.provider_type.
ConfigurationError: No provider config for ``prompt_settings.provider_type``.
EvaluatorError: SDK errors, including :func:`~learning_commons_evaluators.schemas.errors.wrap_provider_error` output for LangChain or HTTP failures (typically :class:`~learning_commons_evaluators.schemas.errors.APIError` subclasses). Pydantic :exc:`pydantic.ValidationError` from output parsing is re-raised unchanged.
ValueError: If ``json_dict_normalizer`` is set but ``parser_output_type`` is omitted.
"""
Expand All @@ -250,27 +318,27 @@ def execute_prompt_chain_step(
# Populated after a successful LLM invoke so we can attach usage even if parsing fails.
token_usage: TokenUsage | None = None

def _run_chain() -> BaseModel | str:
async def _run_chain() -> BaseModel | str:
nonlocal token_usage
try:
provider = create_provider(prompt_settings, self.config)
llm_chain: Any = template | provider
ai_message = llm_chain.invoke(chain_inputs)
ai_message = await llm_chain.ainvoke(chain_inputs)
token_usage = token_usage_from_aimessage(ai_message, prompt_settings)
if parser_output_type is None:
return str(ai_message.content)
from langchain_core.output_parsers.json import JsonOutputParser

if json_dict_normalizer is not None:
loose = JsonOutputParser()
parsed_dict = loose.invoke(ai_message)
parsed_dict = await loose.ainvoke(ai_message)
if not isinstance(parsed_dict, dict):
parsed_dict = dict(parsed_dict)
normalized = json_dict_normalizer(parsed_dict)
return parser_output_type.model_validate(normalized)

parser = JsonOutputParser(pydantic_object=parser_output_type)
raw = parser.invoke(ai_message)
raw = await parser.ainvoke(ai_message)
if isinstance(raw, parser_output_type):
return raw
return parser_output_type.model_validate(raw)
Expand All @@ -284,15 +352,15 @@ def _run_chain() -> BaseModel | str:
raise wrap_provider_error(e) from e

try:
return self.execute_step(
return await self.execute_step(
step_name,
evaluation_metadata,
_run_chain,
extras={
PROMPT_STEP_EXTRA_PROMPT_SETTINGS: prompt_settings_to_extras_value(
prompt_settings
),
},
implementation_function=_run_chain,
)
finally:
if token_usage is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ConventionalityEvaluator(
_CONVENTIONALITY_CONFIG.evaluation_settings
)

def evaluate_impl(
async def evaluate_impl(
self,
input: ConventionalityEvaluationInput,
evaluation_settings: ConventionalityEvaluationSettings,
Expand All @@ -93,7 +93,7 @@ def evaluate_impl(
("human", prompts["human_prompt"]),
]
).partial(format_instructions=parser.get_format_instructions())
conventionality_output = self.execute_prompt_chain_step(
conventionality_output = await self.execute_prompt_chain_step(
step_name="conventionality_evaluation",
prompt_settings=step_prompt_settings,
evaluation_metadata=evaluation_metadata,
Expand Down
Loading