Warning
This python package is currently in beta and will likely change. It is not yet ready for production use.
In addition to supporting OpenAI API compatible agents, Sentient Chat supports a custom, open source event system for agent responses. These events can be rendered in Sentient Chat to provide a richer user experience. This particularly useful for streaming responses from an AI agent, when you might want to show the agent's work while the response is being generated, rather than having the user wait for the final response. This python package provides an agent framework that can be used to build agents that serve Sentient Chat events.
Examples of agents that use this framework/package can be found here.
pip install sentient-agent-frameworkA ResponseHandler is initialized with an agent's Identity and a Hook. A new ResponseHandler is created for every agent query:
from sentient_agent_framework import DefaultHook, DefaultResponseHandler, Identity
response_handler = DefaultResponseHandler(self._identity, DefaultHook(self._response_queue))Once initialized, the ResponseHandler is used to create events that are emitted using the Hook.
Text events are used to send single, complete messages to the client:
await response_handler.emit_text_block(
"PLAN", "Rephrasing user query..."
)JSON events are used to send JSON objects to the client:
await response_handler.emit_json(
"SOURCES", {"results": search_results["results"]}
)Error events are used to send error messages to the client:
await response_handler.emit_error(
"ERROR", {"message": "An error occurred"}
)At the end of a response, response_handler.complete() is called to signal the end of the response (this will emit a DoneEvent using the Hook):
await response_handler.complete()To stream a longer response one chunk at a time, use the response_handler.create_text_stream method. This returns a StreamEventEmitter that can be used to stream text to the client using the emit_chunk method:
final_response_stream = response_handler.create_text_stream(
"FINAL_RESPONSE"
)
for chunk in self.__process_search_results(search_results["results"]):
await final_response_stream.emit_chunk(chunk)At the end of the stream, final_response_stream.complete() is called to signal the end of the stream (this will emit a TextChunkEvent with is_complete=True):
await final_response_stream.complete()