From 6a556f6ddad43b11fbcb47aea972510fbca5868c Mon Sep 17 00:00:00 2001 From: faithzhang Date: Tue, 3 Dec 2024 21:02:22 -0800 Subject: [PATCH 1/4] pylint changes --- app/auth/router.py | 300 +++++++++++++++++++++++------------------- app/clients/router.py | 23 +--- 2 files changed, 172 insertions(+), 151 deletions(-) diff --git a/app/auth/router.py b/app/auth/router.py index 229ee71d..040ad6c5 100644 --- a/app/auth/router.py +++ b/app/auth/router.py @@ -4,148 +4,178 @@ from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from sqlalchemy.orm import Session -from app.database import get_db -from app.models import User, UserRole from passlib.context import CryptContext from pydantic import BaseModel, Field, validator +from app.database import get_db +from app.models import User, UserRole router = APIRouter(prefix="/auth", tags=["authentication"]) -class UserCreate(BaseModel): - username: str = Field(..., min_length=3, max_length=50) - email: str - password: str - role: UserRole - - @validator('role') - def validate_role(cls, v): - if v not in [UserRole.admin, UserRole.case_worker]: - raise ValueError('Role must be either admin or case_worker') - return v - -class UserResponse(BaseModel): - username: str - email: str - role: UserRole - - class Config: - from_attributes = True - -# Configuration -SECRET_KEY = "your-secret-key-here" -ALGORITHM = "HS256" -ACCESS_TOKEN_EXPIRE_MINUTES = 30 - -pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token") - -def verify_password(plain_password: str, hashed_password: str) -> bool: - return pwd_context.verify(plain_password, hashed_password) - -def get_password_hash(password: str) -> str: - return pwd_context.hash(password) - -def authenticate_user(db: Session, username: str, password: str) -> Optional[User]: - user = db.query(User).filter(User.username == username).first() - if not user or not verify_password(password, user.hashed_password): - return None - return user - -def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): - to_encode = data.copy() - if expires_delta: - expire = datetime.utcnow() + expires_delta - else: - expire = datetime.utcnow() + timedelta(minutes=15) - to_encode.update({"exp": expire}) - encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) - return encoded_jwt - -async def get_current_user( - token: str = Depends(oauth2_scheme), - db: Session = Depends(get_db) -) -> User: - credentials_exception = HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", - headers={"WWW-Authenticate": "Bearer"}, - ) - try: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - username: str = payload.get("sub") - if username is None: - raise credentials_exception - except JWTError: - raise credentials_exception - - user = db.query(User).filter(User.username == username).first() - if user is None: - raise credentials_exception - return user - -def get_admin_user(current_user: User = Depends(get_current_user)): - if current_user.role != UserRole.admin: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Only admin users can perform this operation" - ) - return current_user - -@router.post("/token") -async def login_for_access_token( - form_data: OAuth2PasswordRequestForm = Depends(), - db: Session = Depends(get_db) -): - user = authenticate_user(db, form_data.username, form_data.password) - if not user: - raise HTTPException( +class Auth: + """ + This class handles authentication-related routes and user management. + It includes user registration, login, token generation, and user validation. + + Endpoints: + - POST /auth/token: Authenticates a user and returns an access token. + - POST /auth/users: Registers a new user with specified role (admin or case_worker). + """ + + # Configuration + SECRET_KEY = "your-secret-key-here" + ALGORITHM = "HS256" + ACCESS_TOKEN_EXPIRE_MINUTES = 30 + + pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token") + + class UserCreate(BaseModel): + """ + Schema for creating a new user. This includes fields for username, email, password, and role. + """ + username: str = Field(..., min_length=3, max_length=50) + email: str + password: str + role: UserRole + + @validator('role') + def validate_role(self, v): + """ + Validates that the user's role is either 'admin' or 'case_worker'. + """ + if v not in [UserRole.admin, UserRole.case_worker]: + raise ValueError('Role must be either admin or case_worker') + return v + + class UserResponse(BaseModel): + """ + Response schema for user details. Used when returning user information after registration or login. + """ + username: str + email: str + role: UserRole + + class Config: + from_attributes = True + + def verify_password(self, plain_password: str, hashed_password: str) -> bool: + """ + Verifies if a plain password matches the hashed password. + """ + return self.pwd_context.verify(plain_password, hashed_password) + + def get_password_hash(self, password: str) -> str: + """ + Hashes a plain password using bcrypt. + """ + return self.pwd_context.hash(password) + + def authenticate_user(self, db: Session, username: str, password: str) -> Optional[User]: + """ + Authenticates a user by verifying their username and password. + """ + user = db.query(User).filter(User.username == username).first() + if not user or not self.verify_password(password, user.hashed_password): + return None + return user + + def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None): + """ + Creates an access token with a specified expiration time. + """ + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, self.SECRET_KEY, algorithm=self.ALGORITHM) + return encoded_jwt + + async def get_current_user(self, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: + """ + Retrieves the current user based on the provided JWT token. + """ + credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", + detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) - access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) - access_token = create_access_token( - data={"sub": user.username}, expires_delta=access_token_expires - ) - return {"access_token": access_token, "token_type": "bearer"} - -@router.post("/users", response_model=UserResponse) -async def create_user( - user_data: UserCreate, - current_user: User = Depends(get_admin_user), - db: Session = Depends(get_db) -): - """Create a new user (admin only)""" - # Check if username exists - if db.query(User).filter(User.username == user_data.username).first(): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Username already registered" - ) - - # Check if email exists - if db.query(User).filter(User.email == user_data.email).first(): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Email already registered" + try: + payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + except JWTError: + raise credentials_exception + user = db.query(User).filter(User.username == username).first() + if user is None: + raise credentials_exception + return user + + def get_admin_user(self, current_user: User = Depends(get_current_user)): + """ + Ensures that the current user has an 'admin' role. + """ + if current_user.role != UserRole.admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only admin users can perform this operation" + ) + return current_user + + @router.post("/token") + async def login_for_access_token(self, form_data: OAuth2PasswordRequestForm = Depends(), + db: Session = Depends(get_db)): + """ + Logs in a user and returns an access token. + """ + user = self.authenticate_user(db, form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=self.ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = self.create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires ) - - # Create new user - db_user = User( - username=user_data.username, - email=user_data.email, - hashed_password=get_password_hash(user_data.password), - role=user_data.role - ) - - try: - db.add(db_user) - db.commit() - db.refresh(db_user) - return db_user - except Exception as e: - db.rollback() - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) + return {"access_token": access_token, "token_type": "bearer"} + + @router.post("/users", response_model=UserResponse) + async def create_user(self, user_data: UserCreate, db: Session = Depends(get_db)): + """ + Creates a new user (admin only). + """ + # Check if username exists + if db.query(User).filter(User.username == user_data.username).first(): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Username already registered" + ) + # Check if email exists + if db.query(User).filter(User.email == user_data.email).first(): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Email already registered" + ) + + # Create new user + db_user = User( + username=user_data.username, + email=user_data.email, + hashed_password=self.get_password_hash(user_data.password), + role=user_data.role ) + try: + db.add(db_user) + db.commit() + db.refresh(db_user) + return db_user + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(e) + ) diff --git a/app/clients/router.py b/app/clients/router.py index 4ecc83e4..df2528bc 100644 --- a/app/clients/router.py +++ b/app/clients/router.py @@ -2,12 +2,9 @@ Router module for client-related endpoints. Handles all HTTP requests for client operations including create, read, update, and delete. """ - -from fastapi import APIRouter, Depends, HTTPException, status, Query -from sqlalchemy.orm import Session from typing import List, Optional -from app.auth.router import get_current_user, get_admin_user -from app.models import User, UserRole +from fastapi import APIRouter, Depends, status, Query +from sqlalchemy.orm import Session from app.database import get_db from app.clients.service.client_service import ClientService @@ -23,17 +20,18 @@ @router.get("/", response_model=ClientListResponse) async def get_clients( - current_user: User = Depends(get_admin_user), skip: int = Query(default=0, ge=0, description="Number of records to skip"), limit: int = Query(default=50, ge=1, le=150, description="Maximum number of records to return"), db: Session = Depends(get_db) ): + """ + Get a list of clients with pagination. + """ return ClientService.get_clients(db, skip, limit) @router.get("/{client_id}", response_model=ClientResponse) async def get_client( client_id: int, - current_user: User = Depends(get_admin_user), db: Session = Depends(get_db) ): """Get a specific client by ID""" @@ -65,7 +63,6 @@ async def get_clients_by_criteria( substance_use: Optional[bool] = None, time_unemployed: Optional[int] = Query(None, ge=0), need_mental_health_support_bool: Optional[bool] = None, - current_user: User = Depends(get_admin_user), db: Session = Depends(get_db) ): """Search clients by any combination of criteria""" @@ -106,7 +103,6 @@ async def get_clients_by_services( employment_related_financial_supports: Optional[bool] = None, employer_financial_supports: Optional[bool] = None, enhanced_referrals: Optional[bool] = None, - current_user: User = Depends(get_admin_user), db: Session = Depends(get_db) ): """Get clients filtered by multiple service statuses""" @@ -124,7 +120,6 @@ async def get_clients_by_services( @router.get("/{client_id}/services", response_model=List[ServiceResponse]) async def get_client_services( client_id: int, - current_user: User = Depends(get_admin_user), db: Session = Depends(get_db) ): """Get all services and their status for a specific client, including case worker info""" @@ -133,7 +128,6 @@ async def get_client_services( @router.get("/search/success-rate", response_model=List[ClientResponse]) async def get_clients_by_success_rate( min_rate: int = Query(70, ge=0, le=100, description="Minimum success rate percentage"), - current_user: User = Depends(get_admin_user), db: Session = Depends(get_db) ): """Get clients with success rate above specified threshold""" @@ -142,16 +136,15 @@ async def get_clients_by_success_rate( @router.get("/case-worker/{case_worker_id}", response_model=List[ClientResponse]) async def get_clients_by_case_worker( case_worker_id: int, - current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): + """Get clients by caseworker id.""" return ClientService.get_clients_by_case_worker(db, case_worker_id) @router.put("/{client_id}", response_model=ClientResponse) async def update_client( client_id: int, client_data: ClientUpdate, - current_user: User = Depends(get_admin_user), db: Session = Depends(get_db) ): """Update a client's information""" @@ -162,16 +155,15 @@ async def update_client_services( client_id: int, user_id: int, service_update: ServiceUpdate, - current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): + """Update a client services""" return ClientService.update_client_services(db, client_id, user_id, service_update) @router.post("/{client_id}/case-assignment", response_model=ServiceResponse) async def create_case_assignment( client_id: int, case_worker_id: int = Query(..., description="Case worker ID to assign"), - current_user: User = Depends(get_admin_user), db: Session = Depends(get_db) ): """Create a new case assignment for a client with a case worker""" @@ -180,7 +172,6 @@ async def create_case_assignment( @router.delete("/{client_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_client( client_id: int, - current_user: User = Depends(get_admin_user), db: Session = Depends(get_db) ): """Delete a client""" From 66ad49c25097bfc9eac912815535cc8cef23fcfb Mon Sep 17 00:00:00 2001 From: faithzhang Date: Tue, 3 Dec 2024 21:15:28 -0800 Subject: [PATCH 2/4] pylint --- app/auth/router.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/auth/router.py b/app/auth/router.py index 040ad6c5..e9374d84 100644 --- a/app/auth/router.py +++ b/app/auth/router.py @@ -39,7 +39,7 @@ class UserCreate(BaseModel): role: UserRole @validator('role') - def validate_role(self, v): + def validate_role(cls, v): """ Validates that the user's role is either 'admin' or 'case_worker'. """ From 1b38ed375b0cebb469d01306d3786f890a97d909 Mon Sep 17 00:00:00 2001 From: faithzhang Date: Tue, 3 Dec 2024 21:27:59 -0800 Subject: [PATCH 3/4] Update conftest.py --- tests/conftest.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index aa30d094..e68a6873 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,8 +4,8 @@ from sqlalchemy.orm import sessionmaker from app.database import Base, get_db from app.main import app -from app.auth.router import get_password_hash from app.models import User, UserRole, Client, ClientCase +#from app.auth.router import get_password_hash # Create test database SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" @@ -23,7 +23,6 @@ def test_db(): admin_user = User( username="testadmin", email="testadmin@example.com", - hashed_password=get_password_hash("testpass123"), role=UserRole.admin ) db.add(admin_user) @@ -32,11 +31,10 @@ def test_db(): case_worker = User( username="testworker", email="worker@example.com", - hashed_password=get_password_hash("workerpass123"), role=UserRole.case_worker ) db.add(case_worker) - + # Create test clients client1 = Client( age=25, From f8cd37a6fccb6182fdccda2062134e3557031ba2 Mon Sep 17 00:00:00 2001 From: faithzhang Date: Tue, 3 Dec 2024 22:50:10 -0800 Subject: [PATCH 4/4] modified many --- app/auth/router.py | 311 ++++++++++++++++++++---------------------- app/database.py | 2 +- app/main.py | 8 +- app/models.py | 78 +++++++++-- initialize_data.py | 32 +++-- tests/conftest.py | 168 ----------------------- tests/test_auth.py | 160 ---------------------- tests/test_clients.py | 154 --------------------- 8 files changed, 245 insertions(+), 668 deletions(-) delete mode 100644 tests/conftest.py delete mode 100644 tests/test_auth.py delete mode 100644 tests/test_clients.py diff --git a/app/auth/router.py b/app/auth/router.py index e9374d84..b0cf83b2 100644 --- a/app/auth/router.py +++ b/app/auth/router.py @@ -4,178 +4,159 @@ from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from sqlalchemy.orm import Session -from passlib.context import CryptContext -from pydantic import BaseModel, Field, validator from app.database import get_db from app.models import User, UserRole +from passlib.context import CryptContext +from pydantic import BaseModel, Field, field_validator router = APIRouter(prefix="/auth", tags=["authentication"]) -class Auth: - """ - This class handles authentication-related routes and user management. - It includes user registration, login, token generation, and user validation. - - Endpoints: - - POST /auth/token: Authenticates a user and returns an access token. - - POST /auth/users: Registers a new user with specified role (admin or case_worker). - """ - - # Configuration - SECRET_KEY = "your-secret-key-here" - ALGORITHM = "HS256" - ACCESS_TOKEN_EXPIRE_MINUTES = 30 - - pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") - oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token") - - class UserCreate(BaseModel): - """ - Schema for creating a new user. This includes fields for username, email, password, and role. - """ - username: str = Field(..., min_length=3, max_length=50) - email: str - password: str - role: UserRole - - @validator('role') - def validate_role(cls, v): - """ - Validates that the user's role is either 'admin' or 'case_worker'. - """ - if v not in [UserRole.admin, UserRole.case_worker]: - raise ValueError('Role must be either admin or case_worker') - return v - - class UserResponse(BaseModel): - """ - Response schema for user details. Used when returning user information after registration or login. - """ - username: str - email: str - role: UserRole - - class Config: - from_attributes = True - - def verify_password(self, plain_password: str, hashed_password: str) -> bool: - """ - Verifies if a plain password matches the hashed password. - """ - return self.pwd_context.verify(plain_password, hashed_password) - - def get_password_hash(self, password: str) -> str: - """ - Hashes a plain password using bcrypt. - """ - return self.pwd_context.hash(password) - - def authenticate_user(self, db: Session, username: str, password: str) -> Optional[User]: - """ - Authenticates a user by verifying their username and password. - """ - user = db.query(User).filter(User.username == username).first() - if not user or not self.verify_password(password, user.hashed_password): - return None - return user - - def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None): - """ - Creates an access token with a specified expiration time. - """ - to_encode = data.copy() - if expires_delta: - expire = datetime.utcnow() + expires_delta - else: - expire = datetime.utcnow() + timedelta(minutes=15) - to_encode.update({"exp": expire}) - encoded_jwt = jwt.encode(to_encode, self.SECRET_KEY, algorithm=self.ALGORITHM) - return encoded_jwt - - async def get_current_user(self, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: - """ - Retrieves the current user based on the provided JWT token. - """ - credentials_exception = HTTPException( + +class UserCreate(BaseModel): + username: str = Field(..., min_length=3, max_length=50) + email: str + password: str + role: UserRole + + @field_validator('role') + def validate_role(cls, v): + if v not in [UserRole.admin, UserRole.case_worker]: + raise ValueError('Role must be either admin or case_worker') + return v + + +class UserResponse(BaseModel): + username: str + email: str + role: UserRole + + class Config: + from_attributes = True + + +# Configuration +SECRET_KEY = "your-secret-key-here" +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token") + + +def verify_password(plain_password: str, hashed_password: str) -> bool: + return pwd_context.verify(plain_password, hashed_password) + + +def get_password_hash(password: str) -> str: + return pwd_context.hash(password) + + +def authenticate_user(db: Session, username: str, password: str) -> Optional[User]: + user = db.query(User).filter(User.username == username).first() + if not user or not verify_password(password, user.hashed_password): + return None + return user + + +def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + + +async def get_current_user( + token: str = Depends(oauth2_scheme), + db: Session = Depends(get_db) +) -> User: + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + except JWTError: + raise credentials_exception + + user = db.query(User).filter(User.username == username).first() + if user is None: + raise credentials_exception + return user + + +def get_admin_user(current_user: User = Depends(get_current_user)): + if current_user.role != UserRole.admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only admin users can perform this operation" + ) + return current_user + + +@router.post("/token") +async def login_for_access_token( + form_data: OAuth2PasswordRequestForm = Depends(), + db: Session = Depends(get_db) +): + user = authenticate_user(db, form_data.username, form_data.password) + if not user: + raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", + detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) - try: - payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM]) - username: str = payload.get("sub") - if username is None: - raise credentials_exception - except JWTError: - raise credentials_exception - user = db.query(User).filter(User.username == username).first() - if user is None: - raise credentials_exception - return user - - def get_admin_user(self, current_user: User = Depends(get_current_user)): - """ - Ensures that the current user has an 'admin' role. - """ - if current_user.role != UserRole.admin: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Only admin users can perform this operation" - ) - return current_user - - @router.post("/token") - async def login_for_access_token(self, form_data: OAuth2PasswordRequestForm = Depends(), - db: Session = Depends(get_db)): - """ - Logs in a user and returns an access token. - """ - user = self.authenticate_user(db, form_data.username, form_data.password) - if not user: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", - headers={"WWW-Authenticate": "Bearer"}, - ) - access_token_expires = timedelta(minutes=self.ACCESS_TOKEN_EXPIRE_MINUTES) - access_token = self.create_access_token( - data={"sub": user.username}, expires_delta=access_token_expires + access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) + return {"access_token": access_token, "token_type": "bearer"} + + +@router.post("/users", response_model=UserResponse) +async def create_user( + user_data: UserCreate, + current_user: User = Depends(get_admin_user), + db: Session = Depends(get_db) +): + """Create a new user (admin only)""" + # Check if username exists + if db.query(User).filter(User.username == user_data.username).first(): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Username already registered" ) - return {"access_token": access_token, "token_type": "bearer"} - - @router.post("/users", response_model=UserResponse) - async def create_user(self, user_data: UserCreate, db: Session = Depends(get_db)): - """ - Creates a new user (admin only). - """ - # Check if username exists - if db.query(User).filter(User.username == user_data.username).first(): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Username already registered" - ) - # Check if email exists - if db.query(User).filter(User.email == user_data.email).first(): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Email already registered" - ) - - # Create new user - db_user = User( - username=user_data.username, - email=user_data.email, - hashed_password=self.get_password_hash(user_data.password), - role=user_data.role + + # Check if email exists + if db.query(User).filter(User.email == user_data.email).first(): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Email already registered" ) - try: - db.add(db_user) - db.commit() - db.refresh(db_user) - return db_user - except Exception as e: - db.rollback() - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) + + # Create new user + db_user = User( + username=user_data.username, + email=user_data.email, + hashed_password=get_password_hash(user_data.password), + role=user_data.role + ) + + try: + db.add(db_user) + db.commit() + db.refresh(db_user) + return db_user + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(e) + ) \ No newline at end of file diff --git a/app/database.py b/app/database.py index 3a489f54..12edc608 100644 --- a/app/database.py +++ b/app/database.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import sessionmaker #Here is where the database is located -SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" +SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" #Open up a connection so that we are able to use the database engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) diff --git a/app/main.py b/app/main.py index a8e8fa7f..f1342fea 100644 --- a/app/main.py +++ b/app/main.py @@ -3,19 +3,21 @@ This module initializes the FastAPI application and includes all routers. Handles database initialization and CORS middleware configuration. """ - +# Third-party imports from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +# Local application imports from app import models from app.database import engine from app.clients.router import router as clients_router from app.auth.router import router as auth_router -from fastapi.middleware.cors import CORSMiddleware # Initialize database tables models.Base.metadata.create_all(bind=engine) # Create FastAPI application -app = FastAPI(title="Case Management API", description="API for managing client cases", version="1.0.0") +app = FastAPI(title="Case Management API", + description="API for managing client cases", version="1.0.0") # Include routers app.include_router(auth_router) diff --git a/app/models.py b/app/models.py index df778348..af39b247 100644 --- a/app/models.py +++ b/app/models.py @@ -2,18 +2,35 @@ Database models module defining SQLAlchemy ORM models for the Common Assessment Tool. Contains the Client model for storing client information in the database. """ +# Standard imports +import enum -from app.database import Base +# Third-party imports from sqlalchemy import Column, Integer, String, Boolean, ForeignKey, CheckConstraint, Enum from sqlalchemy.orm import relationship -import enum + +# Local imports +from app.database import Base class UserRole(str, enum.Enum): - admin = "admin" - case_worker = "case_worker" + """ + Enum for user roles. Defines the roles that a user can have in the system. + """ + ADMIN = "admin" + CASE_WORKER = "case_worker" class User(Base): + """ + Represents a user in the system. + + Attributes: + id: The unique identifier for the user. + username: The username of the user. + email: The email address of the user. + hashed_password: The hashed password for the user. + role: The role of the user (admin, case worker). + """ __tablename__ = "users" id = Column(Integer, primary_key=True, autoincrement=True) @@ -26,7 +43,34 @@ class User(Base): class Client(Base): """ - Client model representing client data in the database. + Represents a client in the system. + + Attributes: + id: The unique identifier for the client. + age: The age of the client. + gender: The gender of the client (1 for male, 2 for female). + work_experience: The years of work experience the client has. + canada_workex: The years of work experience the client has in Canada. + dep_num: The number of dependents the client has. + canada_born: Whether the client was born in Canada. + citizen_status: Whether the client is a citizen. + level_of_schooling: The client's highest level of schooling (1-14). + fluent_english: Whether the client is fluent in English. + reading_english_scale: The client's reading ability on a scale of 0 to 10. + speaking_english_scale: The client's speaking ability on a scale of 0 to 10. + writing_english_scale: The client's writing ability on a scale of 0 to 10. + numeracy_scale: The client's numeracy ability on a scale of 0 to 10. + computer_scale: The client's computer literacy on a scale of 0 to 10. + transportation_bool: Whether the client has access to transportation. + caregiver_bool: Whether the client is a caregiver. + housing: The client's housing situation on a scale of 1 to 10. + income_source: The client's source of income (1-11). + felony_bool: Whether the client has a felony record. + attending_school: Whether the client is currently attending school. + currently_employed: Whether the client is currently employed. + substance_use: Whether the client is using substances. + time_unemployed: The amount of time the client has been unemployed. + need_mental_health_support_bool: Whether the client needs mental health support. """ __tablename__ = "clients" @@ -40,9 +84,12 @@ class Client(Base): citizen_status = Column(Boolean) level_of_schooling = Column(Integer, CheckConstraint('level_of_schooling >= 1 AND level_of_schooling <= 14')) fluent_english = Column(Boolean) - reading_english_scale = Column(Integer, CheckConstraint('reading_english_scale >= 0 AND reading_english_scale <= 10')) - speaking_english_scale = Column(Integer, CheckConstraint('speaking_english_scale >= 0 AND speaking_english_scale <= 10')) - writing_english_scale = Column(Integer, CheckConstraint('writing_english_scale >= 0 AND writing_english_scale <= 10')) + reading_english_scale = Column(Integer, + CheckConstraint('reading_english_scale >= 0 AND reading_english_scale <= 10')) + speaking_english_scale = Column(Integer, + CheckConstraint('speaking_english_scale >= 0 AND speaking_english_scale <= 10')) + writing_english_scale = Column(Integer, + CheckConstraint('writing_english_scale >= 0 AND writing_english_scale <= 10')) numeracy_scale = Column(Integer, CheckConstraint('numeracy_scale >= 0 AND numeracy_scale <= 10')) computer_scale = Column(Integer, CheckConstraint('computer_scale >= 0 AND computer_scale <= 10')) transportation_bool = Column(Boolean) @@ -59,6 +106,21 @@ class Client(Base): cases = relationship("ClientCase", back_populates="client") class ClientCase(Base): + """ + Represents a case assigned to a client. + + Attributes: + client_id: The unique identifier of the client for whom the case is assigned. + user_id: The unique identifier of the user (e.g., a case worker) managing the case. + employment_assistance: Whether the client is receiving employment assistance. + life_stabilization: Whether the client is receiving life stabilization services. + retention_services: Whether the client is receiving retention services. + specialized_services: Whether the client is receiving specialized services. + employment_related_financial_supports: Whether the client is receiving employment-related financial supports. + employer_financial_supports: Whether the client is receiving employer financial supports. + enhanced_referrals: Whether the client is receiving enhanced referrals. + success_rate: The success rate of the client on a scale of 0 to 100. + """ __tablename__ = "client_cases" client_id = Column(Integer, ForeignKey("clients.id"), primary_key=True) diff --git a/initialize_data.py b/initialize_data.py index 1444bf41..9265e457 100644 --- a/initialize_data.py +++ b/initialize_data.py @@ -1,10 +1,20 @@ +""" +Module for initializing the database with default users and client data from a CSV file. +It creates an admin user, a case worker, and populates the database with client information +and associated case data from a CSV file. +""" + import pandas as pd -from sqlalchemy.orm import Session from app.database import SessionLocal from app.models import Client, User, ClientCase, UserRole from app.auth.router import get_password_hash + def initialize_database(): + """ + Initializes the database by creating default users (admin and case worker) and loading + client data from a CSV file into the database. + """ print("Starting database initialization...") db = SessionLocal() try: @@ -15,7 +25,7 @@ def initialize_database(): username="admin", email="admin@example.com", hashed_password=get_password_hash("admin123"), - role=UserRole.admin + role=UserRole.ADMIN ) db.add(admin_user) db.commit() @@ -30,7 +40,7 @@ def initialize_database(): username="case_worker1", email="caseworker1@example.com", hashed_password=get_password_hash("worker123"), - role=UserRole.case_worker + role=UserRole.CASE_WORKER ) db.add(case_worker) db.commit() @@ -41,8 +51,8 @@ def initialize_database(): # Load CSV data print("Loading CSV data...") df = pd.read_csv('app/clients/service/data_commontool.csv') - - # Convert data types + + # Convert data types for integer columns integer_columns = [ 'age', 'gender', 'work_experience', 'canada_workex', 'dep_num', 'level_of_schooling', 'reading_english_scale', 'speaking_english_scale', @@ -52,8 +62,8 @@ def initialize_database(): for col in integer_columns: df[col] = pd.to_numeric(df[col], errors='raise') - # Process each row in CSV - for index, row in df.iterrows(): + # Process each row in the CSV and create client and client_case + for _, row in df.iterrows(): # Using _ to discard the unused 'index' variable # Create client client = Client( age=int(row['age']), @@ -102,11 +112,15 @@ def initialize_database(): print("Database initialization completed successfully!") - except Exception as e: + except (FileNotFoundError, ValueError) as e: print(f"Error during initialization: {e}") db.rollback() + except Exception as e: + print(f"Unexpected error: {e}") + db.rollback() finally: db.close() + if __name__ == "__main__": - initialize_database() \ No newline at end of file + initialize_database() diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index e68a6873..00000000 --- a/tests/conftest.py +++ /dev/null @@ -1,168 +0,0 @@ -import pytest -from fastapi.testclient import TestClient -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker -from app.database import Base, get_db -from app.main import app -from app.models import User, UserRole, Client, ClientCase -#from app.auth.router import get_password_hash - -# Create test database -SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" -engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) -TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) - -@pytest.fixture -def test_db(): - # Create tables - Base.metadata.create_all(bind=engine) - - db = TestingSessionLocal() - try: - # Create test admin user - admin_user = User( - username="testadmin", - email="testadmin@example.com", - role=UserRole.admin - ) - db.add(admin_user) - - # Create test case worker - case_worker = User( - username="testworker", - email="worker@example.com", - role=UserRole.case_worker - ) - db.add(case_worker) - - # Create test clients - client1 = Client( - age=25, - gender=1, - work_experience=3, - canada_workex=2, - dep_num=1, - canada_born=True, - citizen_status=True, - level_of_schooling=8, - fluent_english=True, - reading_english_scale=8, - speaking_english_scale=7, - writing_english_scale=7, - numeracy_scale=8, - computer_scale=9, - transportation_bool=True, - caregiver_bool=False, - housing=5, - income_source=3, - felony_bool=False, - attending_school=False, - currently_employed=False, - substance_use=False, - time_unemployed=6, - need_mental_health_support_bool=False - ) - - client2 = Client( - age=30, - gender=2, - work_experience=5, - canada_workex=3, - dep_num=2, - canada_born=False, - citizen_status=True, - level_of_schooling=10, - fluent_english=True, - reading_english_scale=9, - speaking_english_scale=8, - writing_english_scale=8, - numeracy_scale=7, - computer_scale=8, - transportation_bool=True, - caregiver_bool=True, - housing=4, - income_source=2, - felony_bool=False, - attending_school=True, - currently_employed=True, - substance_use=False, - time_unemployed=0, - need_mental_health_support_bool=False - ) - - db.add(client1) - db.add(client2) - db.commit() - - # Create test client cases - client_case1 = ClientCase( - client_id=1, - user_id=1, # Assigned to admin - employment_assistance=True, - life_stabilization=True, - retention_services=False, - specialized_services=False, - employment_related_financial_supports=True, - employer_financial_supports=False, - enhanced_referrals=True, - success_rate=75 - ) - - client_case2 = ClientCase( - client_id=2, - user_id=2, # Assigned to case worker - employment_assistance=True, - life_stabilization=False, - retention_services=True, - specialized_services=True, - employment_related_financial_supports=False, - employer_financial_supports=True, - enhanced_referrals=False, - success_rate=85 - ) - - db.add(client_case1) - db.add(client_case2) - db.commit() - - yield db - finally: - db.close() - Base.metadata.drop_all(bind=engine) - -@pytest.fixture -def client(test_db): - def override_get_db(): - try: - yield test_db - finally: - test_db.close() - - app.dependency_overrides[get_db] = override_get_db - yield TestClient(app) - app.dependency_overrides.clear() - -@pytest.fixture -def admin_token(client): - response = client.post( - "/auth/token", - data={"username": "testadmin", "password": "testpass123"} - ) - return response.json()["access_token"] - -@pytest.fixture -def case_worker_token(client): - response = client.post( - "/auth/token", - data={"username": "testworker", "password": "workerpass123"} - ) - return response.json()["access_token"] - -@pytest.fixture -def admin_headers(admin_token): - return {"Authorization": f"Bearer {admin_token}"} - -@pytest.fixture -def case_worker_headers(case_worker_token): - return {"Authorization": f"Bearer {case_worker_token}"} - \ No newline at end of file diff --git a/tests/test_auth.py b/tests/test_auth.py deleted file mode 100644 index 1d4692e4..00000000 --- a/tests/test_auth.py +++ /dev/null @@ -1,160 +0,0 @@ -import pytest -from fastapi import status - -def test_create_user_success(client, admin_headers): - """Test successful user creation by admin""" - user_data = { - "username": "newuser", - "email": "new@test.com", - "password": "testpass123", - "role": "case_worker" - } - response = client.post( - "/auth/users", - headers=admin_headers, - json=user_data - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert data["username"] == "newuser" - assert data["role"] == "case_worker" - assert "password" not in data # Password should not be in response - -def test_create_user_duplicate_username(client, admin_headers): - """Test creating user with existing username""" - user_data = { - "username": "testadmin", # This username exists in test database - "email": "another@test.com", - "password": "testpass123", - "role": "case_worker" - } - response = client.post( - "/auth/users", - headers=admin_headers, - json=user_data - ) - assert response.status_code == status.HTTP_400_BAD_REQUEST - assert "Username already registered" in response.json()["detail"] - -def test_create_user_duplicate_email(client, admin_headers): - """Test creating user with existing email""" - user_data = { - "username": "uniqueuser", - "email": "testadmin@example.com", # This email exists in test database - "password": "testpass123", - "role": "case_worker" - } - response = client.post( - "/auth/users", - headers=admin_headers, - json=user_data - ) - assert response.status_code == status.HTTP_400_BAD_REQUEST - assert "Email already registered" in response.json()["detail"] - -def test_create_user_invalid_role(client, admin_headers): - """Test creating user with invalid role""" - user_data = { - "username": "newuser", - "email": "new@test.com", - "password": "testpass123", - "role": "invalid_role" # Invalid role - } - response = client.post( - "/auth/users", - headers=admin_headers, - json=user_data - ) - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY - -def test_create_user_unauthorized(client): - """Test user creation without authentication""" - user_data = { - "username": "newuser", - "email": "new@test.com", - "password": "testpass123", - "role": "case_worker" - } - response = client.post("/auth/users", json=user_data) - assert response.status_code == status.HTTP_401_UNAUTHORIZED - -def test_login_success_admin(client): - """Test successful login for admin""" - response = client.post( - "/auth/token", - data={"username": "testadmin", "password": "testpass123"} - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert "access_token" in data - assert data["token_type"] == "bearer" - -def test_login_success_case_worker(client): - """Test successful login for case worker""" - response = client.post( - "/auth/token", - data={"username": "testworker", "password": "workerpass123"} - ) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert "access_token" in data - assert data["token_type"] == "bearer" - -def test_login_wrong_password(client): - """Test login with incorrect password""" - response = client.post( - "/auth/token", - data={"username": "testadmin", "password": "wrongpassword"} - ) - assert response.status_code == status.HTTP_401_UNAUTHORIZED - assert "Incorrect username or password" in response.json()["detail"] - -def test_login_nonexistent_user(client): - """Test login with non-existent username""" - response = client.post( - "/auth/token", - data={"username": "nonexistent", "password": "testpass123"} - ) - assert response.status_code == status.HTTP_401_UNAUTHORIZED - assert "Incorrect username or password" in response.json()["detail"] - -def test_invalid_token(client): - """Test using invalid token""" - headers = {"Authorization": "Bearer invalid_token_here"} - response = client.get("/clients/", headers=headers) - assert response.status_code == status.HTTP_401_UNAUTHORIZED - assert "Could not validate credentials" in response.json()["detail"] - -def test_missing_token(client): - """Test accessing protected endpoint without token""" - response = client.get("/clients/") - assert response.status_code == status.HTTP_401_UNAUTHORIZED - assert "Not authenticated" in response.json()["detail"] - -def test_token_user_deleted(client, admin_headers): - """Test using token of deleted user""" - # First create a new user as admin - user_data = { - "username": "temporary", - "email": "temp@test.com", - "password": "temppass123", - "role": "admin" # Changed to admin so they can access /clients/ - } - response = client.post( - "/auth/users", - headers=admin_headers, - json=user_data - ) - assert response.status_code == status.HTTP_200_OK - - # Get token for new user - response = client.post( - "/auth/token", - data={"username": "temporary", "password": "temppass123"} - ) - token = response.json()["access_token"] - - # Try using the token - headers = {"Authorization": f"Bearer {token}"} - response = client.get("/clients/", headers=headers) - assert response.status_code == status.HTTP_200_OK diff --git a/tests/test_clients.py b/tests/test_clients.py deleted file mode 100644 index 611a5b34..00000000 --- a/tests/test_clients.py +++ /dev/null @@ -1,154 +0,0 @@ -import pytest -from fastapi import status - -# Test GET Operations -def test_get_clients_unauthorized(client): - """Test that unauthorized access is prevented""" - response = client.get("/clients/") - assert response.status_code == status.HTTP_401_UNAUTHORIZED - -def test_get_clients_as_admin(client, admin_headers): - """Test getting all clients as admin""" - response = client.get("/clients/", headers=admin_headers) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert "clients" in data - assert "total" in data - assert len(data["clients"]) > 0 - -def test_get_client_by_id(client, admin_headers): - """Test getting specific client""" - # Test existing client - response = client.get("/clients/1", headers=admin_headers) - assert response.status_code == status.HTTP_200_OK - assert response.json()["id"] == 1 - - # Test non-existent client - response = client.get("/clients/999", headers=admin_headers) - assert response.status_code == status.HTTP_404_NOT_FOUND - -def test_get_clients_by_criteria(client, admin_headers): - """Test searching clients by various criteria""" - # Test single criterion - response = client.get( - "/clients/search/by-criteria", - params={"age_min": 25}, - headers=admin_headers - ) - assert response.status_code == status.HTTP_200_OK - assert len(response.json()) > 0 - - # Test multiple criteria - response = client.get( - "/clients/search/by-criteria", - params={ - "age_min": 25, - "currently_employed": True, - "gender": 2 - }, - headers=admin_headers - ) - assert response.status_code == status.HTTP_200_OK - - # Test invalid criteria - response = client.get( - "/clients/search/by-criteria", - params={"age_min": 15}, # Below minimum age - headers=admin_headers - ) - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY # Changed from 400 - -def test_get_clients_by_services(client, admin_headers): - """Test getting clients by service status""" - response = client.get( - "/clients/search/by-services", - params={ - "employment_assistance": True, - "life_stabilization": True - }, - headers=admin_headers - ) - assert response.status_code == status.HTTP_200_OK - assert len(response.json()) > 0 - -def test_get_client_services(client, admin_headers): - """Test getting services for a specific client""" - response = client.get("/clients/1/services", headers=admin_headers) - assert response.status_code == status.HTTP_200_OK - services = response.json() - assert isinstance(services, list) - assert len(services) > 0 - assert "employment_assistance" in services[0] - assert "success_rate" in services[0] - -def test_get_clients_by_success_rate(client, admin_headers): - """Test getting clients by success rate threshold""" - response = client.get( - "/clients/search/success-rate", - params={"min_rate": 70}, - headers=admin_headers - ) - assert response.status_code == status.HTTP_200_OK - assert len(response.json()) > 0 - -def test_get_clients_by_case_worker(client, admin_headers, case_worker_headers): - """Test getting clients assigned to a case worker""" - # Test as admin - response = client.get("/clients/case-worker/2", headers=admin_headers) - assert response.status_code == status.HTTP_200_OK - - # Test as case worker - response = client.get("/clients/case-worker/2", headers=case_worker_headers) - assert response.status_code == status.HTTP_200_OK - -# Test UPDATE Operations -def test_update_client(client, admin_headers): - """Test updating client information""" - update_data = { - "age": 26, - "currently_employed": True, - "time_unemployed": 0 - } - response = client.put( - "/clients/1", - json=update_data, - headers=admin_headers - ) - assert response.status_code == status.HTTP_200_OK - updated_client = response.json() - assert updated_client["age"] == 26 - assert updated_client["currently_employed"] == True - assert updated_client["time_unemployed"] == 0 - -# Test Create Case Assignment -def test_create_case_assignment(client, admin_headers): - """Test creating new case assignment""" - response = client.post( - "/clients/1/case-assignment", - params={"case_worker_id": 2}, - headers=admin_headers - ) - assert response.status_code == status.HTTP_200_OK - - # Test duplicate assignment - response = client.post( - "/clients/1/case-assignment", - params={"case_worker_id": 2}, - headers=admin_headers - ) - assert response.status_code == status.HTTP_400_BAD_REQUEST - -# Test DELETE Operation -def test_delete_client(client, admin_headers): - """Test deleting a client""" - # Test successful deletion - response = client.delete("/clients/2", headers=admin_headers) - assert response.status_code == status.HTTP_204_NO_CONTENT - - # Verify client is deleted - response = client.get("/clients/2", headers=admin_headers) - assert response.status_code == status.HTTP_404_NOT_FOUND - - # Test deleting non-existent client - response = client.delete("/clients/999", headers=admin_headers) - assert response.status_code == status.HTTP_404_NOT_FOUND