-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapache-data-serialization-2.py
More file actions
73 lines (64 loc) · 2.46 KB
/
apache-data-serialization-2.py
File metadata and controls
73 lines (64 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
from datetime import datetime
from decimal import Decimal
import logging
import argparse
class ParseCSVToPSV(beam.DoFn):
def process(self, element):
# element corresponds to a single row in the CSV file
row = element.split(',')
product_name = row[1]
yield (product_name, {
"First Sale": datetime.fromtimestamp(int(row[0])),
"Last Sale": datetime.fromtimestamp(int(row[0])),
"Total Quantity Sold": int(row[2]),
"Total Sales Amount": Decimal(row[3])
})
class WriteToPSV(beam.DoFn):
def process(self, element):
sales_report = dict(element)
header_names = ["Product Name", "First Sale", "Last Sale", "Total Quantity Sold", "Total Sales Amount"]
yield '|'.join(header_names)
for product in sales_report.keys():
yield '|'.join([
product,
str(sales_report[product].get("First Sale")),
str(sales_report[product].get("Last Sale")),
str(sales_report[product].get("Total Quantity Sold")),
str(sales_report[product].get("Total Sales Amount"))
])
def run_pipeline(argv=None):
"""Main entry point; defines and runs the CSV parser pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as pipeline:
sales_data = (
pipeline
| 'ReadCSVFile' >> beam.io.ReadFromText(known_args.input)
)
processed_data = (
sales_data
| 'SkipHeaderRow' >> beam.Filter(lambda row: not row.startswith('Timestamp'))
| 'ParseCSVtoPSV' >> beam.ParDo(ParseCSVToPSV())
)
data_transformation = (
processed_data
| 'WriteProcessedData' >> beam.ParDo(WriteToPSV())
| 'WriteToNewFile' >> beam.io.WriteToText(known_args.output, file_name_suffix='.psv')
| 'PrintToConsole' >> beam.Map(print)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run_pipeline()