-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSparkStreaming.py
More file actions
17 lines (14 loc) · 792 Bytes
/
SparkStreaming.py
File metadata and controls
17 lines (14 loc) · 792 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming.kafka import KafkaUtils
kafka_params = {'bootstrap.servers': 'localhost:9092',
'group.id': 'StreamingGroup',
'auto.offset.reset': 'largest'
} #Kakfa consumer parametres
if __name__ == '__main__':
sc = SparkContext(appName='StreamingAp')
ssc = StreamingContext(sc, 4) # 4 second window
lines = KafkaUtils.createDirectStream(ssc, ['Test'], kafka_params) #Initial Dstream
Phones = lines.map(lambda Order: (Order[1],1)).reduceByKey(lambda x,y : x + y).pprint() #MapReduce and print the results
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate