Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions biolit/export_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import requests
import polars as pl
import structlog
import re
import os

LOGGER = structlog.get_logger()

# ------------------------------
# FETCH API
# ------------------------------
def fetch_biolit_from_api():

url = os.getenv("BIOLIT_API_URL")

response = requests.get(url)
response.raise_for_status()

data = response.json()

print(f"{len(data)} observations récupérées")
return data

# ------------------------------
# RENAME OF COLUMNS
# ------------------------------


def normalize_column_name(col: str) -> str:
"""Convertit les noms API en snake_case propre FR"""
col = col.lower()
col = col.replace("-", "_")
col = col.replace(" ", "_")
col = col.replace("é", "e").replace("è", "e").replace("à", "a")
col = col.replace("ù", "u").replace("ô", "o")
col = re.sub(r"[^a-z0-9_]", "", col)
return col


COLUMN_MAPPING = {
"id": "id_observation",
"date": "date_observation",
"link": "lien_observation",
"author": "observateur",
"_url_sortie": "url_sortie",
"espece-identifiee": "espece_identifiee",
"heure-debut": "heure_debut",
"heure-fin": "heure_fin",
"latitude": "latitude",
"longitude": "longitude",
"photos": "photos",
"relais": "relais",
"espece_id": "id_espece",
"espece": "nom_scientifique",
"common": "nom_commun",
"categorie-programme": "categorie_programme",
"programme": "programme",
}


# ------------------------------
# ADAPT API -> PARQUET
# ------------------------------
def adapt_api_to_dataframe(data: list) -> pl.DataFrame:
rows = []

for item in data:
new_row = {}

for key, value in item.items():
# mapping si connu, sinon normalisation auto
new_key = COLUMN_MAPPING.get(key, normalize_column_name(key))
new_row[new_key] = value

rows.append(new_row)

df = pl.DataFrame(rows)

return df


# ------------------------------
# LOAD (Fetch + Adapt)
# ------------------------------
def load_biolit_from_api() -> pl.DataFrame:
raw_data = fetch_biolit_from_api()
df = adapt_api_to_dataframe(raw_data)
return df

124 changes: 124 additions & 0 deletions biolit/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
import polars as pl
from sqlalchemy import create_engine, text


# -------------------------
# Connexion DB
# -------------------------

def get_engine():
postgres_url = os.getenv("POSTGRES_URL")

if not postgres_url:
raise ValueError("Missing POSTGRES_URL")

return create_engine(postgres_url)


# -------------------------
# Préparation des données
# -------------------------
def prepare_dataframe_for_postgres(df: pl.DataFrame) -> pl.DataFrame:
return df.with_columns([

# -------------------------
# IDs
# -------------------------
pl.col("id_observation")
.cast(pl.Int64),

pl.col("id_espece")
.cast(pl.Float64, strict=False)
.fill_nan(None)
.cast(pl.Int64, strict=False),

pl.col("categorie_programme")
.cast(pl.Float64, strict=False)
.fill_nan(None)
.cast(pl.Int64, strict=False),

pl.col("relais")
.cast(pl.Utf8)
.replace("", None)
.cast(pl.Float64, strict=False)
.fill_nan(None)
.cast(pl.Int64, strict=False),

# -------------------------
# Coordonnées
# -------------------------
pl.col("latitude")
.cast(pl.Utf8)
.str.strip_chars()
.cast(pl.Float64, strict=False),

pl.col("longitude")
.cast(pl.Utf8)
.str.strip_chars()
.cast(pl.Float64, strict=False),

# -------------------------
# Dates
# -------------------------
pl.col("date_observation")
.str.strptime(pl.Datetime, strict=False),

pl.col("heure_debut")
.str.strptime(pl.Time, strict=False),

pl.col("heure_fin")
.str.strptime(pl.Time, strict=False),
])

# -------------------------
# Insert avec sécurité (UPSERT)
# -------------------------

def insert_dataframe(df: pl.DataFrame):
engine = get_engine()

rows = df.to_dicts()

with engine.begin() as conn:
for row in rows:
conn.execute(text("""
INSERT INTO observations (
id_observation,
date_observation,
lien_observation,
observateur,
url_sortie,
espece_identifiee,
heure_debut,
heure_fin,
latitude,
longitude,
photos,
relais,
id_espece,
nom_scientifique,
nom_commun,
categorie_programme,
programme
) VALUES (
:id_observation,
:date_observation,
:lien_observation,
:observateur,
:url_sortie,
:espece_identifiee,
:heure_debut,
:heure_fin,
:latitude,
:longitude,
:photos,
:relais,
:id_espece,
:nom_scientifique,
:nom_commun,
:categorie_programme,
:programme
)
ON CONFLICT (id_observation) DO NOTHING
"""), row)
25 changes: 24 additions & 1 deletion pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,30 @@ Les pipelines utilisent le dossier `data/` (non versionné) comme espace de trav

## Flux quotidien (API → ML → Label Studio)

1. **Récupération quotidienne** depuis l'API (à venir) ou CSV local.
1. **Ingestion & préparation des données** depuis l'API.
1. **Objectifs**
- Récupérer les données depuis l’API Biolit
- Standardiser et nettoyer les données
- Les stocker dans une base PostgreSQL
- Mettre les données à disposition des autres systèmes (ML, dataviz)
2. **Étapes du pipeline**
1. Ingestion
- appel à l’API Biolit
- récupération des observations

2. Transformation
- normalisation des noms de colonnes
- typage des champs
- nettoyage des données (dates, coordonnées, identifiants)

3. Chargement
- insertion dans PostgreSQL
- gestion des doublons via un mécanisme d’UPSERT (ON CONFLICT DO NOTHING)
3. **Variables d'environnement**
- POSTGRES_URL=postgresql://user:password@host:port/dbname
- BIOLIT_API_URL=https://biolit.fr/wp-json/biolit/v1/observations?token=XXX
4. **Lancer la pipeline**
- uv run python -m pipelines.run
2. **Qualité** : si l'image est mauvaise → stop.
3. **YOLOv8** : détection + crop.
- si aucune détection → **Label Studio (CROP)**
Expand Down
22 changes: 22 additions & 0 deletions pipelines/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from biolit.export_api import fetch_biolit_from_api, adapt_api_to_dataframe
from biolit.postgres import prepare_dataframe_for_postgres, insert_dataframe


def run_pipeline():
print("Fetching data...")
data = fetch_biolit_from_api()

print("Transforming...")
df = adapt_api_to_dataframe(data)

print("Preparing for Postgres...")
df = prepare_dataframe_for_postgres(df)

print("Loading into Postgres...")
insert_dataframe(df)

print("DONE ✅")


if __name__ == "__main__":
run_pipeline()
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ dependencies = [
"plotly>=6.5.0",
"polars>=1.36.1",
"pre-commit>=4.5.1",
"prefect>=3.6.22",
"pyarrow>=23.0.0",
"psycopg2-binary>=2.9.11",
"pyarrow>=23.0.1",
"pytest>=9.0.2",
"requests>=2.32.3",
"ruff>=0.14.10",
"sqlalchemy>=2.0.44",
"prefect>=3.6.22",
"requests>=2.32.3",
"shapely>=2.1.2",
"structlog>=25.5.0",
"tqdm>=4.67.3",
Expand Down
Loading