Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions backend/app/repositories/csv_repository.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
import logging
from uuid import UUID

import pandas as pd
from supabase._async.client import AsyncClient

from app.core.exceptions import NotFoundException
from app.schemas.csv_schemas import CSVInsertResult

logger = logging.getLogger(__name__)


class CSVRepository:
"""Repository for CSV-based run ingestion and stride data storage."""

def __init__(self, supabase: AsyncClient) -> None:
self.supabase = supabase
self.metrics_table = "run_metrics"
self.run_table = "run"

async def verify_athlete_belongs_to_coach(
self, athlete_id: str, coach_id: UUID
) -> None:
"""Verify an athlete belongs to the coach. Raises NotFoundException if not."""
result = (
await self.supabase.table("athletes")
.select("athlete_id")
.eq("athlete_id", athlete_id)
.eq("coach_id", str(coach_id))
.execute()
)
if not result.data:
raise NotFoundException("Athlete", athlete_id)

async def create_record(
self,
athlete_id: str,
Expand Down
25 changes: 7 additions & 18 deletions backend/app/routes/csv_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,10 @@ async def upload_data_csv(
status_code=400, detail=f"Failed to read CSV file: {str(e)}"
) from e

try:
result = await service.ingest_stride_csv(
raw_df,
athlete_id=str(athlete_id),
event_type=event_type,
name=name,
elapsed_ms=elapsed_ms,
)
return result
except HTTPException:
span.set_attribute("error", True)
raise
except Exception as e:
logger.exception("Failed to ingest run data frame")
span.set_attribute("error", True)
raise HTTPException(
status_code=500, detail=f"Failed to ingest run data frame: {str(e)}"
) from e
return await service.ingest_stride_csv(
raw_df,
athlete_id=str(athlete_id),
event_type=event_type,
name=name,
elapsed_ms=elapsed_ms,
)
71 changes: 21 additions & 50 deletions backend/app/services/csv_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from uuid import UUID

import pandas as pd
from fastapi import HTTPException
from opentelemetry.trace import StatusCode

from app.core.observability import get_tracer
from app.repositories.csv_repository import CSVRepository
Expand All @@ -13,6 +13,8 @@


class CSVService:
"""Service for CSV-based run ingestion."""

def __init__(self, repository: CSVRepository, coach_id: UUID) -> None:
self.repository = repository
self.coach_id = coach_id
Expand All @@ -25,48 +27,20 @@ async def ingest_stride_csv(
name: str | None = None,
elapsed_ms: int | None = None,
) -> CSVUploadResponse:

# Athlete Check
athlete_check = (
await self.repository.supabase.table("athletes")
.select("athlete_id")
.eq("athlete_id", athlete_id)
.eq("coach_id", str(self.coach_id))
.execute()
)
if not athlete_check.data:
raise HTTPException(status_code=404, detail="Athlete not found")

# Transform
try:
transformed_df = transform_feet_to_stride_cycles(raw_df)
except Exception as e:
logger.exception("Service: Run data transform failed")
raise HTTPException(
status_code=500, detail=f"Run data transform failed: {str(e)}"
) from e
"""Transform raw CSV data into stride cycles and persist a complete run."""
await self.repository.verify_athlete_belongs_to_coach(athlete_id, self.coach_id)

tracer = get_tracer()
with tracer.start_as_current_span("csv.ingest") as span:
span.set_attribute("csv.rows_in", len(raw_df))
try:
span.set_attribute("csv.rows_in", len(raw_df))

# Use client-provided elapsed_ms (wall-clock); fall back to CSV Time delta
if elapsed_ms is None and "Time" in raw_df.columns and len(raw_df) > 0:
elapsed_ms = int(raw_df["Time"].max() - raw_df["Time"].min())
if elapsed_ms is None and "Time" in raw_df.columns and len(raw_df) > 0:
elapsed_ms = int(raw_df["Time"].max() - raw_df["Time"].min())

# Transform
try:
transformed_df = transform_feet_to_stride_cycles(raw_df)
span.set_attribute("csv.rows_transformed", len(transformed_df))
except Exception as e:
logger.exception("Service: Run data transform failed")
span.set_attribute("error", True)
raise HTTPException(
status_code=500, detail=f"Run data transform failed: {str(e)}"
) from e

# Load
try:
result = await self.repository.insert_complete_run(
df=transformed_df,
athlete_id=athlete_id,
Expand All @@ -76,20 +50,17 @@ async def ingest_stride_csv(
)
span.set_attribute("csv.run_id", result.run_id)
span.set_attribute("csv.rows_inserted", result.rows_inserted)
except Exception as e:
logger.exception("Service: Transformed run data insert failed")
span.set_attribute("error", True)
raise HTTPException(
status_code=500,
detail=f"Transformed run data insert failed: {str(e)}",
) from e

logger.info(
f"Service: ingest_stride_csv rows_in={len(raw_df)} rows_out={len(transformed_df)} run_id={result.run_id}"
)
logger.info(
f"Service: ingest_stride_csv rows_in={len(raw_df)} rows_out={len(transformed_df)} run_id={result.run_id}"
)

return CSVUploadResponse(
message=f"CSV uploaded successfully. Run ID: {result.run_id}, Rows inserted: {result.rows_inserted}",
run_id=str(result.run_id),
rows_inserted=result.rows_inserted,
)
return CSVUploadResponse(
message=f"CSV uploaded successfully. Run ID: {result.run_id}, Rows inserted: {result.rows_inserted}",
run_id=str(result.run_id),
rows_inserted=result.rows_inserted,
)
except Exception as e:
span.set_status(StatusCode.ERROR, str(e))
span.record_exception(e)
raise
Loading
Loading