diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index aa19e0f..e910b4c 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -12,7 +12,15 @@ services: - ../..:/workspaces:cached # Overrides default command so things don't shut down after the process ends. entrypoint: /usr/local/share/docker-init.sh - command: sleep infinity + command: sleep infinity + redis: + hostname: redis + image: redis + command: redis-server --include /usr/local/etc/redis/redis.conf + volumes: + - ./redis/redis.conf:/usr/local/etc/redis/redis.conf + ports: + - "6379:6379" zookeeper: image: wurstmeister/zookeeper ports: diff --git a/client/package-lock.json b/client/package-lock.json index a0d27ea..2778323 100644 --- a/client/package-lock.json +++ b/client/package-lock.json @@ -14,9 +14,6 @@ "@mantine/hooks": "^6.0.4", "@mantine/next": "^6.0.4", "@tabler/icons-react": "^2.15.0", - "@types/node": "18.15.5", - "@types/react": "18.0.28", - "@types/react-dom": "18.0.11", "axios": "^1.3.4", "axios-hooks": "^4.0.0", "eslint": "8.36.0", @@ -26,11 +23,15 @@ "react": "18.2.0", "react-dom": "18.2.0", "react-graph-vis": "^1.0.7", - "typescript": "5.0.2", + "redis": "^4.6.8", "uuidv4": "^6.2.13" }, "devDependencies": { - "prettier": "^2.8.8" + "@types/node": "18.15.5", + "@types/react": "18.0.28", + "@types/react-dom": "18.0.11", + "prettier": "^2.8.8", + "typescript": "5.0.2" } }, "node_modules/@babel/code-frame": { @@ -899,6 +900,59 @@ "react": "^16.8 || ^17.0 || ^18.0" } }, + "node_modules/@redis/bloom": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", + "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/client": { + "version": "1.5.9", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.9.tgz", + "integrity": "sha512-SffgN+P1zdWJWSXBvJeynvEnmnZrYmtKSRW00xl8pOPFOMJjxRR9u0frSxJpPR6Y4V+k54blJjGW7FgxbTI7bQ==", + "dependencies": { + "cluster-key-slot": "1.1.2", + "generic-pool": "3.9.0", + "yallist": "4.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/@redis/graph": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.0.tgz", + "integrity": "sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/json": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.4.tgz", + "integrity": "sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/search": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.3.tgz", + "integrity": "sha512-4Dg1JjvCevdiCBTZqjhKkGoC5/BcB7k9j99kdMnaXFXg8x4eyOIVg9487CMv7/BUVkFLZCaIh8ead9mU15DNng==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/time-series": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.5.tgz", + "integrity": "sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, "node_modules/@rushstack/eslint-patch": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/@rushstack/eslint-patch/-/eslint-patch-1.2.0.tgz", @@ -951,7 +1005,8 @@ "node_modules/@types/node": { "version": "18.15.5", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.15.5.tgz", - "integrity": "sha512-Ark2WDjjZO7GmvsyFFf81MXuGTA/d6oP38anyxWOL6EREyBKAxKoFHwBhaZxCfLRLpO8JgVXwqOwSwa7jRcjew==" + "integrity": "sha512-Ark2WDjjZO7GmvsyFFf81MXuGTA/d6oP38anyxWOL6EREyBKAxKoFHwBhaZxCfLRLpO8JgVXwqOwSwa7jRcjew==", + "dev": true }, "node_modules/@types/parse-json": { "version": "4.0.0", @@ -961,12 +1016,14 @@ "node_modules/@types/prop-types": { "version": "15.7.5", "resolved": "https://registry.npmjs.org/@types/prop-types/-/prop-types-15.7.5.tgz", - "integrity": "sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==" + "integrity": "sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==", + "devOptional": true }, "node_modules/@types/react": { "version": "18.0.28", "resolved": "https://registry.npmjs.org/@types/react/-/react-18.0.28.tgz", "integrity": "sha512-RD0ivG1kEztNBdoAK7lekI9M+azSnitIn85h4iOiaLjaTrMjzslhaqCGaI4IyCJ1RljWiLCEu4jyrLLgqxBTew==", + "devOptional": true, "dependencies": { "@types/prop-types": "*", "@types/scheduler": "*", @@ -977,6 +1034,7 @@ "version": "18.0.11", "resolved": "https://registry.npmjs.org/@types/react-dom/-/react-dom-18.0.11.tgz", "integrity": "sha512-O38bPbI2CWtgw/OoQoY+BRelw7uysmXbWvw3nLWO21H1HSh+GOlqPuXshJfjmpNlKiiSDG9cc1JZAaMmVdcTlw==", + "dev": true, "dependencies": { "@types/react": "*" } @@ -984,7 +1042,8 @@ "node_modules/@types/scheduler": { "version": "0.16.2", "resolved": "https://registry.npmjs.org/@types/scheduler/-/scheduler-0.16.2.tgz", - "integrity": "sha512-hppQEBDmlwhFAXKJX2KnWLYu5yMfi91yazPb2l+lbJiwW+wdo1gNeRA+3RgNSO39WYX2euey41KEwnqesU2Jew==" + "integrity": "sha512-hppQEBDmlwhFAXKJX2KnWLYu5yMfi91yazPb2l+lbJiwW+wdo1gNeRA+3RgNSO39WYX2euey41KEwnqesU2Jew==", + "devOptional": true }, "node_modules/@types/uuid": { "version": "8.3.4", @@ -1438,6 +1497,14 @@ "node": ">=6" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/color-convert": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", @@ -2481,6 +2548,14 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/generic-pool": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", + "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==", + "engines": { + "node": ">= 4" + } + }, "node_modules/get-intrinsic": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.0.tgz", @@ -3955,6 +4030,19 @@ "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", "integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==" }, + "node_modules/redis": { + "version": "4.6.8", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.8.tgz", + "integrity": "sha512-S7qNkPUYrsofQ0ztWlTHSaK0Qqfl1y+WMIxrzeAGNG+9iUZB4HGeBgkHxE6uJJ6iXrkvLd1RVJ2nvu6H1sAzfQ==", + "dependencies": { + "@redis/bloom": "1.2.0", + "@redis/client": "1.5.9", + "@redis/graph": "1.1.0", + "@redis/json": "1.0.4", + "@redis/search": "1.1.3", + "@redis/time-series": "1.0.5" + } + }, "node_modules/regenerator-runtime": { "version": "0.13.11", "resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.13.11.tgz", @@ -5363,6 +5451,46 @@ "@babel/runtime": "^7.13.10" } }, + "@redis/bloom": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", + "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==", + "requires": {} + }, + "@redis/client": { + "version": "1.5.9", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.9.tgz", + "integrity": "sha512-SffgN+P1zdWJWSXBvJeynvEnmnZrYmtKSRW00xl8pOPFOMJjxRR9u0frSxJpPR6Y4V+k54blJjGW7FgxbTI7bQ==", + "requires": { + "cluster-key-slot": "1.1.2", + "generic-pool": "3.9.0", + "yallist": "4.0.0" + } + }, + "@redis/graph": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.0.tgz", + "integrity": "sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==", + "requires": {} + }, + "@redis/json": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.4.tgz", + "integrity": "sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==", + "requires": {} + }, + "@redis/search": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.3.tgz", + "integrity": "sha512-4Dg1JjvCevdiCBTZqjhKkGoC5/BcB7k9j99kdMnaXFXg8x4eyOIVg9487CMv7/BUVkFLZCaIh8ead9mU15DNng==", + "requires": {} + }, + "@redis/time-series": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.5.tgz", + "integrity": "sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==", + "requires": {} + }, "@rushstack/eslint-patch": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/@rushstack/eslint-patch/-/eslint-patch-1.2.0.tgz", @@ -5404,7 +5532,8 @@ "@types/node": { "version": "18.15.5", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.15.5.tgz", - "integrity": "sha512-Ark2WDjjZO7GmvsyFFf81MXuGTA/d6oP38anyxWOL6EREyBKAxKoFHwBhaZxCfLRLpO8JgVXwqOwSwa7jRcjew==" + "integrity": "sha512-Ark2WDjjZO7GmvsyFFf81MXuGTA/d6oP38anyxWOL6EREyBKAxKoFHwBhaZxCfLRLpO8JgVXwqOwSwa7jRcjew==", + "dev": true }, "@types/parse-json": { "version": "4.0.0", @@ -5414,12 +5543,14 @@ "@types/prop-types": { "version": "15.7.5", "resolved": "https://registry.npmjs.org/@types/prop-types/-/prop-types-15.7.5.tgz", - "integrity": "sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==" + "integrity": "sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==", + "devOptional": true }, "@types/react": { "version": "18.0.28", "resolved": "https://registry.npmjs.org/@types/react/-/react-18.0.28.tgz", "integrity": "sha512-RD0ivG1kEztNBdoAK7lekI9M+azSnitIn85h4iOiaLjaTrMjzslhaqCGaI4IyCJ1RljWiLCEu4jyrLLgqxBTew==", + "devOptional": true, "requires": { "@types/prop-types": "*", "@types/scheduler": "*", @@ -5430,6 +5561,7 @@ "version": "18.0.11", "resolved": "https://registry.npmjs.org/@types/react-dom/-/react-dom-18.0.11.tgz", "integrity": "sha512-O38bPbI2CWtgw/OoQoY+BRelw7uysmXbWvw3nLWO21H1HSh+GOlqPuXshJfjmpNlKiiSDG9cc1JZAaMmVdcTlw==", + "dev": true, "requires": { "@types/react": "*" } @@ -5437,7 +5569,8 @@ "@types/scheduler": { "version": "0.16.2", "resolved": "https://registry.npmjs.org/@types/scheduler/-/scheduler-0.16.2.tgz", - "integrity": "sha512-hppQEBDmlwhFAXKJX2KnWLYu5yMfi91yazPb2l+lbJiwW+wdo1gNeRA+3RgNSO39WYX2euey41KEwnqesU2Jew==" + "integrity": "sha512-hppQEBDmlwhFAXKJX2KnWLYu5yMfi91yazPb2l+lbJiwW+wdo1gNeRA+3RgNSO39WYX2euey41KEwnqesU2Jew==", + "devOptional": true }, "@types/uuid": { "version": "8.3.4", @@ -5746,6 +5879,11 @@ "resolved": "https://registry.npmjs.org/clsx/-/clsx-1.1.1.tgz", "integrity": "sha512-6/bPho624p3S2pMyvP5kKBPXnI3ufHLObBFCfgx+LkeR5lg2XYy2hqZqUf45ypD8COn2bhgGJSUE+l5dhNBieA==" }, + "cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==" + }, "color-convert": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", @@ -6546,6 +6684,11 @@ "resolved": "https://registry.npmjs.org/functions-have-names/-/functions-have-names-1.2.3.tgz", "integrity": "sha512-xckBUXyTIqT97tq2x2AMb+g163b5JFysYk0x4qxNFwbfQkmNZoiRHb6sPzI9/QV33WeuvVYBUIiD4NzNIyqaRQ==" }, + "generic-pool": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", + "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==" + }, "get-intrinsic": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.0.tgz", @@ -7544,6 +7687,19 @@ } } }, + "redis": { + "version": "4.6.8", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.8.tgz", + "integrity": "sha512-S7qNkPUYrsofQ0ztWlTHSaK0Qqfl1y+WMIxrzeAGNG+9iUZB4HGeBgkHxE6uJJ6iXrkvLd1RVJ2nvu6H1sAzfQ==", + "requires": { + "@redis/bloom": "1.2.0", + "@redis/client": "1.5.9", + "@redis/graph": "1.1.0", + "@redis/json": "1.0.4", + "@redis/search": "1.1.3", + "@redis/time-series": "1.0.5" + } + }, "regenerator-runtime": { "version": "0.13.11", "resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.13.11.tgz", diff --git a/client/package.json b/client/package.json index 6753189..ba5913d 100644 --- a/client/package.json +++ b/client/package.json @@ -25,13 +25,14 @@ "react": "18.2.0", "react-dom": "18.2.0", "react-graph-vis": "^1.0.7", + "redis": "^4.6.8", "uuidv4": "^6.2.13" }, "devDependencies": { - "typescript": "5.0.2", - "prettier": "^2.8.8", "@types/node": "18.15.5", "@types/react": "18.0.28", - "@types/react-dom": "18.0.11" + "@types/react-dom": "18.0.11", + "prettier": "^2.8.8", + "typescript": "5.0.2" } } diff --git a/client/src/components/connectors-table.tsx b/client/src/components/connectors-table.tsx index eae1491..c782277 100644 --- a/client/src/components/connectors-table.tsx +++ b/client/src/components/connectors-table.tsx @@ -1,4 +1,4 @@ -import { Connector } from '@/models/connector'; +import { Connector, KafkaConnector, RedisConnector } from '@/models/connector'; import { Accordion } from '@mantine/core'; interface TableProps { @@ -19,7 +19,11 @@ export default function PipelinesTable({ connector }: TableProps) { Connector - {connector.name} ({connector.type}), {connector.brokers} | {connector.group_id} + {connector.name} ({connector.type}), + {connector.type == 'kafka' ? + `${(connector as KafkaConnector).brokers} | ${(connector as KafkaConnector).group_id}` + : `${(connector as RedisConnector).host} | ${(connector as RedisConnector).port}` + } diff --git a/client/src/models/connector.ts b/client/src/models/connector.ts index 9d72425..6d56422 100644 --- a/client/src/models/connector.ts +++ b/client/src/models/connector.ts @@ -1,6 +1,14 @@ export interface Connector { name: string; - type: string; + type: 'kafka' | 'redis'; +} + +export interface KafkaConnector extends Connector { brokers: string; group_id: string; } + +export interface RedisConnector extends Connector { + host: string; + port: number +} \ No newline at end of file diff --git a/client/src/models/pipeline-list.ts b/client/src/models/pipeline-list.ts index ac7371c..4a8738d 100644 --- a/client/src/models/pipeline-list.ts +++ b/client/src/models/pipeline-list.ts @@ -1,8 +1,8 @@ -import { Connector } from './connector'; +import { KafkaConnector, RedisConnector } from './connector'; import { Pipeline } from './pipeline'; export interface PipelineList { name: string; pipelines: Pipeline[]; - connector: Connector; + connector: RedisConnector | KafkaConnector; } diff --git a/client/src/pages/api/create-topics.ts b/client/src/pages/api/create-topics.ts index 18e2ee1..2faa031 100644 --- a/client/src/pages/api/create-topics.ts +++ b/client/src/pages/api/create-topics.ts @@ -1,7 +1,15 @@ -import { Kafka } from 'kafkajs'; import type { NextApiRequest, NextApiResponse } from 'next'; import { Pipeline } from '@/models/pipeline'; -import { Connector } from '@/models/connector'; +import { Connector, KafkaConnector } from '@/models/connector'; +import { createTopics } from '@/tools/kafka'; + +const kafkaHandler = async (connector: KafkaConnector, pipelines: Pipeline[]) => { + await createTopics(connector, pipelines); +} + +const redisHandler = () => { + +} export default async function handler( req: NextApiRequest, @@ -12,31 +20,12 @@ export default async function handler( const connector = req.body.kafkaConnector as Connector; console.log(connector); - - // Create the client with the broker list - const kafka = new Kafka({ - clientId: 'test-client', - brokers: [...connector.brokers.split(',')], - }); - - const admin = kafka.admin(); - const topics = Array.from(new Set(pipelines.map((p) => p.input.topic))); - try { - console.log('start to deleting topics'); - await admin.deleteTopics({ topics: topics }); - } catch { - console.log('failed to delete topics'); + if (connector.type === 'kafka') { + await kafkaHandler(connector as KafkaConnector, pipelines); + } else if (connector.type === 'redis') { + redisHandler(); } - try { - console.log('start to creating topics'); - await admin.createTopics({ - topics: topics.map((x) => ({ topic: x })), - waitForLeaders: false, - }); - } catch { - console.log('failed to create topics'); - } res.status(200).json({ result: 'succeed' }); } else { diff --git a/client/src/pages/api/pipeline-metadata.ts b/client/src/pages/api/pipeline-metadata.ts index 3db86ca..de348b5 100644 --- a/client/src/pages/api/pipeline-metadata.ts +++ b/client/src/pages/api/pipeline-metadata.ts @@ -1,48 +1,10 @@ -import { Consumer, Kafka } from 'kafkajs'; import type { NextApiRequest, NextApiResponse } from 'next'; import { Pipeline } from '@/models/pipeline'; -import { Connector } from '@/models/connector'; +import { Connector, KafkaConnector } from '@/models/connector'; +import { getTopicMetadata } from '@/tools/kafka'; -let consumer: Consumer | null = null; - -const getPipelineMetadata = async (pipeline: Pipeline, connector: Connector) => { - const kafka = new Kafka({ - clientId: 'test-client', - brokers: [...connector.brokers.split(',')], - }); - - console.log('start get metadata'); - - const admin = kafka.admin(); - const topicOffsets = await admin.fetchTopicOffsets(pipeline.input.topic); - let latestMsgTime = undefined; - const promise = new Promise(async (resolve, reject) => { - if (!consumer) { - consumer = kafka.consumer({ groupId: 'reader' }); - await consumer.connect(); - await consumer.subscribe({ - topics: [pipeline.output.topic], - fromBeginning: true, - }); - - consumer - .run({ - eachBatchAutoResolve: false, - autoCommit: false, - eachMessage: async ({ topic,partition, message}) => { - console.log(message) - resolve({msg: message, topicOffsets: topicOffsets}); - } - }) - .catch((err) => reject(err)); - - consumer.seek({topic: pipeline.input.topic,partition: topicOffsets[0].partition, offset: topicOffsets[0].offset}) - } else { - resolve(undefined); - } - }); - - return await promise; +const kafkaHandler = async (connector: KafkaConnector, pipeline: Pipeline) => { + return getTopicMetadata(connector, pipeline); }; export default async function handler( @@ -53,14 +15,13 @@ export default async function handler( const pipeline = req.body.pipeline as Pipeline; const connector = req.body.kafkaConnector as Connector; - const metadata = await getPipelineMetadata(pipeline, connector); - - if (consumer) { - console.log('retrived messages'); - await consumer.disconnect(); - consumer = null; + let metadata; + if (connector.type === 'kafka') { + metadata = await kafkaHandler(connector as KafkaConnector, pipeline); } + console.log('retrived messages'); + res.status(200).json({ pipelineMetadata: metadata }); } else { res.status(405); diff --git a/client/src/pages/api/test.ts b/client/src/pages/api/test.ts index ebdfcb7..daf09d3 100644 --- a/client/src/pages/api/test.ts +++ b/client/src/pages/api/test.ts @@ -1,7 +1,8 @@ -import { Kafka } from 'kafkajs'; import type { NextApiRequest, NextApiResponse } from 'next'; import { Pipeline } from '@/models/pipeline'; -import { Connector } from '@/models/connector'; +import { Connector, KafkaConnector, RedisConnector } from '@/models/connector'; +import { produceTest as kafkaHandler } from '@/tools/kafka'; +import { produceTest as redisHandler } from '@/tools/redis'; export default async function handler( req: NextApiRequest, @@ -13,19 +14,11 @@ export default async function handler( const message = req.body.message; console.log(connector); - // Create the client with the broker list - const kafka = new Kafka({ - clientId: 'test-client', - brokers: [...connector.brokers.split(',')], - }); - - const producer = kafka.producer(); - - await producer.connect(); - await producer.send({ - topic: pipeline.input.topic, - messages: [{ key: 'test-key', value: message }], - }); + if (connector.type === 'kafka') { + await kafkaHandler(connector as KafkaConnector, pipeline, message); + }else if(connector.type === 'redis'){ + await redisHandler(connector as RedisConnector, pipeline, message); + } res.status(200).json({ result: 'succeed' }); } else { diff --git a/client/src/pages/api/transformed-messages.ts b/client/src/pages/api/transformed-messages.ts index f74bc3e..9360c2f 100644 --- a/client/src/pages/api/transformed-messages.ts +++ b/client/src/pages/api/transformed-messages.ts @@ -1,62 +1,10 @@ -import { Consumer, Kafka } from 'kafkajs'; import type { NextApiRequest, NextApiResponse } from 'next'; import { Pipeline } from '@/models/pipeline'; -import { Connector } from '@/models/connector'; +import { Connector, KafkaConnector } from '@/models/connector'; +import { getMessages } from '@/tools/kafka'; -let consumer: Consumer | null = null; - -const getMessages = async (pipeline: Pipeline, connector: Connector) => { - const kafka = new Kafka({ - clientId: 'test-client', - brokers: [...connector.brokers.split(',')], - }); - - console.log('start get messages'); - const promise = new Promise(async (resolve, reject) => { - if (!consumer) { - consumer = kafka.consumer({ groupId: 'reader' }); - await consumer.connect(); - await consumer.subscribe({ - topics: [pipeline.output.topic], - fromBeginning: true, - }); - - consumer - .run({ - eachBatchAutoResolve: false, - autoCommit: false, - eachBatch: async ({ batch, resolveOffset, heartbeat }) => { - const messages: { - timestamp: string; - value: string | undefined; - }[] = []; - for (let message of batch.messages) { - console.log({ - value: message.value - ? message.value.toString() - : 'null', - partition: batch.partition, - offset: message.offset, - timestamp: message.timestamp, - }); - messages.push({ - timestamp: message.timestamp, - value: message.value - ? message.value.toString() - : 'null', - }); - } - - resolve(messages); - }, - }) - .catch((err) => reject(err)); - } else { - resolve(undefined); - } - }); - - return await promise; +const kafkaHandler = async (connector: KafkaConnector, pipeline: Pipeline) => { + return getMessages(connector, pipeline) }; export default async function handler( @@ -67,14 +15,13 @@ export default async function handler( const pipeline = req.body.pipeline as Pipeline; const connector = req.body.kafkaConnector as Connector; - const messages = await getMessages(pipeline, connector); - - if (consumer) { - console.log('retrived messages'); - await consumer.disconnect(); - consumer = null; + let messages; + if (connector.type === 'kafka') { + messages = await kafkaHandler(connector as KafkaConnector, pipeline); } + console.log('retrived messages'); + res.status(200).json({ transformedMessages: messages }); } else { res.status(405); diff --git a/client/src/tools/kafka.ts b/client/src/tools/kafka.ts new file mode 100644 index 0000000..5b3413b --- /dev/null +++ b/client/src/tools/kafka.ts @@ -0,0 +1,145 @@ +import { KafkaConnector } from "@/models/connector"; +import { Pipeline } from "@/models/pipeline"; +import { Kafka, KafkaMessage, PartitionOffset } from "kafkajs"; + +export const createTopics = async (connector: KafkaConnector, pipelines: Pipeline[]) => { + // Create the client with the broker list + const kafka = new Kafka({ + clientId: 'test-client', + brokers: [...connector.brokers.split(',')], + }); + + const admin = kafka.admin(); + const topics = Array.from(new Set(pipelines.map((p) => p.input.topic))); + try { + console.log('start to deleting topics'); + await admin.deleteTopics({ topics: topics }); + } catch { + console.log('failed to delete topics'); + } + + try { + console.log('start to creating topics'); + await admin.createTopics({ + topics: topics.map((x) => ({ topic: x })), + waitForLeaders: false, + }); + } catch { + console.log('failed to create topics'); + } +} + +export const produceTest = async (connector: KafkaConnector, pipeline: Pipeline, message: string) => { + // Create the client with the broker list + const kafka = new Kafka({ + clientId: 'test-client', + brokers: [...connector.brokers.split(',')], + }); + + const producer = kafka.producer(); + + await producer.connect(); + await producer.send({ + topic: pipeline.input.topic, + messages: [{ key: 'test-key', value: message }], + }); +} + +interface IMessage { + timestamp: string; + value: string | undefined; +} + +export const getMessages = async (connector: KafkaConnector, pipeline: Pipeline): Promise => { + const kafka = new Kafka({ + clientId: 'test-client', + brokers: [...connector.brokers.split(',')], + }); + + console.log('start get messages'); + const promise = new Promise(async (resolve, reject) => { + const consumer = kafka.consumer({ groupId: 'reader' }); + await consumer.connect(); + await consumer.subscribe({ + topics: [pipeline.output.topic], + fromBeginning: true, + }); + + consumer + .run({ + eachBatchAutoResolve: false, + autoCommit: false, + eachBatch: async ({ batch, resolveOffset, heartbeat }) => { + const messages: IMessage[] = []; + for (let message of batch.messages) { + console.log({ + value: message.value + ? message.value.toString() + : 'null', + partition: batch.partition, + offset: message.offset, + timestamp: message.timestamp, + }); + messages.push({ + timestamp: message.timestamp, + value: message.value + ? message.value.toString() + : 'null', + }); + } + + await consumer.disconnect(); + resolve(messages); + }, + }) + .catch(async (err) => { + await consumer.disconnect(); + reject(err) + }); + }); + + return await promise; +}; + +export const getTopicMetadata = async (connector: KafkaConnector, pipeline: Pipeline) => { + const kafka = new Kafka({ + clientId: 'test-client', + brokers: [...connector.brokers.split(',')], + }); + + console.log('start get metadata'); + + const admin = kafka.admin(); + const topicOffsets = await admin.fetchTopicOffsets(pipeline.input.topic); + + const promise = new Promise<{ msg: KafkaMessage, topicOffsets: (PartitionOffset & { + high: string; + low: string; + })[] }>(async (resolve, reject) => { + const consumer = kafka.consumer({ groupId: 'reader' }); + await consumer.connect(); + await consumer.subscribe({ + topics: [pipeline.output.topic], + fromBeginning: true, + }); + + consumer + .run({ + eachBatchAutoResolve: false, + autoCommit: false, + eachMessage: async ({ topic, partition, message }) => { + console.log(message) + await consumer.disconnect(); + resolve({ msg: message, topicOffsets: topicOffsets }); + } + }) + .catch(async (err) => { + await consumer.disconnect(); + reject(err) + }); + + consumer.seek({ topic: pipeline.input.topic, partition: topicOffsets[0].partition, offset: topicOffsets[0].offset }) + }); + + return await promise; +}; diff --git a/client/src/tools/redis.ts b/client/src/tools/redis.ts new file mode 100644 index 0000000..87af4de --- /dev/null +++ b/client/src/tools/redis.ts @@ -0,0 +1,10 @@ +import redis from 'redis'; +import { RedisConnector } from "@/models/connector"; +import { Pipeline } from "@/models/pipeline"; + +export const produceTest = async (connector: RedisConnector, pipeline: Pipeline, message: string) => { + const redisClient = redis.createClient({ + url: `redis://${connector.host}:${connector.port}/0` + }) + await redisClient.publish(pipeline.input.topic, message); +} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 2ba9620..371affd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,14 @@ services: - /var/run/docker.sock:/var/run/docker.sock ports: - "5000:5000" + redis: + hostname: redis + image: redis + command: redis-server --include /usr/local/etc/redis/redis.conf + volumes: + - ./redis/redis.conf:/usr/local/etc/redis/redis.conf + ports: + - "6379:6379" zookeeper: image: wurstmeister/zookeeper ports: