-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpush_data.py
More file actions
60 lines (45 loc) · 1.7 KB
/
push_data.py
File metadata and controls
60 lines (45 loc) · 1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from dotenv import load_dotenv
from network_security.logging.logger import logging
from network_security.exceptions.exception import NetworkSecurityException
import os, sys
import json
import certifi
import pandas as pd
import numpy as np
import pymongo
load_dotenv()
# MongoDB URL and certification for valid connection
MONGO_DB_URL = os.getenv("MONGODB_URI")
ca = certifi.where()
# implementing the ETL pipeline
class NetworkDataExtraction():
def __init__(self):
try:
pass
except Exception as e:
raise NetworkSecurityException(e, sys)
def csv_to_json_convertor(self, file_path):
try:
data = pd.read_csv(file_path)
# getting rid of the index
data.reset_index(drop=True, inplace=True)
records = list((json.loads(data.T.to_json()).values()))
return records
except Exception as e:
raise NetworkSecurityException(e, sys)
def push_data_to_mongodb(self, records, database, collection):
try:
self.mongo_client = pymongo.MongoClient(MONGO_DB_URL)
db = self.mongo_client[database]
collection_obj = db[collection]
collection_obj.insert_many(records)
return len(records)
except Exception as e:
raise NetworkSecurityException(e, sys)
if __name__ == "__main__":
FILE_PATH = "network_data/phisingData.csv"
DATABASE = "aryan"
Collection = "NetworkData"
networkobj = NetworkDataExtraction()
records = networkobj.csv_to_json_convertor(file_path=FILE_PATH)
len_records = networkobj.push_data_to_mongodb(records=records, database=DATABASE, collection=Collection)