forked from airsend-io/websocket
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConsumer.js
More file actions
65 lines (47 loc) · 1.81 KB
/
Consumer.js
File metadata and controls
65 lines (47 loc) · 1.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
const Kafka = require('node-rdkafka');
const host = require('./util/host');
class Consumer
{
constructor(logger, config, socketServer) {
this.logger = logger;
this.socketServer = socketServer;
this.consumer = new Kafka.KafkaConsumer({
'group.id': 'websocketgroup',
'metadata.broker.list': config.get('kafka_host'),
'client.id': `websocket-${host()}`,
}, {});
}
connect(kafkaTopic) {
return new Promise((resolve, reject) => {
this.consumer.connect({}, () => {
reject({message: 'Kafka service is not available, try again...'});
});
this.consumer.on('event.error', (err) => {
reject("Consumer Error: " + err);
});
// when the connection is ready to consume, resolve it
this.consumer.on('ready', () => {
this.consumer.subscribe([kafkaTopic]);
// Consume from the topic.
// No callback required. Will cause data event to be raised
this.consumer.consume();
resolve();
});
// set the data handler
this.consumer.on('data', (message) => {
// Output the actual message contents
const data = message.value.toString();
//logger.info(data);
try {
let dataObj = JSON.parse(data);
let key = dataObj.token.user_id + "_" + dataObj.token.finger_print;
// Send to clients
this.socketServer.sendToClient(key, JSON.stringify(dataObj.rtm_payload));
} catch (e) {
this.logger.error(e);
}
});
});
}
}
module.exports = Consumer;