Skip to content
Merged
13 changes: 10 additions & 3 deletions backend/apps/agent_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,22 @@

# Define API route
@agent_runtime_router.post("/run")
async def agent_run_api(agent_request: AgentRequest, http_request: Request, authorization: str = Header(None)):
async def agent_run_api(
agent_request: AgentRequest,
http_request: Request,
authorization: str = Header(None),

Check warning on line 58 in backend/apps/agent_app.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "Annotated" type hints for FastAPI dependency injection

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ8S7jGXTAi7HPSCwNL7&open=AZ8S7jGXTAi7HPSCwNL7&pullRequest=3331
resume: bool = Query(False, description="Resume an existing streaming conversation"),

Check warning on line 59 in backend/apps/agent_app.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "Annotated" type hints for FastAPI dependency injection

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ8S7jGXTAi7HPSCwNL8&open=AZ8S7jGXTAi7HPSCwNL8&pullRequest=3331
):
"""
Agent execution API endpoint
Agent execution API endpoint.
If resume=true, attempts to continue streaming from where it left off after a tab switch.
"""
try:
return await run_agent_stream(
agent_request=agent_request,
http_request=http_request,
authorization=authorization
authorization=authorization,
resume=resume,
)
except Exception as e:
logger.error(f"Agent run error: {str(e)}")
Expand Down
3 changes: 3 additions & 0 deletions backend/consts/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,6 @@ def _parse_otlp_headers(headers_str: str) -> dict:
"tool",
"execution_logs",
])

# SSE streaming event type for status messages
STREAM_STATUS_EVENT = "event: stream_status\n"
237 changes: 234 additions & 3 deletions backend/database/conversation_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def create_conversation(conversation_title: str, user_id: Optional[str] = None)
return result_dict


def create_conversation_message(message_data: Dict[str, Any], user_id: Optional[str] = None) -> int:
def create_conversation_message(message_data: Dict[str, Any], user_id: Optional[str] = None,
status: str = 'completed') -> int:
"""
Create a conversation message record

Expand All @@ -102,6 +103,7 @@ def create_conversation_message(message_data: Dict[str, Any], user_id: Optional[
- content: Message content
- minio_files: JSON string of attachment information
user_id: Reserved parameter for created_by and updated_by fields
status: Lifecycle status (pending / streaming / completed / failed / stopped)

Returns:
int: Newly created message ID (auto-increment ID)
Expand All @@ -121,7 +123,7 @@ def create_conversation_message(message_data: Dict[str, Any], user_id: Optional[
# Prepare data dictionary
data = {"conversation_id": conversation_id, "message_index": message_idx, "message_role": message_data['role'],
"message_content": message_data['content'], "minio_files": minio_files, "opinion_flag": None,
"delete_flag": 'N'}
"delete_flag": 'N', "status": status}
if user_id:
data = add_creation_tracking(data, user_id)

Expand Down Expand Up @@ -184,6 +186,153 @@ def create_message_units(message_units: List[Dict[str, Any]], message_id: int, c
return unit_ids


def create_message_unit(message_id: int, conversation_id: int, unit_index: int,
unit_type: str, unit_content: str,
user_id: Optional[str] = None,
unit_status: str = 'completed') -> int:
"""
Insert a single ConversationMessageUnit row.

Args:
message_id: Message ID (integer)
conversation_id: Conversation ID (integer)
unit_index: Sequence number for frontend display sorting
unit_type: Type of the unit (e.g. "model_output_code", "final_answer")
unit_content: Complete content of the unit
user_id: Reserved parameter for created_by and updated_by fields
unit_status: Lifecycle status (streaming / completed)

Returns:
int: Newly created unit ID (auto-increment ID)
"""
with get_db_session() as session:
message_id = int(message_id)
conversation_id = int(conversation_id)
unit_index = int(unit_index)

row_data = {
"message_id": message_id,
"conversation_id": conversation_id,
"unit_index": unit_index,
"unit_type": unit_type,
"unit_content": unit_content,
"unit_status": unit_status,
"delete_flag": 'N',
}
if user_id:
row_data["created_by"] = user_id
row_data["updated_by"] = user_id

stmt = insert(ConversationMessageUnit).values(
**row_data).returning(ConversationMessageUnit.unit_id)
result = session.execute(stmt)
return result.scalar_one()


def update_conversation_message_status(message_id: int, status: str,
user_id: Optional[str] = None) -> None:
"""
Update the lifecycle status of a conversation message.

Args:
message_id: Message ID (integer)
status: New status (pending / streaming / completed / failed / stopped)
user_id: Reserved parameter for updated_by field
"""
with get_db_session() as session:
message_id = int(message_id)
update_data = {
"status": status,
"update_time": func.current_timestamp(),
}
if user_id:
update_data = add_update_tracking(update_data, user_id)
session.execute(
update(ConversationMessage)
.where(ConversationMessage.message_id == message_id,
ConversationMessage.delete_flag == 'N')
.values(update_data)
)


def update_conversation_message_content(message_id: int, content: str,
user_id: Optional[str] = None) -> None:
"""
Update the message_content field of a conversation message.

Args:
message_id: Message ID (integer)
content: New content text
user_id: Reserved parameter for updated_by field
"""
with get_db_session() as session:
message_id = int(message_id)
update_data = {
"message_content": content,
"update_time": func.current_timestamp(),
}
if user_id:
update_data = add_update_tracking(update_data, user_id)
session.execute(
update(ConversationMessage)
.where(ConversationMessage.message_id == message_id,
ConversationMessage.delete_flag == 'N')
.values(update_data)
)


def update_message_unit_status(unit_id: int, status: str,
user_id: Optional[str] = None) -> None:
"""
Update the unit_status field of a message unit.

Args:
unit_id: Unit ID (integer)
status: New status (streaming / completed)
user_id: Reserved parameter for updated_by field
"""
with get_db_session() as session:
unit_id = int(unit_id)
update_data = {
"unit_status": status,
"update_time": func.current_timestamp(),
}
if user_id:
update_data = add_update_tracking(update_data, user_id)
session.execute(
update(ConversationMessageUnit)
.where(ConversationMessageUnit.unit_id == unit_id,
ConversationMessageUnit.delete_flag == 'N')
.values(update_data)
)


def update_message_unit_content(unit_id: int, content: str,
user_id: Optional[str] = None) -> None:
"""
Update the unit_content field of a message unit.

Args:
unit_id: Unit ID (integer)
content: New content text
user_id: Reserved parameter for updated_by field
"""
with get_db_session() as session:
unit_id = int(unit_id)
update_data = {
"unit_content": content,
"update_time": func.current_timestamp(),
}
if user_id:
update_data = add_update_tracking(update_data, user_id)
session.execute(
update(ConversationMessageUnit)
.where(ConversationMessageUnit.unit_id == unit_id,
ConversationMessageUnit.delete_flag == 'N')
.values(update_data)
)


def get_conversation(conversation_id: int, user_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
Get conversation details
Expand Down Expand Up @@ -552,7 +701,9 @@ def get_conversation_history(conversation_id: int, user_id: Optional[str] = None
func.json_build_object(
'unit_id', ConversationMessageUnit.unit_id,
'unit_type', ConversationMessageUnit.unit_type,
'unit_content', ConversationMessageUnit.unit_content
'unit_content', ConversationMessageUnit.unit_content,
'unit_status', ConversationMessageUnit.unit_status,
'unit_index', ConversationMessageUnit.unit_index
)
)
).select_from(
Expand All @@ -568,6 +719,7 @@ def get_conversation_history(conversation_id: int, user_id: Optional[str] = None
ConversationMessage.message_index,
ConversationMessage.message_role.label('role'),
ConversationMessage.message_content,
ConversationMessage.status,
ConversationMessage.minio_files,
ConversationMessage.opinion_flag,
subquery.label('units')
Expand Down Expand Up @@ -1062,6 +1214,85 @@ def get_latest_assistant_message_id(conversation_id: int, user_id: Optional[str]
return result


def get_latest_assistant_message(conversation_id: int, user_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
Get the latest assistant message for a conversation, including its status field.
Used for streaming recovery to check if a stream is still in progress.

Args:
conversation_id: Conversation ID
user_id: Optional user ID for ownership check

Returns:
Optional[Dict]: Contains message_id, status, message_content, or None if not found
"""
with get_db_session() as session:
conversation_id = int(conversation_id)

stmt = select(
ConversationMessage.message_id,
ConversationMessage.status,
ConversationMessage.message_content,
).where(
ConversationMessage.conversation_id == conversation_id,
ConversationMessage.delete_flag == 'N',
ConversationMessage.message_role == 'assistant'
).order_by(desc(ConversationMessage.message_index)).limit(1)

if user_id:
stmt = stmt.join(
ConversationRecord,
ConversationMessage.conversation_id == ConversationRecord.conversation_id
).where(ConversationRecord.created_by == user_id)

result = session.execute(stmt).first()
if result:
return {
'message_id': result.message_id,
'status': result.status,
'message_content': result.message_content,
}
return None


def get_last_unit_for_message(message_id: int) -> Optional[Dict[str, Any]]:
"""
Get the last unit (highest unit_index) for a message.
Used for streaming recovery to determine the resume position.

Args:
message_id: Message ID

Returns:
Optional[Dict]: Contains unit_id, unit_index, unit_type, unit_content, unit_status,
or None if no units exist
"""
with get_db_session() as session:
message_id = int(message_id)

stmt = select(
ConversationMessageUnit.unit_id,
ConversationMessageUnit.unit_index,
ConversationMessageUnit.unit_type,
ConversationMessageUnit.unit_content,
ConversationMessageUnit.unit_status,
).where(
ConversationMessageUnit.message_id == message_id,
ConversationMessageUnit.delete_flag == 'N'
).order_by(desc(ConversationMessageUnit.unit_index)).limit(1)

result = session.execute(stmt).first()
if result:
return {
'unit_id': result.unit_id,
'unit_index': result.unit_index,
'unit_type': result.unit_type,
'unit_content': result.unit_content,
'unit_status': result.unit_status,
}
return None


def update_message_minio_files(message_id: int, skill_file_uploads: List[Dict[str, Any]]) -> bool:
"""
Merge skill file uploads into an existing message's minio_files field.
Expand Down
6 changes: 6 additions & 0 deletions backend/database/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class ConversationMessage(TableBase):
String, doc="Images or documents uploaded by the user on the chat page, stored as a list")
opinion_flag = Column(String(
1), doc="User evaluation of the conversation. Enumeration value \"Y\" represents a positive review, \"N\" represents a negative review")
status = Column(
String(30), default='completed',
doc="Lifecycle status: pending / streaming / completed / failed / stopped")


class ConversationMessageUnit(TableBase):
Expand All @@ -85,6 +88,9 @@ class ConversationMessageUnit(TableBase):
unit_type = Column(String(100), doc="Type of the smallest answer unit")
unit_content = Column(
String, doc="Complete content of the smallest reply unit")
unit_status = Column(
String(30), default='completed',
doc="Lifecycle status: streaming (still aggregating) or completed (fully persisted)")


class ConversationSourceImage(TableBase):
Expand Down
Loading
Loading