diff --git a/src/repository/images.py b/src/repository/images.py index 9bd3086..6f5d94c 100644 --- a/src/repository/images.py +++ b/src/repository/images.py @@ -119,4 +119,28 @@ async def put_image(image_id:int, new_description:str, current_user: User, db: S db.add(new_images) db.commit() db.refresh(new_images) - return new_images \ No newline at end of file + return new_images + + +async def get_image_by_tag(hashtag_name: str, db: Session): + """ + This function asynchronously fetches image URLs from the database + based on a provided hashtag name. + + Args: + hashtag_name: The name of the hashtag to search for (str). + db: A database session object used for querying the database (Session). + + Returns: + A list of image URLs associated with the provided hashtag (List[str]). + """ + + from database.models import Base, Hashtag + + # Build the SQLAlchemy query to join Post and Hashtag tables + query = db.query(Post.image_url).join(Post.hashtags).filter(Hashtag.name == hashtag_name).all() + + # Extract image URLs from the query results + image_urls = [image_url for image_url, in query] + + return image_urls diff --git a/src/routes/images.py b/src/routes/images.py index 04489c1..cb5e970 100644 --- a/src/routes/images.py +++ b/src/routes/images.py @@ -89,38 +89,61 @@ async def put_image(image_id: int , new_description: str, return await repository_images.put_image(image_id, new_description, current_user, db) @router.get("/images/crop") -async def crop_image_view(image_url: str, width: int, height: int, description: str, hashtags: List[str], db: Session = Depends(get_db), current_user: User = Depends(auth_service.get_current_user), file: UploadFile = File(...)): +async def crop_image_view(image_id: int, width: int, height: int, description: str, db: Session = Depends(get_db), user: User = Depends(auth_service.get_current_user)): return await crop_image( - image_url=image_url, + image_id=image_id, width=width, height=height, description=description, - hashtags=hashtags, - current_user=current_user, - db=db, - file=file + user=user, + db=db ) @router.get("/images/effect") -async def apply_effect_view( image_url: str, effect: str, description: str, hashtags: List[str], db: Session = Depends(get_db), current_user: User = Depends(auth_service.get_current_user), file: UploadFile = File(...)): +async def apply_effect_view( image_id: int, effect: str, description: str, hashtags: List[str], db: Session = Depends(get_db), user: User = Depends(auth_service.get_current_user)): return await apply_effect( - image_url=image_url, + image_id=image_id, effect = effect, description=description, hashtags=hashtags, - current_user=current_user, - db=db, - file=file + user=user, + db=db ) @router.get("/images/roundcorners") -async def round_corners(image_url: str, radius: int, description: str, hashtags: List[str], db: Session = Depends(get_db), current_user: User = Depends(auth_service.get_current_user), file: UploadFile = File(...)): +async def round_corners(image_id: int, radius: int, description: str, hashtags: List[str], db: Session = Depends(get_db), user: User = Depends(auth_service.get_current_user)): return await round_corners( - image_url=image_url, + image_id=image_id, radius = radius, description=description, hashtags=hashtags, - current_user=current_user, - db=db, - file=file -) \ No newline at end of file + user=user, + db=db +) + +@router.get("/get_image_by_tag") +async def get_images_by_tag( + hashtag_name: str, # Name of the hashtag to search for (without the # symbol) + db: Session = Depends(get_db) # Database session dependency (injected using Depends) +): + + """ + Fetches images associated with a given hashtag from the database asynchronously. + + Args: + hashtag_name (str): The hashtag name to query (without the # symbol). + db (Session): The database session object (injected using Depends). + + Returns: + List[Image]: A list of Image objects matching the provided hashtag. + + Raises: + Exception: If an error occurs while fetching images from the database. + """ + + from src.repository.images import get_image_by_tag + + try: + return await get_image_by_tag(hashtag_name=hashtag_name, db=db) + except Exception as e: + print( f"Error fetching images for hashtag '{hashtag_name}': {e}") \ No newline at end of file diff --git a/src/routes/utils.py b/src/routes/utils.py index 263ba0d..1c9f3e3 100644 --- a/src/routes/utils.py +++ b/src/routes/utils.py @@ -1,40 +1,28 @@ import os from cloudinary.uploader import upload -import cloudinary +from cloudinary import CloudinaryImage from typing import List from fastapi import File, HTTPException, UploadFile, status from sqlalchemy.orm import Session from src.repository.qr_code import get_qr_code_by_url from src.repository.tags import get_or_create_tag - +from src.repository.images import get_image from src.database.models import Post, User -from dotenv import load_dotenv -load_dotenv() from src.repository import images as repository_images -async def crop_image(image_url, width, height, description: str, hashtags: List[str], user: User, db: Session, file: UploadFile)-> Post: - - # Extract public ID from the image URL (assuming the format) - public_id = image_url.split("/")[-1].split(".")[0] +async def crop_image(image_id, width, height, description: str, user: User, db: Session)-> Post: - # Generate the transformation string for cropping - transformation = f"c_crop,w_{width},h_{height}" + image = get_image(image_id, user, db) - # Construct the URL for the cropped image - cropped_url = f"{image_url.split('/upload')[0]}/upload/{transformation}/{public_id}" + cropped_url = CloudinaryImage(image).image(transformation=[{"width": width, "height": height, "crop": "fill"}]) print(f"Secure URL: {cropped_url}") - dbtags =[] - for i in hashtags: - tags_list = i.split(',') - for tag in tags_list: - dbtag = await get_or_create_tag(db, tag) - dbtags.append(dbtag) + tags = [hashtag.name for hashtag in image.hashtags] url = cropped_url qr_url = await get_qr_code_by_url(url) - images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=dbtags) + images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=tags) db.add(images) db.commit() db.refresh(images) @@ -42,27 +30,22 @@ async def crop_image(image_url, width, height, description: str, hashtags: List[ -async def apply_effect(image_url, effect, description: str, hashtags: List[str], user: User, db: Session, file: UploadFile)-> Post: +async def apply_effect(image_id, effect, description: str, user: User, db: Session)-> Post: + + image = get_image(image_id, user, db) +<<<<<<< Updated upstream # Extract public ID from the image URL (assuming the format) - public_id = image_url.split("/")[-1].split(".")[0] - - # Build the transformation string for the effect - transformation = f"e_{effect}" - - # Construct the URL for the image with effect - effect_url = f"{image_url.split('/upload')[0]}/upload/{transformation}/{public_id}" +======= +>>>>>>> Stashed changes + effected_url = CloudinaryImage(image).image(transformation=[{"effect": effect}]) + print(f"Secure URL: {effected_url}") - dbtags =[] - for i in hashtags: - tags_list = i.split(',') - for tag in tags_list: - dbtag = await get_or_create_tag(db, tag) - dbtags.append(dbtag) + tags = [hashtag.name for hashtag in image.hashtags] - url = effect_url + url = effected_url qr_url = await get_qr_code_by_url(url) - images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=dbtags) + images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=tags) db.add(images) db.commit() db.refresh(images) @@ -71,29 +54,22 @@ async def apply_effect(image_url, effect, description: str, hashtags: List[str], -async def round_corners(image_url, radius, description: str, hashtags: List[str], user: User, db: Session, file: UploadFile)-> Post: - - # Extract public ID from the image URL (assuming the format) - public_id = image_url.split("/")[-1].split(".")[0] - - # Build the transformation string for rounded corners - transformation = f"r_{radius}" +async def round_corners(image_id, radius, description: str, user: User, db: Session)-> Post: + + image = get_image(image_id, user, db) - # Construct the URL for the image with rounded corners - rounded_url = f"{image_url.split('/upload')[0]}/upload/{transformation}/{public_id}" + rounded_url = CloudinaryImage(image).image(transformation=[{"radius": radius}]) + print(f"Secure URL: {rounded_url}") - dbtags =[] - for i in hashtags: - tags_list = i.split(',') - for tag in tags_list: - dbtag = await get_or_create_tag(db, tag) - dbtags.append(dbtag) + tags = [hashtag.name for hashtag in image.hashtags] url = rounded_url qr_url = await get_qr_code_by_url(url) - images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=dbtags) + images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=tags) db.add(images) db.commit() db.refresh(images) return images + + diff --git a/tests/test_services_email.py b/tests/test_services_email.py new file mode 100644 index 0000000..8f7f5c2 --- /dev/null +++ b/tests/test_services_email.py @@ -0,0 +1,76 @@ +# test_email.py +import unittest +from unittest.mock import patch, AsyncMock, MagicMock +from pydantic import EmailStr + +from src.services.email import send_email, conf +from fastapi_mail.errors import ConnectionErrors + + +class TestSendEmail(unittest.TestCase): + + @patch('email.auth_service.create_email_token') + @patch('email.FastMail') + def test_send_email_success(self, MockFastMail, mock_create_email_token): + mock_create_email_token.return_value = 'mock_token' + mock_fastmail_instance = MockFastMail.return_value + mock_fastmail_instance.send_message = AsyncMock() + + #Following instructions should be replaced with an actual data + email = EmailStr('test@example.com') + username = 'testuser' + host = 'localhost' + + # Run the send_email function + async def run_test(): + await send_email(email, username, host) + + import asyncio + asyncio.run(run_test()) + + # Check if token was created + mock_create_email_token.assert_called_once_with({'sub': email}) + + # Check if email was sent with correct parameters + mock_fastmail_instance.send_message.assert_awaited_once() + sent_message = mock_fastmail_instance.send_message.call_args[0][0] + + self.assertEqual(sent_message.subject, "Confirm your email ") + self.assertEqual(sent_message.recipients, [email]) + self.assertEqual(sent_message.template_body['host'], host) + self.assertEqual(sent_message.template_body['username'], username) + self.assertEqual(sent_message.template_body['token'], 'mock_token') + + @patch('email.auth_service.create_email_token') + @patch('email.FastMail') + def test_send_email_connection_error(self, MockFastMail, mock_create_email_token): + mock_create_email_token.return_value = 'mock_token' + mock_fastmail_instance = MockFastMail.return_value + mock_fastmail_instance.send_message = AsyncMock(side_effect=ConnectionErrors('Connection error')) + + email = EmailStr('test@example.com') + username = 'testuser' + host = 'localhost' + + # Run the send_email function + async def run_test(): + await send_email(email, username, host) + + import asyncio + asyncio.run(run_test()) + + # Check if token was created + mock_create_email_token.assert_called_once_with({'sub': email}) + + # Check if ConnectionErrors was raised + mock_fastmail_instance.send_message.assert_awaited_once() + sent_message = mock_fastmail_instance.send_message.call_args[0][0] + + self.assertEqual(sent_message.subject, "Confirm your email ") + self.assertEqual(sent_message.recipients, [email]) + self.assertEqual(sent_message.template_body['host'], host) + self.assertEqual(sent_message.template_body['username'], username) + self.assertEqual(sent_message.template_body['token'], 'mock_token') + +if __name__ == '__main__': + unittest.main()