Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
logger: npm start
aggregator: npm run start-aggregator
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,20 @@ defaults to 1MB.
- `KAFKA_CONSUMER_IDLE_TIMEOUT`: Timeout between fetch calls, defaults to
1000ms.

To enable aggregation supply the application with the following environment variables:

- `AGGREGATION_TOPIC`: Topic where pub-sub tracking messages are being produced
- `FLUSH_TO_PERSISTENCE_AFTER`: Timeout period for aggregated value for each sample
- `NUM_REALTIME_PROCESSES`: Number of real-time processes being run (this is to calculate if and how many subscribe events did we miss)
- `LOG_PUBSUB_STATS`: Set to true if you want to log the aggregated statistics

Note: If you are using one of Heroku's multi-tenant Apache Kafka plans, you must also define the "logger-group" consumer group with the following command:

`heroku kafka:consumer-groups:create logger-group -a YOUR_REFOCUS_LOGGING_APPLICATION`

you must also define the "aggregator-group" consumer group with the following command if you want aggregation enabled:

`heroku kafka:consumer-groups:create aggregator-group -a YOUR_REFOCUS_LOGGING_APPLICATION`

For more information on this feature, please see https://devcenter.heroku.com/articles/multi-tenant-kafka-on-heroku#consumer-groups.

Expand All @@ -62,5 +73,6 @@ See https://github.com/salesforce/refocus-logging-client#configuration.

## Version History

- 1.2.0 Add option for aggregating pub sub aggregation logs
- 1.1.0 Add option for consolidated Refocus logging using Kafka, group consumer
- 1.0.0
161 changes: 161 additions & 0 deletions __tests__/unit/aggregator/aggregatorHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Copyright (c) 2019, salesforce.com, inc.
* All rights reserved.
* Licensed under the BSD 3-Clause license.
* For full license text, see LICENSE.txt file in the repo root or
* https://opensource.org/licenses/BSD-3-Clause
*/

const { aggregationHandler } = require('../../../src/aggregator/aggregatorHandler');
const persist = require('../../../src/aggregator/persist');
const timeout = require('../../../src/config').getConfig().aggregatorTimeout;
jest.mock('../../../src/aggregator/persist');

jest.useFakeTimers();

describe('test/unit/aggregator/aggregatorHandler.js', () => {
afterEach(() => {
jest.clearAllMocks();
});

it('end-to-end OK', () => {
const key = Buffer.from(JSON.stringify({
updatedAt: new Date().toISOString(),
sampleName: 'testSample',
}));

const reqStartTime = Date.now();
const jobStartTime = reqStartTime + 2;
const publishCompletedAt = jobStartTime + 4;
const emittedAt1 = publishCompletedAt + 4;
const emittedAt2 = publishCompletedAt + 6;
const emittedAt3 = publishCompletedAt + 8;
const numClientsEmittedTo = 2;
const acknowledgedAt1 = emittedAt3 + 6;
const acknowledgedAt2 = emittedAt2 + 4;

const value1 = Buffer.from(JSON.stringify({ message:
{ type: 'requestStarted', reqStartTime, jobStartTime }, }));

const value2 = Buffer.from(JSON.stringify({ message:
{ type: 'published', publishCompletedAt }, }));

const value3 = Buffer.from(JSON.stringify({ message:
{ type: 'emitted', emittedAt: emittedAt1, numClientsEmittedTo: 2 }, }));

const value4 = Buffer.from(JSON.stringify({ message:
{ type: 'emitted', emittedAt: emittedAt2, numClientsEmittedTo: 3 }, }));

const value5 = Buffer.from(JSON.stringify({ message:
{ type: 'emitted', emittedAt: emittedAt3, numClientsEmittedTo: 1 }, }));

const value6 = Buffer.from(JSON.stringify({ message:
{ type: 'acknowledged', acknowledgedAt: acknowledgedAt1 }, }));

const value7 = Buffer.from(JSON.stringify({ message:
{ type: 'acknowledged', acknowledgedAt: acknowledgedAt2 }, }));

const messageSet = [];

messageSet.push({ message: { value: value1, key } });
messageSet.push({ message: { value: value2, key } });
messageSet.push({ message: { value: value3, key } });
messageSet.push({ message: { value: value4, key } });
messageSet.push({ message: { value: value5, key } });
messageSet.push({ message: { value: value6, key } });
messageSet.push({ message: { value: value7, key } });

const result = {
jobStartTime,
queueTime: 2,
publishLatency: 4,
avgSubscribeLatency: 6,
numSubsMissed: 0,
avgEndToEndLatency: 16,
medianEndToEndLatency: 16,
ninetyFifthPercentileEndToEndLatency: 18,
isPublished: true,
isSuccessfullyEmitted: true,
numClientsAcknowledged: 2,
numClientsEmittedTo: 6,
};

const persistMock = jest.spyOn(persist, 'persist');
aggregationHandler(messageSet, 'foo', 0);
jest.advanceTimersByTime(timeout);
expect(persistMock).toHaveBeenCalledWith(JSON.parse(key.toString()), result);
});

it('Does not include messages received after timeout', () => {
const key = Buffer.from(JSON.stringify({
updatedAt: new Date().toISOString(),
sampleName: 'testSample',
}));

const reqStartTime = Date.now();
const jobStartTime = reqStartTime + 2;
const publishCompletedAt = jobStartTime + 4;
const emittedAt1 = publishCompletedAt + 4;
const emittedAt2 = publishCompletedAt + 6;
const emittedAt3 = publishCompletedAt + 8;
const numClientsEmittedTo = 2;
const acknowledgedAt1 = emittedAt3 + 6;
const acknowledgedAt2 = emittedAt2 + 4;

const value1 = Buffer.from(JSON.stringify({ message:
{ type: 'requestStarted', reqStartTime, jobStartTime }, }));

const value2 = Buffer.from(JSON.stringify({ message:
{ type: 'published', publishCompletedAt }, }));

const value3 = Buffer.from(JSON.stringify({ message:
{ type: 'emitted', emittedAt: emittedAt1, numClientsEmittedTo: 2 }, }));

const value4 = Buffer.from(JSON.stringify({ message:
{ type: 'emitted', emittedAt: emittedAt2, numClientsEmittedTo: 3 }, }));

const value5 = Buffer.from(JSON.stringify({ message:
{ type: 'emitted', emittedAt: emittedAt3, numClientsEmittedTo: 1 }, }));

const value6 = Buffer.from(JSON.stringify({ message:
{ type: 'acknowledged', acknowledgedAt: acknowledgedAt1 }, }));

const value7 = Buffer.from(JSON.stringify({ message:
{ type: 'acknowledged', acknowledgedAt: acknowledgedAt2 }, }));

const messageSet = [];

messageSet.push({ message: { value: value1, key } });
messageSet.push({ message: { value: value2, key } });
messageSet.push({ message: { value: value3, key } });
messageSet.push({ message: { value: value4, key } });

const result = {
jobStartTime,
queueTime: 2,
publishLatency: 4,
avgSubscribeLatency: 5,
numSubsMissed: 1,
avgEndToEndLatency: null,
medianEndToEndLatency: null,
ninetyFifthPercentileEndToEndLatency: null,
isPublished: true,
isSuccessfullyEmitted: false,
numClientsAcknowledged: 0,
numClientsEmittedTo: 5,
};
const persistMock = jest.spyOn(persist, 'persist');
aggregationHandler(messageSet, 'foo', 0);
jest.advanceTimersByTime(timeout);

const messageSet2 = [];

messageSet2.push({ message: { value: value5, key } });
messageSet2.push({ message: { value: value6, key } });
messageSet2.push({ message: { value: value7, key } });
aggregationHandler(messageSet2, 'foo', 0);
jest.advanceTimersByTime(timeout);
expect(persistMock).toHaveBeenCalledWith(JSON.parse(key.toString()), result);
expect(persistMock).toHaveBeenCalledTimes(1);
});
});
45 changes: 45 additions & 0 deletions __tests__/unit/aggregator/kafkaConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright (c) 2019, salesforce.com, inc.
* All rights reserved.
* Licensed under the BSD 3-Clause license.
* For full license text, see LICENSE.txt file in the repo root or
* https://opensource.org/licenses/BSD-3-Clause
*/

const kafkaConsumer = require('../../../src/kafkaConsumer');
const kafka = require('no-kafka');
jest.mock('no-kafka');

describe('test/unit/consumer.js', () => {
it('Creates the consumer with the right arguments', async () => {
const GroupConsumerMock = jest.spyOn(kafka, 'GroupConsumer');
const topicHandlers = await kafkaConsumer.initConsumer();
expect(GroupConsumerMock).toHaveBeenCalledWith(
{
clientId: `consumer-${process.pid}`,
connectionString: 'test-url',
groupId: 'test-prefixlogger-group',
idleTimeout: 1000,
maxBytes: 1048576,
maxWaitTime: 100,
ssl: {
cert: 'test-cert',
key: 'test-key',
},
});
});

it('Init throws an error', async () => {
const GroupConsumerMock = jest.spyOn(kafka, 'GroupConsumer');
GroupConsumerMock.mockImplementationOnce(() => ({
init: () => {
throw new Error('');
},
})
);
const callback = jest.fn();
await kafkaConsumer.initConsumer(callback);
expect(callback).toHaveBeenCalled();
});

});
56 changes: 56 additions & 0 deletions __tests__/unit/aggregator/persist.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
const { persist } = require('../../../src/aggregator/persist');
const { initDb, db, aggregateTableName } = require('../../../src/aggregator/db');

describe('test/unit/aggregator/persist.js', () => {
beforeAll(async (done) => {
await initDb();
done();
});

it('persist OK', async () => {
const updatedAt = new Date().toISOString();
const key = {
updatedAt,
sampleName: 'testSample',
};

const jobStartTime = Date.now();
const result = {
jobStartTime,
queueTime: 2,
publishLatency: 4,
avgSubscribeLatency: 6,
numSubsMissed: 0,
avgEndToEndLatency: 16,
medianEndToEndLatency: 16,
ninetyFifthPercentileEndToEndLatency: 18,
isPublished: true,
isSuccessfullyEmitted: true,
numClientsAcknowledged: 2,
numClientsEmittedTo: 4,
};
await persist(key, result);

dbUpdatedAt = Date.parse(updatedAt);
const res = await db.query(`select * from ${aggregateTableName} where
updated_at = ${dbUpdatedAt}`);

const expectedRes = {
avg_end_to_end_latency: 16,
avg_subscribe_latency: 6,
is_published: true,
is_successfully_emitted: true,
job_start_time: '' + jobStartTime,
median_end_to_end_latency: 16,
ninety_fifth_percentile_end_to_end_latency: 18,
num_clients_acknowledged: 2,
num_subs_missed: 0,
publish_latency: 4,
queue_time: 2,
sample_name: 'testSample',
updated_at: '' + dbUpdatedAt,
num_clients_emitted_to: 4,
};
expect(res[0][0]).toEqual(expectedRes);
});
});
1 change: 1 addition & 0 deletions app.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"addons": ["heroku-postgresql:hobby-dev"],
"environments": {
"test": {
"formation": {
Expand Down
Loading