-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcodesnippets
More file actions
80 lines (70 loc) · 2.28 KB
/
codesnippets
File metadata and controls
80 lines (70 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#Python code snippets to generate real stock quote message with uuid and ISO 8601 formatted time stamp.
from faker.providers import BaseProvider
import random
import time
from yahoo_fin import stock_info as si
import uuid
from datetime import datetime
StockNames = ["BTC-USD", "ETH-USD", "BNB-USD", "ADA-USD", "DOGE-USD"]
class RealStockProvider_uuid(BaseProvider):
def stock_name(self):
return random.choice(StockNames)
def stock_value(self, stockname):
nextval = si.get_live_price(stockname)
return nextval
def produce_msg(self):
stockname = self.stock_name()
message = {
"message_uuid": str(uuid.uuid4()),
"stock_name": stockname,
"stock_value": self.stock_value(stockname),
"message_timestamp": str(datetime.now().astimezone().isoformat()),
}
key = {"stock_name": stockname}
return message, key
#Flink source table
CREATE TABLE realstock_uuid (
message_uuid VARCHAR,
stock_name VARCHAR,
stock_value FLOAT,
message_timestamp VARCHAR
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka-22264d8c-egong-kafka.j.aivencloud.com:22367',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'realstock',
'value.format' = 'json'
)
#Flink Sink Tables
CREATE TABLE pennystock (
message_uuid VARCHAR,
stock_name VARCHAR,
stock_value FLOAT,
message_timestamp VARCHAR,
PRIMARY KEY (message_uuid) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka-22264d8c-egong-kafka.j.aivencloud.com:22367',
'topic' = 'pennystock',
'value.format' = 'json',
'key.format' = 'json'
)
CREATE TABLE nonpennystock (
message_uuid VARCHAR,
stock_name VARCHAR,
stock_value FLOAT,
message_timestamp VARCHAR,
PRIMARY KEY (message_uuid) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka-22264d8c-egong-kafka.j.aivencloud.com:22367',
'topic' = 'nonpennystock',
'value.format' = 'json',
'key.format' = 'json'
)
#Flink SQL application
EXECUTE STATEMENT SET
BEGIN
INSERT INTO pennystock SELECT * FROM realstock_uuid WHERE stock_value < 1.0;
INSERT INTO nonpennystock SELECT * FROM realstock_uuid WHERE stock_value >= 1.0;
END