Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ def _build_response_api_params(self, system_role=None):
params = {}
params["stream"] = self.stream

user_text = normalize_input_to_text(self.input_data)
if self.chat_history:
user_text = normalize_input_to_text(self.input_data)
if user_text:
self.chat_history.append({"role": "user", "content": user_text})
params["input"] = self.chat_history
Expand All @@ -135,6 +135,8 @@ def _build_response_api_params(self, system_role=None):
{"role": "system", "content": self.custom_prompt or system_role},
{"role": "system", "content": self.context},
]
if user_text:
params["input"].append({"role": "user", "content": user_text})

# Add optional parameters only if configured
params.update(self.extra_params)
Expand Down
20 changes: 20 additions & 0 deletions backend/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
_settings.SERVICE_VARIANT = "lms"


from openedx_ai_extensions.processors.llm.providers import \
provider_supports # noqa: E402 pylint: disable=wrong-import-position
from openedx_ai_extensions.workflows.models import ( # noqa: E402 pylint: disable=wrong-import-position
AIWorkflowProfile,
AIWorkflowScope,
Expand Down Expand Up @@ -63,6 +65,24 @@ def skip_if_no_key(env_var: str) -> None:
pytest.skip(f"{env_var} not set — skipping live LLM test")


def skip_unless_capability(capability: str):
"""
Skip the test unless a provider supporting *capability* (per
_PROVIDER_CAPABILITIES) has its API key set, so capability/test
coverage stays driven by providers/__init__.py rather than duplicated
provider names in test code.
"""
env_vars = [
env_var for provider_slug, env_var in (p.values for p in PROVIDERS)
if provider_supports(provider_slug.removeprefix("test_"), capability)
]
has_key = any(os.environ.get(env_var) for env_var in env_vars)
return pytest.mark.skipif(
not has_key,
reason=f"No API key set for a provider supporting {capability!r} ({', '.join(env_vars)})",
)


def create_profile_and_scope( # pylint: disable=redefined-outer-name
provider_slug: str,
course_key,
Expand Down
19 changes: 19 additions & 0 deletions backend/tests/integration/sample_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,22 @@
"It emphasises code readability using significant indentation. "
"Python supports multiple programming paradigms and has a large standard library."
)

SHORT_CONTENT = "Python uses indentation to define code blocks."

LONG_SYSTEM_CONTEXT = (
"The history of computing spans several decades. "
"From vacuum tubes to transistors to integrated circuits, each era "
"brought dramatic improvements in speed, size, and cost. "
"ENIAC (1945) was the first general-purpose electronic computer, "
"weighing 30 tons and occupying an entire room. "
"The invention of the transistor in 1947 at Bell Labs was a watershed moment, "
"enabling miniaturisation that made personal computers possible. "
"Intel released the first commercial microprocessor, the 4004, in 1971. "
"The IBM PC in 1981 standardised the personal computer market. "
"Tim Berners-Lee invented the World Wide Web in 1989, transforming computing. "
"The rise of smartphones in the 2000s put computing in every pocket. "
"Cloud computing emerged in the 2010s, shifting workloads to remote data centres. "
"Today artificial intelligence, driven by GPUs and large language models, "
"represents the next major inflection point in the history of computing technology. "
) * 24 # Repeat to exceed Anthropic's cache minimum (4096 tokens for claude-haiku-4-5)
254 changes: 254 additions & 0 deletions backend/tests/integration/test_threading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
"""
Validates that stale / expired remote thread IDs are recovered from
without crashing, that the recovered conversation starts cleanly, that
multi-turn context persists across three turns, and that Anthropic
prompt caching fires (or at least does not crash) at various token sizes.

Every test in this file uses a real AIWorkflowSession DB row (via
create_live_session) rather than a mock, so session.save() exercises the
actual persistence layer.
"""

import pytest

from openedx_ai_extensions.processors.llm.llm_processor import LLMProcessor

from .conftest import PROVIDERS, create_live_session, skip_if_no_key, skip_unless_capability
from .sample_content import DUMMY_CONTENT, LONG_SYSTEM_CONTEXT, SHORT_CONTENT

ALREADY_EXPIRED_THREAD_ID = (
"resp_bGl0ZWxsbTpjdXN0b21fbGxtX3Byb3ZpZGVyOm9wZW5haTttb2RlbF9pZDpOb25lO3Jlc3BvbnNlX2lkOnJlc3BfMDI5MTVhYjk4Mjc4"
"ODVhMTAwNmEwZTNhMWQ1NjY0ODE5NWJmOTUyYWIxYTExYjE3ZmQ="
)

_OPENAI_CONFIG = {
"LLMProcessor": {
"provider": "test_openai",
"stream": False,
"function": "chat_with_context",
}
}


@pytest.mark.live_llm
@pytest.mark.django_db
@skip_unless_capability("server_side_thread_id")
def test_stale_thread_id_triggers_recovery(live_user, course_key):
"""
When session.remote_response_id points to a non-existent / expired
OpenAI thread, the processor must catch previous_response_not_found,
clear the stale ID, start a fresh thread, and return a valid response.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we start a completely fresh thread or prehaps re-load the conversation we already have in submissions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may leave this as a fresh thread and left an issue to solve apart with that new feature based on the fact of thread ids has expiration

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is doable. But please create an issue to keep track of that.

"""
session = create_live_session(
live_user, course_key,
remote_response_id=ALREADY_EXPIRED_THREAD_ID,
)

processor = LLMProcessor(config=_OPENAI_CONFIG, user_session=session)
result = processor.process(
context=DUMMY_CONTENT,
input_data="Hello, please introduce yourself briefly.",
)

assert result.get("status") == "success", f"Expected success after recovery, got: {result}"
assert result.get("response"), "Expected non-empty response after thread recovery"

session.refresh_from_db()
assert session.remote_response_id != ALREADY_EXPIRED_THREAD_ID, (
"remote_response_id was not updated after stale-thread recovery"
)


@pytest.mark.live_llm
@pytest.mark.django_db
@skip_unless_capability("server_side_thread_id")
def test_conversation_clean_after_stale_thread_recovery(live_user, course_key):
"""
After stale-thread recovery, a second call on the same session must
succeed and recall a fact planted in turn 1 — proving the recovered
thread actually carries turn-1 context forward, not just that turn 2
independently produces a plausible answer. The planted number is not in
DUMMY_CONTENT or inferable from general knowledge, so the model can only
recall it if turn 2 has real access to turn 1. Framed as a "lucky number"
rather than an ID/identifier to avoid PII-refusal false negatives.
"""
session = create_live_session(
live_user, course_key,
remote_response_id=ALREADY_EXPIRED_THREAD_ID,
)

# Turn 1 — recovery happens here
proc1 = LLMProcessor(config=_OPENAI_CONFIG, user_session=session)
result1 = proc1.process(
context=DUMMY_CONTENT,
input_data="My lucky number is 9142. Just say 'Got it'.",
)
assert result1.get("response"), "Turn 1 must produce a response for this test to be meaningful"

# Turn 2 — same session, recovered thread
session.refresh_from_db()
proc2 = LLMProcessor(config=_OPENAI_CONFIG, user_session=session)
result2 = proc2.process(
context=DUMMY_CONTENT,
input_data="What is my lucky number?",
)

assert result2.get("status") == "success", f"Turn 2 failed: {result2}"
response_text = result2.get("response") or ""
assert "9142" in response_text, (
f"Expected '9142' in turn-2 response (recalled from turn 1), got: {response_text}"
)


@pytest.mark.live_llm
@pytest.mark.django_db
@pytest.mark.parametrize("provider_slug,env_var", PROVIDERS)
def test_three_turn_context_chain(provider_slug, env_var, live_user, course_key):
"""
A fact planted in turn 1 must still be recalled in turn 3, even after
a neutral turn 2 that does not reference it. Multi-turn context retention
is a general guarantee of the processor — providers with server-side
threading (e.g. OpenAI) chain via previous_response_id, while others
(e.g. Anthropic) need the prior turns resent as chat_history — so this
runs against every configured provider rather than just OpenAI.

LLMProcessor itself never auto-reconstructs chat_history (that's the
caller's job — see ThreadedLLMResponse.run); since this test calls
LLMProcessor directly, it threads chat_history between calls itself so
non-OpenAI providers actually receive turn 1/2 on later calls instead of
relying solely on previous_response_id (OpenAI-only).
"""
skip_if_no_key(env_var)
config = {
"LLMProcessor": {
"provider": provider_slug,
"stream": False,
"function": "chat_with_context",
}
}

session = create_live_session(live_user, course_key)
chat_history = []

# Turn 0 — initialise the thread
r0 = LLMProcessor(config=config, user_session=session).process(
context=DUMMY_CONTENT, input_data="Start.", chat_history=chat_history
)
chat_history.append({"role": "user", "content": "Start."})
chat_history.append({"role": "assistant", "content": r0.get("response") or ""})
session.refresh_from_db()

# Turn 1 — plant memorable fact
proc1 = LLMProcessor(config=config, user_session=session)
r1 = proc1.process(
context=DUMMY_CONTENT,
input_data="My favourite colour is TURQUOISE. Just say 'Got it'.",
chat_history=chat_history,
)
assert r1.get("response"), "Turn 1 must return a response"
chat_history.append({"role": "assistant", "content": r1.get("response") or ""})

# Turn 2 — neutral noise turn
session.refresh_from_db()
proc2 = LLMProcessor(config=config, user_session=session)
r2 = proc2.process(
context=DUMMY_CONTENT,
input_data="Tell me one thing about Python in one sentence.",
chat_history=chat_history,
)
assert r2.get("response"), "Turn 2 must return a response"
chat_history.append({"role": "assistant", "content": r2.get("response") or ""})

# Turn 3 — recall the fact from turn 1
session.refresh_from_db()
proc3 = LLMProcessor(config=config, user_session=session)
r3 = proc3.process(
context=DUMMY_CONTENT,
input_data="What is my favourite colour?",
chat_history=chat_history,
)

assert r3.get("status") == "success", f"Turn 3 failed: {r3}"
response_text = (r3.get("response") or "").lower()
assert "turquoise" in response_text, (
f"Expected 'turquoise' in turn-3 response, got: {r3.get('response')}"
)


@pytest.mark.live_llm
@pytest.mark.django_db
@skip_unless_capability("multi_turn_cache")
def test_anthropic_cache_hit_on_second_call(live_user, course_key):
"""
When the same large system context is sent twice to Anthropic, the
second call's usage should report cache_read_input_tokens > 0,
confirming the cache_control prefix written by the first call was
reused. claude-haiku-4-5's cache minimum is 4096 tokens;
LONG_SYSTEM_CONTEXT comfortably exceeds it.

Uses a real AIWorkflowSession (like the other threading tests) rather
than a MagicMock, so any session.save() call this code path makes is
actually exercised instead of silently swallowed.
"""
config = {
"LLMProcessor": {
"provider": "test_anthropic",
"stream": False,
"function": "summarize_content",
}
}

session = create_live_session(live_user, course_key)

# First call — warms the cache
proc1 = LLMProcessor(config=config, user_session=session)
r1 = proc1.process(context=LONG_SYSTEM_CONTEXT, input_data="Summarize this in one sentence.")
assert r1.get("status") == "success", f"First call failed: {r1}"

# Second call — should hit the cache
session.refresh_from_db()
proc2 = LLMProcessor(config=config, user_session=session)
r2 = proc2.process(context=LONG_SYSTEM_CONTEXT, input_data="Summarize this in one sentence.")
assert r2.get("status") == "success", f"Second call failed: {r2}"

usage = proc2.get_usage()
Comment thread
Henrrypg marked this conversation as resolved.
assert usage is not None, "Expected usage to be populated on second call"
cache_tokens = getattr(usage, "cache_read_input_tokens", 0) or 0
assert cache_tokens > 0, (
f"Expected cache_read_input_tokens > 0 on second call. usage={usage}"
)


@pytest.mark.live_llm
@pytest.mark.django_db
@pytest.mark.parametrize("provider_slug,env_var", PROVIDERS)
def test_cache_short_prompt_no_crash(provider_slug, env_var, live_user, course_key):
"""
Setting cache=True must never crash, regardless of whether the provider
actually supports a caching feature. Providers without "multi_turn_cache"
in _PROVIDER_CAPABILITIES (e.g. OpenAI) should just ignore the flag;
Anthropic silently ignores cache_control for prompts below the model's
minimum (4096 tokens for claude-haiku-4-5) too. Either way, a valid
response is returned with no error, even if no cache tokens are reported.

Uses a real AIWorkflowSession so this exercises the same persistence
path as every other test in this file, regardless of provider.
"""
skip_if_no_key(env_var)
config = {
"LLMProcessor": {
"provider": provider_slug,
"stream": False,
"function": "summarize_content",
"cache": True,
}
}

session = create_live_session(live_user, course_key)
processor = LLMProcessor(config=config, user_session=session)
result = processor.process(context=SHORT_CONTENT, input_data="Summarize this.")

assert result.get("status") == "success", (
f"Short-prompt cache call failed: {result}"
)
assert result.get("response"), "Expected non-empty response"
Loading