Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 63 additions & 48 deletions services/database/facades/jargon_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,54 +670,69 @@ async def save_or_update_jargon(
jargon_data,
)

async with self.get_session() as session:

stmt = select(Jargon).where(and_(
Jargon.chat_id == chat_id,
Jargon.content == content,
))
result = await session.execute(stmt)
record = result.scalars().first()

now_ts = self._coerce_jargon_timestamp()

if record:
# 更新已有记录
if 'meaning' in jargon_data:
record.meaning = jargon_data['meaning']
if 'raw_content' in jargon_data:
record.raw_content = jargon_data['raw_content']
if 'is_jargon' in jargon_data:
record.is_jargon = jargon_data['is_jargon']
if 'count' in jargon_data:
record.count = jargon_data['count']
if 'last_inference_count' in jargon_data:
record.last_inference_count = jargon_data['last_inference_count']
if 'is_complete' in jargon_data:
record.is_complete = jargon_data['is_complete']
if 'is_global' in jargon_data:
record.is_global = jargon_data['is_global']
record.updated_at = now_ts

await session.commit()
self._logger.debug(
f"[JargonFacade] 更新黑话: content='{content}', chat_id={chat_id}, "
f"id={record.id}"
)
return record.id
else:
# 插入新记录
new_record = Jargon(
**self._jargon_insert_values(chat_id, content, jargon_data, now_ts)
)
session.add(new_record)
await session.commit()
await session.refresh(new_record)
self._logger.debug(
f"[JargonFacade] 插入黑话: content='{content}', chat_id={chat_id}, "
f"id={new_record.id}"
)
return new_record.id
# SQLite 路径:使用 SELECT→INSERT/UPDATE 模式,
# 并在 INSERT 因并发竞争触发 UNIQUE 约束时自动重试一次,
# 使重试的 SELECT 能命中另一请求刚提交的记录,走 UPDATE 分支。
max_attempts = 2
for attempt in range(1, max_attempts + 1):
try:
async with self.get_session() as session:

stmt = select(Jargon).where(and_(
Jargon.chat_id == chat_id,
Jargon.content == content,
))
result = await session.execute(stmt)
record = result.scalars().first()

now_ts = self._coerce_jargon_timestamp()

if record:
# 更新已有记录
if 'meaning' in jargon_data:
record.meaning = jargon_data['meaning']
if 'raw_content' in jargon_data:
record.raw_content = jargon_data['raw_content']
if 'is_jargon' in jargon_data:
record.is_jargon = jargon_data['is_jargon']
if 'count' in jargon_data:
record.count = jargon_data['count']
if 'last_inference_count' in jargon_data:
record.last_inference_count = jargon_data['last_inference_count']
if 'is_complete' in jargon_data:
record.is_complete = jargon_data['is_complete']
if 'is_global' in jargon_data:
record.is_global = jargon_data['is_global']
record.updated_at = now_ts

await session.commit()
self._logger.debug(
f"[JargonFacade] 更新黑话: content='{content}', chat_id={chat_id}, "
f"id={record.id}"
)
return record.id
else:
# 插入新记录
new_record = Jargon(
**self._jargon_insert_values(chat_id, content, jargon_data, now_ts)
)
session.add(new_record)
await session.commit()
await session.refresh(new_record)
self._logger.debug(
f"[JargonFacade] 插入黑话: content='{content}', chat_id={chat_id}, "
f"id={new_record.id}"
)
return new_record.id
except IntegrityError:
if attempt < max_attempts:
self._logger.debug(
f"[JargonFacade] 并发竞争导致 UNIQUE 冲突,重试 "
f"(content='{content}', chat_id={chat_id}, attempt={attempt})"
)
continue
# 最后一次尝试也失败,向上抛出
raise

except Exception as e:
self._logger.error(
Expand Down
Loading