【Flink】利用flink Cdc同步mysql数据到es

1. 需求

需要把 mysql 的数据聚合同步到 es 以提供海量数据检索能力

2. 解决方案

  • 变更 mysql 数据时同步调用 es 的 api
  • 变更 mysql 数据时异步调用 es 的 api(先发送消息到消息队列,搜索微服务调用 es 的 api 消费数据)
  • canal 监听 mysql 的 binlog 日志,再同步到 es
  • flink-cdc 监听 mysql 的 binlog 日志同步到 es

3. 选用方案

最终选择 flink-cdc 的方案,一是 go-canal 的仓库不活跃,就三个 issue,提了 bug 也没人修,而且后续可能有实时分析的大数据需求,本身就有 flink 集群的需求。 二是 canal 的 release 版本要最近才会更新,最后一次更新是 21 年 4 月,害怕烂尾。三是俺尝试部署 canal,最后失败了 💦。

4. 部署环境

选用 docker-compose 部署 mysql、link 与 elasticsearch

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
version: '3'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.13
    container_name: elasticsearch
    user: root
    environment:
      - discovery.type=single-node
      - 'ES_JAVA_OPTS=-Xms512m -Xmx512m'
      - TZ=Asia/Shanghai
    volumes:
      - ./data/elasticsearch/data:/usr/share/elasticsearch/data
    restart: always
    ports:
      - '9200:9200'
      - '9300:9300'
    networks:
      - douyin_net

  kibana:
    image: docker.elastic.co/kibana/kibana:7.17.13
    container_name: kibana
    environment:
      - elasticsearch.hosts=http://elasticsearch:9200
      - TZ=Asia/Shanghai
    restart: always
    networks:
      - douyin_net
    ports:
      - '5601:5601'
    depends_on:
      - elasticsearch

  mysql:
    image: mysql:latest
    container_name: mysql
    environment:
      TZ: Asia/Shanghai
      MYSQL_ROOT_PASSWORD: my-secret-pw
    ports:
      - '3306:3306'
    volumes:
      # 数据挂载 - Data mounting
      - ./data/mysql/data:/var/lib/mysql
      - ./data/mysql/my.cnf:/etc/my.cnf
    privileged: true
    restart: always
    networks:
      - douyin_net

  jobmanager:
    image: flink:latest
    container_name: jobmanager
    expose:
      - '6123'
    ports:
      - '8081:8081'
    command: jobmanager
    environment:
      - TZ=Asia/Shanghai
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    restart: always
    networks:
      - douyin_net

  taskmanager:
    image: flink:latest
    container_name: taskmanager
    expose:
      - '6121'
      - '6122'
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
      - TZ=Asia/Shanghai
    restart: always
    networks:
      - douyin_net

networks:
  douyin_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.16.0.0/16

其中 mysql 挂载的配置文件内容

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
[mysqld]
skip-host-cache
skip-name-resolve
datadir=/var/lib/mysql
socket=/var/run/mysqld/mysqld.sock
secure-file-priv=/var/lib/mysql-files
user=mysql
log-bin=mysql-bin
binlog-format=ROW
server_id=66
pid-file=/var/run/mysqld/mysqld.pid
[client]
socket=/var/run/mysqld/mysqld.sock

!includedir /etc/mysql/conf.d/

主要是为了开启 binlog

5. 配置

先启动集群

1
docker-compose up -d

检查 binlog 是否开启

1
2
3
docker exec mysql -uroot -pmy-secret-pw
show variables like 'log_bin';
show variables like 'binlog_format';

下载 mysql、es 的 connector 并拷贝到容器内部

1
2
3
docker cp ./flink-sql-connector-elasticsearch7-3.0.1-1.17.jar jobmanager:/opt/flink/lib/
docker cp ./flink-sql-connector-mysql-cdc-2.4.1.jar jobmanager:/opt/flink/lib/
docker restart jobmanager

6. 测试

进入 mysql 容器/连接 mysql 创建 mysql 数据库表

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

进入 jobmanager 容器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
docker exec -it jobmanager /bin/bash
cd bin && ./sql-client.sh
SET execution.checkpointing.interval = 3s;
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.2.248',
'port' = '3306',
'username' = 'root',
'password' = 'my-secret-pw',
'database-name' = 'mydb',
'table-name' = 'orders'
);

CREATE TABLE es_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.2.248:9200',
'index' = 'orders'
);

INSERT INTO es_orders SELECT * FROM orders;

访问 kibana 查询 es 数据

1
2
3
4
5
6
GET orders/_search
{
  "query":{
    "match_all": {}
  }
}
Licensed under CC BY-NC-SA 4.0
comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计