diff --git a/examples/consumer-metrics/README.md b/examples/consumer-metrics/README.md new file mode 100644 index 0000000..c98c95e --- /dev/null +++ b/examples/consumer-metrics/README.md @@ -0,0 +1,45 @@ +# Consumer Metrics + +This example demonstrates two methods of measuring metrics of kafka consumer. + +**Method 1** - consumer is responsible for measuring how quickly it consumes messages, by reading current server time at each message arrive. + +**Method 2** - publisher or external process is responsible for measuring how the offset of consumer group moves forward. +This method makes the following assumptions: + - Consumer is not auto committing offsets (or it is auto-committing offsets frequently) + - Publisher can predict what will be the last offset + - The topic has a single partition. + +## How to run + +- We need two terminal windows +- We need running kafka with (auto-create topics feature) or with pre-created topic named "local_pit_consumer_metrics" +- Kafka is listening on 127.1.1:9092 and can take plaintext connection with `scram-sha-512` and credentials: `admin/admin`. + See `pit-toolkit/examples/consumer-metrics/src/core.ts` + +In terminal one run the consumer: +``` +cd pit-toolkit/examples/consumer-metrics +npm i +npm run start.consumer +``` +It will connect to kafka and listen for messages from `local_pit_consumer_metrics` topic. When messages start coming it will print current consuming metrics every few seconds. When messages stop coming and keep silence for 30 seconds, the current metrics will be reset to zero. Once messages start coming again, the new metric will start printing. + +In the second terminal run the producer: +``` +cd pit-toolkit/examples/consumer-metrics +npm i +npm run start.publisher +``` +It will start two activities concurrently: +- it will start publishing messages one by one (not in batches). +- it will connect to kafka as admin and start fetching the offset from `local_pit_consumer_metrics` topic and consumer group used by consumer from terminal one. +- once it detects that consumer's offset has moved forward to its last value, the metric will be printed and program will terminate. + +## Test run results + +Below is the table of metrics comparing 9 test sessions. First 5 sessions each used 100k messages. The remaining 4 sessions used 1mln messages each. All tests and kafka were running on Macbook. + +![](./docs/metrics.png) + +The column "Offset minus Consumer (%)" shows the discrepancy in reported metrics in per cents. In other words, it shows how much slower the Method 2 is comparing to Method 1. \ No newline at end of file diff --git a/examples/consumer-metrics/docs/metrics.png b/examples/consumer-metrics/docs/metrics.png new file mode 100644 index 0000000..1f95069 Binary files /dev/null and b/examples/consumer-metrics/docs/metrics.png differ diff --git a/examples/consumer-metrics/package-lock.json b/examples/consumer-metrics/package-lock.json new file mode 100644 index 0000000..b1825b1 --- /dev/null +++ b/examples/consumer-metrics/package-lock.json @@ -0,0 +1,450 @@ +{ + "name": "@kindredgroup/pit-k8s-deployer", + "version": "1.1.8-dev", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "@kindredgroup/pit-k8s-deployer", + "version": "1.1.8-dev", + "license": "MIT", + "dependencies": { + "hdr-histogram-js": "^3.0.0", + "kafkajs": "^2.2.4", + "lz4": "0.6.5", + "uuid": "^9.0.1", + "winston": "^3.11.0" + }, + "devDependencies": { + "@types/node": "^20.8.9", + "@types/uuid": "^9.0.8", + "typescript": "^5.2.2" + } + }, + "node_modules/@assemblyscript/loader": { + "version": "0.19.23", + "resolved": "https://registry.npmjs.org/@assemblyscript/loader/-/loader-0.19.23.tgz", + "integrity": "sha512-ulkCYfFbYj01ie1MDOyxv2F6SpRN1TOj7fQxbP07D6HmeR+gr2JLSmINKjga2emB+b1L2KGrFKBTc+e00p54nw==" + }, + "node_modules/@colors/colors": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/@colors/colors/-/colors-1.6.0.tgz", + "integrity": "sha512-Ir+AOibqzrIsL6ajt3Rz3LskB7OiMVHqltZmspbW/TJuTVuyOMirVqAkjfY6JISiLHgyNqicAC8AyHHGzNd/dA==", + "engines": { + "node": ">=0.1.90" + } + }, + "node_modules/@dabh/diagnostics": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/@dabh/diagnostics/-/diagnostics-2.0.3.tgz", + "integrity": "sha512-hrlQOIi7hAfzsMqlGSFyVucrx38O+j6wiGOf//H2ecvIEqYN4ADBSS2iLMh5UFyDunCNniUIPk/q3riFv45xRA==", + "dependencies": { + "colorspace": "1.1.x", + "enabled": "2.0.x", + "kuler": "^2.0.0" + } + }, + "node_modules/@types/node": { + "version": "20.12.7", + "resolved": "https://registry.npmjs.org/@types/node/-/node-20.12.7.tgz", + "integrity": "sha512-wq0cICSkRLVaf3UGLMGItu/PtdY7oaXaI/RVU+xliKVOtRna3PRY57ZDfztpDL0n11vfymMUnXv8QwYCO7L1wg==", + "dev": true, + "dependencies": { + "undici-types": "~5.26.4" + } + }, + "node_modules/@types/triple-beam": { + "version": "1.3.5", + "resolved": "https://registry.npmjs.org/@types/triple-beam/-/triple-beam-1.3.5.tgz", + "integrity": "sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw==" + }, + "node_modules/@types/uuid": { + "version": "9.0.8", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.8.tgz", + "integrity": "sha512-jg+97EGIcY9AGHJJRaaPVgetKDsrTgbRjQ5Msgjh/DQKEFl0DtyRr/VCOyD1T2R1MNeWPK/u7JoGhlDZnKBAfA==", + "dev": true + }, + "node_modules/async": { + "version": "3.2.5", + "resolved": "https://registry.npmjs.org/async/-/async-3.2.5.tgz", + "integrity": "sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg==" + }, + "node_modules/base64-js": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ] + }, + "node_modules/buffer": { + "version": "5.7.1", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", + "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "dependencies": { + "base64-js": "^1.3.1", + "ieee754": "^1.1.13" + } + }, + "node_modules/color": { + "version": "3.2.1", + "resolved": "https://registry.npmjs.org/color/-/color-3.2.1.tgz", + "integrity": "sha512-aBl7dZI9ENN6fUGC7mWpMTPNHmWUSNan9tuWN6ahh5ZLNk9baLJOnSMlrQkHcrfFgz2/RigjUVAjdx36VcemKA==", + "dependencies": { + "color-convert": "^1.9.3", + "color-string": "^1.6.0" + } + }, + "node_modules/color-convert": { + "version": "1.9.3", + "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz", + "integrity": "sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg==", + "dependencies": { + "color-name": "1.1.3" + } + }, + "node_modules/color-name": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.3.tgz", + "integrity": "sha512-72fSenhMw2HZMTVHeCA9KCmpEIbzWiQsjN+BHcBbS9vr1mtt+vJjPdksIBNUmKAW8TFUDPJK5SUU3QhE9NEXDw==" + }, + "node_modules/color-string": { + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/color-string/-/color-string-1.9.1.tgz", + "integrity": "sha512-shrVawQFojnZv6xM40anx4CkoDP+fZsw/ZerEMsW/pyzsRbElpsL/DBVW7q3ExxwusdNXI3lXpuhEZkzs8p5Eg==", + "dependencies": { + "color-name": "^1.0.0", + "simple-swizzle": "^0.2.2" + } + }, + "node_modules/colorspace": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/colorspace/-/colorspace-1.1.4.tgz", + "integrity": "sha512-BgvKJiuVu1igBUF2kEjRCZXol6wiiGbY5ipL/oVPwm0BL9sIpMIzM8IK7vwuxIIzOXMV3Ey5w+vxhm0rR/TN8w==", + "dependencies": { + "color": "^3.1.3", + "text-hex": "1.0.x" + } + }, + "node_modules/cuint": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/cuint/-/cuint-0.2.2.tgz", + "integrity": "sha512-d4ZVpCW31eWwCMe1YT3ur7mUDnTXbgwyzaL320DrcRT45rfjYxkt5QWLrmOJ+/UEAI2+fQgKe/fCjR8l4TpRgw==" + }, + "node_modules/enabled": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/enabled/-/enabled-2.0.0.tgz", + "integrity": "sha512-AKrN98kuwOzMIdAizXGI86UFBoo26CL21UM763y1h/GMSJ4/OHU9k2YlsmBpyScFo/wbLzWQJBMCW4+IO3/+OQ==" + }, + "node_modules/fecha": { + "version": "4.2.3", + "resolved": "https://registry.npmjs.org/fecha/-/fecha-4.2.3.tgz", + "integrity": "sha512-OP2IUU6HeYKJi3i0z4A19kHMQoLVs4Hc+DPqqxI2h/DPZHTm/vjsfC6P0b4jCMy14XizLBqvndQ+UilD7707Jw==" + }, + "node_modules/fn.name": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/fn.name/-/fn.name-1.1.0.tgz", + "integrity": "sha512-GRnmB5gPyJpAhTQdSZTSp9uaPSvl09KoYcMQtsB9rQoOmzs9dH6ffeccH+Z+cv6P68Hu5bC6JjRh4Ah/mHSNRw==" + }, + "node_modules/hdr-histogram-js": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/hdr-histogram-js/-/hdr-histogram-js-3.0.0.tgz", + "integrity": "sha512-/EpvQI2/Z98mNFYEnlqJ8Ogful8OpArLG/6Tf2bPnkutBVLIeMVNHjk1ZDfshF2BUweipzbk+dB1hgSB7SIakw==", + "dependencies": { + "@assemblyscript/loader": "^0.19.21", + "base64-js": "^1.2.0", + "pako": "^1.0.3" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/ieee754": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", + "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ] + }, + "node_modules/inherits": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", + "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==" + }, + "node_modules/is-arrayish": { + "version": "0.3.2", + "resolved": "https://registry.npmjs.org/is-arrayish/-/is-arrayish-0.3.2.tgz", + "integrity": "sha512-eVRqCvVlZbuw3GrM63ovNSNAeA1K16kaR/LRY/92w0zxQ5/1YzwblUX652i4Xs9RwAGjW9d9y6X88t8OaAJfWQ==" + }, + "node_modules/is-stream": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/is-stream/-/is-stream-2.0.1.tgz", + "integrity": "sha512-hFoiJiTl63nn+kstHGBtewWSKnQLpyb155KHheA1l39uvtO9nWIop1p3udqPcUd/xbF1VLMO4n7OI6p7RbngDg==", + "engines": { + "node": ">=8" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/kuler": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/kuler/-/kuler-2.0.0.tgz", + "integrity": "sha512-Xq9nH7KlWZmXAtodXDDRE7vs6DU1gTU8zYDHDiWLSip45Egwq3plLHzPn27NgvzL2r1LMPC1vdqh98sQxtqj4A==" + }, + "node_modules/logform": { + "version": "2.6.0", + "resolved": "https://registry.npmjs.org/logform/-/logform-2.6.0.tgz", + "integrity": "sha512-1ulHeNPp6k/LD8H91o7VYFBng5i1BDE7HoKxVbZiGFidS1Rj65qcywLxX+pVfAPoQJEjRdvKcusKwOupHCVOVQ==", + "dependencies": { + "@colors/colors": "1.6.0", + "@types/triple-beam": "^1.3.2", + "fecha": "^4.2.0", + "ms": "^2.1.1", + "safe-stable-stringify": "^2.3.1", + "triple-beam": "^1.3.0" + }, + "engines": { + "node": ">= 12.0.0" + } + }, + "node_modules/lz4": { + "version": "0.6.5", + "resolved": "https://registry.npmjs.org/lz4/-/lz4-0.6.5.tgz", + "integrity": "sha512-KSZcJU49QZOlJSItaeIU3p8WoAvkTmD9fJqeahQXNu1iQ/kR0/mQLdbrK8JY9MY8f6AhJoMrihp1nu1xDbscSQ==", + "hasInstallScript": true, + "dependencies": { + "buffer": "^5.2.1", + "cuint": "^0.2.2", + "nan": "^2.13.2", + "xxhashjs": "^0.2.2" + }, + "engines": { + "node": ">= 0.10" + } + }, + "node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" + }, + "node_modules/nan": { + "version": "2.19.0", + "resolved": "https://registry.npmjs.org/nan/-/nan-2.19.0.tgz", + "integrity": "sha512-nO1xXxfh/RWNxfd/XPfbIfFk5vgLsAxUR9y5O0cHMJu/AW9U95JLXqthYHjEp+8gQ5p96K9jUp8nbVOxCdRbtw==" + }, + "node_modules/one-time": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/one-time/-/one-time-1.0.0.tgz", + "integrity": "sha512-5DXOiRKwuSEcQ/l0kGCF6Q3jcADFv5tSmRaJck/OqkVFcOzutB134KRSfF0xDrL39MNnqxbHBbUUcjZIhTgb2g==", + "dependencies": { + "fn.name": "1.x.x" + } + }, + "node_modules/pako": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/pako/-/pako-1.0.11.tgz", + "integrity": "sha512-4hLB8Py4zZce5s4yd9XzopqwVv/yGNhV1Bl8NTmCq1763HeK2+EwVTv+leGeL13Dnh2wfbqowVPXCIO0z4taYw==" + }, + "node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, + "node_modules/safe-buffer": { + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", + "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ] + }, + "node_modules/safe-stable-stringify": { + "version": "2.4.3", + "resolved": "https://registry.npmjs.org/safe-stable-stringify/-/safe-stable-stringify-2.4.3.tgz", + "integrity": "sha512-e2bDA2WJT0wxseVd4lsDP4+3ONX6HpMXQa1ZhFQ7SU+GjvORCmShbCMltrtIDfkYhVHrOcPtj+KhmDBdPdZD1g==", + "engines": { + "node": ">=10" + } + }, + "node_modules/simple-swizzle": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/simple-swizzle/-/simple-swizzle-0.2.2.tgz", + "integrity": "sha512-JA//kQgZtbuY83m+xT+tXJkmJncGMTFT+C+g2h2R9uxkYIrE2yy9sgmcLhCnw57/WSD+Eh3J97FPEDFnbXnDUg==", + "dependencies": { + "is-arrayish": "^0.3.1" + } + }, + "node_modules/stack-trace": { + "version": "0.0.10", + "resolved": "https://registry.npmjs.org/stack-trace/-/stack-trace-0.0.10.tgz", + "integrity": "sha512-KGzahc7puUKkzyMt+IqAep+TVNbKP+k2Lmwhub39m1AsTSkaDutx56aDCo+HLDzf/D26BIHTJWNiTG1KAJiQCg==", + "engines": { + "node": "*" + } + }, + "node_modules/string_decoder": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", + "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", + "dependencies": { + "safe-buffer": "~5.2.0" + } + }, + "node_modules/text-hex": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/text-hex/-/text-hex-1.0.0.tgz", + "integrity": "sha512-uuVGNWzgJ4yhRaNSiubPY7OjISw4sw4E5Uv0wbjp+OzcbmVU/rsT8ujgcXJhn9ypzsgr5vlzpPqP+MBBKcGvbg==" + }, + "node_modules/triple-beam": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/triple-beam/-/triple-beam-1.4.1.tgz", + "integrity": "sha512-aZbgViZrg1QNcG+LULa7nhZpJTZSLm/mXnHXnbAbjmN5aSa0y7V+wvv6+4WaBtpISJzThKy+PIPxc1Nq1EJ9mg==", + "engines": { + "node": ">= 14.0.0" + } + }, + "node_modules/typescript": { + "version": "5.4.5", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.4.5.tgz", + "integrity": "sha512-vcI4UpRgg81oIRUFwR0WSIHKt11nJ7SAVlYNIu+QpqeyXP+gpQJy/Z4+F0aGxSE4MqwjyXvW/TzgkLAx2AGHwQ==", + "dev": true, + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + }, + "node_modules/undici-types": { + "version": "5.26.5", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", + "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==", + "dev": true + }, + "node_modules/util-deprecate": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", + "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==" + }, + "node_modules/uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "bin": { + "uuid": "dist/bin/uuid" + } + }, + "node_modules/winston": { + "version": "3.13.0", + "resolved": "https://registry.npmjs.org/winston/-/winston-3.13.0.tgz", + "integrity": "sha512-rwidmA1w3SE4j0E5MuIufFhyJPBDG7Nu71RkZor1p2+qHvJSZ9GYDA81AyleQcZbh/+V6HjeBdfnTZJm9rSeQQ==", + "dependencies": { + "@colors/colors": "^1.6.0", + "@dabh/diagnostics": "^2.0.2", + "async": "^3.2.3", + "is-stream": "^2.0.0", + "logform": "^2.4.0", + "one-time": "^1.0.0", + "readable-stream": "^3.4.0", + "safe-stable-stringify": "^2.3.1", + "stack-trace": "0.0.x", + "triple-beam": "^1.3.0", + "winston-transport": "^4.7.0" + }, + "engines": { + "node": ">= 12.0.0" + } + }, + "node_modules/winston-transport": { + "version": "4.7.0", + "resolved": "https://registry.npmjs.org/winston-transport/-/winston-transport-4.7.0.tgz", + "integrity": "sha512-ajBj65K5I7denzer2IYW6+2bNIVqLGDHqDw3Ow8Ohh+vdW+rv4MZ6eiDvHoKhfJFZ2auyN8byXieDDJ96ViONg==", + "dependencies": { + "logform": "^2.3.2", + "readable-stream": "^3.6.0", + "triple-beam": "^1.3.0" + }, + "engines": { + "node": ">= 12.0.0" + } + }, + "node_modules/xxhashjs": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/xxhashjs/-/xxhashjs-0.2.2.tgz", + "integrity": "sha512-AkTuIuVTET12tpsVIQo+ZU6f/qDmKuRUcjaqR+OIvm+aCBsZ95i7UVY5WJ9TMsSaZ0DA2WxoZ4acu0sPH+OKAw==", + "dependencies": { + "cuint": "^0.2.2" + } + } + } +} diff --git a/examples/consumer-metrics/package.json b/examples/consumer-metrics/package.json new file mode 100644 index 0000000..3f40ec0 --- /dev/null +++ b/examples/consumer-metrics/package.json @@ -0,0 +1,31 @@ +{ + "name": "@kindredgroup/pit-k8s-deployer", + "version": "1.1.8-dev", + "description": "The deployment utility for apps designed to run in K8s clusters", + "type": "module", + "repository": { + "type": "git", + "url": "git+https://github.com/kindredgroup/pit-toolkit.git" + }, + "author": "kindredgroup", + "license": "MIT", + "homepage": "https://github.com/kindredgroup/pit-toolkit#readme", + "devDependencies": { + "@types/node": "^20.8.9", + "@types/uuid": "^9.0.8", + "typescript": "^5.2.2" + }, + "scripts": { + "clean": "rm -rf dist", + "build": "npm run clean && npx tsc", + "start.consumer": "npm run build && node dist/consumer/index.js", + "start.publisher": "npm run build && node dist/publisher/index.js" + }, + "dependencies": { + "hdr-histogram-js": "^3.0.0", + "kafkajs": "^2.2.4", + "lz4": "0.6.5", + "uuid": "^9.0.1", + "winston": "^3.11.0" + } +} diff --git a/examples/consumer-metrics/src/consumer/index.ts b/examples/consumer-metrics/src/consumer/index.ts new file mode 100644 index 0000000..b3eda09 --- /dev/null +++ b/examples/consumer-metrics/src/consumer/index.ts @@ -0,0 +1,48 @@ +import { CONSUMER_GROUP, PRINT_PROGRESS_EVERY, TOPIC, kafkaClient } from "../core.js" +import { logger } from "../logger.js" + +const main = async () => { + logger.info("main(): Starting consumer...") + + const consumer = kafkaClient.consumer({ + groupId: CONSUMER_GROUP, + readUncommitted: false + }) + + await consumer.subscribe({ topics: [ TOPIC ] }) + + let firstMessageTime = 0 + let recentMessageTime = 0 + let count = 0 + await consumer.run({ + autoCommit: false, + eachMessage: async ({ message }) => { + if (firstMessageTime == 0) firstMessageTime = Date.now() + recentMessageTime = Date.now() + count++ + if (count % PRINT_PROGRESS_EVERY == 0) logger.info("consuming: %s", message.value?.toString()) + await consumer.commitOffsets([ { topic: TOPIC, partition: 0, offset: `${ parseInt(message.offset) + 1 }` } ]) + } + }) + + // start metrics calculator + setInterval(() => { + if (Date.now() - recentMessageTime >= 30_000) { + logger.info("auto resetting metrics...") + firstMessageTime = 0 + recentMessageTime = 0 + count = 0 + } + if (recentMessageTime) { + logger.info("consumer rate: %s TPS", (count / ((recentMessageTime - firstMessageTime) / 1_000)).toFixed(2)) + } + }, 5_000) +} + +main() + .catch(e => { + logger.error("Message: %s", e.message) + if (e.cause) logger.error(e.cause) + if (e.stack) logger.error("Stack:\n%s", e.stack) + process.exit(1) + }) \ No newline at end of file diff --git a/examples/consumer-metrics/src/core.ts b/examples/consumer-metrics/src/core.ts new file mode 100644 index 0000000..88ac036 --- /dev/null +++ b/examples/consumer-metrics/src/core.ts @@ -0,0 +1,16 @@ +import { Kafka } from "kafkajs" +import { v4 as uuid } from 'uuid' + +export const TOPIC = "local_pit_consumer_metrics" +export const CONSUMER_GROUP = "local_pit_group" +export const PRINT_PROGRESS_EVERY = 50_000 + +export const kafkaClient = new Kafka( { + brokers: [ "127.0.0.1:9092" ], + clientId: uuid(), + sasl: { + mechanism: "scram-sha-512", + username: "admin", + password: "admin", + } +}) \ No newline at end of file diff --git a/examples/consumer-metrics/src/logger.ts b/examples/consumer-metrics/src/logger.ts new file mode 100644 index 0000000..701d137 --- /dev/null +++ b/examples/consumer-metrics/src/logger.ts @@ -0,0 +1,18 @@ +import { createLogger, format, transports } from "winston" + +const logger = createLogger({ + level: "info", + format: format.combine( + format.colorize(), + format.timestamp(), + format.align(), + format.splat(), + format.printf(({ timestamp, level, message }) => `${ timestamp } - ${ level }: ${ message }`) + ), + transports: [ + new transports.Console() + ], +}) + +export const LOG_SEPARATOR_LINE = "* * * * * * * * * * * * *" +export { logger } \ No newline at end of file diff --git a/examples/consumer-metrics/src/publisher/index.ts b/examples/consumer-metrics/src/publisher/index.ts new file mode 100644 index 0000000..e0f2e7c --- /dev/null +++ b/examples/consumer-metrics/src/publisher/index.ts @@ -0,0 +1,88 @@ +import { Admin, Producer } from "kafkajs" +import * as hdr from "hdr-histogram-js" + +import { CONSUMER_GROUP, PRINT_PROGRESS_EVERY, TOPIC, kafkaClient } from "../core.js" +import { logger } from "../logger.js" + +const OFFSET_MONITOR_FREQUENCY_SECONDS = 15 +const MESSAGES_COUNT = 1_000_000 +const consumerMetrics = hdr.build() + +const main = async () => { + logger.info("main(): Starting producer...") + + const producer = kafkaClient.producer() + await producer.connect() + + const admin = kafkaClient.admin() + await admin.connect() + + const initialOffset = await getOffsetValue(admin) + const expectedOffset = Math.max(0, initialOffset) + MESSAGES_COUNT + + let sentMessages = 0 + logger.info("Initial offset: %s, expected offset: %s", initialOffset, expectedOffset) + setTimeout(async() => { + await startTest(producer, sent => { sentMessages = sent }) + await producer.disconnect() + }, 0) + + setTimeout(async() => { + const timer = await startMonitor(admin, async recentlyFetchedOffset => { + if (recentlyFetchedOffset < expectedOffset) return + if (recentlyFetchedOffset > expectedOffset && sentMessages < MESSAGES_COUNT) { + logger.warn("Incorrect value of fetched offset: %s. Current messages: %s. Approx expected offset is: %s", recentlyFetchedOffset, sentMessages, sentMessages + initialOffset) + return + } + consumerMetrics.recordValue(Date.now()) + if (timer) { + logger.info("stopping offset monitor....") + clearInterval(timer) + } + await admin.disconnect() + printMetrics() + }) + }, 0) +} + +const getOffsetValue = async(admin: Admin) => { + const offsets = await admin.fetchOffsets({ groupId: CONSUMER_GROUP, topics: [ TOPIC ] }) + const offsetsArray = offsets.flatMap(({ partitions }) => { + return partitions.map(({ offset }) => offset) + }) + return offsetsArray.length > 0 ? parseInt(offsetsArray[0]) : 0 +} + +const startMonitor = async (admin: Admin, offsetCallback: (value: number) => Promise) => { + return setInterval(async () => { + const offset = await getOffsetValue(admin) + logger.info("fetched offset: %s", offset) + await offsetCallback(offset) + }, OFFSET_MONITOR_FREQUENCY_SECONDS * 1_000) +} + +const startTest = async (producer: Producer, publishCallback: (sent: number) => void) => { + consumerMetrics.recordValue(Date.now()) + for (let i = 1; i <= MESSAGES_COUNT; i++) { + await producer.send({ + topic: TOPIC, + messages: [ { value: `Sample message: ${i}` } ], + }) + publishCallback(i) + if (i % PRINT_PROGRESS_EVERY == 0) logger.info("sent: %s", i) + } + logger.info("all messages have been published") +} + +const printMetrics = () => { + logger.info("Count : %s", MESSAGES_COUNT) + logger.info("Throughput: %s (tps)", (MESSAGES_COUNT / ((consumerMetrics.maxValue - consumerMetrics.minNonZeroValue) / 1_000)).toFixed(2)) +} + +main() + .catch(e => { + logger.error("Message: %s", e.message) + if (e.cause) logger.error(e.cause) + if (e.stack) logger.error("Stack:\n%s", e.stack) + process.exit(1) + }) \ No newline at end of file diff --git a/examples/consumer-metrics/tsconfig.json b/examples/consumer-metrics/tsconfig.json new file mode 100644 index 0000000..aaed676 --- /dev/null +++ b/examples/consumer-metrics/tsconfig.json @@ -0,0 +1,18 @@ +{ + "compilerOptions": { + "module": "node16", + "esModuleInterop": true, + "allowArbitraryExtensions": true, + "declaration": true, + "target": "ES2022", + "moduleResolution": "node16", + "sourceMap": true, + "outDir": "dist" + }, + "include": ["src/**/**/*.ts", "tests/**/**/*.ts"], + "exclude": [ + "build", + "dist", + "node_modules", + ] +} \ No newline at end of file diff --git a/examples/load-generator/src/index-pond-ordered.ts b/examples/load-generator/src/index-pond-ordered.ts new file mode 100644 index 0000000..e049143 --- /dev/null +++ b/examples/load-generator/src/index-pond-ordered.ts @@ -0,0 +1,39 @@ +import * as hdr from "hdr-histogram-js" + +import { logger } from "./utls/logger.js" +import { Pond } from "./utls/pond.js" + +const CONCURRENCY = 400 + +const main = async () => { + const pool = new Pond(CONCURRENCY) + + const list = new Array() + const ITEMS_COUNT = 100_000 + for (let i = 1; i <= ITEMS_COUNT; i++) { + list.push(i) + } + list.reverse() + + const processedItems = new Array() + for (let i = 1; i <= ITEMS_COUNT; i++) { + pool.submit(async () => { + const item = list.pop() + logger.info("%s - %s", i, item) + processedItems.push(item) + }) + } + + for (let i = 0; i < processedItems.length; i += 2) { + const a = processedItems[i] + const b = processedItems[i + 1] + if (b - a !== 1) throw Error(`out of order items: $a and $b`) + } +} + +main() + .catch(e => { + logger.error("Message: %s", e.message) + if (e.cause) logger.error(e.cause) + if (e.stack) logger.error("Stack:\n%s", e.stack) + }) \ No newline at end of file