diff --git a/services/database/facades/jargon_facade.py b/services/database/facades/jargon_facade.py index f978fd59..dc133d04 100644 --- a/services/database/facades/jargon_facade.py +++ b/services/database/facades/jargon_facade.py @@ -670,54 +670,69 @@ async def save_or_update_jargon( jargon_data, ) - async with self.get_session() as session: - - stmt = select(Jargon).where(and_( - Jargon.chat_id == chat_id, - Jargon.content == content, - )) - result = await session.execute(stmt) - record = result.scalars().first() - - now_ts = self._coerce_jargon_timestamp() - - if record: - # 更新已有记录 - if 'meaning' in jargon_data: - record.meaning = jargon_data['meaning'] - if 'raw_content' in jargon_data: - record.raw_content = jargon_data['raw_content'] - if 'is_jargon' in jargon_data: - record.is_jargon = jargon_data['is_jargon'] - if 'count' in jargon_data: - record.count = jargon_data['count'] - if 'last_inference_count' in jargon_data: - record.last_inference_count = jargon_data['last_inference_count'] - if 'is_complete' in jargon_data: - record.is_complete = jargon_data['is_complete'] - if 'is_global' in jargon_data: - record.is_global = jargon_data['is_global'] - record.updated_at = now_ts - - await session.commit() - self._logger.debug( - f"[JargonFacade] 更新黑话: content='{content}', chat_id={chat_id}, " - f"id={record.id}" - ) - return record.id - else: - # 插入新记录 - new_record = Jargon( - **self._jargon_insert_values(chat_id, content, jargon_data, now_ts) - ) - session.add(new_record) - await session.commit() - await session.refresh(new_record) - self._logger.debug( - f"[JargonFacade] 插入黑话: content='{content}', chat_id={chat_id}, " - f"id={new_record.id}" - ) - return new_record.id + # SQLite 路径:使用 SELECT→INSERT/UPDATE 模式, + # 并在 INSERT 因并发竞争触发 UNIQUE 约束时自动重试一次, + # 使重试的 SELECT 能命中另一请求刚提交的记录,走 UPDATE 分支。 + max_attempts = 2 + for attempt in range(1, max_attempts + 1): + try: + async with self.get_session() as session: + + stmt = select(Jargon).where(and_( + Jargon.chat_id == chat_id, + Jargon.content == content, + )) + result = await session.execute(stmt) + record = result.scalars().first() + + now_ts = self._coerce_jargon_timestamp() + + if record: + # 更新已有记录 + if 'meaning' in jargon_data: + record.meaning = jargon_data['meaning'] + if 'raw_content' in jargon_data: + record.raw_content = jargon_data['raw_content'] + if 'is_jargon' in jargon_data: + record.is_jargon = jargon_data['is_jargon'] + if 'count' in jargon_data: + record.count = jargon_data['count'] + if 'last_inference_count' in jargon_data: + record.last_inference_count = jargon_data['last_inference_count'] + if 'is_complete' in jargon_data: + record.is_complete = jargon_data['is_complete'] + if 'is_global' in jargon_data: + record.is_global = jargon_data['is_global'] + record.updated_at = now_ts + + await session.commit() + self._logger.debug( + f"[JargonFacade] 更新黑话: content='{content}', chat_id={chat_id}, " + f"id={record.id}" + ) + return record.id + else: + # 插入新记录 + new_record = Jargon( + **self._jargon_insert_values(chat_id, content, jargon_data, now_ts) + ) + session.add(new_record) + await session.commit() + await session.refresh(new_record) + self._logger.debug( + f"[JargonFacade] 插入黑话: content='{content}', chat_id={chat_id}, " + f"id={new_record.id}" + ) + return new_record.id + except IntegrityError: + if attempt < max_attempts: + self._logger.debug( + f"[JargonFacade] 并发竞争导致 UNIQUE 冲突,重试 " + f"(content='{content}', chat_id={chat_id}, attempt={attempt})" + ) + continue + # 最后一次尝试也失败,向上抛出 + raise except Exception as e: self._logger.error(