-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathservice.lua
More file actions
133 lines (116 loc) · 3.95 KB
/
service.lua
File metadata and controls
133 lines (116 loc) · 3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
local zmq = require 'lzmq'
local zloop = require 'lzmq.loop'
local cjson = require 'cjson'
local signal = require 'posix.signal'
local helpers = require 'somata.helpers'
local Connection = require 'somata.connection'
local Binding = require 'somata.binding'
local Service = {}
Service.__index = Service
function Service.create(name, methods, options)
local service = {}
setmetatable(service, Service)
service.id = name .. '~' .. helpers.randomString(5)
service.name = name
service.port = math.random(5000, 35000)
service.methods = methods
service.subscriptions = {}
service.heartbeat = 5000
if options ~= nil then
if options.heartbeat ~= nil then
service.heartbeat = options.heartbeat
end
end
service.ctx = zmq.context()
service.loop = zloop.new(1, service.ctx)
service.registry_connection = Connection.create(service.ctx, service.loop, "tcp://localhost:8420", 'registry')
service.binding = Binding.create(service.ctx, service.loop, service.port, function(client_id, message) service:handleMessage(client_id, message) end)
if service.heartbeat > 0 then
service:sendPing()
end
service.loop:start()
return service
end
function Service:register()
local registration = {
id=self.id,
name=self.name,
port=self.port,
heartbeat=self.heartbeat
}
self.registry_connection:sendMethod('registerService', {registration}, function()
print(string.format("Registered %s on :%d", self.id, self.port))
end)
-- Deregister handler
deregister = function() self:deregister() end
signal.signal(signal.SIGINT, deregister)
signal.signal(signal.SIGTERM, deregister)
end
function Service:deregister()
print(string.format("Deregistering %s...", self.id))
self.registry_connection:sendMethod('deregisterService', {self.name, self.id})
end
function Service:sendPing()
local ping = 'ping'
if self.ping_id == nil then
self.ping_id = helpers.randomString(8)
ping = 'hello'
end
self.registry_connection:sendMessage({kind='ping', ping=ping, id=self.ping_id}, function(err, pong)
if pong == 'welcome' then
self:register()
end
self.loop:add_once(self.heartbeat, function() self:sendPing() end)
end)
end
function Service:handleMessage(client_id, message)
if message.kind == 'method' then
self:handleMethod(client_id, message.id, message.method, message.args)
elseif message.kind == 'subscribe' then
self:handleSubscribe(client_id, message.id, message.type)
end
end
function Service:handleMethod(client_id, message_id, method, args)
function cb(err, data)
local response
if err ~= nil then
response = {
id=message_id,
kind='error',
error=err
}
else
response = {
id=message_id,
kind='response',
response=data
}
end
local response_json = cjson.encode(response)
self.binding.socket:sendx(client_id, response_json)
end
print('[handleMethod]', client_id, method, args)
table.insert(args, cb)
self.methods[method](unpack(args))
end
function Service:handleSubscribe(client_id, message_id, event_type)
if self.subscriptions[event_type] == nil then
self.subscriptions[event_type] = {}
end
print('[handleSubscribe]', client_id, event_type)
table.insert(self.subscriptions[event_type], {client_id, message_id})
end
function Service:publish(event_type, data)
for i, subscription in pairs(self.subscriptions[event_type]) do
local client_id = subscription[1]
local message_id = subscription[2]
local event = {
id=message_id,
kind='event',
event=data
}
local event_json = cjson.encode(event)
self.binding.socket:sendx(client_id, event_json)
end
end
return Service