-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathDriverNode.py
More file actions
36 lines (30 loc) · 1.11 KB
/
DriverNode.py
File metadata and controls
36 lines (30 loc) · 1.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from kafka import KafkaConsumer, KafkaProducer
import json
import time
class DriverNode:
def __init__(self, nodeID):
self.nodeID = nodeID
self.consumer = KafkaConsumer()
def initialize(self):
self.consumer.subscribe(['test_config', 'trigger'])
n = 2
while True:
msg = self.consumer.poll(1000)
if len(msg) > 0:
print('--------------------------------')
# print('Length of message: ',len(msg),'\n\n',type(msg),'\n\n',msg,"\n\n\n")
print(f"Topic: {list(msg.values())[0][0].topic}")
print(f"Message: {list(msg.values())[0][0].value.decode()}")
print('--------------------------------')
n -= 1
# data = json.loads(msg.values().decode('utf-8'))
# print(data)
if n == 0:
self.consumer.close() #! Safety Measure
print('Consumer Closed')
break
# Start thread
if __name__ == "__main__":
nodeID = '302A'
driver = DriverNode(nodeID)
driver.initialize()