Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
index.py
env/
env/
venv
1,001 changes: 1,001 additions & 0 deletions data/results.csv

Large diffs are not rendered by default.

Binary file added src/__pycache__/clean_data.cpython-312.pyc
Binary file not shown.
Binary file added src/__pycache__/evaluate.cpython-312.pyc
Binary file not shown.
Binary file added src/__pycache__/load_data.cpython-312.pyc
Binary file not shown.
20 changes: 20 additions & 0 deletions src/clean_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
import pandas as pd
def clean_sensor_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean sensor data by handling missing or invalid values.

Returns:
pd.DataFrame: Cleaned data.
"""
if df is None:
print("Cannot perform data cleaning. Kindly provide raw data frame")

print(df.info()) # inspecting the data for anonormallies
print(df.isnull().sum()) # This will show a count of missing values for each column

# create a copy of the data
df_copy = df.copy()
# ensure columns should have numeric values:
numeric_columns = ["ph", "turbidity", "dissolved_oxygen", "temperature"]

for column in numeric_columns:
if column in df_copy.columns:
df_copy[column] = pd.to_numeric(df_copy[column], errors="coerce") # convert to numeric values
df_copy[column] = df_copy[column].round(3) #convert to three decimal places

return df_copy


57 changes: 56 additions & 1 deletion src/evaluate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,64 @@
import pandas as pd


class WaterQualityEvaluator:
def __init__(self, ph_range=(6.5, 8.5), turbidity_threshold=1.0):
self.ph_range = ph_range
self.min_ph, self.max_ph = ph_range
self.turbidity_threshold = turbidity_threshold

def is_safe(self, row: pd.Series) -> bool:
"""
Determine if a row of water data is safe.

Args:
row (pd.Series): A row from the sensor data DataFrame.

Returns:
tuple: A tuple containing a boolean (True if safe, False if not)
and a string describing the status.
"""
ph = row.get("pH")
turbidity = row.get('turbidity')

# Check for missing values first
if pd.isnull(ph):
return False, "Unsafe (missing pH)"
if pd.isnull(turbidity):
return False, "Unsafe (missing turbidity)"

# Check against thresholds
if not (self.min_ph <= ph <= self.max_ph):
reason = "high" if ph > self.max_ph else "low"
return False, f"Unsafe (pH too {reason})"

if turbidity > self.turbidity_threshold:
return False, "Unsafe (turbidity too high)"

return True, "Safe"

def evaluate_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Applies the evaluation logic to an entire DataFrame.

Args:
df (pd.DataFrame): The cleaned sensor data DataFrame.

Returns:
pd.DataFrame: The DataFrame with a new 'status' column containing
the evaluation result string.
"""
if df is None:
return None

print("Evaluating data...")
# The apply function passes each row to our evaluate_row method
results = df.apply(self.is_safe, axis=1, result_type='expand')

# Rename the new columns created by the apply function
results.columns = ['is_safe', 'status_message']

# Join the results back to the original DataFrame
evaluated_df = df.join(results)
print("Evaluation complete.")
print(evaluated_df)
return evaluated_df
6 changes: 6 additions & 0 deletions src/load_data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import pandas as pd


def load_csv(filepath: str) -> pd.DataFrame:
"""
Load sensor data from a CSV file.
Expand All @@ -8,3 +11,6 @@ def load_csv(filepath: str) -> pd.DataFrame:
Returns:
pd.DataFrame: Loaded data as a pandas DataFrame.
"""

df = pd.read_csv(filepath)
return df
41 changes: 41 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
''' ETL Pipeline to Extract, Load and Transfrom Data in python

Task 1 = Extract data from a CSV file. This will be done by calling the load_data.py module and calling a funtion.
This funtion should be able to accept an argument, which is the file to work with.
This file will be passed as an input in the terminal when the python script is called.
The data in this file will be extracted and passed to a pandas dataframe and result will be printed out.
'''

from load_data import load_csv
from clean_data import clean_sensor_data
from evaluate import WaterQualityEvaluator
import pandas as pd
import sys
import argparse


print("********************Running ETL Pipeline*******************************\n")

try:
raw_data = sys.argv[1] # get the raw data from the terminal
sensor_raw_data = load_csv(raw_data) # extract the data using
print(sensor_raw_data.head())

try:
cleaned_data = clean_sensor_data(sensor_raw_data) # try to clean the data

try:
print(type(cleaned_data))
evaluator = WaterQualityEvaluator()
evaluated_data = evaluator.evaluate_data(cleaned_data)
if evaluated_data is not None:
evaluated_data.to_csv("data/results.csv", index=False)
except Exception as e:
print(e)

except Exception as e:
print(e)


except IndexError as e:
print("Please provide source data")