diff --git a/src/core/upload/newapi_upload.py b/src/core/upload/newapi_upload.py
index 3e94ee3..09c1b31 100644
--- a/src/core/upload/newapi_upload.py
+++ b/src/core/upload/newapi_upload.py
@@ -23,9 +23,42 @@ def _normalize_base(api_url: str) -> str:
return (api_url or "").strip().rstrip("/")
+def normalize_authorization_token(header_value: str, header_name: str = "Authorization Token") -> str:
+ normalized_value = (header_value or "").strip()
+ if not normalized_value:
+ raise ValueError(f"{header_name} 不能为空")
+ try:
+ normalized_value.encode("ascii")
+ except UnicodeEncodeError as exc:
+ raise ValueError(f"{header_name} 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明") from exc
+ if any(ord(ch) < 32 or ord(ch) == 127 for ch in normalized_value):
+ raise ValueError(f"{header_name} 包含非法控制字符")
+ return normalized_value
+
+
+def _mask_header_value(header_value: str, keep: int = 4) -> str:
+ """
+ Mask a sensitive header value for safe logging.
+
+ The strategy is:
+ - If the value is empty, return an empty string.
+ - If the length is <= keep, fully mask it (no characters revealed).
+ - Otherwise, reveal only the last `keep` characters and mask the rest.
+ """
+ if not header_value:
+ return ""
+
+ length = len(header_value)
+ if length <= keep:
+ return "*" * length
+
+ masked_prefix = "*" * (length - keep)
+ visible_suffix = header_value[-keep:]
+ return masked_prefix + visible_suffix
def _build_headers(api_key: str) -> dict:
+ safe_api_key = normalize_authorization_token(api_key)
return {
- "Authorization": f"Bearer {api_key}",
+ "Authorization": f"Bearer {safe_api_key}",
"New-Api-User": "1",
"Content-Type": "application/json",
}
@@ -68,7 +101,7 @@ def upload_to_newapi(
"auto_ban": 1,
"name": account.email or "",
"type": resolved_channel_type,
- "key": json.dumps({"access_token": account.access_token or "", "account_id": account_name}, ensure_ascii=False),
+ "key": json.dumps({"access_token": account.access_token or "", "account_id": account_name}, ensure_ascii=True),
"base_url": resolved_channel_base_url,
"models": resolved_channel_models,
"multi_key_mode": "random",
@@ -79,10 +112,20 @@ def upload_to_newapi(
}
try:
+ payload = json.dumps({"mode": "single", "channel": channel}, ensure_ascii=True)
+ headers = _build_headers(api_key)
+ headers["Content-Type"] = "application/json; charset=utf-8"
+
+ logger.info("NEWAPI 上传 URL: %s", url)
+ safe_headers = dict(headers)
+ if "Authorization" in safe_headers:
+ safe_headers["Authorization"] = "REDACTED"
+ logger.debug("NEWAPI 请求头: %s", safe_headers)
+
resp = cffi_requests.post(
url,
- headers=_build_headers(api_key),
- json={"mode": "single", "channel": channel},
+ headers=headers,
+ data=payload.encode("utf-8"),
proxies=None,
timeout=30,
impersonate="chrome110",
diff --git a/src/web/routes/upload/newapi_services.py b/src/web/routes/upload/newapi_services.py
index a36510f..17bfdbb 100644
--- a/src/web/routes/upload/newapi_services.py
+++ b/src/web/routes/upload/newapi_services.py
@@ -8,6 +8,7 @@
from ....database import crud
from ....database.session import get_db
+from ....core.upload.newapi_upload import normalize_authorization_token
router = APIRouter()
@@ -67,6 +68,13 @@ def _to_response(svc) -> NewapiServiceResponse:
)
+def _validated_newapi_api_key(api_key: str) -> str:
+ try:
+ return normalize_authorization_token(api_key, header_name="Root Token / API Key")
+ except ValueError as exc:
+ raise HTTPException(status_code=400, detail=str(exc)) from exc
+
+
@router.get("", response_model=List[NewapiServiceResponse])
async def list_newapi_services(enabled: Optional[bool] = None):
with get_db() as db:
@@ -81,7 +89,7 @@ async def create_newapi_service(request: NewapiServiceCreate):
db,
name=request.name,
api_url=request.api_url,
- api_key=request.api_key,
+ api_key=_validated_newapi_api_key(request.api_key),
channel_type=request.channel_type,
channel_base_url=request.channel_base_url,
channel_models=request.channel_models,
@@ -113,7 +121,7 @@ async def update_newapi_service(service_id: int, request: NewapiServiceUpdate):
if request.api_url is not None:
update_data["api_url"] = request.api_url
if request.api_key:
- update_data["api_key"] = request.api_key
+ update_data["api_key"] = _validated_newapi_api_key(request.api_key)
if request.enabled is not None:
update_data["enabled"] = request.enabled
if request.priority is not None:
diff --git a/static/js/settings.js b/static/js/settings.js
index d3546eb..7fd55ed 100644
--- a/static/js/settings.js
+++ b/static/js/settings.js
@@ -1296,6 +1296,28 @@ function closeNewapiServiceModal() {
elements.newapiServiceEditModal.classList.remove('active');
}
+function validateNewapiApiKeyInput(apiKey, { required = false } = {}) {
+ const normalizedApiKey = String(apiKey || '').trim();
+ if (!normalizedApiKey) {
+ if (required) {
+ return '新增服务时 Root Token / API Key 不能为空';
+ }
+ return '';
+ }
+
+ for (const char of normalizedApiKey) {
+ const code = char.charCodeAt(0);
+ if (code > 127) {
+ return 'Root Token / API Key 只能包含 ASCII 字符,请粘贴实际令牌,不要填写中文说明';
+ }
+ if (code < 32 || code === 127) {
+ return 'Root Token / API Key 包含非法控制字符';
+ }
+ }
+
+ return '';
+}
+
async function editNewapiService(id) {
try {
const service = await api.get(`/newapi-services/${id}`);
@@ -1321,8 +1343,9 @@ async function handleSaveNewapiService(e) {
toast.error('名称和 API URL 不能为空');
return;
}
- if (!id && !apiKey) {
- toast.error('新增服务时 Root Token / API Key 不能为空');
+ const apiKeyValidationError = validateNewapiApiKeyInput(apiKey, { required: !id });
+ if (apiKeyValidationError) {
+ toast.error(apiKeyValidationError);
return;
}
diff --git a/templates/settings.html b/templates/settings.html
index 1d1dc65..5638a29 100644
--- a/templates/settings.html
+++ b/templates/settings.html
@@ -472,6 +472,7 @@
添加 NEWAPI 服务
diff --git a/tests/test_newapi_service_routes.py b/tests/test_newapi_service_routes.py
new file mode 100644
index 0000000..67c3727
--- /dev/null
+++ b/tests/test_newapi_service_routes.py
@@ -0,0 +1,67 @@
+import asyncio
+from contextlib import contextmanager
+
+import pytest
+from fastapi import HTTPException
+
+import src.web.routes.upload.newapi_services as newapi_routes
+from src.database.session import DatabaseSessionManager
+from src.web.routes.upload.newapi_services import NewapiServiceCreate, NewapiServiceUpdate
+
+
+def _build_fake_get_db(manager):
+ @contextmanager
+ def fake_get_db():
+ with manager.session_scope() as session:
+ yield session
+
+ return fake_get_db
+
+
+def test_create_newapi_service_rejects_non_ascii_api_key(tmp_path, monkeypatch):
+ manager = DatabaseSessionManager(f"sqlite:///{tmp_path}/newapi-create.db")
+ manager.create_tables()
+ manager.migrate_tables()
+ monkeypatch.setattr(newapi_routes, "get_db", _build_fake_get_db(manager))
+
+ with pytest.raises(HTTPException) as exc_info:
+ asyncio.run(
+ newapi_routes.create_newapi_service(
+ NewapiServiceCreate(
+ name="bad-token",
+ api_url="https://newapi.example.com",
+ api_key="系统访问令牌 (System Access Token)",
+ )
+ )
+ )
+
+ assert exc_info.value.status_code == 400
+ assert exc_info.value.detail == "Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"
+
+
+def test_update_newapi_service_rejects_non_ascii_api_key(tmp_path, monkeypatch):
+ manager = DatabaseSessionManager(f"sqlite:///{tmp_path}/newapi-update.db")
+ manager.create_tables()
+ manager.migrate_tables()
+ monkeypatch.setattr(newapi_routes, "get_db", _build_fake_get_db(manager))
+
+ created = asyncio.run(
+ newapi_routes.create_newapi_service(
+ NewapiServiceCreate(
+ name="good-token",
+ api_url="https://newapi.example.com",
+ api_key="token-123",
+ )
+ )
+ )
+
+ with pytest.raises(HTTPException) as exc_info:
+ asyncio.run(
+ newapi_routes.update_newapi_service(
+ created.id,
+ NewapiServiceUpdate(api_key="系统访问令牌 (System Access Token)"),
+ )
+ )
+
+ assert exc_info.value.status_code == 400
+ assert exc_info.value.detail == "Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"
diff --git a/tests/test_newapi_upload.py b/tests/test_newapi_upload.py
new file mode 100644
index 0000000..4e25c52
--- /dev/null
+++ b/tests/test_newapi_upload.py
@@ -0,0 +1,58 @@
+from types import SimpleNamespace
+
+from src.core.upload import newapi_upload
+
+
+class FakeResponse:
+ def __init__(self, status_code=200, payload=None, text=""):
+ self.status_code = status_code
+ self._payload = payload
+ self.text = text
+
+ def json(self):
+ if self._payload is None:
+ raise ValueError("no json payload")
+ return self._payload
+
+
+def test_build_headers_rejects_non_ascii_api_key():
+ try:
+ newapi_upload._build_headers("系统访问令牌 (System Access Token)")
+ except ValueError as exc:
+ assert str(exc) == "Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"
+ else:
+ raise AssertionError("expected ValueError")
+
+
+def test_upload_to_newapi_uses_ascii_authorization_header(monkeypatch):
+ calls = []
+
+ def fake_post(url, **kwargs):
+ calls.append({"url": url, "kwargs": kwargs})
+ return FakeResponse(status_code=201)
+
+ monkeypatch.setattr(newapi_upload.cffi_requests, "post", fake_post)
+
+ success, message = newapi_upload.upload_to_newapi(
+ account=SimpleNamespace(email="tester@example.com", access_token="access-token"),
+ api_url="https://newapi.example.com/",
+ api_key="token-123",
+ )
+
+ assert success is True
+ assert message == "上传成功"
+ assert calls[0]["url"] == "https://newapi.example.com/api/channel/"
+ assert calls[0]["kwargs"]["headers"]["Authorization"] == "Bearer token-123"
+ assert calls[0]["kwargs"]["headers"]["Content-Type"] == "application/json; charset=utf-8"
+ assert calls[0]["kwargs"]["data"].startswith(b"{")
+
+
+def test_upload_to_newapi_returns_clear_error_for_non_ascii_api_key():
+ success, message = newapi_upload.upload_to_newapi(
+ account=SimpleNamespace(email="tester@example.com", access_token="access-token"),
+ api_url="https://newapi.example.com/",
+ api_key="系统访问令牌 (System Access Token)",
+ )
+
+ assert success is False
+ assert message == "上传异常: Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"
diff --git a/tests/test_settings_newapi_validation.cjs b/tests/test_settings_newapi_validation.cjs
new file mode 100644
index 0000000..6f7552d
--- /dev/null
+++ b/tests/test_settings_newapi_validation.cjs
@@ -0,0 +1,109 @@
+const test = require('node:test');
+const assert = require('node:assert/strict');
+const fs = require('node:fs');
+const path = require('node:path');
+const vm = require('node:vm');
+
+const SETTINGS_JS_PATH = path.join(__dirname, '..', 'static', 'js', 'settings.js');
+
+function createClassList() {
+ const values = new Set();
+ return {
+ add(...items) {
+ items.forEach((item) => values.add(item));
+ },
+ remove(...items) {
+ items.forEach((item) => values.delete(item));
+ },
+ contains(item) {
+ return values.has(item);
+ },
+ };
+}
+
+function createElementStub(overrides = {}) {
+ return {
+ value: '',
+ checked: false,
+ innerHTML: '',
+ textContent: '',
+ style: {},
+ dataset: {},
+ classList: createClassList(),
+ addEventListener() {},
+ removeEventListener() {},
+ querySelectorAll() {
+ return [];
+ },
+ querySelector() {
+ return null;
+ },
+ ...overrides,
+ };
+}
+
+function createSandbox() {
+ const elements = new Map();
+
+ function getElement(id) {
+ if (!elements.has(id)) {
+ elements.set(id, createElementStub({ id }));
+ }
+ return elements.get(id);
+ }
+
+ const sandbox = {
+ console,
+ setTimeout,
+ clearTimeout,
+ document: {
+ getElementById(id) {
+ return getElement(id);
+ },
+ querySelectorAll() {
+ return [];
+ },
+ addEventListener() {},
+ },
+ window: null,
+ api: {
+ get: async () => [],
+ post: async () => ({ success: true }),
+ patch: async () => ({ success: true }),
+ delete: async () => ({ success: true }),
+ },
+ toast: {
+ success() {},
+ error() {},
+ },
+ confirm: async () => true,
+ escapeHtml(value) {
+ return String(value ?? '');
+ },
+ };
+
+ sandbox.window = sandbox;
+ vm.createContext(sandbox);
+ vm.runInContext(fs.readFileSync(SETTINGS_JS_PATH, 'utf8'), sandbox, { filename: 'settings.js' });
+ return sandbox;
+}
+
+test('validateNewapiApiKeyInput rejects non-ascii text', () => {
+ const sandbox = createSandbox();
+ const message = vm.runInContext(
+ "validateNewapiApiKeyInput('系统访问令牌 (System Access Token)', { required: true })",
+ sandbox,
+ );
+
+ assert.equal(message, 'Root Token / API Key 只能包含 ASCII 字符,请粘贴实际令牌,不要填写中文说明');
+});
+
+test('validateNewapiApiKeyInput allows ascii token', () => {
+ const sandbox = createSandbox();
+ const message = vm.runInContext(
+ "validateNewapiApiKeyInput('token-123', { required: true })",
+ sandbox,
+ );
+
+ assert.equal(message, '');
+});