Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/repository/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,28 @@ async def put_image(image_id:int, new_description:str, current_user: User, db: S
db.add(new_images)
db.commit()
db.refresh(new_images)
return new_images
return new_images


async def get_image_by_tag(hashtag_name: str, db: Session):
"""
This function asynchronously fetches image URLs from the database
based on a provided hashtag name.

Args:
hashtag_name: The name of the hashtag to search for (str).
db: A database session object used for querying the database (Session).

Returns:
A list of image URLs associated with the provided hashtag (List[str]).
"""

from database.models import Base, Hashtag

# Build the SQLAlchemy query to join Post and Hashtag tables
query = db.query(Post.image_url).join(Post.hashtags).filter(Hashtag.name == hashtag_name).all()

# Extract image URLs from the query results
image_urls = [image_url for image_url, in query]

return image_urls
57 changes: 40 additions & 17 deletions src/routes/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,38 +89,61 @@ async def put_image(image_id: int , new_description: str,
return await repository_images.put_image(image_id, new_description, current_user, db)

@router.get("/images/crop")
async def crop_image_view(image_url: str, width: int, height: int, description: str, hashtags: List[str], db: Session = Depends(get_db), current_user: User = Depends(auth_service.get_current_user), file: UploadFile = File(...)):
async def crop_image_view(image_id: int, width: int, height: int, description: str, db: Session = Depends(get_db), user: User = Depends(auth_service.get_current_user)):
return await crop_image(
image_url=image_url,
image_id=image_id,
width=width,
height=height,
description=description,
hashtags=hashtags,
current_user=current_user,
db=db,
file=file
user=user,
db=db
)

@router.get("/images/effect")
async def apply_effect_view( image_url: str, effect: str, description: str, hashtags: List[str], db: Session = Depends(get_db), current_user: User = Depends(auth_service.get_current_user), file: UploadFile = File(...)):
async def apply_effect_view( image_id: int, effect: str, description: str, hashtags: List[str], db: Session = Depends(get_db), user: User = Depends(auth_service.get_current_user)):
return await apply_effect(
image_url=image_url,
image_id=image_id,
effect = effect,
description=description,
hashtags=hashtags,
current_user=current_user,
db=db,
file=file
user=user,
db=db
)

@router.get("/images/roundcorners")
async def round_corners(image_url: str, radius: int, description: str, hashtags: List[str], db: Session = Depends(get_db), current_user: User = Depends(auth_service.get_current_user), file: UploadFile = File(...)):
async def round_corners(image_id: int, radius: int, description: str, hashtags: List[str], db: Session = Depends(get_db), user: User = Depends(auth_service.get_current_user)):
return await round_corners(
image_url=image_url,
image_id=image_id,
radius = radius,
description=description,
hashtags=hashtags,
current_user=current_user,
db=db,
file=file
)
user=user,
db=db
)

@router.get("/get_image_by_tag")
async def get_images_by_tag(
hashtag_name: str, # Name of the hashtag to search for (without the # symbol)
db: Session = Depends(get_db) # Database session dependency (injected using Depends)
):

"""
Fetches images associated with a given hashtag from the database asynchronously.

Args:
hashtag_name (str): The hashtag name to query (without the # symbol).
db (Session): The database session object (injected using Depends).

Returns:
List[Image]: A list of Image objects matching the provided hashtag.

Raises:
Exception: If an error occurs while fetching images from the database.
"""

from src.repository.images import get_image_by_tag

try:
return await get_image_by_tag(hashtag_name=hashtag_name, db=db)
except Exception as e:
print( f"Error fetching images for hashtag '{hashtag_name}': {e}")
78 changes: 27 additions & 51 deletions src/routes/utils.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,51 @@
import os
from cloudinary.uploader import upload
import cloudinary
from cloudinary import CloudinaryImage
from typing import List
from fastapi import File, HTTPException, UploadFile, status
from sqlalchemy.orm import Session
from src.repository.qr_code import get_qr_code_by_url
from src.repository.tags import get_or_create_tag

from src.repository.images import get_image
from src.database.models import Post, User
from dotenv import load_dotenv
load_dotenv()

from src.repository import images as repository_images

async def crop_image(image_url, width, height, description: str, hashtags: List[str], user: User, db: Session, file: UploadFile)-> Post:

# Extract public ID from the image URL (assuming the format)
public_id = image_url.split("/")[-1].split(".")[0]
async def crop_image(image_id, width, height, description: str, user: User, db: Session)-> Post:

# Generate the transformation string for cropping
transformation = f"c_crop,w_{width},h_{height}"
image = get_image(image_id, user, db)

# Construct the URL for the cropped image
cropped_url = f"{image_url.split('/upload')[0]}/upload/{transformation}/{public_id}"
cropped_url = CloudinaryImage(image).image(transformation=[{"width": width, "height": height, "crop": "fill"}])
print(f"Secure URL: {cropped_url}")

dbtags =[]
for i in hashtags:
tags_list = i.split(',')
for tag in tags_list:
dbtag = await get_or_create_tag(db, tag)
dbtags.append(dbtag)
tags = [hashtag.name for hashtag in image.hashtags]

url = cropped_url
qr_url = await get_qr_code_by_url(url)
images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=dbtags)
images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=tags)
db.add(images)
db.commit()
db.refresh(images)
return images



async def apply_effect(image_url, effect, description: str, hashtags: List[str], user: User, db: Session, file: UploadFile)-> Post:
async def apply_effect(image_id, effect, description: str, user: User, db: Session)-> Post:

image = get_image(image_id, user, db)

<<<<<<< Updated upstream
# Extract public ID from the image URL (assuming the format)
public_id = image_url.split("/")[-1].split(".")[0]

# Build the transformation string for the effect
transformation = f"e_{effect}"

# Construct the URL for the image with effect
effect_url = f"{image_url.split('/upload')[0]}/upload/{transformation}/{public_id}"
=======
>>>>>>> Stashed changes
effected_url = CloudinaryImage(image).image(transformation=[{"effect": effect}])
print(f"Secure URL: {effected_url}")

dbtags =[]
for i in hashtags:
tags_list = i.split(',')
for tag in tags_list:
dbtag = await get_or_create_tag(db, tag)
dbtags.append(dbtag)
tags = [hashtag.name for hashtag in image.hashtags]

url = effect_url
url = effected_url
qr_url = await get_qr_code_by_url(url)
images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=dbtags)
images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=tags)
db.add(images)
db.commit()
db.refresh(images)
Expand All @@ -71,29 +54,22 @@ async def apply_effect(image_url, effect, description: str, hashtags: List[str],



async def round_corners(image_url, radius, description: str, hashtags: List[str], user: User, db: Session, file: UploadFile)-> Post:

# Extract public ID from the image URL (assuming the format)
public_id = image_url.split("/")[-1].split(".")[0]

# Build the transformation string for rounded corners
transformation = f"r_{radius}"
async def round_corners(image_id, radius, description: str, user: User, db: Session)-> Post:

image = get_image(image_id, user, db)

# Construct the URL for the image with rounded corners
rounded_url = f"{image_url.split('/upload')[0]}/upload/{transformation}/{public_id}"
rounded_url = CloudinaryImage(image).image(transformation=[{"radius": radius}])
print(f"Secure URL: {rounded_url}")

dbtags =[]
for i in hashtags:
tags_list = i.split(',')
for tag in tags_list:
dbtag = await get_or_create_tag(db, tag)
dbtags.append(dbtag)
tags = [hashtag.name for hashtag in image.hashtags]

url = rounded_url
qr_url = await get_qr_code_by_url(url)
images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=dbtags)
images = Post(description=description, author_id=user.id, image_url=url, qr_code_url=qr_url, hashtags=tags)
db.add(images)
db.commit()
db.refresh(images)
return images



76 changes: 76 additions & 0 deletions tests/test_services_email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# test_email.py
import unittest
from unittest.mock import patch, AsyncMock, MagicMock
from pydantic import EmailStr

from src.services.email import send_email, conf
from fastapi_mail.errors import ConnectionErrors


class TestSendEmail(unittest.TestCase):

@patch('email.auth_service.create_email_token')
@patch('email.FastMail')
def test_send_email_success(self, MockFastMail, mock_create_email_token):
mock_create_email_token.return_value = 'mock_token'
mock_fastmail_instance = MockFastMail.return_value
mock_fastmail_instance.send_message = AsyncMock()

#Following instructions should be replaced with an actual data
email = EmailStr('test@example.com')
username = 'testuser'
host = 'localhost'

# Run the send_email function
async def run_test():
await send_email(email, username, host)

import asyncio
asyncio.run(run_test())

# Check if token was created
mock_create_email_token.assert_called_once_with({'sub': email})

# Check if email was sent with correct parameters
mock_fastmail_instance.send_message.assert_awaited_once()
sent_message = mock_fastmail_instance.send_message.call_args[0][0]

self.assertEqual(sent_message.subject, "Confirm your email ")
self.assertEqual(sent_message.recipients, [email])
self.assertEqual(sent_message.template_body['host'], host)
self.assertEqual(sent_message.template_body['username'], username)
self.assertEqual(sent_message.template_body['token'], 'mock_token')

@patch('email.auth_service.create_email_token')
@patch('email.FastMail')
def test_send_email_connection_error(self, MockFastMail, mock_create_email_token):
mock_create_email_token.return_value = 'mock_token'
mock_fastmail_instance = MockFastMail.return_value
mock_fastmail_instance.send_message = AsyncMock(side_effect=ConnectionErrors('Connection error'))

email = EmailStr('test@example.com')
username = 'testuser'
host = 'localhost'

# Run the send_email function
async def run_test():
await send_email(email, username, host)

import asyncio
asyncio.run(run_test())

# Check if token was created
mock_create_email_token.assert_called_once_with({'sub': email})

# Check if ConnectionErrors was raised
mock_fastmail_instance.send_message.assert_awaited_once()
sent_message = mock_fastmail_instance.send_message.call_args[0][0]

self.assertEqual(sent_message.subject, "Confirm your email ")
self.assertEqual(sent_message.recipients, [email])
self.assertEqual(sent_message.template_body['host'], host)
self.assertEqual(sent_message.template_body['username'], username)
self.assertEqual(sent_message.template_body['token'], 'mock_token')

if __name__ == '__main__':
unittest.main()