From a7a71b893057f0c6332b106a5236d1a322249348 Mon Sep 17 00:00:00 2001
From: ofirelarat <96ofir11@gmail.com>
Date: Thu, 7 Sep 2023 14:55:23 +0300
Subject: [PATCH 1/3] added to redis to local env
---
.devcontainer/docker-compose.yml | 10 +++++++++-
client/src/models/connector.ts | 8 +++++---
docker-compose.yml | 8 ++++++++
3 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml
index aa19e0f..e910b4c 100644
--- a/.devcontainer/docker-compose.yml
+++ b/.devcontainer/docker-compose.yml
@@ -12,7 +12,15 @@ services:
- ../..:/workspaces:cached
# Overrides default command so things don't shut down after the process ends.
entrypoint: /usr/local/share/docker-init.sh
- command: sleep infinity
+ command: sleep infinity
+ redis:
+ hostname: redis
+ image: redis
+ command: redis-server --include /usr/local/etc/redis/redis.conf
+ volumes:
+ - ./redis/redis.conf:/usr/local/etc/redis/redis.conf
+ ports:
+ - "6379:6379"
zookeeper:
image: wurstmeister/zookeeper
ports:
diff --git a/client/src/models/connector.ts b/client/src/models/connector.ts
index 9d72425..855e60e 100644
--- a/client/src/models/connector.ts
+++ b/client/src/models/connector.ts
@@ -1,6 +1,8 @@
export interface Connector {
name: string;
- type: string;
- brokers: string;
- group_id: string;
+ type: 'kafka' | 'redis';
+ brokers?: string;
+ group_id?: string;
+ host?: string;
+ port?: number
}
diff --git a/docker-compose.yml b/docker-compose.yml
index 2ba9620..371affd 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -10,6 +10,14 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
ports:
- "5000:5000"
+ redis:
+ hostname: redis
+ image: redis
+ command: redis-server --include /usr/local/etc/redis/redis.conf
+ volumes:
+ - ./redis/redis.conf:/usr/local/etc/redis/redis.conf
+ ports:
+ - "6379:6379"
zookeeper:
image: wurstmeister/zookeeper
ports:
From 9e6c99f3fe165c12bd9bbd5a8878a2875f76579e Mon Sep 17 00:00:00 2001
From: ofirelarat <96ofir11@gmail.com>
Date: Fri, 8 Sep 2023 19:34:20 +0300
Subject: [PATCH 2/3] tring to use the connector abstractly in client
---
client/src/components/connectors-table.tsx | 8 +-
client/src/models/connector.ts | 14 +-
client/src/models/pipeline-list.ts | 4 +-
client/src/pages/api/create-topics.ts | 39 ++---
client/src/pages/api/pipeline-metadata.ts | 57 ++------
client/src/pages/api/test.ts | 24 ++-
client/src/pages/api/transformed-messages.ts | 71 ++-------
client/src/tools/kafka.ts | 145 +++++++++++++++++++
8 files changed, 204 insertions(+), 158 deletions(-)
create mode 100644 client/src/tools/kafka.ts
diff --git a/client/src/components/connectors-table.tsx b/client/src/components/connectors-table.tsx
index eae1491..c782277 100644
--- a/client/src/components/connectors-table.tsx
+++ b/client/src/components/connectors-table.tsx
@@ -1,4 +1,4 @@
-import { Connector } from '@/models/connector';
+import { Connector, KafkaConnector, RedisConnector } from '@/models/connector';
import { Accordion } from '@mantine/core';
interface TableProps {
@@ -19,7 +19,11 @@ export default function PipelinesTable({ connector }: TableProps) {
Connector
- {connector.name} ({connector.type}), {connector.brokers} | {connector.group_id}
+ {connector.name} ({connector.type}),
+ {connector.type == 'kafka' ?
+ `${(connector as KafkaConnector).brokers} | ${(connector as KafkaConnector).group_id}`
+ : `${(connector as RedisConnector).host} | ${(connector as RedisConnector).port}`
+ }
diff --git a/client/src/models/connector.ts b/client/src/models/connector.ts
index 855e60e..6d56422 100644
--- a/client/src/models/connector.ts
+++ b/client/src/models/connector.ts
@@ -1,8 +1,14 @@
export interface Connector {
name: string;
type: 'kafka' | 'redis';
- brokers?: string;
- group_id?: string;
- host?: string;
- port?: number
}
+
+export interface KafkaConnector extends Connector {
+ brokers: string;
+ group_id: string;
+}
+
+export interface RedisConnector extends Connector {
+ host: string;
+ port: number
+}
\ No newline at end of file
diff --git a/client/src/models/pipeline-list.ts b/client/src/models/pipeline-list.ts
index ac7371c..4a8738d 100644
--- a/client/src/models/pipeline-list.ts
+++ b/client/src/models/pipeline-list.ts
@@ -1,8 +1,8 @@
-import { Connector } from './connector';
+import { KafkaConnector, RedisConnector } from './connector';
import { Pipeline } from './pipeline';
export interface PipelineList {
name: string;
pipelines: Pipeline[];
- connector: Connector;
+ connector: RedisConnector | KafkaConnector;
}
diff --git a/client/src/pages/api/create-topics.ts b/client/src/pages/api/create-topics.ts
index 18e2ee1..2faa031 100644
--- a/client/src/pages/api/create-topics.ts
+++ b/client/src/pages/api/create-topics.ts
@@ -1,7 +1,15 @@
-import { Kafka } from 'kafkajs';
import type { NextApiRequest, NextApiResponse } from 'next';
import { Pipeline } from '@/models/pipeline';
-import { Connector } from '@/models/connector';
+import { Connector, KafkaConnector } from '@/models/connector';
+import { createTopics } from '@/tools/kafka';
+
+const kafkaHandler = async (connector: KafkaConnector, pipelines: Pipeline[]) => {
+ await createTopics(connector, pipelines);
+}
+
+const redisHandler = () => {
+
+}
export default async function handler(
req: NextApiRequest,
@@ -12,31 +20,12 @@ export default async function handler(
const connector = req.body.kafkaConnector as Connector;
console.log(connector);
-
- // Create the client with the broker list
- const kafka = new Kafka({
- clientId: 'test-client',
- brokers: [...connector.brokers.split(',')],
- });
-
- const admin = kafka.admin();
- const topics = Array.from(new Set(pipelines.map((p) => p.input.topic)));
- try {
- console.log('start to deleting topics');
- await admin.deleteTopics({ topics: topics });
- } catch {
- console.log('failed to delete topics');
+ if (connector.type === 'kafka') {
+ await kafkaHandler(connector as KafkaConnector, pipelines);
+ } else if (connector.type === 'redis') {
+ redisHandler();
}
- try {
- console.log('start to creating topics');
- await admin.createTopics({
- topics: topics.map((x) => ({ topic: x })),
- waitForLeaders: false,
- });
- } catch {
- console.log('failed to create topics');
- }
res.status(200).json({ result: 'succeed' });
} else {
diff --git a/client/src/pages/api/pipeline-metadata.ts b/client/src/pages/api/pipeline-metadata.ts
index 3db86ca..de348b5 100644
--- a/client/src/pages/api/pipeline-metadata.ts
+++ b/client/src/pages/api/pipeline-metadata.ts
@@ -1,48 +1,10 @@
-import { Consumer, Kafka } from 'kafkajs';
import type { NextApiRequest, NextApiResponse } from 'next';
import { Pipeline } from '@/models/pipeline';
-import { Connector } from '@/models/connector';
+import { Connector, KafkaConnector } from '@/models/connector';
+import { getTopicMetadata } from '@/tools/kafka';
-let consumer: Consumer | null = null;
-
-const getPipelineMetadata = async (pipeline: Pipeline, connector: Connector) => {
- const kafka = new Kafka({
- clientId: 'test-client',
- brokers: [...connector.brokers.split(',')],
- });
-
- console.log('start get metadata');
-
- const admin = kafka.admin();
- const topicOffsets = await admin.fetchTopicOffsets(pipeline.input.topic);
- let latestMsgTime = undefined;
- const promise = new Promise(async (resolve, reject) => {
- if (!consumer) {
- consumer = kafka.consumer({ groupId: 'reader' });
- await consumer.connect();
- await consumer.subscribe({
- topics: [pipeline.output.topic],
- fromBeginning: true,
- });
-
- consumer
- .run({
- eachBatchAutoResolve: false,
- autoCommit: false,
- eachMessage: async ({ topic,partition, message}) => {
- console.log(message)
- resolve({msg: message, topicOffsets: topicOffsets});
- }
- })
- .catch((err) => reject(err));
-
- consumer.seek({topic: pipeline.input.topic,partition: topicOffsets[0].partition, offset: topicOffsets[0].offset})
- } else {
- resolve(undefined);
- }
- });
-
- return await promise;
+const kafkaHandler = async (connector: KafkaConnector, pipeline: Pipeline) => {
+ return getTopicMetadata(connector, pipeline);
};
export default async function handler(
@@ -53,14 +15,13 @@ export default async function handler(
const pipeline = req.body.pipeline as Pipeline;
const connector = req.body.kafkaConnector as Connector;
- const metadata = await getPipelineMetadata(pipeline, connector);
-
- if (consumer) {
- console.log('retrived messages');
- await consumer.disconnect();
- consumer = null;
+ let metadata;
+ if (connector.type === 'kafka') {
+ metadata = await kafkaHandler(connector as KafkaConnector, pipeline);
}
+ console.log('retrived messages');
+
res.status(200).json({ pipelineMetadata: metadata });
} else {
res.status(405);
diff --git a/client/src/pages/api/test.ts b/client/src/pages/api/test.ts
index ebdfcb7..03f33b6 100644
--- a/client/src/pages/api/test.ts
+++ b/client/src/pages/api/test.ts
@@ -1,7 +1,11 @@
-import { Kafka } from 'kafkajs';
import type { NextApiRequest, NextApiResponse } from 'next';
import { Pipeline } from '@/models/pipeline';
-import { Connector } from '@/models/connector';
+import { Connector, KafkaConnector } from '@/models/connector';
+import { produceTest } from '@/tools/kafka';
+
+const kafkaHandler = async (connector: KafkaConnector, pipeline: Pipeline, message: string) => {
+ await produceTest(connector, pipeline, message);
+}
export default async function handler(
req: NextApiRequest,
@@ -13,19 +17,9 @@ export default async function handler(
const message = req.body.message;
console.log(connector);
- // Create the client with the broker list
- const kafka = new Kafka({
- clientId: 'test-client',
- brokers: [...connector.brokers.split(',')],
- });
-
- const producer = kafka.producer();
-
- await producer.connect();
- await producer.send({
- topic: pipeline.input.topic,
- messages: [{ key: 'test-key', value: message }],
- });
+ if (connector.type === 'kafka') {
+ await kafkaHandler(connector as KafkaConnector, pipeline, message);
+ }
res.status(200).json({ result: 'succeed' });
} else {
diff --git a/client/src/pages/api/transformed-messages.ts b/client/src/pages/api/transformed-messages.ts
index f74bc3e..9360c2f 100644
--- a/client/src/pages/api/transformed-messages.ts
+++ b/client/src/pages/api/transformed-messages.ts
@@ -1,62 +1,10 @@
-import { Consumer, Kafka } from 'kafkajs';
import type { NextApiRequest, NextApiResponse } from 'next';
import { Pipeline } from '@/models/pipeline';
-import { Connector } from '@/models/connector';
+import { Connector, KafkaConnector } from '@/models/connector';
+import { getMessages } from '@/tools/kafka';
-let consumer: Consumer | null = null;
-
-const getMessages = async (pipeline: Pipeline, connector: Connector) => {
- const kafka = new Kafka({
- clientId: 'test-client',
- brokers: [...connector.brokers.split(',')],
- });
-
- console.log('start get messages');
- const promise = new Promise(async (resolve, reject) => {
- if (!consumer) {
- consumer = kafka.consumer({ groupId: 'reader' });
- await consumer.connect();
- await consumer.subscribe({
- topics: [pipeline.output.topic],
- fromBeginning: true,
- });
-
- consumer
- .run({
- eachBatchAutoResolve: false,
- autoCommit: false,
- eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
- const messages: {
- timestamp: string;
- value: string | undefined;
- }[] = [];
- for (let message of batch.messages) {
- console.log({
- value: message.value
- ? message.value.toString()
- : 'null',
- partition: batch.partition,
- offset: message.offset,
- timestamp: message.timestamp,
- });
- messages.push({
- timestamp: message.timestamp,
- value: message.value
- ? message.value.toString()
- : 'null',
- });
- }
-
- resolve(messages);
- },
- })
- .catch((err) => reject(err));
- } else {
- resolve(undefined);
- }
- });
-
- return await promise;
+const kafkaHandler = async (connector: KafkaConnector, pipeline: Pipeline) => {
+ return getMessages(connector, pipeline)
};
export default async function handler(
@@ -67,14 +15,13 @@ export default async function handler(
const pipeline = req.body.pipeline as Pipeline;
const connector = req.body.kafkaConnector as Connector;
- const messages = await getMessages(pipeline, connector);
-
- if (consumer) {
- console.log('retrived messages');
- await consumer.disconnect();
- consumer = null;
+ let messages;
+ if (connector.type === 'kafka') {
+ messages = await kafkaHandler(connector as KafkaConnector, pipeline);
}
+ console.log('retrived messages');
+
res.status(200).json({ transformedMessages: messages });
} else {
res.status(405);
diff --git a/client/src/tools/kafka.ts b/client/src/tools/kafka.ts
new file mode 100644
index 0000000..5b3413b
--- /dev/null
+++ b/client/src/tools/kafka.ts
@@ -0,0 +1,145 @@
+import { KafkaConnector } from "@/models/connector";
+import { Pipeline } from "@/models/pipeline";
+import { Kafka, KafkaMessage, PartitionOffset } from "kafkajs";
+
+export const createTopics = async (connector: KafkaConnector, pipelines: Pipeline[]) => {
+ // Create the client with the broker list
+ const kafka = new Kafka({
+ clientId: 'test-client',
+ brokers: [...connector.brokers.split(',')],
+ });
+
+ const admin = kafka.admin();
+ const topics = Array.from(new Set(pipelines.map((p) => p.input.topic)));
+ try {
+ console.log('start to deleting topics');
+ await admin.deleteTopics({ topics: topics });
+ } catch {
+ console.log('failed to delete topics');
+ }
+
+ try {
+ console.log('start to creating topics');
+ await admin.createTopics({
+ topics: topics.map((x) => ({ topic: x })),
+ waitForLeaders: false,
+ });
+ } catch {
+ console.log('failed to create topics');
+ }
+}
+
+export const produceTest = async (connector: KafkaConnector, pipeline: Pipeline, message: string) => {
+ // Create the client with the broker list
+ const kafka = new Kafka({
+ clientId: 'test-client',
+ brokers: [...connector.brokers.split(',')],
+ });
+
+ const producer = kafka.producer();
+
+ await producer.connect();
+ await producer.send({
+ topic: pipeline.input.topic,
+ messages: [{ key: 'test-key', value: message }],
+ });
+}
+
+interface IMessage {
+ timestamp: string;
+ value: string | undefined;
+}
+
+export const getMessages = async (connector: KafkaConnector, pipeline: Pipeline): Promise => {
+ const kafka = new Kafka({
+ clientId: 'test-client',
+ brokers: [...connector.brokers.split(',')],
+ });
+
+ console.log('start get messages');
+ const promise = new Promise(async (resolve, reject) => {
+ const consumer = kafka.consumer({ groupId: 'reader' });
+ await consumer.connect();
+ await consumer.subscribe({
+ topics: [pipeline.output.topic],
+ fromBeginning: true,
+ });
+
+ consumer
+ .run({
+ eachBatchAutoResolve: false,
+ autoCommit: false,
+ eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
+ const messages: IMessage[] = [];
+ for (let message of batch.messages) {
+ console.log({
+ value: message.value
+ ? message.value.toString()
+ : 'null',
+ partition: batch.partition,
+ offset: message.offset,
+ timestamp: message.timestamp,
+ });
+ messages.push({
+ timestamp: message.timestamp,
+ value: message.value
+ ? message.value.toString()
+ : 'null',
+ });
+ }
+
+ await consumer.disconnect();
+ resolve(messages);
+ },
+ })
+ .catch(async (err) => {
+ await consumer.disconnect();
+ reject(err)
+ });
+ });
+
+ return await promise;
+};
+
+export const getTopicMetadata = async (connector: KafkaConnector, pipeline: Pipeline) => {
+ const kafka = new Kafka({
+ clientId: 'test-client',
+ brokers: [...connector.brokers.split(',')],
+ });
+
+ console.log('start get metadata');
+
+ const admin = kafka.admin();
+ const topicOffsets = await admin.fetchTopicOffsets(pipeline.input.topic);
+
+ const promise = new Promise<{ msg: KafkaMessage, topicOffsets: (PartitionOffset & {
+ high: string;
+ low: string;
+ })[] }>(async (resolve, reject) => {
+ const consumer = kafka.consumer({ groupId: 'reader' });
+ await consumer.connect();
+ await consumer.subscribe({
+ topics: [pipeline.output.topic],
+ fromBeginning: true,
+ });
+
+ consumer
+ .run({
+ eachBatchAutoResolve: false,
+ autoCommit: false,
+ eachMessage: async ({ topic, partition, message }) => {
+ console.log(message)
+ await consumer.disconnect();
+ resolve({ msg: message, topicOffsets: topicOffsets });
+ }
+ })
+ .catch(async (err) => {
+ await consumer.disconnect();
+ reject(err)
+ });
+
+ consumer.seek({ topic: pipeline.input.topic, partition: topicOffsets[0].partition, offset: topicOffsets[0].offset })
+ });
+
+ return await promise;
+};
From 79528aa92d258b8a11a5ce6667e130bc8b38a925 Mon Sep 17 00:00:00 2001
From: ofirelarat <96ofir11@gmail.com>
Date: Fri, 8 Sep 2023 19:46:55 +0300
Subject: [PATCH 3/3] added produce test message using redis connector
---
client/package-lock.json | 178 ++++++++++++++++++++++++++++++++---
client/package.json | 7 +-
client/src/pages/api/test.ts | 11 +--
client/src/tools/redis.ts | 10 ++
4 files changed, 186 insertions(+), 20 deletions(-)
create mode 100644 client/src/tools/redis.ts
diff --git a/client/package-lock.json b/client/package-lock.json
index a0d27ea..2778323 100644
--- a/client/package-lock.json
+++ b/client/package-lock.json
@@ -14,9 +14,6 @@
"@mantine/hooks": "^6.0.4",
"@mantine/next": "^6.0.4",
"@tabler/icons-react": "^2.15.0",
- "@types/node": "18.15.5",
- "@types/react": "18.0.28",
- "@types/react-dom": "18.0.11",
"axios": "^1.3.4",
"axios-hooks": "^4.0.0",
"eslint": "8.36.0",
@@ -26,11 +23,15 @@
"react": "18.2.0",
"react-dom": "18.2.0",
"react-graph-vis": "^1.0.7",
- "typescript": "5.0.2",
+ "redis": "^4.6.8",
"uuidv4": "^6.2.13"
},
"devDependencies": {
- "prettier": "^2.8.8"
+ "@types/node": "18.15.5",
+ "@types/react": "18.0.28",
+ "@types/react-dom": "18.0.11",
+ "prettier": "^2.8.8",
+ "typescript": "5.0.2"
}
},
"node_modules/@babel/code-frame": {
@@ -899,6 +900,59 @@
"react": "^16.8 || ^17.0 || ^18.0"
}
},
+ "node_modules/@redis/bloom": {
+ "version": "1.2.0",
+ "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz",
+ "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==",
+ "peerDependencies": {
+ "@redis/client": "^1.0.0"
+ }
+ },
+ "node_modules/@redis/client": {
+ "version": "1.5.9",
+ "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.9.tgz",
+ "integrity": "sha512-SffgN+P1zdWJWSXBvJeynvEnmnZrYmtKSRW00xl8pOPFOMJjxRR9u0frSxJpPR6Y4V+k54blJjGW7FgxbTI7bQ==",
+ "dependencies": {
+ "cluster-key-slot": "1.1.2",
+ "generic-pool": "3.9.0",
+ "yallist": "4.0.0"
+ },
+ "engines": {
+ "node": ">=14"
+ }
+ },
+ "node_modules/@redis/graph": {
+ "version": "1.1.0",
+ "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.0.tgz",
+ "integrity": "sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==",
+ "peerDependencies": {
+ "@redis/client": "^1.0.0"
+ }
+ },
+ "node_modules/@redis/json": {
+ "version": "1.0.4",
+ "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.4.tgz",
+ "integrity": "sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==",
+ "peerDependencies": {
+ "@redis/client": "^1.0.0"
+ }
+ },
+ "node_modules/@redis/search": {
+ "version": "1.1.3",
+ "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.3.tgz",
+ "integrity": "sha512-4Dg1JjvCevdiCBTZqjhKkGoC5/BcB7k9j99kdMnaXFXg8x4eyOIVg9487CMv7/BUVkFLZCaIh8ead9mU15DNng==",
+ "peerDependencies": {
+ "@redis/client": "^1.0.0"
+ }
+ },
+ "node_modules/@redis/time-series": {
+ "version": "1.0.5",
+ "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.5.tgz",
+ "integrity": "sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==",
+ "peerDependencies": {
+ "@redis/client": "^1.0.0"
+ }
+ },
"node_modules/@rushstack/eslint-patch": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/@rushstack/eslint-patch/-/eslint-patch-1.2.0.tgz",
@@ -951,7 +1005,8 @@
"node_modules/@types/node": {
"version": "18.15.5",
"resolved": "https://registry.npmjs.org/@types/node/-/node-18.15.5.tgz",
- "integrity": "sha512-Ark2WDjjZO7GmvsyFFf81MXuGTA/d6oP38anyxWOL6EREyBKAxKoFHwBhaZxCfLRLpO8JgVXwqOwSwa7jRcjew=="
+ "integrity": "sha512-Ark2WDjjZO7GmvsyFFf81MXuGTA/d6oP38anyxWOL6EREyBKAxKoFHwBhaZxCfLRLpO8JgVXwqOwSwa7jRcjew==",
+ "dev": true
},
"node_modules/@types/parse-json": {
"version": "4.0.0",
@@ -961,12 +1016,14 @@
"node_modules/@types/prop-types": {
"version": "15.7.5",
"resolved": "https://registry.npmjs.org/@types/prop-types/-/prop-types-15.7.5.tgz",
- "integrity": "sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w=="
+ "integrity": "sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==",
+ "devOptional": true
},
"node_modules/@types/react": {
"version": "18.0.28",
"resolved": "https://registry.npmjs.org/@types/react/-/react-18.0.28.tgz",
"integrity": "sha512-RD0ivG1kEztNBdoAK7lekI9M+azSnitIn85h4iOiaLjaTrMjzslhaqCGaI4IyCJ1RljWiLCEu4jyrLLgqxBTew==",
+ "devOptional": true,
"dependencies": {
"@types/prop-types": "*",
"@types/scheduler": "*",
@@ -977,6 +1034,7 @@
"version": "18.0.11",
"resolved": "https://registry.npmjs.org/@types/react-dom/-/react-dom-18.0.11.tgz",
"integrity": "sha512-O38bPbI2CWtgw/OoQoY+BRelw7uysmXbWvw3nLWO21H1HSh+GOlqPuXshJfjmpNlKiiSDG9cc1JZAaMmVdcTlw==",
+ "dev": true,
"dependencies": {
"@types/react": "*"
}
@@ -984,7 +1042,8 @@
"node_modules/@types/scheduler": {
"version": "0.16.2",
"resolved": "https://registry.npmjs.org/@types/scheduler/-/scheduler-0.16.2.tgz",
- "integrity": "sha512-hppQEBDmlwhFAXKJX2KnWLYu5yMfi91yazPb2l+lbJiwW+wdo1gNeRA+3RgNSO39WYX2euey41KEwnqesU2Jew=="
+ "integrity": "sha512-hppQEBDmlwhFAXKJX2KnWLYu5yMfi91yazPb2l+lbJiwW+wdo1gNeRA+3RgNSO39WYX2euey41KEwnqesU2Jew==",
+ "devOptional": true
},
"node_modules/@types/uuid": {
"version": "8.3.4",
@@ -1438,6 +1497,14 @@
"node": ">=6"
}
},
+ "node_modules/cluster-key-slot": {
+ "version": "1.1.2",
+ "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz",
+ "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==",
+ "engines": {
+ "node": ">=0.10.0"
+ }
+ },
"node_modules/color-convert": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
@@ -2481,6 +2548,14 @@
"url": "https://github.com/sponsors/ljharb"
}
},
+ "node_modules/generic-pool": {
+ "version": "3.9.0",
+ "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz",
+ "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==",
+ "engines": {
+ "node": ">= 4"
+ }
+ },
"node_modules/get-intrinsic": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.0.tgz",
@@ -3955,6 +4030,19 @@
"resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz",
"integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ=="
},
+ "node_modules/redis": {
+ "version": "4.6.8",
+ "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.8.tgz",
+ "integrity": "sha512-S7qNkPUYrsofQ0ztWlTHSaK0Qqfl1y+WMIxrzeAGNG+9iUZB4HGeBgkHxE6uJJ6iXrkvLd1RVJ2nvu6H1sAzfQ==",
+ "dependencies": {
+ "@redis/bloom": "1.2.0",
+ "@redis/client": "1.5.9",
+ "@redis/graph": "1.1.0",
+ "@redis/json": "1.0.4",
+ "@redis/search": "1.1.3",
+ "@redis/time-series": "1.0.5"
+ }
+ },
"node_modules/regenerator-runtime": {
"version": "0.13.11",
"resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.13.11.tgz",
@@ -5363,6 +5451,46 @@
"@babel/runtime": "^7.13.10"
}
},
+ "@redis/bloom": {
+ "version": "1.2.0",
+ "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz",
+ "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==",
+ "requires": {}
+ },
+ "@redis/client": {
+ "version": "1.5.9",
+ "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.9.tgz",
+ "integrity": "sha512-SffgN+P1zdWJWSXBvJeynvEnmnZrYmtKSRW00xl8pOPFOMJjxRR9u0frSxJpPR6Y4V+k54blJjGW7FgxbTI7bQ==",
+ "requires": {
+ "cluster-key-slot": "1.1.2",
+ "generic-pool": "3.9.0",
+ "yallist": "4.0.0"
+ }
+ },
+ "@redis/graph": {
+ "version": "1.1.0",
+ "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.0.tgz",
+ "integrity": "sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==",
+ "requires": {}
+ },
+ "@redis/json": {
+ "version": "1.0.4",
+ "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.4.tgz",
+ "integrity": "sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==",
+ "requires": {}
+ },
+ "@redis/search": {
+ "version": "1.1.3",
+ "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.3.tgz",
+ "integrity": "sha512-4Dg1JjvCevdiCBTZqjhKkGoC5/BcB7k9j99kdMnaXFXg8x4eyOIVg9487CMv7/BUVkFLZCaIh8ead9mU15DNng==",
+ "requires": {}
+ },
+ "@redis/time-series": {
+ "version": "1.0.5",
+ "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.5.tgz",
+ "integrity": "sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==",
+ "requires": {}
+ },
"@rushstack/eslint-patch": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/@rushstack/eslint-patch/-/eslint-patch-1.2.0.tgz",
@@ -5404,7 +5532,8 @@
"@types/node": {
"version": "18.15.5",
"resolved": "https://registry.npmjs.org/@types/node/-/node-18.15.5.tgz",
- "integrity": "sha512-Ark2WDjjZO7GmvsyFFf81MXuGTA/d6oP38anyxWOL6EREyBKAxKoFHwBhaZxCfLRLpO8JgVXwqOwSwa7jRcjew=="
+ "integrity": "sha512-Ark2WDjjZO7GmvsyFFf81MXuGTA/d6oP38anyxWOL6EREyBKAxKoFHwBhaZxCfLRLpO8JgVXwqOwSwa7jRcjew==",
+ "dev": true
},
"@types/parse-json": {
"version": "4.0.0",
@@ -5414,12 +5543,14 @@
"@types/prop-types": {
"version": "15.7.5",
"resolved": "https://registry.npmjs.org/@types/prop-types/-/prop-types-15.7.5.tgz",
- "integrity": "sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w=="
+ "integrity": "sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==",
+ "devOptional": true
},
"@types/react": {
"version": "18.0.28",
"resolved": "https://registry.npmjs.org/@types/react/-/react-18.0.28.tgz",
"integrity": "sha512-RD0ivG1kEztNBdoAK7lekI9M+azSnitIn85h4iOiaLjaTrMjzslhaqCGaI4IyCJ1RljWiLCEu4jyrLLgqxBTew==",
+ "devOptional": true,
"requires": {
"@types/prop-types": "*",
"@types/scheduler": "*",
@@ -5430,6 +5561,7 @@
"version": "18.0.11",
"resolved": "https://registry.npmjs.org/@types/react-dom/-/react-dom-18.0.11.tgz",
"integrity": "sha512-O38bPbI2CWtgw/OoQoY+BRelw7uysmXbWvw3nLWO21H1HSh+GOlqPuXshJfjmpNlKiiSDG9cc1JZAaMmVdcTlw==",
+ "dev": true,
"requires": {
"@types/react": "*"
}
@@ -5437,7 +5569,8 @@
"@types/scheduler": {
"version": "0.16.2",
"resolved": "https://registry.npmjs.org/@types/scheduler/-/scheduler-0.16.2.tgz",
- "integrity": "sha512-hppQEBDmlwhFAXKJX2KnWLYu5yMfi91yazPb2l+lbJiwW+wdo1gNeRA+3RgNSO39WYX2euey41KEwnqesU2Jew=="
+ "integrity": "sha512-hppQEBDmlwhFAXKJX2KnWLYu5yMfi91yazPb2l+lbJiwW+wdo1gNeRA+3RgNSO39WYX2euey41KEwnqesU2Jew==",
+ "devOptional": true
},
"@types/uuid": {
"version": "8.3.4",
@@ -5746,6 +5879,11 @@
"resolved": "https://registry.npmjs.org/clsx/-/clsx-1.1.1.tgz",
"integrity": "sha512-6/bPho624p3S2pMyvP5kKBPXnI3ufHLObBFCfgx+LkeR5lg2XYy2hqZqUf45ypD8COn2bhgGJSUE+l5dhNBieA=="
},
+ "cluster-key-slot": {
+ "version": "1.1.2",
+ "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz",
+ "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA=="
+ },
"color-convert": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
@@ -6546,6 +6684,11 @@
"resolved": "https://registry.npmjs.org/functions-have-names/-/functions-have-names-1.2.3.tgz",
"integrity": "sha512-xckBUXyTIqT97tq2x2AMb+g163b5JFysYk0x4qxNFwbfQkmNZoiRHb6sPzI9/QV33WeuvVYBUIiD4NzNIyqaRQ=="
},
+ "generic-pool": {
+ "version": "3.9.0",
+ "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz",
+ "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g=="
+ },
"get-intrinsic": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.0.tgz",
@@ -7544,6 +7687,19 @@
}
}
},
+ "redis": {
+ "version": "4.6.8",
+ "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.8.tgz",
+ "integrity": "sha512-S7qNkPUYrsofQ0ztWlTHSaK0Qqfl1y+WMIxrzeAGNG+9iUZB4HGeBgkHxE6uJJ6iXrkvLd1RVJ2nvu6H1sAzfQ==",
+ "requires": {
+ "@redis/bloom": "1.2.0",
+ "@redis/client": "1.5.9",
+ "@redis/graph": "1.1.0",
+ "@redis/json": "1.0.4",
+ "@redis/search": "1.1.3",
+ "@redis/time-series": "1.0.5"
+ }
+ },
"regenerator-runtime": {
"version": "0.13.11",
"resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.13.11.tgz",
diff --git a/client/package.json b/client/package.json
index 6753189..ba5913d 100644
--- a/client/package.json
+++ b/client/package.json
@@ -25,13 +25,14 @@
"react": "18.2.0",
"react-dom": "18.2.0",
"react-graph-vis": "^1.0.7",
+ "redis": "^4.6.8",
"uuidv4": "^6.2.13"
},
"devDependencies": {
- "typescript": "5.0.2",
- "prettier": "^2.8.8",
"@types/node": "18.15.5",
"@types/react": "18.0.28",
- "@types/react-dom": "18.0.11"
+ "@types/react-dom": "18.0.11",
+ "prettier": "^2.8.8",
+ "typescript": "5.0.2"
}
}
diff --git a/client/src/pages/api/test.ts b/client/src/pages/api/test.ts
index 03f33b6..daf09d3 100644
--- a/client/src/pages/api/test.ts
+++ b/client/src/pages/api/test.ts
@@ -1,11 +1,8 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { Pipeline } from '@/models/pipeline';
-import { Connector, KafkaConnector } from '@/models/connector';
-import { produceTest } from '@/tools/kafka';
-
-const kafkaHandler = async (connector: KafkaConnector, pipeline: Pipeline, message: string) => {
- await produceTest(connector, pipeline, message);
-}
+import { Connector, KafkaConnector, RedisConnector } from '@/models/connector';
+import { produceTest as kafkaHandler } from '@/tools/kafka';
+import { produceTest as redisHandler } from '@/tools/redis';
export default async function handler(
req: NextApiRequest,
@@ -19,6 +16,8 @@ export default async function handler(
if (connector.type === 'kafka') {
await kafkaHandler(connector as KafkaConnector, pipeline, message);
+ }else if(connector.type === 'redis'){
+ await redisHandler(connector as RedisConnector, pipeline, message);
}
res.status(200).json({ result: 'succeed' });
diff --git a/client/src/tools/redis.ts b/client/src/tools/redis.ts
new file mode 100644
index 0000000..87af4de
--- /dev/null
+++ b/client/src/tools/redis.ts
@@ -0,0 +1,10 @@
+import redis from 'redis';
+import { RedisConnector } from "@/models/connector";
+import { Pipeline } from "@/models/pipeline";
+
+export const produceTest = async (connector: RedisConnector, pipeline: Pipeline, message: string) => {
+ const redisClient = redis.createClient({
+ url: `redis://${connector.host}:${connector.port}/0`
+ })
+ await redisClient.publish(pipeline.input.topic, message);
+}
\ No newline at end of file