1. 概述
Canal 是阿里开源的一款 MySQL 数据库增量日志解析工具,提供增量数据订阅和消费,可以实现如下业务:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
Canal 的原理详见Wiki ,简单理解就是模拟 MySQL 的主从备份,工作流程如下:
- Canal 将自己伪装成一个 MySQL 的从服务器,向主服务器发送 dump 协议
- MySQL 主服务器收到 dump 协议后,推送日志给 Canal
- Canal 解析日志流
Canal 包含了如下几个服务:
- canal-server :Canal 服务端,核心工作都在此处
- canal-admin :Canal管理端,为Canal提供整体配置管理、节点运维等面向运维的功能,需要 Canal 在 1.14版本以上
Canal 当前已经支持 Prometheus、Kafka等,并且支持单机模式和集群模式两种部署方式,这里我们结合 MySQL、ZooKeeper、Kafka、ELasticsearch、Kibana 来试验。
2. 资源准备
2.1 部署方案
安装前先说明下当前的环境,规划一下部署方案,这里还是同之前安装 etcd 等服务一样,部署在 node1(usefulness=devops)上。
节点 | 角色 | IP | 配置 | Label |
---|---|---|---|---|
master | master, etcd | 192.168.1.100 | 4核4G50G | usefulness=schedule |
node1 | worker | 192.168.1.101 | 8核32G100G | usefulness=devops |
node2 | worker | 192.168.1.102 | 8核12G100G | usefulness=business |
node3 | worker | 192.168.1.103 | 8核12G100G | usefulness=business |
2.2 域名
需要将以下域名都指向 node1,因为这里部署的所有服务以及traefik 部署在 node1(192.168.1.101) 上,并且traefik 监听了 node1 的80端口。
- kafka.local.com
- canal.local.com
- canal-admin.local.com
- kibana.local.com
2.3 依赖服务
依赖服务的部署之前都有介绍,这里列出链接,参照文档部署即可:
-
MySQL的部署方法参见k8s搭建MySQL 。
-
ZooKeeper和Kafka的部署方法参见Helm安装ZooKeeper和Kafka 。
-
ELasticsearch和Kibana的部署方法参见K8s使用ElasticStack结合Kafka收集日志 。
我之前已经部署好了,这里列出部署后的服务的内部dns或外部访问地址:
- MySQL:mysql-primary.devops.svc.cluster.local:3306
- ELasticsearch
- k8s内部:elasticsearch-client.devops.svc.cluster.local:9200
- k8s外部:es.local.com:4511
- Kibana:kibana.local.com
- Zookeeper:zookeeper.devops.svc.cluster.local:2181
- Kafka
- k8s内部:kafka.devops.svc.cluster.local:9092
- k8s外部:kafka.local.com:19090、kafka.local.com:19091、kafka.local.com:19092
2.3 Canal
这里针对我部署Canal踩到的一些坑做个整理,部署的时候可以做个参考。
2.3.1 关于数据库
-
MySQL 需要开启 binlog,如果是 8.0 以上的版本,默认已开启,并且设置为了 row 模式
[mysqld] # 开启 binlog log-bin=mysql-bin # 选择 ROW 模式 binlog-format=ROW # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 server_id=1
-
需要为 Canal 创建一个具有同步数据权限的账号 canal
# canal CREATE USER canal IDENTIFIED BY 'si1n39zuybQ='; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
-
需要为 Canal Admin 创建一个访问数据库的账号密码 canal_admin
# canal_admin CREATE USER canal_admin IDENTIFIED BY '2HcZUUTVyNo='; GRANT ALL PRIVILEGES ON canal_admin.* TO 'canal_admin'@'%'; FLUSH PRIVILEGES;
-
canal-admin 的默认数据库是 canal_manager,我修改为了 canal_admin,需要在部署前创建数据库并导入数据
# 创建数据库 create schema canal_admin;
-
canal-admin 的数据库sql可以点此下载 ,需要注意,这份sql中默认向 canal_user 表写入了 admin 账号,密码是 123456,如果需要修改,导入前修改 sql 中的如下部分密码即可,此处的密码生成方式见下一条。
-
MySQL 生成密码的方法,结果中需要去掉最开始的 *
# mysql 5.7.5 以下版本: select password('xxx'); # mysql 5.7.5 以上版本: SELECT CONCAT('*', UPPER(SHA1(UNHEX(SHA1('xxx')))));
-
本次测试需要创建一个测试数据库和测试数据表
# 创建数据库 create schema canal_test; # 切换数据库 use canal_test; # 创建数据表 create table if not exists user ( id int(11) unsigned auto_increment comment 'ID', name varchar(100) not null comment '姓名', gender tinyint(1) not null comment '性别,0:男,1:女', mobile varchar(100) not null comment '手机号', email varchar(100) not null comment '邮箱', created_at int(10) unsigned default 0 not null comment '创建时间', updated_at int(10) unsigned default 0 not null comment '更新时间', deleted_at int(10) unsigned default 0 not null comment '删除时间', primary key (`id`), index index_mobile (`mobile`), index index_email (`email`) ) engine = innodb default charset = utf8mb4 collate = utf8mb4_general_ci comment '用户表';
2.3.2 关于账号密码
Canal 的密码比较绕,主要提现在账号密码较多以及明文密文配置,这里简单总结下:
- 所有密码的加密都使用上述MySQL 生成密码的方法进行加密
- 有的密码有时候需要配置明文,有时候又需要配置密文
# | 类型 | 账号 | 密码 | 用处 |
---|---|---|---|---|
1 | 数据库 | canal | si1n39zuybQ= | 授权 Canal 作为从服务器访问MySQL |
2 | 数据库 | canal_admin | 2HcZUUTVyNo= | canal_admin访问数据库账号 |
3 | web | admin | O8avksPO95s= | canal-admin web 登录账号密码 |
4 | tcp | admin | QLyPwzWazjg= | canal server 连接 admin 时的账号密码 |
5 | tcp | canal | dAsQZ2otzK4= | 客户端连接 Server 时的账号密码,在 canal.properties 中配置 |
需要注意的是,3和4是两个账号密码,一个用于web界面登录,一个用于 canal server 连接 admin。
3. 部署Canal
官方已经为 Cannal 提供了docker镜像,这里我们自己使用官方镜像编写 Deployment(admin) 和 Statefulset(server) 来部署到 K8s 上。
3.1 部署 Canal Admin
Canal Admin 配置如下:
-
admin_user 和 admin_password
Canal Server 连接 admin 时的账号密码,admin 端配置的是明文,但 server 端配置的是密文。
-
数据库配置信息
此处配置都是明文,需要注意,管理员密码需要合数据库中的一致,但是数据库中存储的是密文。
完整的配置如下:
apiVersion: v1
kind: ConfigMap
metadata:
name: canal-admin
namespace: devops
data:
admin_user: "admin"
admin_password: "QLyPwzWazjg="
datasource_address: "mysql-primary.devops.svc.cluster.local:3306"
datasource_database: "canal_admin"
datasource_username: "canal_admin"
datasource_password: "2HcZUUTVyNo="
完整的资源清单如下,需要注意,部署的端口是 8089,这里部署了 ingressroute,将域名 canal-admin.local.com 的流量指定到了该服务,完整的资源清单如下:
apiVersion: v1
kind: ConfigMap
metadata:
name: canal-admin
namespace: devops
data:
admin_user: "admin"
admin_password: "QLyPwzWazjg="
datasource_address: "mysql-primary.devops.svc.cluster.local:3306"
datasource_database: "canal_admin"
datasource_username: "canal_admin"
datasource_password: "2HcZUUTVyNo="
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: canal-admin
namespace: devops
labels:
app.kubernetes.io/name: canal-admin
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: canal-admin
template:
metadata:
name: canal-admin
creationTimestamp: null
labels:
app.kubernetes.io/name: canal-admin
spec:
containers:
- name: canal-admin
image: 'canal/canal-admin:latest'
imagePullPolicy: IfNotPresent
ports:
- name: web
containerPort: 8089
protocol: TCP
env:
- name: server.port
value: '8089'
- name: canal.adminUser
valueFrom:
configMapKeyRef:
name: canal-admin
key: admin_user
- name: canal.adminPasswd
valueFrom:
configMapKeyRef:
name: canal-admin
key: admin_password
- name: spring.datasource.address
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_address
- name: spring.datasource.database
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_database
- name: spring.datasource.username
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_username
- name: spring.datasource.password
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_password
resources:
requests:
cpu: 250m
memory: 256Mi
livenessProbe:
httpGet:
path: /
port: 8089
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
readinessProbe:
httpGet:
path: /
port: 8089
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
restartPolicy: Always
nodeSelector:
usefulness: devops
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/name: canal-admin
namespaces:
- devops
topologyKey: kubernetes.io/hostname
revisionHistoryLimit: 10
---
kind: Service
apiVersion: v1
metadata:
name: canal-admin
namespace: devops
spec:
ports:
- protocol: TCP
port: 8089
targetPort: 8089
selector:
app.kubernetes.io/name: canal-admin
---
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
name: canal-admin
namespace: devops
spec:
entryPoints:
- web
routes:
- match: Host(`canal-admin.local.com`)
kind: Rule
services:
- name: canal-admin
port: 8089
使用 kubectl apply 后结果如下,部署了一个 configmap、deployment、service 和 ingressroute。
root@master:/data/yaml/canal# kubectl apply -f admin.yml
configmap/canal-admin unchanged
deployment.apps/canal-admin created
service/canal-admin created
ingressroute.traefik.containo.us/canal-admin created
3.2 集群配置
Canal Admin 部署完成后,访问 canal-admin.local.com 即可访问管理后台,登入进去后,点击左侧的集群管理,之后点击新建集群,这里需要输入集群名称和ZooKeeper地址,我输入的集群名称是local,点击确定后可以看到新建的集群。
这时候再点击集群后面操作按钮下的主配置,进行 canal.properties 配置,如果是单机版的话,该配置需要在 server 中维护,集群中需要在 admin 配置,server 连接上 admin 后统一拉取配置,如下:
第一次配置时为配置为空,点击 载入模板 按钮,之后再进行编辑,这里需要修改如下几个地方:
-
canal.user 和 cancal.passwd
客户端访问 Canal Server 的账号密码,密码需要是密文配置,这里我配置的是 canal 和 64D406545EB954F76A16939C5CA9A7F49C10DDF2,对应的明文是 dAsQZ2otzK4=。
-
canal.admin.passwd
修改为 admin 的密码,这里的账号密码是 server 端连接的时候用的,默认账号密码都是 admin,其中密码是密文配置,对应的c anal server 的配置也是密文配置,这里我配置的是 15C038E1A3F6286C301FC986FE8E28CFFBD73C5B,对应的明文是 QLyPwzWazjg=。
- canal.zkServers
修改为 ZooKeeper 对应的访问地址,这里我配置的是 zookeeper.devops.svc.cluster.local:2181。
-
kafka.bootstrap.servers
修改为 Kafka 对应的访问地址,这里我配置的是 kafka.devops.svc.cluster.local:9092。
-
canal.serverMode = kafka
指定数据写入 Kafka
-
canal.instance.global.spring.xml
全局的spring配置方式的组件文件,需要配置成 classpath:spring/default-instance.xml
修改为上述项后点击保存,保存操作会导致 Canal Server 重启,完整的配置如下:
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = 64D406545EB954F76A16939C5CA9A7F49C10DDF2
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 15C038E1A3F6286C301FC986FE8E28CFFBD73C5B
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers = zookeeper.devops.svc.cluster.local:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = kafka.devops.svc.cluster.local:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
3.3 部署 Canal Server
这里我部署方式为 StatefulSet,replicas 配置为3,对应启动了3个Service,并且都绑定了节点端口,这样k8s外部也可以访问,对应的访问方式如下:
- canal.local.com:11111
- canal.local.com:11112
- canal.local.com:11113
Canal Server 的配置如下:
-
admin_user 和 admin_password
Canal Server 连接 admin 时的账号密码,admin 端配置的是明文,但 server 端配置的是密文,需要和上一步集群配置中的 canal.admin.user 和 canal.admin.passwd 一致。
-
admin_register_cluster
上一步建好的集群名称,我配置的是 local。
-
admin_register_auto
是否自动注册,如果配置为true,则会自动注册到集群中,否则需要在管理后台手动添加 Server 信息。
完整的配置如下:
apiVersion: v1
kind: ConfigMap
metadata:
name: canal-server
namespace: devops
data:
admin_manager: "canal-admin.devops.svc.cluster.local:8089"
admin_port: "11110"
admin_user: "admin"
admin_password: "17F69641AD5AF38C9BE516C3D3AC78D2D672D802"
admin_register_cluster: "local"
admin_register_auto: "true"
完整的资源清单如下:
apiVersion: v1
kind: ConfigMap
metadata:
name: canal-server
namespace: devops
data:
admin_manager: "canal-admin.devops.svc.cluster.local:8089"
admin_port: "11110"
admin_user: "admin"
admin_password: "15C038E1A3F6286C301FC986FE8E28CFFBD73C5B"
admin_register_cluster: "local"
admin_register_auto: "true"
---
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: canal-server
namespace: devops
labels:
app.kubernetes.io/name: canal-server
spec:
replicas: 3
selector:
matchLabels:
app.kubernetes.io/name: canal-server
template:
metadata:
name: canal-server
creationTimestamp: null
labels:
app.kubernetes.io/name: canal-server
spec:
containers:
- name: canal-server
image: 'canal/canal-server:latest'
imagePullPolicy: IfNotPresent
ports:
- name: tcp
containerPort: 11111
protocol: TCP
env:
- name: canal.admin.manager
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_manager
- name: canal.admin.port
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_port
- name: canal.admin.user
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_user
- name: canal.admin.passwd
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_password
- name: canal.admin.register.cluster
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_register_cluster
- name: canal.admin.register.auto
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_register_auto
resources:
requests:
cpu: 250m
memory: 256Mi
livenessProbe:
tcpSocket:
port: 11112
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
readinessProbe:
tcpSocket:
port: 11112
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
restartPolicy: Always
nodeSelector:
usefulness: devops
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/name: canal-server
namespaces:
- devops
topologyKey: kubernetes.io/hostname
serviceName: canal-server
revisionHistoryLimit: 10
如果配置的 canal.serverMode 为 tcp 的话,容器内会启动 11111 端口,这时可以创建几个 Service 进行访问,如果是 mq 模式,则不会启动该端口。
kind: Service
apiVersion: v1
metadata:
name: canal-server-0-external
namespace: devops
labels:
app.kubernetes.io/name: canal-server
pod: canal-server-0
spec:
ports:
- name: tcp-canal-server
protocol: TCP
port: 11111
targetPort: 11111
nodePort: 11111
selector:
app.kubernetes.io/name: canal-server
statefulset.kubernetes.io/pod-name: canal-server-0
type: NodePort
sessionAffinity: None
externalTrafficPolicy: Cluster
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
---
kind: Service
apiVersion: v1
metadata:
name: canal-server-1-external
namespace: devops
labels:
app.kubernetes.io/name: canal-server
pod: canal-server-1
spec:
ports:
- name: tcp-canal-server
protocol: TCP
port: 11111
targetPort: 11111
nodePort: 11112
selector:
app.kubernetes.io/name: canal-server
statefulset.kubernetes.io/pod-name: canal-server-1
type: NodePort
sessionAffinity: None
externalTrafficPolicy: Cluster
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
---
kind: Service
apiVersion: v1
metadata:
name: canal-server-2-external
namespace: devops
labels:
app.kubernetes.io/name: canal-server
pod: canal-server-2
spec:
ports:
- name: tcp-canal-server
protocol: TCP
port: 11111
targetPort: 11111
nodePort: 11113
selector:
app.kubernetes.io/name: canal-server
statefulset.kubernetes.io/pod-name: canal-server-2
type: NodePort
sessionAffinity: None
externalTrafficPolicy: Cluster
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
使用 kubectl apply 后结果如下:
root@master:/data/yaml/canal# kubectl apply -f server.yml
configmap/canal-server created
statefulset.apps/canal-server created
root@master:/data/yaml/canal# kubectl apply -f service.yml
service/canal-server-0-external created
service/canal-server-1-external created
service/canal-server-2-external created
此时查看管理后台,可以看到三个 Server 已经加入到了集群中:
3.4 创建 Instance
进入管理后台,左侧菜单点击 Instance 管理,点击新建instance,在表单页面中输入名称,选择集群/主机,点击载入模板后修改配置信息,配置中修改如下内容后,点击保存。
-
canal.instance.master.address
访问 MySQL 服务的连接信息,这里我配置的是 mysql-primary.devops.svc.cluster.local:3306。
-
canal.instance.dbUsername 和 canal.instance.dbPassword
访问 MySQL 服务的账号密码,这里我配置的是 canal 和 si1n39zuybQ=。
-
canal.mq.topic
写入消息队列的主题名称,这里我配置的是 canal_test。
-
canal.instance.filter.regex
监听的数据库及表的正则信息,我配置的是 canal_test\..*
完整的配置如下:
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=mysql-primary.devops.svc.cluster.local:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=si1n39zuybQ=
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=canal_test\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=canal_test
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
因为选择的是集群,所以默认停止状态下没有分配主机,点击操作按钮中的启动,可以看到分配了主机:
3.5 测试
这里我们使用 Golang 编写代码消费 Kafka 的主题 canal_test,通过手动执行 Sql,并观察每一次 sql 执行后 Kibana 中看到的数据。
Golang 代码启动时会检测 ELasticsearch 中是否存在索引 user,如果不存在会自动创建索引,我们先启动创建下索引,之后在 Kibana 上创建索引模式,这样就可以执行 Sql 观察结果,如下图:
接着我们依次执行 Insert、Update 以及 Delete 三种类型的 Sql
3.5.1 Insert
Sql 如下:
insert into user (`name`, `gender`, `mobile`, `email`, `created_at`, `updated_at`, `deleted_at`) values ('周杰伦', '0', '13565654521', '[email protected]', 1646453618, 1646453618, 0), ('张学友', '0', '13565654522', '[email protected]', 1646453618, 1646453618, 0), ('林俊杰', '0', '13565654523', '[email protected]', 1646453618, 1646453618, 0), ('王力宏', '0', '13565654524', '[email protected]', 1646453618, 1646453618, 0), ('孙燕姿', '1', '13565654525', '[email protected]', 1646453618, 1646453618, 0), ('梁静茹', '1', '13565654526', '[email protected]', 1646453618, 1646453618, 0), ('谭维维', '1', '13565654527', '[email protected]', 1646453618, 1646453618, 0), ('五月天', '0', '13565654528', '[email protected]', 1646453618, 1646453618, 0), ('腾格尔', '0', '13565654529', '[email protected]', 1646453618, 1646453618, 0), ('陈奕迅', '0', '13565654530', '[email protected]', 1646453618, 1646453618, 0);
执行后观察 Kibana 结果,可以看到已经写入了10个用户:
3.5.2 Update
Sql 如下:
update user set email = '[email protected]', updated_at = '1646453628' where id = 1;
执行后观察 Kibana 结果,可以看到周杰伦的邮箱和更新时间都以变更:
3.5.3 Delete
删除操作会删掉数据库中的数据,但是 Es 中的数据不会删除,只会变更 is_deleted 和 deleted_at 字段。
Sql 如下
delete from user where id = 1;
执行后观察 Kibana 结果,可以看到 is_deleted 和 deleted_at 字段都已更新:
3.6 代码
完整的 Golang 代码如下:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/Shopify/sarama"
"github.com/olivere/elastic/v7"
"github.com/olivere/elastic/v7/config"
)
// esServer ES连接信息
var esServer = "http://es.local.com:19200"
// esIndex ES索引
var esIndex = "user"
// esUsername ES账号
var esUsername = "elastic"
// esPassword ES密码
var esPassword = "eYTmelHfmA4="
// esClient ES客户端
var esClient *elastic.Client
// kafkaServers Kafka连接信息
var kafkaServers = []string{
"kafka.local.com:19090",
"kafka.local.com:19091",
"kafka.local.com:19092",
}
// kafkaTopic Kafka主题
var kafkaTopic = "canal_test"
// kafkaConsumer Kafka消费者
var kafkaConsumer sarama.Consumer
// 上下文
var ctx = context.Background()
func main() {
var err error
// 获取ES客户端
esClient, err = getEsClient()
if err != nil {
panic(err)
}
// 获取Kafka消费者
sarama.Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)
kafkaConsumer, _ = sarama.NewConsumer(kafkaServers, nil)
partitions, _ := kafkaConsumer.Partitions(kafkaTopic)
for partition := range partitions {
go func(partition int32) {
pc, _ := kafkaConsumer.ConsumePartition(kafkaTopic, partition, sarama.OffsetOldest)
for v := range pc.Messages() {
var kafkaMsg KafkaMsg
_ = json.Unmarshal(v.Value, &kafkaMsg)
for _, user := range kafkaMsg.Data {
data, _ := json.Marshal(kafkaMsg.Data)
fmt.Println(kafkaMsg.Type, kafkaMsg.Database, kafkaMsg.Table, string(data), kafkaMsg.Ts)
if kafkaMsg.Type == "DELETE" {
user.DeletedAt = fmt.Sprintf("%d", kafkaMsg.Ts/1000)
}
err = saveUserToEs(user)
if err != nil {
fmt.Printf("存入用户信息失败,error: %v\n", err)
}
}
}
}(int32(partition))
}
select {}
}
// KafkaMsg Kafka消息
type KafkaMsg struct {
Data []*User `json:"data"`
Database string `json:"database"`
Es int64 `json:"es"`
ID int `json:"id"`
IsDdl bool `json:"isDdl"`
MysqlType struct {
ID string `json:"id"`
Name string `json:"name"`
Gender string `json:"gender"`
Mobile string `json:"mobile"`
Email string `json:"email"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
DeletedAt string `json:"deleted_at"`
} `json:"mysqlType"`
Old interface{} `json:"old"`
PkNames []string `json:"pkNames"`
SQL string `json:"sql"`
SQLType struct {
ID int `json:"id"`
Name int `json:"name"`
Gender int `json:"gender"`
Mobile int `json:"mobile"`
Email int `json:"email"`
CreatedAt int `json:"created_at"`
UpdatedAt int `json:"updated_at"`
DeletedAt int `json:"deleted_at"`
} `json:"sqlType"`
Table string `json:"table"`
Ts int64 `json:"ts"`
Type string `json:"type"`
}
// User 用户
type User struct {
ID string `json:"id"`
Name string `json:"name"`
Gender string `json:"gender"`
Mobile string `json:"mobile"`
Email string `json:"email"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
DeletedAt string `json:"deleted_at"`
}
// esMapping ES索引映射
const esMapping = `
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"gender": {
"type": "keyword"
},
"mobile": {
"type": "keyword"
},
"email": {
"type": "keyword"
},
"is_deleted": {
"type": "keyword"
},
"created_at": {
"type": "date"
},
"updated_at": {
"type": "date"
},
"deleted_at": {
"type": "date"
}
}
}
}`
// getEsClient 获取ES客户端
func getEsClient() (*elastic.Client, error) {
// 关闭嗅探器和健康检查
off := false
client, err := elastic.NewClientFromConfig(
&config.Config{
URL: esServer,
Index: esIndex,
Username: esUsername,
Password: esPassword,
Sniff: &off,
Healthcheck: &off,
},
)
if err != nil {
return nil, err
}
// 检测索引是否存在,不存在的话创建索引
res, err := client.IndexExists(esIndex).Do(ctx)
if err != nil {
return nil, err
}
if !res {
_, err = client.CreateIndex(esIndex).BodyString(esMapping).Do(ctx)
if err != nil {
return nil, err
}
}
return client, nil
}
// saveUserToEs 用户信息保存到ES
func saveUserToEs(user *User) error {
m := make(map[string]interface{})
b, _ := json.Marshal(user)
_ = json.Unmarshal(b, &m)
m["deleted_at"] = "0"
m["is_deleted"] = "否"
m["gender"] = "男"
if user.DeletedAt != "0" {
m["is_deleted"] = "是"
m["deleted_at"] = parseTime(user.DeletedAt)
}
if user.Gender != "0" {
m["gender"] = "女"
}
m["created_at"] = parseTime(m["created_at"].(string))
m["updated_at"] = parseTime(m["updated_at"].(string))
b, _ = json.Marshal(m)
_, err := esClient.Index().Index(esIndex).Id(user.ID).BodyString(fmt.Sprintf("%s", b)).Do(ctx)
return err
}
// parseTime 解析时间
func parseTime(t string) string {
tsp, _ := strconv.ParseInt(t, 10, 64)
return time.Unix(tsp, 0).Format("2006-01-02T15:04:05")
}
测试日志如下:
[Sarama] 2022/03/11 12:01:45 Initializing new client
[Sarama] 2022/03/11 12:01:45 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2022/03/11 12:01:45 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2022/03/11 12:01:45 client/metadata fetching metadata for all topics from broker kafka.local.com:19091
[Sarama] 2022/03/11 12:01:45 Connected to broker at kafka.local.com:19091 (unregistered)
[Sarama] 2022/03/11 12:01:45 client/brokers registered new broker #0 at 192.168.1.101:19090
[Sarama] 2022/03/11 12:01:45 client/brokers registered new broker #2 at 192.168.1.101:19092
[Sarama] 2022/03/11 12:01:45 client/brokers registered new broker #1 at 192.168.1.101:19091
[Sarama] 2022/03/11 12:01:45 Successfully initialized new client
[Sarama] 2022/03/11 12:01:45 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2022/03/11 12:01:45 Connected to broker at 192.168.1.101:19090 (registered as #0)
[Sarama] 2022/03/11 12:01:45 consumer/broker/0 added subscription to canal_test/0
INSERT canal_test user [{"id":"1","name":"周杰伦","gender":"0","mobile":"13565654521","email":"[email protected]","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"2","name":"张学友"er":"0","mobile":"13565654522","email":"zhangxueyou@gmail.com","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"3","name":"林俊杰","gender":"0","mobile":"13565654523","email":"linjunjimail.com","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"4","name":"王力宏","gender":"0","mobile":"13565654524","email":"wanglihong@gmail.com","created_at":"1646453618","updated_at":46453618","deleted_at":"0"},{"id":"5","name":"孙燕姿","gender":"1","mobile":"13565654525","email":"[email protected]","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"6","name":"梁静茹nder":"1","mobile":"13565654526","email":"[email protected]","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"7","name":"谭维维","gender":"1","mobile":"13565654527","email":"[email protected]","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"8","name":"五月天","gender":"0","mobile":"13565654528","email":"[email protected]","created_at":"1646453618","updated_at"646453618","deleted_at":"0"},{"id":"9","name":"腾格尔","gender":"0","mobile":"13565654529","email":"tenggeer@gmail.com","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"10","name":"陈奕gender":"0","mobile":"13565654530","email":"chenyixun@gmail.com","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"}] 1646969682003
UPDATE canal_test user [{"id":"1","name":"周杰伦","gender":"0","mobile":"13565654521","email":"jay@gmail.com","created_at":"1646453618","updated_at":"1646453628","deleted_at":"0"}] 1646970591004
DELETE canal_test user [{"id":"1","name":"周杰伦","gender":"0","mobile":"13565654521","email":"jay@gmail.com","created_at":"1646453618","updated_at":"1646453628","deleted_at":"0"}] 1646970723986
4. 总结
本篇文章是学习 Canal 的记录,先简单介绍了下 Canal 的原理,之后通过一个实例完整验证了下。Canal 的配置项比较多,文章篇幅有限,只简单介绍了部分重要的选项,完整的学习文档还请参考官方文档 。