Skip to content

Commit 18374fe

Browse files
Merge pull request #8 from learningOrchestra/feature-enabling-data-scientist-pipeline
Update package to use new api
2 parents 6e5febd + 7dfaf3d commit 18374fe

34 files changed

+1621
-547
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,7 @@ learning_orchestra_client/transform/__pycache__
66
learning_orchestra_client/main.py
77
learning_orchestra_client/explore/__pycache__
88
learning_orchestra_client/builder/__pycache__
9-
docs
9+
docs
10+
sentiment_analysis_output.py
11+
mnist_output.py
12+
mnist_treatment.py

README.md

Lines changed: 3 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -23,182 +23,8 @@ Each functionality in learningOrchestra is contained in its own class. Check the
2323

2424
# Example
2525

26-
Shown below is an example usage of learning-orchestra-client using the [Titanic Dataset](https://www.kaggle.com/c/titanic/overview):
26+
* [Here](examples/titanic.py) has an example using the [Titanic Dataset](https://www.kaggle.com/c/titanic/overview):
27+
* [Here](examples/sentiment_analysis.py) has an example using the [Sentiment Analysis On IMDb reviews](https://www.kaggle.com/avnika22/imdb-perform-sentiment-analysis-with-scikit-learn):
28+
* [Here](examples/mnist.py) has an example using the [MNIST Dataset](http://yann.lecun.com/exdb/mnist/):
2729

28-
```python
29-
from learning_orchestra_client import (
30-
dataset,
31-
builder,
32-
transform,
33-
)
3430

35-
cluster_ip = "34.95.187.26"
36-
37-
38-
dataset = Dataset(cluster_ip)
39-
40-
print(dataset.insert_dataset_sync(
41-
"titanic_training",
42-
"https://filebin.net/rpfdy8clm5984a4c/titanic_training.csv?t=gcnjz1yo"))
43-
print(dataset.insert_dataset_sync(
44-
"titanic_testing",
45-
"https://filebin.net/mguee52ke97k0x9h/titanic_testing.csv?t=ub4nc1rc"))
46-
47-
print(dataset.search_all_datasets())
48-
49-
50-
projection = Projection(cluster_ip)
51-
required_columns = [
52-
"PassengerId",
53-
"Pclass",
54-
"Age",
55-
"SibSp",
56-
"Parch",
57-
"Fare",
58-
"Name",
59-
"Sex",
60-
"Embarked",
61-
"Survived"
62-
]
63-
print(projection.insert_dataset_attributes_sync(
64-
"titanic_training",
65-
"titanic_training_projection",
66-
required_columns))
67-
68-
required_columns.remove("Survived")
69-
70-
print(projection.insert_dataset_attributes_sync(
71-
"titanic_testing",
72-
"titanic_testing_projection",
73-
required_columns))
74-
75-
76-
data_type_handler = DataType(cluster_ip)
77-
type_fields = {
78-
"Age": "number",
79-
"Fare": "number",
80-
"Parch": "number",
81-
"PassengerId": "number",
82-
"Pclass": "number",
83-
"SibSp": "number"
84-
}
85-
86-
print(data_type_handler.update_dataset_types(
87-
"titanic_testing_projection",
88-
type_fields))
89-
90-
type_fields["Survived"] = "number"
91-
92-
print(data_type_handler.update_dataset_types(
93-
"titanic_training_projection",
94-
type_fields))
95-
96-
97-
modeling_code = '''
98-
from pyspark.ml import Pipeline
99-
from pyspark.sql.functions import (
100-
mean, col, split,
101-
regexp_extract, when, lit)
102-
103-
from pyspark.ml.feature import (
104-
VectorAssembler,
105-
StringIndexer
106-
)
107-
108-
TRAINING_DF_INDEX = 0
109-
TESTING_DF_INDEX = 1
110-
111-
training_df = training_df.withColumnRenamed('Survived', 'label')
112-
testing_df = testing_df.withColumn('label', lit(0))
113-
datasets_list = [training_df, testing_df]
114-
115-
for index, dataset in enumerate(datasets_list):
116-
dataset = dataset.withColumn(
117-
"Initial",
118-
regexp_extract(col("Name"), "([A-Za-z]+)\.", 1))
119-
datasets_list[index] = dataset
120-
121-
misspelled_initials = [
122-
'Mlle', 'Mme', 'Ms', 'Dr',
123-
'Major', 'Lady', 'Countess',
124-
'Jonkheer', 'Col', 'Rev',
125-
'Capt', 'Sir', 'Don'
126-
]
127-
correct_initials = [
128-
'Miss', 'Miss', 'Miss', 'Mr',
129-
'Mr', 'Mrs', 'Mrs',
130-
'Other', 'Other', 'Other',
131-
'Mr', 'Mr', 'Mr'
132-
]
133-
for index, dataset in enumerate(datasets_list):
134-
dataset = dataset.replace(misspelled_initials, correct_initials)
135-
datasets_list[index] = dataset
136-
137-
138-
initials_age = {"Miss": 22,
139-
"Other": 46,
140-
"Master": 5,
141-
"Mr": 33,
142-
"Mrs": 36}
143-
for index, dataset in enumerate(datasets_list):
144-
for initial, initial_age in initials_age.items():
145-
dataset = dataset.withColumn(
146-
"Age",
147-
when((dataset["Initial"] == initial) &
148-
(dataset["Age"].isNull()), initial_age).otherwise(
149-
dataset["Age"]))
150-
datasets_list[index] = dataset
151-
152-
153-
for index, dataset in enumerate(datasets_list):
154-
dataset = dataset.na.fill({"Embarked": 'S'})
155-
datasets_list[index] = dataset
156-
157-
158-
for index, dataset in enumerate(datasets_list):
159-
dataset = dataset.withColumn("Family_Size", col('SibSp')+col('Parch'))
160-
dataset = dataset.withColumn('Alone', lit(0))
161-
dataset = dataset.withColumn(
162-
"Alone",
163-
when(dataset["Family_Size"] == 0, 1).otherwise(dataset["Alone"]))
164-
datasets_list[index] = dataset
165-
166-
167-
text_fields = ["Sex", "Embarked", "Initial"]
168-
for column in text_fields:
169-
for index, dataset in enumerate(datasets_list):
170-
dataset = StringIndexer(
171-
inputCol=column, outputCol=column+"_index").\
172-
fit(dataset).\
173-
transform(dataset)
174-
datasets_list[index] = dataset
175-
176-
177-
non_required_columns = ["Name", "Embarked", "Sex", "Initial"]
178-
for index, dataset in enumerate(datasets_list):
179-
dataset = dataset.drop(*non_required_columns)
180-
datasets_list[index] = dataset
181-
182-
183-
training_df = datasets_list[TRAINING_DF_INDEX]
184-
testing_df = datasets_list[TESTING_DF_INDEX]
185-
186-
assembler = VectorAssembler(
187-
inputCols=training_df.columns[:],
188-
outputCol="features")
189-
assembler.setHandleInvalid('skip')
190-
191-
features_training = assembler.transform(training_df)
192-
(features_training, features_evaluation) =\
193-
features_training.randomSplit([0.8, 0.2], seed=33)
194-
features_testing = assembler.transform(testing_df)
195-
'''
196-
197-
builder = Builder(cluster_ip)
198-
199-
print(builder.run_builder_sync(
200-
"titanic_training_projection",
201-
"titanic_testing_projection",
202-
modeling_code,
203-
["lr", "dt", "gb", "rf", "nb"]))
204-
```

examples/sentiment_analysis.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
from learning_orchestra_client.dataset.csv import DatasetCsv
2+
from learning_orchestra_client.function.python import FunctionPython
3+
from learning_orchestra_client.model.scikitlearn import ModelScikitLearn
4+
from learning_orchestra_client.train.scikitlearn import TrainScikitLearn
5+
from learning_orchestra_client.predict.scikitlearn import PredictScikitLearn
6+
7+
CLUSTER_IP = "http://34.68.100.96"
8+
9+
dataset_csv = DatasetCsv(CLUSTER_IP)
10+
11+
dataset_csv.insert_dataset_sync(
12+
dataset_name="sentiment_analysis",
13+
url="https://drive.google.com/u/0/uc?"
14+
"id=1PSLWHbKR_cuKvGKeOSl913kCfs-DJE2n&export=download",
15+
)
16+
17+
function_python = FunctionPython(CLUSTER_IP)
18+
19+
explore_dataset = '''
20+
pos=data[data["label"]=="1"]
21+
neg=data[data["label"]=="0"]
22+
23+
total_rows = len(pos) + len(neg)
24+
25+
print("Positive = " + str(len(pos) / total_rows))
26+
print("Negative = " + str(len(neg) / total_rows))
27+
28+
response = None
29+
'''
30+
31+
function_python.run_function_sync(
32+
name="sentiment_analysis_exploring",
33+
parameters={"data": "$sentiment_analysis"},
34+
code=explore_dataset)
35+
36+
print(function_python.search_execution_content(
37+
name="sentiment_analysis_exploring",
38+
limit=1,
39+
skip=1,
40+
pretty_response=True))
41+
42+
dataset_preprocessing = '''
43+
import re;
44+
45+
46+
def preprocessor(text):
47+
global re
48+
text = re.sub("<[^>]*>", "", text)
49+
emojis = re.findall("(?::|;|=)(?:-)?(?:\)|\(|D|P)", text)
50+
text = re.sub("[\W]+", " ", text.lower()) + \
51+
" ".join(emojis).replace("-", "")
52+
return text
53+
54+
55+
data["text"] = data["text"].apply(preprocessor)
56+
57+
from nltk.stem.porter import PorterStemmer
58+
59+
porter = PorterStemmer()
60+
61+
62+
def tokenizer_porter(text):
63+
global porter
64+
return [porter.stem(word) for word in text.split()]
65+
66+
67+
from sklearn.feature_extraction.text import TfidfVectorizer
68+
69+
tfidf = TfidfVectorizer(strip_accents=None,
70+
lowercase=False,
71+
preprocessor=None,
72+
tokenizer=tokenizer_porter,
73+
use_idf=True,
74+
norm="l2",
75+
smooth_idf=True)
76+
77+
y = data.label.values
78+
x = tfidf.fit_transform(data.text)
79+
80+
from sklearn.model_selection import train_test_split
81+
82+
X_train, X_test, y_train, y_test = train_test_split(x, y,
83+
random_state=1,
84+
test_size=0.5,
85+
shuffle=False)
86+
87+
response = {
88+
"X_train": X_train,
89+
"X_test": X_test,
90+
"y_train": y_train,
91+
"y_test": y_test
92+
}
93+
'''
94+
95+
function_python.run_function_sync(
96+
name="sentiment_analysis_preprocessed",
97+
parameters={
98+
"data": "$sentiment_analysis"
99+
},
100+
code=dataset_preprocessing
101+
)
102+
103+
model_scikitlearn = ModelScikitLearn(CLUSTER_IP)
104+
105+
model_scikitlearn.create_model_sync(
106+
name="sentiment_analysis_logistic_regression_cv",
107+
module_path="sklearn.linear_model",
108+
class_name="LogisticRegressionCV",
109+
class_parameters={
110+
"cv": 6,
111+
"scoring": "accuracy",
112+
"random_state": 0,
113+
"n_jobs": -1,
114+
"verbose": 3,
115+
"max_iter": 500
116+
}
117+
118+
)
119+
120+
train_scikitlearn = TrainScikitLearn(CLUSTER_IP)
121+
train_scikitlearn.create_training_sync(
122+
parent_name="sentiment_analysis_logistic_regression_cv",
123+
name="sentiment_analysis_logistic_regression_cv_trained",
124+
model_name="sentiment_analysis_logistic_regression_cv",
125+
method_name="fit",
126+
parameters={
127+
"X": "$sentiment_analysis_preprocessed.X_train",
128+
"y": "$sentiment_analysis_preprocessed.y_train",
129+
}
130+
)
131+
132+
predict_scikitlearn = PredictScikitLearn(CLUSTER_IP)
133+
predict_scikitlearn.create_prediction_sync(
134+
parent_name="sentiment_analysis_logistic_regression_cv_trained",
135+
name="sentiment_analysis_logistic_regression_cv_predicted",
136+
model_name="sentiment_analysis_logistic_regression_cv",
137+
method_name="predict",
138+
parameters={
139+
"X": "$sentiment_analysis_preprocessed.X_test",
140+
}
141+
142+
)
143+
144+
logistic_regression_cv_accuracy = '''
145+
from sklearn import metrics
146+
147+
print("Accuracy: ",metrics.accuracy_score(y_test, y_pred))
148+
149+
response = None
150+
'''
151+
function_python.run_function_sync(
152+
name="sentiment_analysis_logistic_regression_cv_accuracy",
153+
parameters={
154+
"y_test": "$sentiment_analysis_preprocessed.y_test",
155+
"y_pred": "$sentiment_analysis_logistic_regression_cv_predicted"
156+
},
157+
code=logistic_regression_cv_accuracy
158+
)
159+
160+
print(function_python.search_execution_content(
161+
name="sentiment_analysis_logistic_regression_cv_accuracy",
162+
pretty_response=True))

0 commit comments

Comments
 (0)