diff --git a/backend/openedx_ai_extensions/processors/llm/llm_processor.py b/backend/openedx_ai_extensions/processors/llm/llm_processor.py index 628a901e..47362480 100644 --- a/backend/openedx_ai_extensions/processors/llm/llm_processor.py +++ b/backend/openedx_ai_extensions/processors/llm/llm_processor.py @@ -124,8 +124,8 @@ def _build_response_api_params(self, system_role=None): params = {} params["stream"] = self.stream + user_text = normalize_input_to_text(self.input_data) if self.chat_history: - user_text = normalize_input_to_text(self.input_data) if user_text: self.chat_history.append({"role": "user", "content": user_text}) params["input"] = self.chat_history @@ -135,6 +135,8 @@ def _build_response_api_params(self, system_role=None): {"role": "system", "content": self.custom_prompt or system_role}, {"role": "system", "content": self.context}, ] + if user_text: + params["input"].append({"role": "user", "content": user_text}) # Add optional parameters only if configured params.update(self.extra_params) diff --git a/backend/tests/integration/conftest.py b/backend/tests/integration/conftest.py index 7c75e8a4..18138390 100644 --- a/backend/tests/integration/conftest.py +++ b/backend/tests/integration/conftest.py @@ -26,6 +26,8 @@ _settings.SERVICE_VARIANT = "lms" +from openedx_ai_extensions.processors.llm.providers import \ + provider_supports # noqa: E402 pylint: disable=wrong-import-position from openedx_ai_extensions.workflows.models import ( # noqa: E402 pylint: disable=wrong-import-position AIWorkflowProfile, AIWorkflowScope, @@ -63,6 +65,24 @@ def skip_if_no_key(env_var: str) -> None: pytest.skip(f"{env_var} not set — skipping live LLM test") +def skip_unless_capability(capability: str): + """ + Skip the test unless a provider supporting *capability* (per + _PROVIDER_CAPABILITIES) has its API key set, so capability/test + coverage stays driven by providers/__init__.py rather than duplicated + provider names in test code. + """ + env_vars = [ + env_var for provider_slug, env_var in (p.values for p in PROVIDERS) + if provider_supports(provider_slug.removeprefix("test_"), capability) + ] + has_key = any(os.environ.get(env_var) for env_var in env_vars) + return pytest.mark.skipif( + not has_key, + reason=f"No API key set for a provider supporting {capability!r} ({', '.join(env_vars)})", + ) + + def create_profile_and_scope( # pylint: disable=redefined-outer-name provider_slug: str, course_key, diff --git a/backend/tests/integration/sample_content.py b/backend/tests/integration/sample_content.py index b56d11f4..104e65d2 100644 --- a/backend/tests/integration/sample_content.py +++ b/backend/tests/integration/sample_content.py @@ -34,3 +34,22 @@ "It emphasises code readability using significant indentation. " "Python supports multiple programming paradigms and has a large standard library." ) + +SHORT_CONTENT = "Python uses indentation to define code blocks." + +LONG_SYSTEM_CONTEXT = ( + "The history of computing spans several decades. " + "From vacuum tubes to transistors to integrated circuits, each era " + "brought dramatic improvements in speed, size, and cost. " + "ENIAC (1945) was the first general-purpose electronic computer, " + "weighing 30 tons and occupying an entire room. " + "The invention of the transistor in 1947 at Bell Labs was a watershed moment, " + "enabling miniaturisation that made personal computers possible. " + "Intel released the first commercial microprocessor, the 4004, in 1971. " + "The IBM PC in 1981 standardised the personal computer market. " + "Tim Berners-Lee invented the World Wide Web in 1989, transforming computing. " + "The rise of smartphones in the 2000s put computing in every pocket. " + "Cloud computing emerged in the 2010s, shifting workloads to remote data centres. " + "Today artificial intelligence, driven by GPUs and large language models, " + "represents the next major inflection point in the history of computing technology. " +) * 24 # Repeat to exceed Anthropic's cache minimum (4096 tokens for claude-haiku-4-5) diff --git a/backend/tests/integration/test_threading.py b/backend/tests/integration/test_threading.py new file mode 100644 index 00000000..e6e673b8 --- /dev/null +++ b/backend/tests/integration/test_threading.py @@ -0,0 +1,254 @@ +""" +Validates that stale / expired remote thread IDs are recovered from +without crashing, that the recovered conversation starts cleanly, that +multi-turn context persists across three turns, and that Anthropic +prompt caching fires (or at least does not crash) at various token sizes. + +Every test in this file uses a real AIWorkflowSession DB row (via +create_live_session) rather than a mock, so session.save() exercises the +actual persistence layer. +""" + +import pytest + +from openedx_ai_extensions.processors.llm.llm_processor import LLMProcessor + +from .conftest import PROVIDERS, create_live_session, skip_if_no_key, skip_unless_capability +from .sample_content import DUMMY_CONTENT, LONG_SYSTEM_CONTEXT, SHORT_CONTENT + +ALREADY_EXPIRED_THREAD_ID = ( + "resp_bGl0ZWxsbTpjdXN0b21fbGxtX3Byb3ZpZGVyOm9wZW5haTttb2RlbF9pZDpOb25lO3Jlc3BvbnNlX2lkOnJlc3BfMDI5MTVhYjk4Mjc4" + "ODVhMTAwNmEwZTNhMWQ1NjY0ODE5NWJmOTUyYWIxYTExYjE3ZmQ=" +) + +_OPENAI_CONFIG = { + "LLMProcessor": { + "provider": "test_openai", + "stream": False, + "function": "chat_with_context", + } +} + + +@pytest.mark.live_llm +@pytest.mark.django_db +@skip_unless_capability("server_side_thread_id") +def test_stale_thread_id_triggers_recovery(live_user, course_key): + """ + When session.remote_response_id points to a non-existent / expired + OpenAI thread, the processor must catch previous_response_not_found, + clear the stale ID, start a fresh thread, and return a valid response. + """ + session = create_live_session( + live_user, course_key, + remote_response_id=ALREADY_EXPIRED_THREAD_ID, + ) + + processor = LLMProcessor(config=_OPENAI_CONFIG, user_session=session) + result = processor.process( + context=DUMMY_CONTENT, + input_data="Hello, please introduce yourself briefly.", + ) + + assert result.get("status") == "success", f"Expected success after recovery, got: {result}" + assert result.get("response"), "Expected non-empty response after thread recovery" + + session.refresh_from_db() + assert session.remote_response_id != ALREADY_EXPIRED_THREAD_ID, ( + "remote_response_id was not updated after stale-thread recovery" + ) + + +@pytest.mark.live_llm +@pytest.mark.django_db +@skip_unless_capability("server_side_thread_id") +def test_conversation_clean_after_stale_thread_recovery(live_user, course_key): + """ + After stale-thread recovery, a second call on the same session must + succeed and recall a fact planted in turn 1 — proving the recovered + thread actually carries turn-1 context forward, not just that turn 2 + independently produces a plausible answer. The planted number is not in + DUMMY_CONTENT or inferable from general knowledge, so the model can only + recall it if turn 2 has real access to turn 1. Framed as a "lucky number" + rather than an ID/identifier to avoid PII-refusal false negatives. + """ + session = create_live_session( + live_user, course_key, + remote_response_id=ALREADY_EXPIRED_THREAD_ID, + ) + + # Turn 1 — recovery happens here + proc1 = LLMProcessor(config=_OPENAI_CONFIG, user_session=session) + result1 = proc1.process( + context=DUMMY_CONTENT, + input_data="My lucky number is 9142. Just say 'Got it'.", + ) + assert result1.get("response"), "Turn 1 must produce a response for this test to be meaningful" + + # Turn 2 — same session, recovered thread + session.refresh_from_db() + proc2 = LLMProcessor(config=_OPENAI_CONFIG, user_session=session) + result2 = proc2.process( + context=DUMMY_CONTENT, + input_data="What is my lucky number?", + ) + + assert result2.get("status") == "success", f"Turn 2 failed: {result2}" + response_text = result2.get("response") or "" + assert "9142" in response_text, ( + f"Expected '9142' in turn-2 response (recalled from turn 1), got: {response_text}" + ) + + +@pytest.mark.live_llm +@pytest.mark.django_db +@pytest.mark.parametrize("provider_slug,env_var", PROVIDERS) +def test_three_turn_context_chain(provider_slug, env_var, live_user, course_key): + """ + A fact planted in turn 1 must still be recalled in turn 3, even after + a neutral turn 2 that does not reference it. Multi-turn context retention + is a general guarantee of the processor — providers with server-side + threading (e.g. OpenAI) chain via previous_response_id, while others + (e.g. Anthropic) need the prior turns resent as chat_history — so this + runs against every configured provider rather than just OpenAI. + + LLMProcessor itself never auto-reconstructs chat_history (that's the + caller's job — see ThreadedLLMResponse.run); since this test calls + LLMProcessor directly, it threads chat_history between calls itself so + non-OpenAI providers actually receive turn 1/2 on later calls instead of + relying solely on previous_response_id (OpenAI-only). + """ + skip_if_no_key(env_var) + config = { + "LLMProcessor": { + "provider": provider_slug, + "stream": False, + "function": "chat_with_context", + } + } + + session = create_live_session(live_user, course_key) + chat_history = [] + + # Turn 0 — initialise the thread + r0 = LLMProcessor(config=config, user_session=session).process( + context=DUMMY_CONTENT, input_data="Start.", chat_history=chat_history + ) + chat_history.append({"role": "user", "content": "Start."}) + chat_history.append({"role": "assistant", "content": r0.get("response") or ""}) + session.refresh_from_db() + + # Turn 1 — plant memorable fact + proc1 = LLMProcessor(config=config, user_session=session) + r1 = proc1.process( + context=DUMMY_CONTENT, + input_data="My favourite colour is TURQUOISE. Just say 'Got it'.", + chat_history=chat_history, + ) + assert r1.get("response"), "Turn 1 must return a response" + chat_history.append({"role": "assistant", "content": r1.get("response") or ""}) + + # Turn 2 — neutral noise turn + session.refresh_from_db() + proc2 = LLMProcessor(config=config, user_session=session) + r2 = proc2.process( + context=DUMMY_CONTENT, + input_data="Tell me one thing about Python in one sentence.", + chat_history=chat_history, + ) + assert r2.get("response"), "Turn 2 must return a response" + chat_history.append({"role": "assistant", "content": r2.get("response") or ""}) + + # Turn 3 — recall the fact from turn 1 + session.refresh_from_db() + proc3 = LLMProcessor(config=config, user_session=session) + r3 = proc3.process( + context=DUMMY_CONTENT, + input_data="What is my favourite colour?", + chat_history=chat_history, + ) + + assert r3.get("status") == "success", f"Turn 3 failed: {r3}" + response_text = (r3.get("response") or "").lower() + assert "turquoise" in response_text, ( + f"Expected 'turquoise' in turn-3 response, got: {r3.get('response')}" + ) + + +@pytest.mark.live_llm +@pytest.mark.django_db +@skip_unless_capability("multi_turn_cache") +def test_anthropic_cache_hit_on_second_call(live_user, course_key): + """ + When the same large system context is sent twice to Anthropic, the + second call's usage should report cache_read_input_tokens > 0, + confirming the cache_control prefix written by the first call was + reused. claude-haiku-4-5's cache minimum is 4096 tokens; + LONG_SYSTEM_CONTEXT comfortably exceeds it. + + Uses a real AIWorkflowSession (like the other threading tests) rather + than a MagicMock, so any session.save() call this code path makes is + actually exercised instead of silently swallowed. + """ + config = { + "LLMProcessor": { + "provider": "test_anthropic", + "stream": False, + "function": "summarize_content", + } + } + + session = create_live_session(live_user, course_key) + + # First call — warms the cache + proc1 = LLMProcessor(config=config, user_session=session) + r1 = proc1.process(context=LONG_SYSTEM_CONTEXT, input_data="Summarize this in one sentence.") + assert r1.get("status") == "success", f"First call failed: {r1}" + + # Second call — should hit the cache + session.refresh_from_db() + proc2 = LLMProcessor(config=config, user_session=session) + r2 = proc2.process(context=LONG_SYSTEM_CONTEXT, input_data="Summarize this in one sentence.") + assert r2.get("status") == "success", f"Second call failed: {r2}" + + usage = proc2.get_usage() + assert usage is not None, "Expected usage to be populated on second call" + cache_tokens = getattr(usage, "cache_read_input_tokens", 0) or 0 + assert cache_tokens > 0, ( + f"Expected cache_read_input_tokens > 0 on second call. usage={usage}" + ) + + +@pytest.mark.live_llm +@pytest.mark.django_db +@pytest.mark.parametrize("provider_slug,env_var", PROVIDERS) +def test_cache_short_prompt_no_crash(provider_slug, env_var, live_user, course_key): + """ + Setting cache=True must never crash, regardless of whether the provider + actually supports a caching feature. Providers without "multi_turn_cache" + in _PROVIDER_CAPABILITIES (e.g. OpenAI) should just ignore the flag; + Anthropic silently ignores cache_control for prompts below the model's + minimum (4096 tokens for claude-haiku-4-5) too. Either way, a valid + response is returned with no error, even if no cache tokens are reported. + + Uses a real AIWorkflowSession so this exercises the same persistence + path as every other test in this file, regardless of provider. + """ + skip_if_no_key(env_var) + config = { + "LLMProcessor": { + "provider": provider_slug, + "stream": False, + "function": "summarize_content", + "cache": True, + } + } + + session = create_live_session(live_user, course_key) + processor = LLMProcessor(config=config, user_session=session) + result = processor.process(context=SHORT_CONTENT, input_data="Summarize this.") + + assert result.get("status") == "success", ( + f"Short-prompt cache call failed: {result}" + ) + assert result.get("response"), "Expected non-empty response"