From 1fe8ccd2f1c3cb93bfb8e116503988848ce31e82 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 3 Jun 2026 10:56:52 -0700 Subject: [PATCH 1/6] Call _prepare_outputs from the on_chain_start method, rename _prepare_outputs to _prepare_lc_payloads Signed-off-by: David Gardner --- .../integrations/langchain/_serialization.py | 67 ++++++++++--------- .../integrations/langchain/callbacks.py | 25 ++++--- .../integrations/langgraph/callbacks.py | 4 +- 3 files changed, 53 insertions(+), 43 deletions(-) diff --git a/python/nemo_relay/integrations/langchain/_serialization.py b/python/nemo_relay/integrations/langchain/_serialization.py index 56a51712..118307b7 100644 --- a/python/nemo_relay/integrations/langchain/_serialization.py +++ b/python/nemo_relay/integrations/langchain/_serialization.py @@ -304,50 +304,55 @@ def model_response_from_json(payload: Any, codec: Any) -> ModelResponse[Any]: raise TypeError(f"NeMo Relay model execution returned {type(decoded)!r}, expected ModelResponse") -def _prepare_outputs(outputs: Any) -> Any: - """Prepare a NeMo Relay scope output dict for returning to LangChain.""" - if isinstance(outputs, dict): - prepared_outputs = {} - for key, value in outputs.items(): - prepared_outputs[key] = _prepare_outputs(value) - elif isinstance(outputs, list | tuple): - prepared_outputs = [] - for value in outputs: - prepared_outputs.append(_prepare_outputs(value)) - elif isinstance(outputs, Command): - prepared_outputs = { +def _prepare_lc_payloads(pyaload: Any) -> Any: + """ + Convert a LangChain payload to a JSON-serializable structure + + Typically the entry point to this method is a LangChain dictionary containing LC message objects, and the returned + dictionary should contain the same structure, but the values are JSON serializable representations + """ + if isinstance(pyaload, dict): + prepared = {} + for key, value in pyaload.items(): + prepared[key] = _prepare_lc_payloads(value) + elif isinstance(pyaload, list | tuple): + prepared = [] + for value in pyaload: + prepared.append(_prepare_lc_payloads(value)) + elif isinstance(pyaload, Command): + prepared = { "type": "command", "command": { - "graph": _prepare_outputs(outputs.graph), - "update": _prepare_outputs(outputs.update), - "resume": _prepare_outputs(outputs.resume), - "goto": _prepare_outputs(outputs.goto), + "graph": _prepare_lc_payloads(pyaload.graph), + "update": _prepare_lc_payloads(pyaload.update), + "resume": _prepare_lc_payloads(pyaload.resume), + "goto": _prepare_lc_payloads(pyaload.goto), }, } - elif isinstance(outputs, Send): - prepared_outputs = { + elif isinstance(pyaload, Send): + prepared = { "type": "send", "send": { - "node": outputs.node, - "arg": _prepare_outputs(outputs.arg), + "node": pyaload.node, + "arg": _prepare_lc_payloads(pyaload.arg), }, } - elif isinstance(outputs, ToolMessage): - prepared_outputs = { + elif isinstance(pyaload, ToolMessage): + prepared = { "type": "tool_message", "tool_call": { - "name": outputs.name, - "id": outputs.id, - "tool_call_id": outputs.tool_call_id, - "content": outputs.content, + "name": pyaload.name, + "id": pyaload.id, + "tool_call_id": pyaload.tool_call_id, + "content": pyaload.content, }, } - elif isinstance(outputs, BaseMessage): - prepared_outputs = { + elif isinstance(pyaload, BaseMessage): + prepared = { "type": "message", - "message": messages_to_dict([outputs]), + "message": messages_to_dict([pyaload]), } else: - prepared_outputs = outputs + prepared = pyaload - return prepared_outputs + return prepared diff --git a/python/nemo_relay/integrations/langchain/callbacks.py b/python/nemo_relay/integrations/langchain/callbacks.py index d4780762..56e933da 100644 --- a/python/nemo_relay/integrations/langchain/callbacks.py +++ b/python/nemo_relay/integrations/langchain/callbacks.py @@ -11,7 +11,7 @@ from langchain_core.callbacks.base import BaseCallbackHandler import nemo_relay -from nemo_relay.integrations.langchain._serialization import _prepare_outputs +from nemo_relay.integrations.langchain._serialization import _prepare_lc_payloads if typing.TYPE_CHECKING: from uuid import UUID @@ -54,21 +54,23 @@ def on_chain_start( if name is None: name = "Unknown" - parent = self._scope_handles.get(parent_run_id) if parent_run_id else None + parent = None + if parent_run_id is not None: + parent = self._scope_handles.get(parent_run_id) scope_metadata = metadata.copy() if metadata else {} scope_metadata["langchain_run_id"] = str(run_id) + prepared_inputs = _prepare_lc_payloads(inputs) handle = nemo_relay.scope.push( name, nemo_relay.ScopeType.Agent, handle=parent, - input=inputs, + input=prepared_inputs, metadata=scope_metadata, ) self._scope_handles[run_id] = handle except Exception: - _logger.debug("NeMo Relay: on_chain_start failed", exc_info=True) - return None + _logger.error("NeMo Relay: on_chain_start failed", exc_info=True) def on_chain_end( self, @@ -80,7 +82,6 @@ def on_chain_end( ) -> typing.Any: """Pop the NeMo Relay scope associated with a LangChain chain run.""" self._pop_scope(run_id, output=outputs) - return None def on_chain_error( self, @@ -92,14 +93,18 @@ def on_chain_error( ) -> typing.Any: """Pop the NeMo Relay scope associated with a failed LangChain chain run.""" self._pop_scope(run_id, output={"error": repr(error)}) - return None - def _pop_scope(self, run_id: UUID, *, output: dict[str, typing.Any] | None = None) -> None: + def _pop_scope(self, + run_id: UUID, + *, + output: dict[str, typing.Any] | None = None) -> None: handle = self._scope_handles.pop(run_id, None) if handle is None: return + try: - prepared_outputs = _prepare_outputs(output) if output is not None else None + prepared_outputs = _prepare_lc_payloads( + output) if output is not None else None nemo_relay.scope.pop(handle, output=prepared_outputs) except Exception: - _logger.warning("NeMo Relay: scope.pop failed", exc_info=True) + _logger.error("NeMo Relay: scope.pop failed", exc_info=True) diff --git a/python/nemo_relay/integrations/langgraph/callbacks.py b/python/nemo_relay/integrations/langgraph/callbacks.py index 9429b8ab..dfc320d6 100644 --- a/python/nemo_relay/integrations/langgraph/callbacks.py +++ b/python/nemo_relay/integrations/langgraph/callbacks.py @@ -11,7 +11,7 @@ from langgraph.callbacks import GraphCallbackHandler, GraphInterruptEvent, GraphResumeEvent import nemo_relay -from nemo_relay.integrations.langchain._serialization import _prepare_outputs +from nemo_relay.integrations.langchain._serialization import _prepare_lc_payloads from nemo_relay.integrations.langchain.callbacks import NemoRelayCallbackHandler as LangChainNemoRelayCallbackHandler _logger = logging.getLogger(__name__) @@ -20,7 +20,7 @@ def _json_safe(value: Any) -> nemo_relay.Json: """Return a conservative JSON-compatible representation for mark payloads.""" try: - value = _prepare_outputs(value) + value = _prepare_lc_payloads(value) except Exception: pass From 718750cccfb0dc9bdd07607eeef0e9da812cb6d4 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 3 Jun 2026 13:05:05 -0700 Subject: [PATCH 2/6] Call subscribers.flush() from the plugin context manager, avoids what appears to be a gil deadlock when calling clear() Signed-off-by: David Gardner --- python/nemo_relay/plugin.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/nemo_relay/plugin.py b/python/nemo_relay/plugin.py index 8568addd..c0cc0ecf 100644 --- a/python/nemo_relay/plugin.py +++ b/python/nemo_relay/plugin.py @@ -15,6 +15,7 @@ from typing import TYPE_CHECKING, AsyncIterator, Callable, Literal, Protocol, TypedDict, cast from nemo_relay import ( + subscribers, Json, JsonObject, LlmConditionalExecutionGuardrail, @@ -348,6 +349,7 @@ async def plugin(config: PluginConfig | JsonObject) -> AsyncIterator[ConfigRepor try: yield report finally: + subscribers.flush() clear() From bb915e1ce3dd962424c0582832146268fe43c3b6 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 3 Jun 2026 13:09:00 -0700 Subject: [PATCH 3/6] Formatting fixes Signed-off-by: David Gardner --- python/nemo_relay/integrations/langchain/callbacks.py | 8 ++------ python/nemo_relay/plugin.py | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/python/nemo_relay/integrations/langchain/callbacks.py b/python/nemo_relay/integrations/langchain/callbacks.py index 56e933da..dd8f3d64 100644 --- a/python/nemo_relay/integrations/langchain/callbacks.py +++ b/python/nemo_relay/integrations/langchain/callbacks.py @@ -94,17 +94,13 @@ def on_chain_error( """Pop the NeMo Relay scope associated with a failed LangChain chain run.""" self._pop_scope(run_id, output={"error": repr(error)}) - def _pop_scope(self, - run_id: UUID, - *, - output: dict[str, typing.Any] | None = None) -> None: + def _pop_scope(self, run_id: UUID, *, output: dict[str, typing.Any] | None = None) -> None: handle = self._scope_handles.pop(run_id, None) if handle is None: return try: - prepared_outputs = _prepare_lc_payloads( - output) if output is not None else None + prepared_outputs = _prepare_lc_payloads(output) if output is not None else None nemo_relay.scope.pop(handle, output=prepared_outputs) except Exception: _logger.error("NeMo Relay: scope.pop failed", exc_info=True) diff --git a/python/nemo_relay/plugin.py b/python/nemo_relay/plugin.py index c0cc0ecf..3f35d2a4 100644 --- a/python/nemo_relay/plugin.py +++ b/python/nemo_relay/plugin.py @@ -15,7 +15,6 @@ from typing import TYPE_CHECKING, AsyncIterator, Callable, Literal, Protocol, TypedDict, cast from nemo_relay import ( - subscribers, Json, JsonObject, LlmConditionalExecutionGuardrail, @@ -29,6 +28,7 @@ ToolRequestIntercept, ToolSanitizeGuardrail, UnsupportedBehavior, + subscribers, ) from nemo_relay._native import ( active_plugin_report as _active_plugin_report, From a6cadfef6cf2af24edf4cc5cda28012d700c8d8a Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 3 Jun 2026 13:49:13 -0700 Subject: [PATCH 4/6] Optionally avoid calling clear on exit Signed-off-by: David Gardner --- python/nemo_relay/plugin.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/nemo_relay/plugin.py b/python/nemo_relay/plugin.py index 3f35d2a4..51989fef 100644 --- a/python/nemo_relay/plugin.py +++ b/python/nemo_relay/plugin.py @@ -333,11 +333,12 @@ def clear() -> None: @asynccontextmanager -async def plugin(config: PluginConfig | JsonObject) -> AsyncIterator[ConfigReport]: +async def plugin(config: PluginConfig | JsonObject, clear_on_exit: bool = True) -> AsyncIterator[ConfigReport]: """Context manager for plugin initialization and cleanup. Args: config: `PluginConfig` or an equivalent JSON object. + clear_on_exit: Whether to clear the plugin configuration on exit. Yields: The `ConfigReport` for the initialized configuration. @@ -350,7 +351,8 @@ async def plugin(config: PluginConfig | JsonObject) -> AsyncIterator[ConfigRepor yield report finally: subscribers.flush() - clear() + if clear_on_exit: + clear() def report() -> ConfigReport | None: From 79f3745c1c7ef93f99903fb6dcf34ef3bfb7ef80 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 3 Jun 2026 14:03:28 -0700 Subject: [PATCH 5/6] Fix spelling error Signed-off-by: David Gardner --- .../integrations/langchain/_serialization.py | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/python/nemo_relay/integrations/langchain/_serialization.py b/python/nemo_relay/integrations/langchain/_serialization.py index 118307b7..9ab3501d 100644 --- a/python/nemo_relay/integrations/langchain/_serialization.py +++ b/python/nemo_relay/integrations/langchain/_serialization.py @@ -304,55 +304,55 @@ def model_response_from_json(payload: Any, codec: Any) -> ModelResponse[Any]: raise TypeError(f"NeMo Relay model execution returned {type(decoded)!r}, expected ModelResponse") -def _prepare_lc_payloads(pyaload: Any) -> Any: +def _prepare_lc_payloads(payload: Any) -> Any: """ Convert a LangChain payload to a JSON-serializable structure Typically the entry point to this method is a LangChain dictionary containing LC message objects, and the returned dictionary should contain the same structure, but the values are JSON serializable representations """ - if isinstance(pyaload, dict): + if isinstance(payload, dict): prepared = {} - for key, value in pyaload.items(): + for key, value in payload.items(): prepared[key] = _prepare_lc_payloads(value) - elif isinstance(pyaload, list | tuple): + elif isinstance(payload, list | tuple): prepared = [] - for value in pyaload: + for value in payload: prepared.append(_prepare_lc_payloads(value)) - elif isinstance(pyaload, Command): + elif isinstance(payload, Command): prepared = { "type": "command", "command": { - "graph": _prepare_lc_payloads(pyaload.graph), - "update": _prepare_lc_payloads(pyaload.update), - "resume": _prepare_lc_payloads(pyaload.resume), - "goto": _prepare_lc_payloads(pyaload.goto), + "graph": _prepare_lc_payloads(payload.graph), + "update": _prepare_lc_payloads(payload.update), + "resume": _prepare_lc_payloads(payload.resume), + "goto": _prepare_lc_payloads(payload.goto), }, } - elif isinstance(pyaload, Send): + elif isinstance(payload, Send): prepared = { "type": "send", "send": { - "node": pyaload.node, - "arg": _prepare_lc_payloads(pyaload.arg), + "node": payload.node, + "arg": _prepare_lc_payloads(payload.arg), }, } - elif isinstance(pyaload, ToolMessage): + elif isinstance(payload, ToolMessage): prepared = { "type": "tool_message", "tool_call": { - "name": pyaload.name, - "id": pyaload.id, - "tool_call_id": pyaload.tool_call_id, - "content": pyaload.content, + "name": payload.name, + "id": payload.id, + "tool_call_id": payload.tool_call_id, + "content": payload.content, }, } - elif isinstance(pyaload, BaseMessage): + elif isinstance(payload, BaseMessage): prepared = { "type": "message", - "message": messages_to_dict([pyaload]), + "message": messages_to_dict([payload]), } else: - prepared = pyaload + prepared = payload return prepared From 37ccf0531a6919f0d33d48b0d635f2e6384b6057 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 3 Jun 2026 14:05:38 -0700 Subject: [PATCH 6/6] Rename variable 'report' to not clash with function of the same name, make clear_on_exit a keyword only argument Signed-off-by: David Gardner --- python/nemo_relay/plugin.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/nemo_relay/plugin.py b/python/nemo_relay/plugin.py index 51989fef..7e7b2f0f 100644 --- a/python/nemo_relay/plugin.py +++ b/python/nemo_relay/plugin.py @@ -333,7 +333,7 @@ def clear() -> None: @asynccontextmanager -async def plugin(config: PluginConfig | JsonObject, clear_on_exit: bool = True) -> AsyncIterator[ConfigReport]: +async def plugin(config: PluginConfig | JsonObject, *, clear_on_exit: bool = True) -> AsyncIterator[ConfigReport]: """Context manager for plugin initialization and cleanup. Args: @@ -346,9 +346,9 @@ async def plugin(config: PluginConfig | JsonObject, clear_on_exit: bool = True) Behavior: This context manager initializes the plugin configuration on entry and clears it on exit. """ - report = await initialize(config) + report_ = await initialize(config) try: - yield report + yield report_ finally: subscribers.flush() if clear_on_exit: