Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Run Jest Tests

on:
push:
branches: [ '**' ] # Run on all branches
pull_request:
branches: [ '**' ] # Run on PRs targeting any branch

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- name: Set up Node.js
uses: actions/setup-node@v3
with:
node-version: '18'
cache: 'npm'

- name: Install dependencies
run: npm ci

- name: Run tests
run: npm test

- name: Cleanup
if: always()
run: |
echo "Cleaning up..."
rm -rf node_modules
rm -rf .npm
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ node test/testTimeSync.js

## Usage

The index.js file contains a simple logger built for the CDP Studio example case.
The value.js file contains a simple logger built for the CDP Studio example case.

1. Set up and run the Logger in CDP Studio.
(Refer to Help → Framework - Data Logging → How to Setup Logging in Automation System)
https://cdpstudio.com/manual/cdp/cdplogger/cdplogger-configuration-example.html

2. Run the index.js file from the command line:
2. Run the value.js file from the command line:

```bash
node examples/index.js
node examples/value.js
```

For usage related to events run:
Expand Down
170 changes: 85 additions & 85 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ const WebSocket = require('ws');
const root = require('./generated/containerPb.js');
const Container = root.DBMessaging.Protobuf.Container;
const CDPValueType = root.ICD.Protobuf.CDPValueType;
const EventQuery = root.DBMessaging.Protobuf.EventQuery;


/**
* A client for interacting with a CDP Logger or LogServer via WebSocket.
Expand All @@ -14,7 +16,7 @@ const CDPValueType = root.ICD.Protobuf.CDPValueType;
class Client {
// Defined property names to use instead of ambiguous numbers.
static EventQueryFlags = Object.freeze({
None: 0, // Client.EventQueryFlags.None === 0
None: 0, // cdplogger.Client.EventQueryFlags.None === 0
NewestFirst: 1,
TimeRangeBeginExclusive: 2,
TimeRangeEndExclusive: 4,
Expand All @@ -38,35 +40,35 @@ class Client {
if (!/^wss?:\/\//.test(url)) {
url = `ws://${url}`;
}

this.reqId = -1;
this.autoReconnect = autoReconnect;
this.enableTimeSync = true; // Time synchronization is enabled by default.

this.isOpen = false;
this.queuedRequests = {};
this.storedPromises = {};
this.nameToId = {};
this.idToName = {};

// Mapping for signal types (in case we need to interpret values).
this.nameToType = {};

// Time-diff related
this.timeDiff = 0;
this.timeReceived = null;
this.lastTimeRequest = Date.now() / 1000;
this.haveSentQueuedReq = false;
this.roundTripTimes = {};

// Initialize the cache for sender tags and pending tag requests.
this.senderTags = {}; // Cache for event sender tags (keyed by sender)
this.pendingSenderTags = {}; // Holds pending promises for sender tags

// Create the WebSocket connection
this.ws = this._connect(url);
}


/**
* Enable or disable time synchronization with the server.
Expand All @@ -83,7 +85,7 @@ class Client {
setEnableTimeSync(enable) {
this.enableTimeSync = enable;
if (!enable) {
// Cancel any pending time sync requests so they wont update timeDiff later.
// Cancel any pending time sync requests so they won't update timeDiff later.
for (const key in this.storedPromises) {
this.storedPromises[key].reject(new Error("Time sync disabled"));
}
Expand Down Expand Up @@ -127,7 +129,7 @@ class Client {
* larger data sets should be downloaded in patches.
* - 4.0 (2024-01, CDP 4.12):
* - Added NodeTag support to save custom tags for logged values (e.g. Unit or Description),
* accessible via the clients API.
* accessible via the client's API.
* - Reduced network usage by having data responses only include changes instead of repeating unchanged values.
* - Added support for string values and events.
*
Expand Down Expand Up @@ -288,52 +290,52 @@ class Client {
* dataConditions: {
* Text: ["Invalid or missing feature license detected."],
* // Multiple data conditions can be specified:
* Level: { value: "ERROR", matchType: Client.MatchType.Exact }
* Level: { value: "ERROR", matchType: cdplogger.Client.MatchType.Exact }
* },
* limit: 100,
* offset: 0,
* flags: Client.EventQueryFlags.NewestFirst
* flags: cdplogger.Client.EventQueryFlags.NewestFirst
* });
*
* @param {Object} query - A simple plain object representing the EventQuery.
* @returns {Promise<Array>} Resolves with an array of event objects.
*/
// Modified requestEvents() to wait for missing sender tag info.
requestEvents(query) {
this._timeRequest();
const requestId = this._getRequestId();
const eventQuery = this._buildEventQuery(query);
if (!this.isOpen) {
this.queuedRequests[requestId] = { type: "events", query: eventQuery };
} else {
this._sendEventsRequest(requestId, eventQuery);
}
return new Promise((resolve, reject) => {
this.storedPromises[requestId] = { resolve, reject };
})
.then(events => {
// Collect the unique sender names from events that lack cached tags.
const missingSenders = Array.from(new Set(
events
.filter(evt => !this.senderTags[evt.sender])
.map(evt => evt.sender)
));

if (missingSenders.length === 0) {
return events;
// Modified requestEvents() to wait for missing sender tag info.
requestEvents(query) {
this._timeRequest();
const requestId = this._getRequestId();
const eventQuery = this._buildEventQuery(query);
if (!this.isOpen) {
this.queuedRequests[requestId] = { type: "events", query: eventQuery };
} else {
this._sendEventsRequest(requestId, eventQuery);
}
// Request tag info for all missing senders.
return Promise.all(
missingSenders.map(sender => this.getSenderTags(sender))
).then(() => {
// Attach tags to events after tag info is available.
events.forEach(evt => {
evt.tags = this.senderTags[evt.sender];
return new Promise((resolve, reject) => {
this.storedPromises[requestId] = { resolve, reject };
})
.then(events => {
// Collect the unique sender names from events that lack cached tags.
const missingSenders = Array.from(new Set(
events
.filter(evt => !this.senderTags[evt.sender])
.map(evt => evt.sender)
));

if (missingSenders.length === 0) {
return events;
}
// Request tag info for all missing senders.
return Promise.all(
missingSenders.map(sender => this.getSenderTags(sender))
).then(() => {
// Attach tags to events after tag info is available.
events.forEach(evt => {
evt.tags = this.senderTags[evt.sender];
});
return events;
});
});
return events;
});
});
}
}

/**
* Request a count of events that match the given query.
Expand Down Expand Up @@ -437,18 +439,17 @@ requestEvents(query) {
return s;
}

/**
* Retrieves the tags associated with a given sender.
*
* This method checks if the tags for the specified sender are already cached. If so, it returns a
* resolved promise with the cached tags. Otherwise, it initializes a pending promise for the sender,
* sends a request for the sender's tags using `_sendEventSenderTagsRequest`, and returns a promise that
* resolves when the tags are received. If no response is received within 5000 ms, it falls back to resolving
* with an empty object.
*
* @param {string} sender - The identifier of the event sender.
* @returns {Promise<Object>} A promise that resolves with an object representing the tags for the sender.
*/
/**
* Retrieves the tags associated with a given sender.
*
* This method checks if the tags for the specified sender are already cached. If so, it returns a
* resolved promise with the cached tags. Otherwise, it initializes a pending promise for the sender,
* sends a request for the sender's tags using `_sendEventSenderTagsRequest`, and returns a promise that
* resolves when the tags are received.
*
* @param {string} sender - The identifier of the event sender.
* @returns {Promise<Object>} A promise that resolves with an object representing the tags for the sender.
*/
getSenderTags(sender) {
if (this.senderTags && this.senderTags[sender]) {
return Promise.resolve(this.senderTags[sender]);
Expand All @@ -458,16 +459,8 @@ requestEvents(query) {
this.pendingSenderTags[sender] = [];
this._sendEventSenderTagsRequest(sender);
}
return new Promise(resolve => {
this.pendingSenderTags[sender].push(resolve);
// Increase timeout to 5000 ms to wait longer for tag info.
setTimeout(() => {
if (this.pendingSenderTags[sender]) {
this.senderTags[sender] = {}; // Fallback to empty object.
this.pendingSenderTags[sender].forEach(fn => fn({}));
delete this.pendingSenderTags[sender];
}
}, 5000);
return new Promise((resolve, reject) => {
this.pendingSenderTags[sender].push({ resolve, reject });
});
}

Expand Down Expand Up @@ -497,14 +490,20 @@ requestEvents(query) {
if (!error) {
error = new Error("Something went wrong");
}
if (!this.autoReconnect) {
for (const key in this.storedPromises) {
this.storedPromises[key].reject(error);
}
this.storedPromises = {};
this.queuedRequests = {};
// Reject all stored promises.
for (const key in this.storedPromises) {
this.storedPromises[key].reject(error);
}
this.storedPromises = {};
this.queuedRequests = {};

// Reject any pending sender tag promises.
for (const sender in this.pendingSenderTags) {
this.pendingSenderTags[sender].forEach(promiseObj => promiseObj.reject(error));
delete this.pendingSenderTags[sender];
}
}


_onClose(ws) {
this.isOpen = false;
Expand Down Expand Up @@ -663,7 +662,7 @@ requestEvents(query) {
}
break;
}


case Container.Type.eCountEventsResponse: {
if (this.storedPromises[data.countEventsResponse.requestId]) {
Expand All @@ -673,7 +672,7 @@ requestEvents(query) {
}
break;
}

case Container.Type.eEventSenderTagsResponse: {
// Get the mapping of sender names to TagMap objects.
const tagsMapping = data.eventSenderTagsResponse.senderTags;
Expand All @@ -683,13 +682,14 @@ requestEvents(query) {
this.senderTags[sender] = tags;
// Resolve any pending promises waiting for tags for this sender.
if (this.pendingSenderTags[sender]) {
this.pendingSenderTags[sender].forEach(resolveFn => resolveFn(tags));
this.pendingSenderTags[sender].forEach(promiseObj => promiseObj.resolve(tags));
delete this.pendingSenderTags[sender];
}
}
break;
}



default:
console.error("Unknown message type", data.messageType);
}
Expand Down Expand Up @@ -866,7 +866,7 @@ requestEvents(query) {

_reqDataPoints(nodeNames, startS, endS, noOfDataPoints, limit, requestId) {
const _getDataPoints = (nodeIds) => {
this._sendDataPointsRequest(nodeIds, startS, endS, requestId, limit, noOfDataPoints);
this._sendDataPointsRequest(nodeIds, startS, endS, requestId, noOfDataPoints, limit);
};

const rejectRequest = (error) => {
Expand Down Expand Up @@ -908,7 +908,7 @@ requestEvents(query) {
});
}

_sendDataPointsRequest(nodeIds, startS, endS, requestId, limit, noOfDataPoints) {
_sendDataPointsRequest(nodeIds, startS, endS, requestId, noOfDataPoints, limit) {
const container = Container.create();
container.messageType = Container.Type.eSignalDataRequest;
container.signalDataRequest = {
Expand Down Expand Up @@ -945,7 +945,7 @@ requestEvents(query) {
container.countEventsRequest = { requestId, query };
const buffer = Container.encode(container).finish();
this.ws.send(buffer);
}
}

_sendEventSenderTagsRequest(sender) {
const container = Container.create();
Expand Down Expand Up @@ -1017,9 +1017,6 @@ requestEvents(query) {
// Validate the query object before building the EventQuery.
this._validateEventQuery(query);

const root = require('./generated/containerPb.js');
const { EventQuery } = root.DBMessaging.Protobuf;

// Conditionally include these fields only if the user has set them
const optionalFields = [
"timeRangeBegin",
Expand Down Expand Up @@ -1120,4 +1117,7 @@ requestEvents(query) {
}
}

module.exports = Client;
const cdplogger = {};
cdplogger.Client = Client;

module.exports = cdplogger;
Loading