forked from airsend-io/websocket
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathZookeeper.js
More file actions
133 lines (100 loc) · 4.29 KB
/
Zookeeper.js
File metadata and controls
133 lines (100 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
const zookeeper = require('node-zookeeper-client');
const host = require('./util/host');
class Zookeeper
{
constructor(logger, config, consumerTopic) {
this.logger = logger;
this.config = config;
this.consumerTopic = consumerTopic;
this.zooHost = config.get('zoo_host');
this.dataDelimiter = '#';
// the parent node for all websocket nodes on zookeeper
this.parentNodePath = config.get('zoo_rtm_nodes');
// the service is not connected on construction
this.connected = false;
}
connect() {
return new Promise((resolve, reject) => {
this.logger.info(`Trying zookeeper host: '${this.zooHost}'`);
const zooClient = zookeeper.createClient(this.zooHost);
zooClient.connect();
// set the event handlers
zooClient.on('connected', () => {
this.connected = true;
zooClient.exists(this.parentNodePath, (error, stat) => {
if (stat) {
// parent node exists, just include this node
this.logger.info(this.parentNodePath + " node found!")
this.writeThisNodeToZookeeper(zooClient);
} else {
this.logger.info(this.parentNodePath + " node not found. Creating...")
// Parent node still doesn't exists, needs to be created
zooClient.create(this.parentNodePath, (error) => {
if (error) {
this.logger.error(`CRITICAL: Failed to create parent node: ${error}`);
}
this.writeThisNodeToZookeeper(zooClient);
});
}
});
resolve();
});
zooClient.on('disconnected', async () => {
this.logger.info('ZOOKEEPER DISCONNCTED - Attempting RECONNECT');
this.connected = false;
await this.connect();
});
zooClient.on('expired', async () => {
this.logger.info('ZOOKEEPER EXPIRED - Attempting RECONNECT');
this.close();
this.connected = false;
await this.connect();
});
// if after 5 seconds we're not connected, reject
setTimeout(() => {
if (!this.connected) {
zooClient.close();
reject({message: 'Zookeeper service is not available, try again...'});
}
}, 5000)
});
}
writeThisNodeToZookeeper(client)
{
const data = [
this.config.get('websocket_protocol'),
host(),
this.config.get('websocket_port'),
this.consumerTopic,
];
const path = `${this.parentNodePath}/${data.join(this.dataDelimiter)}`;
this.logger.info(`Writing node '${path}' to Zookeeper.`);
client.exists(path, (error, stat) => {
if (stat) {
// Node is there, delete it first and recreate it
client.remove(path, -1, (error) => {
// Create a ephemeral node to allow node to be removed on this client crash
client.create(path, '', zookeeper.CreateMode.EPHEMERAL, (error) => {
if (error) {
this.logger.error(error);
} else {
this.logger.info(`Ephemeral Node: '${path}' created`);
}
// Dont close this. If we close the node will be deleted
});
});
} else {
// Create a ephemeral node to allow node to be removed on this client crash
client.create(path, '', zookeeper.CreateMode.EPHEMERAL, (error) => {
if (error) {
this.logger.info(error);
} else {
this.logger.info(`Ephemeral Node: '${path}' created`);
}
// Dont close this. If we close the node will be deleted
});
}
});
}
}
module.exports = Zookeeper;