技术架构:

首先,创建测试用例mysql表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
CREATE DATABASE mydb;
USE mydb;

drop table if exists mydb.products;
drop table if exists mydb.orders;
drop table if exists mydb.shipments;


--产品表
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");


--订单表
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- 是否下单
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);


-- 物流表
CREATE TABLE shipments (
shipment_id bigint NOT NULL auto_increment PRIMARY KEY,
order_id bigint NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
)auto_increment=1001;

INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);

本文使用以下两个cdc jar包,需要放置到flink/lib中:

紧接着,开启flink集群,打开flink sql客户端,并创建cdc表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
[root@hadoop04 flink-1.12]# bin/sql-client.sh embedded
No default environment specified.
Searching for '/home/module/flink-1.12/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/home/module/flink-1.12/conf/sql-client-defaults.yaml
No session environment specified.

Command history file path: /root/.flink-sql-history
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░

______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|

Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink SQL> CREATE TABLE products (
> id INT,
> name STRING,
> description STRING
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '143382',
> 'database-name' = 'mydb',
> 'table-name' = 'products'
> );
[INFO] Table has been created.

Flink SQL>
> CREATE TABLE orders (
> order_id INT,
> order_date TIMESTAMP(0),
> customer_name STRING,
> price DECIMAL(10, 5),
> product_id INT,
> order_status BOOLEAN
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '143382',
> 'database-name' = 'mydb',
> 'table-name' = 'orders'
> );
[INFO] Table has been created.

Flink SQL>
> CREATE TABLE shipments (
> shipment_id INT,
> order_id INT,
> origin STRING,
> destination STRING,
> is_arrived BOOLEAN
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '143382',
> 'database-name' = 'mydb',
> 'table-name' = 'shipments'
> );
[INFO] Table has been created.

Flink SQL>
> CREATE TABLE enriched_orders (
> order_id INT,
> order_date TIMESTAMP(0),
> customer_name STRING,
> price DECIMAL(10, 5),
> product_id INT,
> order_status BOOLEAN,
> product_name STRING,
> product_description STRING,
> shipment_id INT,
> origin STRING,
> destination STRING,
> is_arrived BOOLEAN,
> PRIMARY KEY (order_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = 'http://hadoop04:9200',
> 'index' = 'enriched_orders'
> );
[INFO] Table has been created.

Flink SQL>
> INSERT INTO enriched_orders
> SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
> FROM orders AS o
> LEFT JOIN products AS p ON o.product_id = p.id
> LEFT JOIN shipments AS s ON o.order_id = s.order_id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: bfe9e15269924fec5bb9028e175efdab

查看flink web端,发现正在运行的flink-sql cdc任务:

查看kibana中数据条数:3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "enriched_orders",
"_type" : "_doc",
"_id" : "10001",
"_score" : 1.0,
"_source" : {
"order_id" : 10001,
"order_date" : "2020-07-30 10:08:22",
"customer_name" : "Jark",
"price" : 50.5,
"product_id" : 102,
"order_status" : false,
"product_name" : "car battery",
"product_description" : "12V car battery",
"shipment_id" : 1001,
"origin" : "Beijing",
"destination" : "Shanghai",
"is_arrived" : false
}
},
{
"_index" : "enriched_orders",
"_type" : "_doc",
"_id" : "10002",
"_score" : 1.0,
"_source" : {
"order_id" : 10002,
"order_date" : "2020-07-30 10:11:09",
"customer_name" : "Sally",
"price" : 15,
"product_id" : 105,
"order_status" : false,
"product_name" : "hammer",
"product_description" : "14oz carpenter's hammer",
"shipment_id" : 1002,
"origin" : "Hangzhou",
"destination" : "Shanghai",
"is_arrived" : false
}
},
{
"_index" : "enriched_orders",
"_type" : "_doc",
"_id" : "10003",
"_score" : 1.0,
"_source" : {
"order_id" : 10003,
"order_date" : "2020-07-30 12:00:30",
"customer_name" : "Edward",
"price" : 25.25,
"product_id" : 106,
"order_status" : false,
"product_name" : "hammer",
"product_description" : "16oz carpenter's hammer",
"shipment_id" : 1003,
"origin" : "Shanghai",
"destination" : "Hangzhou",
"is_arrived" : false
}
}
]
}
}

下面在flink-sql客户端中进行插入更新,并观察数据条数和内容的变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

--MySQL
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);

--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;

--MySQL
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;

--MySQL
DELETE FROM orders WHERE order_id = 10004;

观察得到,第一条数据插入后,订单数据多出了一条变成了4条,随后物流信息更新,订单状态更新,最后删除订单,es中数据又变成3条。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
{
"took" : 993,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "enriched_orders",
"_type" : "_doc",
"_id" : "10001",
"_score" : 1.0,
"_source" : {
"order_id" : 10001,
"order_date" : "2020-07-30 10:08:22",
"customer_name" : "Jark",
"price" : 50.5,
"product_id" : 102,
"order_status" : false,
"product_name" : "car battery",
"product_description" : "12V car battery",
"shipment_id" : 1001,
"origin" : "Beijing",
"destination" : "Shanghai",
"is_arrived" : false
}
},
{
"_index" : "enriched_orders",
"_type" : "_doc",
"_id" : "10002",
"_score" : 1.0,
"_source" : {
"order_id" : 10002,
"order_date" : "2020-07-30 10:11:09",
"customer_name" : "Sally",
"price" : 15,
"product_id" : 105,
"order_status" : false,
"product_name" : "hammer",
"product_description" : "14oz carpenter's hammer",
"shipment_id" : 1002,
"origin" : "Hangzhou",
"destination" : "Shanghai",
"is_arrived" : false
}
},
{
"_index" : "enriched_orders",
"_type" : "_doc",
"_id" : "10003",
"_score" : 1.0,
"_source" : {
"order_id" : 10003,
"order_date" : "2020-07-30 12:00:30",
"customer_name" : "Edward",
"price" : 25.25,
"product_id" : 106,
"order_status" : false,
"product_name" : "hammer",
"product_description" : "16oz carpenter's hammer",
"shipment_id" : 1003,
"origin" : "Shanghai",
"destination" : "Hangzhou",
"is_arrived" : false
}
},
{
"_index" : "enriched_orders",
"_type" : "_doc",
"_id" : "10004",
"_score" : 1.0,
"_source" : {
"order_id" : 10004,
"order_date" : "2020-07-30 15:22:00",
"customer_name" : "Jark",
"price" : 29.71,
"product_id" : 104,
"order_status" : true,
"product_name" : "hammer",
"product_description" : "12oz carpenter's hammer",
"shipment_id" : 1004,
"origin" : "Shanghai",
"destination" : "Beijing",
"is_arrived" : true
}
}
]
}
}

cdc sink可以是es,同样也可以是kafka等,使用sql cdc简化了代码开发的繁琐流程,真好用。

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

扫一扫,分享到微信

微信分享二维码
  • Copyrights © 2020-2021 ycfn97
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信