-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspark_script.py
More file actions
78 lines (61 loc) · 3.22 KB
/
spark_script.py
File metadata and controls
78 lines (61 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Import modules
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import StandardScaler, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Create connection and load csv
spark = SparkSession.builder.getOrCreate()
data_path = "C:\\Users\\ioann\\Desktop\\Programming\\Spark_Project"
file_path = data_path + "\\bank-full.csv"
df = spark.read.csv(file_path, header = True, inferSchema = True)
# Drop columns that are not needed for our analysis
df = df.drop("contact", "poutcome", "day")
# Discard rows with missing values
df = df.filter(df.education != "unknown").filter(df.job != "unknown")
df.count()
# Bucketize the pdays feature
df = df.withColumn("pdays", F.when(df.pdays < 0, "No contact")
.when(df.pdays <= 7, "Within Week")
.when(df.pdays <= 30, "Within Month")
.when(df.pdays <= 90, "Within Trimester")
.when(df.pdays <= 180, "Within Semester")
.when(df.pdays <= 365, "Within Year")
.when(df.pdays <= 730, "Within Two Years")
.otherwise("More than Two Years") )
# Scale the data
feature_list = ["age", "balance", "duration", "campaign", "previous"]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in feature_list]
scalers = [StandardScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in feature_list]
# String Indexers
labeled_cols = ["default", "housing", "loan", "y"]
encoded_cols = ["job", "marital", "education", "month", "pdays"]
indexers = [StringIndexer().setInputCol(col).setOutputCol(col + "_labeled") for col in labeled_cols + encoded_cols]
# One Hot Encoders
encoders = [OneHotEncoder(inputCol = col + "_labeled", outputCol = col + "_encoded") for col in encoded_cols]
# Create the preprocessing timeline
input_stages = assemblers + scalers + indexers + encoders
pipeline = Pipeline(stages = input_stages)
preprocessor = pipeline.fit(df)
df = preprocessor.transform(df)
# Remove unnecessary columns
labeled_cols += [s + "_labeled" for s in encoded_cols]
feature_list += [s + "_vec" for s in feature_list]
df = df.drop(*feature_list, *labeled_cols, *encoded_cols, "features")
# Write the pre-processed data to a csv file
df.write.csv("bank-processed.csv", header = True)
# Train test split
(trainingData, testData) = df.randomSplit([0.8, 0.2])
# Create a vector for all the features
features = [col for col in df.columns if col != "y_labeled"]
assembler = VectorAssembler(inputCols = features, outputCol = "features")
# Create our random forest model
classifier = RandomForestClassifier(labelCol = "y_labeled", featuresCol = "features")
pipeline = Pipeline(stages=[assembler, classifier])
# Fit the model to the data and predict values
rf_model = pipeline.fit(trainingData)
predicted = rf_model.transform(testData)
predicted = predicted.withColumn("label", predicted.y_labeled)
# Compute accuracy
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predicted)