Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,001 changes: 1,001 additions & 0 deletions data/results.csv

Large diffs are not rendered by default.

Binary file added src/__pycache__/clean_data.cpython-313.pyc
Binary file not shown.
Binary file added src/__pycache__/evaluate.cpython-313.pyc
Binary file not shown.
Binary file added src/__pycache__/load_data.cpython-313.pyc
Binary file not shown.
21 changes: 15 additions & 6 deletions src/clean_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
def clean_sensor_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean sensor data by handling missing or invalid values.

Returns:
pd.DataFrame: Cleaned data.
"""
import pandas as pd

def clean_sensor_data(df):
df = df.copy()
sensor_columns = ['pH', 'turbidity', 'temperature']

# Loop through each sensor column
for column_name in sensor_columns:
df[column_name] = pd.to_numeric(df[column_name], errors='coerce')

# Fill missing values (NaN) with the mean of the column
df[column_name] = df[column_name].fillna(df[column_name].mean())
df['pH'] = df['pH'].clip(lower=0, upper=14)

return df
67 changes: 59 additions & 8 deletions src/evaluate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,60 @@
import pandas as pd
from typing import List, Tuple

class SensorReading:
def __init__(self, sensor_id: str, location: str, ph: float, turbidity: float, temperature: float):
self.sensor_id = sensor_id
self.location = location
self.ph = ph
self.turbidity = turbidity
self.temperature = temperature
self.status: bool | None = None
self.reason: str | None = None

def evaluate_safety(self) -> Tuple[bool, str]:
is_safe = True
reasons: List[str] = []

# Evaluate pH
if pd.isna(self.ph):
reasons.append("pH missing")
is_safe = False
elif self.ph < 6.5:
reasons.append("pH too low")
is_safe = False
elif self.ph > 8.5:
reasons.append("pH too high")
is_safe = False

# Evaluate turbidity
if pd.isna(self.turbidity):
reasons.append("turbidity missing")
is_safe = False
elif self.turbidity > 1:
reasons.append("turbidity too high")
is_safe = False

self.status = is_safe
self.reason = ", ".join(reasons) if reasons else "Safe"
return self.status, self.reason


class WaterQualityEvaluator:
def __init__(self, ph_range=(6.5, 8.5), turbidity_threshold=1.0):
self.ph_range = ph_range
self.turbidity_threshold = turbidity_threshold

def is_safe(self, row: pd.Series) -> bool:
"""
Determine if a row of water data is safe.
"""
def __init__(self):
self.readings: List[SensorReading] = []

def add_reading(self, sensor_id: str, location: str, ph: float, turbidity: float, temperature: float):
self.readings.append(SensorReading(sensor_id, location, ph, turbidity, temperature))

def evaluate_all(self) -> List[Tuple[str, str, bool, str]]:
return [(r.sensor_id, r.location, *r.evaluate_safety()) for r in self.readings]

def count_safety_status(self) -> Tuple[int, int]:
# Ensure all readings have been evaluated
for r in self.readings:
if r.status is None:
r.evaluate_safety()
safe_count = sum(1 for r in self.readings if r.status)
unsafe_count = len(self.readings) - safe_count
return safe_count, unsafe_count

17 changes: 8 additions & 9 deletions src/load_data.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
def load_csv(filepath: str) -> pd.DataFrame:
"""
Load sensor data from a CSV file.
import pandas as pd

Args:
filepath (str): Path to the CSV file.

Returns:
pd.DataFrame: Loaded data as a pandas DataFrame.
"""
def load_data(file_path):
try:
df = pd.read_csv(file_path)
return df
except FileNotFoundError:
print("File not found.")
return None
77 changes: 77 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import sys
import pandas as pd
from pathlib import Path
from load_data import load_data
from clean_data import clean_sensor_data
from evaluate import WaterQualityEvaluator

DATA_PATH = Path(r"C:\Users\donald.chuku\water_quality_monitoring\data")

def assign_sensor_and_location(df: pd.DataFrame) -> pd.DataFrame:
"""
Assigns sensor IDs and locations to the dataframe.
Sensor ID: Sensor 001, Sensor 002, ...
Location: Lake A, Lake B, ...
"""
df = df.copy()
df['sensor_id'] = [f"Sensor {i+1:03d}" for i in range(len(df))]
df['location'] = [f"Lake {chr(65 + (i % 26))}" for i in range(len(df))]
return df

def main(location_filter: str = None):
try:
df = load_data(DATA_PATH / "sensor_data.csv")
except FileNotFoundError:
print("Error: File not found.")
return
except pd.errors.EmptyDataError:
print("Error: CSV file is empty.")
return
except Exception as e:
print(f"Unexpected error: {e}")
return

df = assign_sensor_and_location(df)
df_clean = clean_sensor_data(df)

evaluator = WaterQualityEvaluator()
for _, row in df_clean.iterrows():
evaluator.add_reading(
row['sensor_id'],
row['location'],
row['pH'],
row['turbidity'],
row['temperature']
)

results = evaluator.evaluate_all()

# Apply optional location filtering
if location_filter:
location_filter = location_filter.lower()
results = [r for r in results if location_filter in r[1].lower()]

# Display each result
for sensor_id, location, is_safe, reason in results:
status = "[Safe]" if is_safe else f"[Unsafe] ({reason})"
print(f"{sensor_id} at {location}: {status}")

print(f"\nShowing first {min(10, len(results))} of {len(results)} results:\n")
for sensor_id, location, is_safe, reason in results[:10]:
status = "[Safe]" if is_safe else f"[Unsafe] ({reason})"
print(f"{sensor_id} at {location}: {status}")

# Summary
safe_count, unsafe_count = evaluator.count_safety_status()
print(f"\nSummary: {safe_count} safe, {unsafe_count} unsafe")

# Save results
results_df = pd.DataFrame(results, columns=['sensor_id', 'location', 'is_safe', 'reason'])
output_file = DATA_PATH / "results.csv"
results_df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")

if __name__ == "__main__":
# Allow optional location filtering from command-line
location = sys.argv[1] if len(sys.argv) > 1 else None
main(location)