Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ features/redis_conf.rb
config/redis.json

*~
node_modules/
environment.json
6 changes: 3 additions & 3 deletions broadcaster/package.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"name": "broadcaster",
"version": "1.0.0",
"type": "module",
"dependencies": {
"socket.io": "0.9.17",
"hiredis": "",
"redis": ""
"redis": "^4.5.1",
"socket.io": "0.9.19"
}
}
186 changes: 186 additions & 0 deletions broadcaster/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 64 additions & 64 deletions broadcaster/server.js
Original file line number Diff line number Diff line change
@@ -1,80 +1,67 @@
if (process.argv.length != 3) {
console.error("Specify the path of the environment.json file please!");
console.error("Usage: node server.js environment.json");
process.exit(2);
}
import io from 'socket.io';
import redis from 'redis';

var fs = require('fs');
var env = JSON.parse(fs.readFileSync(process.argv[2]));
import { createServer } from 'http';
import { readFileSync } from 'fs';

var trackerConfig = env['tracker_config'];
import HTTPPolling from 'socket.io/lib/transports/http-polling.js';
import XHRPolling from 'socket.io/lib/transports/xhr-polling.js';

var app = require('http').createServer(httpHandler),
io = require('socket.io').listen(app),
redis = require('redis').createClient(Number(env['redis_port'] || 6379),
env['redis_host'] || '127.0.0.1',
Number(env['redis_db'] || 0)),
numberOfClients = 0,
recentMessages = {};
const httpHandler = async (request, response) => {
let m;
let output;

app.listen(8080);

redis.on("error", function (err) {
console.log("Error " + err);
});

redis.on("message", redisHandler);

function httpHandler(request, response) {
var m;
if (m = request.url.match(/^\/recent\/(.+)/)) {
var channel = m[1];
response.writeHead(200, {"Content-Type": "text/plain; charset=UTF-8",
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Credentials': 'true'});
let channel = m[1];
response.writeHead(200, {
"Content-Type": "text/plain; charset=UTF-8",
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Credentials': 'true'
});
output = JSON.stringify(recentMessages[channel] || []);
response.end(output);

} else {
response.writeHead(200, {"Content-Type": "text/plain"});
output = "" + numberOfClients;
response.end(output);
}

response.end(output);
}

function redisHandler(pubsubChannel, message) {
console.log(message);
var msgParsed = JSON.parse(message);
console.log(msgParsed);
var channel = msgParsed['log_channel'];
if (!recentMessages[channel]) {
recentMessages[channel] = [];
}
var msgList = recentMessages[channel];
msgList.push(msgParsed);
while (msgList.length > 20) {
msgList.shift();
}
io.of('/'+channel).emit('log_message', message);
if (process.argv.length != 3) {
console.error("Specify the path of the environment.json file please!");
console.error("Usage: node server.js environment.json");
process.exit(2);
}

const env = JSON.parse(readFileSync(process.argv[2])),
trackerConfig = env['tracker_config'];

const app = createServer(httpHandler),
ioApp = io.listen(app),
redisClient = redis.createClient(Number(env['redis_port'] || 6379),
env['redis_host'] || '127.0.0.1',
Number(env['redis_db'] || 0)),
recentMessages = {};

await redisClient.connect();
if (env['redis_password']) {
await redisClient.auth(env['redis_password']);
}

var numberOfClients = 0;

io.configure(function() {
io.set("transports", ["websocket", "xhr-polling"]);
io.set("polling duration", 10);
app.listen(8080);

var path = require('path');
var HTTPPolling = require(path.join(
path.dirname(require.resolve('socket.io')),'lib', 'transports','http-polling')
);
var XHRPolling = require(path.join(
path.dirname(require.resolve('socket.io')),'lib','transports','xhr-polling')
);
ioApp.configure(function () {
ioApp.set("transports", ["websocket", "xhr-polling"]);
ioApp.set("polling duration", 10);
ioApp.set("log level", 2); // INFO

XHRPolling.prototype.doWrite = function(data) {
XHRPolling.prototype.doWrite = function (data) {
HTTPPolling.prototype.doWrite.call(this);

var headers = {
const headers = {
'Content-Type': 'text/plain; charset=UTF-8',
'Content-Length': (data && Buffer.byteLength(data)) || 0
};
Expand All @@ -88,20 +75,33 @@ io.configure(function() {

this.response.writeHead(200, headers);
this.response.write(data);
// this.log.debug(this.name + ' writing', data);
};
});

io.sockets.on('connection', function(socket) {
ioApp.sockets.on('connection', function (socket) {
numberOfClients++;
socket.on('disconnect', function() {
socket.on('disconnect', function () {
numberOfClients--;
});
});

const subscriber = redisClient.duplicate();
await subscriber.connect();

if (env['redis_password']) {
redis.auth(env['redis_password']);
}
redis.subscribe(trackerConfig['redis_pubsub_channel']);
await subscriber.subscribe(trackerConfig['redis_pubsub_channel'], (message, channel) => {
const msgParsed = JSON.parse(message);
const logChannel = msgParsed['log_channel'];
if (!recentMessages[logChannel]) {
recentMessages[logChannel] = [];
}
const msgList = recentMessages[logChannel];
msgList.push(msgParsed);
while (msgList.length > 20) {
msgList.shift();
}
ioApp.of('/' + logChannel).emit('log_message', message);
});

redisClient.on("error", (err) => {
console.error("redis error ", err);
});