diff --git a/app/api/routes/admin/users.py b/app/api/routes/admin/users.py
index 901fc39..e234ae6 100644
--- a/app/api/routes/admin/users.py
+++ b/app/api/routes/admin/users.py
@@ -17,10 +17,10 @@
from app.schemas.user import Language, SystemRole
from app.schemas.user_activity import ActivityType, ResourceType
from app.services.admin.user_service import (
+ change_password_admin_service,
delete_user_admin_service,
get_user_admin_service,
list_users_admin_service,
- reset_password_admin_service,
suspend_user_admin_service,
unsuspend_user_admin_service,
update_user_admin_service,
@@ -151,26 +151,27 @@ async def delete_user(
)
-@router.post("/{user_id}/reset-password", response_model=Message)
+@router.post("/{user_id}/change-password", response_model=Message)
@rate_limit_strict("5/minute")
@audit_unexpected_failure(
activity_type=ActivityType.UPDATE,
resource_type=ResourceType.AUTH,
- endpoint="/admin/users/{user_id}/reset-password",
+ endpoint="/admin/users/{user_id}/change-password",
)
-async def reset_user_password(
+async def change_user_password(
request: Request,
current_user: CurrentSuperUser,
session: SessionDep,
user_id: uuid.UUID,
lang: Language = Language.EN,
) -> Message:
- """Send a password-reset email to the target user.
+ """Force-rotate the target user's password to an unknown random value.
- The admin never sees or sets the password — the user completes the reset
- via the standard email-link flow.
+ The new password is never returned. The target user receives a neutral
+ notification email and must regain access via the standard 'Forgot
+ Password' flow on the login page.
"""
- return await reset_password_admin_service(
+ return await change_password_admin_service(
request=request,
session=session,
current_user=current_user,
diff --git a/app/core/messages/success_message.py b/app/core/messages/success_message.py
index 08910b8..fd9d3c3 100644
--- a/app/core/messages/success_message.py
+++ b/app/core/messages/success_message.py
@@ -22,4 +22,4 @@ class SuccessMessages:
ADMIN_USER_SUSPENDED = "success.admin.user_suspended"
ADMIN_USER_UNSUSPENDED = "success.admin.user_unsuspended"
ADMIN_USER_DELETED = "success.admin.user_deleted"
- ADMIN_PASSWORD_RESET_SENT = "success.admin.password_reset_sent"
+ ADMIN_PASSWORD_CHANGED = "success.admin.password_changed"
diff --git a/app/core/security.py b/app/core/security.py
index 570d728..744989e 100644
--- a/app/core/security.py
+++ b/app/core/security.py
@@ -1,3 +1,5 @@
+import secrets
+import string
import uuid
from datetime import datetime, timedelta
@@ -140,6 +142,26 @@ def get_password_hash(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), salt).decode("utf-8")
+def generate_secure_random_password(length: int = 32) -> str:
+ """
+ Generate a cryptographically-secure random password.
+
+ The result is never returned to the admin or the user — it is only used
+ to overwrite a user's bcrypt hash during an admin-forced rotation, after
+ which the user must recover access via the standard forgot-password
+ flow. ASCII letters + digits keep us well under bcrypt's 72-byte
+ truncation point at any reasonable length.
+
+ Args:
+ length: Number of characters in the returned password.
+
+ Returns:
+ Random ASCII string of the requested length.
+ """
+ alphabet = string.ascii_letters + string.digits
+ return "".join(secrets.choice(alphabet) for _ in range(length))
+
+
# ---------------------------------------------------------------------------
# Email verification tokens (password-reset & new-account)
# ---------------------------------------------------------------------------
diff --git a/app/services/admin/user_service.py b/app/services/admin/user_service.py
index b0c1ec4..6d5f2e8 100644
--- a/app/services/admin/user_service.py
+++ b/app/services/admin/user_service.py
@@ -7,7 +7,7 @@
from app.core.email import send_email
from app.core.messages.error_message import ErrorMessages
from app.core.messages.success_message import SuccessMessages
-from app.core.security import create_password_reset_token
+from app.core.security import generate_secure_random_password, get_password_hash
from app.models.user import User
from app.repositories.admin.user import (
is_last_active_admin,
@@ -29,7 +29,7 @@
from app.schemas.user import Language, SystemRole
from app.schemas.user_activity import ActivityType, ResourceType
from app.use_cases.log_activity import log_activity
-from app.utils.email_templates import generate_password_reset_email
+from app.utils.email_templates import generate_password_reset_notification_email
async def list_users_admin_service(
@@ -257,25 +257,27 @@ async def delete_user_admin_service(
return Message(success=True, message=SuccessMessages.ADMIN_USER_DELETED)
-async def reset_password_admin_service(
+async def change_password_admin_service(
request: Request,
session: AsyncSession,
current_user: User,
user_id: uuid.UUID,
lang: Language = Language.EN,
) -> Message:
- """Trigger a password-reset email on behalf of the target user.
+ """Force-rotate the target user's password to an unknown random value.
- The admin never sees or sets the new password; the user completes the reset
- via the standard email-link flow used by self-service password recovery.
+ The new password is never returned to the caller and never displayed; the
+ target user must regain access via the standard 'Forgot Password' flow on
+ the login page. A neutral notification email is sent so the recovery path
+ stays auditable and tied to a user-initiated reset request.
"""
target = await _load_target(session, user_id)
- token = create_password_reset_token(target.email)
- reset_url = f"{settings.FRONTEND_HOST}/reset-password?token={token}"
+ new_password = generate_secure_random_password()
+ hashed_password = get_password_hash(new_password)
+ await update_user(session, target, {"hashed_password": hashed_password})
- email_data = generate_password_reset_email(
- reset_link=reset_url,
+ email_data = generate_password_reset_notification_email(
project_name=settings.PROJECT_NAME,
lang=lang,
)
@@ -294,8 +296,8 @@ async def reset_password_admin_service(
activity_type=ActivityType.UPDATE,
resource_type=ResourceType.AUTH,
resource_id=target.id,
- details={"action": "admin_triggered_password_reset"},
+ details={"action": "admin_reset_user_password"},
request=request,
)
- return Message(success=True, message=SuccessMessages.ADMIN_PASSWORD_RESET_SENT)
+ return Message(success=True, message=SuccessMessages.ADMIN_PASSWORD_CHANGED)
diff --git a/app/tests/admin/test_users.py b/app/tests/admin/test_users.py
index 74e301f..fc9c8e6 100644
--- a/app/tests/admin/test_users.py
+++ b/app/tests/admin/test_users.py
@@ -1,7 +1,7 @@
"""End-to-end tests for /admin/users endpoints.
-Covers listing, detail, update, suspend/unsuspend, delete, password reset,
-self-protection, and the last-admin repository guard.
+Covers listing, detail, update, suspend/unsuspend, delete, admin password
+rotation, self-protection, and the last-admin repository guard.
"""
import pytest
@@ -314,17 +314,55 @@ async def test_delete_non_last_admin_succeeds(admin_client: AsyncClient):
@pytest.mark.asyncio
-async def test_reset_password_sends_email(admin_client: AsyncClient, mock_email_send):
- """Reset endpoint must hit the email service with the target user's address."""
- await register_and_verify(admin_client, "resetme@test.com")
- user_id = await get_user_id("resetme@test.com")
+async def test_change_password_rotates_hash_and_notifies(
+ admin_client: AsyncClient, mock_email_send
+):
+ """Admin change-password rotates the hash to an unknown value and notifies the user.
+
+ The new password must never appear in the response, the email must be
+ link-free, and an activity row must be written so the rotation is auditable.
+ """
+ from app.models.user_activity import UserActivity
+
+ await register_and_verify(admin_client, "rotateme@test.com")
+ user_id = await get_user_id("rotateme@test.com")
+
+ async with TestingSessionLocal() as session:
+ result = await session.execute(
+ select(User).where(User.email == "rotateme@test.com")
+ )
+ original_hash = result.scalars().one().hashed_password
mock_email_send.reset_mock()
- response = await admin_client.post(f"/admin/users/{user_id}/reset-password")
+ response = await admin_client.post(f"/admin/users/{user_id}/change-password")
assert response.status_code == 200
- assert response.json()["message"] == SuccessMessages.ADMIN_PASSWORD_RESET_SENT
+ body = response.json()
+ assert body["message"] == SuccessMessages.ADMIN_PASSWORD_CHANGED
+ # The new password must never leak to the caller, under any field name.
+ for leak_key in ("password", "new_password", "temporary_password"):
+ assert leak_key not in body
+
+ # Email reached the target — and is deliberately link-free.
mock_email_send.assert_awaited_once()
call_kwargs = mock_email_send.await_args.kwargs
- assert call_kwargs["to"] == "resetme@test.com"
+ assert call_kwargs["to"] == "rotateme@test.com"
+ assert "href=" not in call_kwargs["body"]
+ assert "http" not in call_kwargs["plain_text"].lower()
+
+ # Hash was rotated; the old hash no longer matches.
+ async with TestingSessionLocal() as session:
+ result = await session.execute(
+ select(User).where(User.email == "rotateme@test.com")
+ )
+ new_hash = result.scalars().one().hashed_password
+ assert new_hash != original_hash
+
+ # Audit trail recorded the rotation.
+ async with TestingSessionLocal() as session:
+ result = await session.execute(select(UserActivity))
+ actions = [
+ row.details.get("action") for row in result.scalars().all() if row.details
+ ]
+ assert "admin_reset_user_password" in actions
diff --git a/app/utils/email_templates.py b/app/utils/email_templates.py
index aa83084..56614ed 100644
--- a/app/utils/email_templates.py
+++ b/app/utils/email_templates.py
@@ -105,6 +105,102 @@ def generate_password_reset_email(
return {"subject": subject, "html": html, "plain_text": plain_text}
+def generate_password_reset_notification_email(
+ project_name: str, lang: str = Language.EN
+) -> dict[str, str]:
+ """
+ Generate subject, HTML, and plain text for a password-reset notification.
+
+ Sent when the user's password was rotated server-side (e.g. by an admin
+ forced reset). Deliberately link-free: the user must recover access via
+ the standard 'Forgot Password' flow on the login page so the recovery
+ path stays auditable and tied to a user-initiated request.
+ """
+ if lang == Language.TR:
+ subject = "Şifreniz sıfırlandı"
+ greeting = "Merhaba,"
+ message = "Hesabınızın şifresi sıfırlandı. Yeniden erişim için giriş sayfasındaki 'Şifremi Unuttum' bağlantısını kullanarak yeni bir şifre belirleyebilirsiniz."
+ disclaimer = (
+ "Bu işlemi beklemiyorsanız lütfen destek ile derhal iletişime geçin."
+ )
+ footer_text = f"© {project_name}. Tüm hakları saklıdır."
+ plain_text = (
+ "Hesabınızın şifresi sıfırlandı. Yeniden erişim için giriş "
+ "sayfasındaki 'Şifremi Unuttum' bağlantısını kullanın. "
+ "Bu işlemi beklemiyorsanız lütfen destek ile derhal iletişime geçin."
+ )
+ else:
+ subject = "Your password has been reset"
+ greeting = "Hi there,"
+ message = "Your account password has been reset. To regain access, please use the 'Forgot Password' link on the login page to set a new password."
+ disclaimer = "If you didn't expect this, please contact support immediately."
+ footer_text = f"© {project_name}. All rights reserved."
+ plain_text = (
+ "Your account password has been reset. To regain access, please "
+ "use the 'Forgot Password' link on the login page. "
+ "If you didn't expect this, please contact support immediately."
+ )
+
+ html = f"""
+
+
+
+
+
+ {subject}
+
+
+
+
+
+
+
+
+
+
+
+
+
+ |
+ {greeting}
+ {message}
+
+
+ |
+
+
+
+ |
+ {project_name}
+ {footer_text}
+ |
+
+
+ |
+
+
+
+"""
+
+ return {"subject": subject, "html": html, "plain_text": plain_text}
+
+
def generate_account_deactivation_email(
reactivate_link: str,
grace_days: int,