背景

在很多场景中我们都会使用到消费mysql的实时增量数据.那么canal就是为这个场景而生的.canal是阿里巴巴开源的一个MySQL 数据库增量日志解析,提供增量数据订阅和消费,可以搭配kafka.各种mq或者ES以及其他nosql数据库使用


使用场景

大家如果不想看原理.可以先跳转到下边,搭建出来.再去理解各个模块.也不失为一个好方法

canal工作原理

mysql的主从复制原理图


  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)


canal架构


说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1..n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

EventParser(数据源接入)设计


整个parser过程大致可分为几步:

  1. Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
  2. Connection建立链接,发送BINLOG_DUMP指令
    // 0. write command number

// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name

  1. Mysql开始推送Binaly Log
  2. 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
    // 补充字段名字,字段类型,主键信息,unsigned类型处理
  3. 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
  4. 存储成功后,定时记录Binaly Log位置


mysql的binlog文件格式

EventSink(数据过滤,加工,分发的工作)设计


说明:

  • 数据过滤:支持通配符的过滤模式,表名,字段内容等
  • 数据路由/分发:解决1:n (1个parser对应多个store的模式)
  • 数据归并:解决n:1 (多个parser对应1个store)
  • 数据加工:在进入store之前进行额外的处理,比如join

EventStore(数据存储)设计

  1. 目前仅实现了Memory内存模式,后续计划增加本地file存储,mixed混合模式
  2. 借鉴了Disruptor的RingBuffer的实现思路


定义了3个cursor

  • Put : Sink模块进行数据存储的最后一次写入位置
  • Get : 数据订阅获取的最后一次提取位置
  • Ack : 数据消费成功的最后一次消费位置

借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:


instance 设计


instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。

抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:

  • manager方式: 和你自己的内部web console/manager系统进行对接。(目前主要是阿里公司内部使用)
  • spring方式:基于spring xml + properties进行定义,构建spring配置.

server 设计


server代表了一个canal的运行实例,为了方便组件化使用,特意抽象了Embeded(嵌入式) / Netty(网络访问)的两种实现

  • Embeded : 对latency和可用性都有比较高的要求,自己又能hold住分布式的相关技术(比如failover)
  • Netty : 基于netty封装了一层网络协议,由canal server保证其可用性,采用的pull模型,当然latency会稍微打点折扣,不过这个也视情况而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠拢,push在数据量大的时候会有一些问题)

canal的配置启动

获取对应的canal安装包

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

解压到对应目录

tar xf canal.deployer-1.1.5.tar.gz
查看目录结构
[root@localhost ~]# ll /canal/
总用量 58800
drwxr-xr-x 2 root root 93 8月 10 16:44 bin
-rw-r--r-- 1 root root 60205298 8月 10 13:14 canal.deployer-1.1.5.tar.gz
drwxr-xr-x 5 root root 123 8月 10 20:18 conf
drwxr-xr-x 2 root root 4096 8月 10 13:22 lib
drwxrwxrwx 4 root root 34 8月 10 14:21 logs
drwxrwxrwx 2 root root 177 4月 19 16:15 plugin

前期准备工作:

注意对应参数设置
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

在MySQL数据库内创建canal专用用户

CREATE USER canal IDENTIFIED BY 'canal'; 注意:如果是8.0的话创建用户使用密码插件mysql_native_password
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

canal.properties配置文件(全局配置文件)

全局配置文件分为:common argument模块,destinations模块,以及对应的消息队列模块RocketMQ Kafka RabbitMQ
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka #设置你需要的对应模式,我这里的消息中间件是kafka
canal.destinations = example #对应的实例,默认是example 如果多个实例.就在conf/下边创建对应目录
列如这里是canal.destinations = example1 那么就需要在conf目录下创建example1目录

选择你的消息中间件模块进行配置.我的是kafka.所以我这里只配置kafka的配置
kafka.bootstrap.servers = 127.0.0.1:9092 #指定kafka的ip及端口
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

instance参数文件:

  1. 在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件
    比如:

canal.destinations = example1,example2

这时需要创建example1和example2两个目录,每个目录里各自有一份instance.properties.

  1. canal自带了一份instance.properties demo,可直接复制conf/example目录进行配置修改
  2. 如果canal.properties未定义instance列表,但开启了canal.auto.scan时
  • server第一次启动时,会自动扫描conf目录下,将文件名做为instance name,启动对应的instance
  • server运行过程中,会根据canal.auto.scan.interval定义的频率,进行扫描发现目录有新增,启动新的instance发现目录有删除,关闭老的instance发现对应目录的instance.properties有变化,重启instance

vim conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .*..*

canal.mq.topic=example --默认topic,后端kafka或者mq的topic,根据实际进行配置
canal.mq.dynamicTopic=test --动态topic,如果这里指定了.那么就会在kafka生成这里指定的topic,没有指定就会默认存储在默认topic
指定匹配对应的库,一个库的所有表都会发送到库名的topic上
# canal.mq.dynamicTopic=mytest1.user,mytest2..*,.*..* 语法

canal.mq.dynamicTopic 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

例子1:test.test 指定匹配的单表,发送到以test_test为名字的topic上
例子2:.*..* 匹配所有表,则每个表都会发送到各自表名的topic上
例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
例子4:test..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
例子5:test,test1.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值
为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

例子1: test:test.test 指定匹配的单表,发送到以test为名字的topic上
例子2: test:.*..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
例子4:testA:test..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
例子5:test0:test,test1:test1.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值
大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

canal.mq.partitionHash 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

例子1:test.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
例子2:.*..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
例子3:.*..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
例子4: 匹配规则啥都不写,则默认发到0这个partition上
例子5:.*..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
例子6: test.test:id,...* , 针对test的表按照id散列,其余的表按照table散列
注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

instance文件主要是配置当前要开启的canal实例的配置

启动canal

进到对应的canal目录执行
sh bin/startup.sh
关闭
sh bin/stop.sh

查看server日志

2021-08-10 16:44:41.376 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2021-08-10 16:44:41.462 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.enable' was supplied but isn't a known config.
2021-08-10 16:44:41.464 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.krb5.file' was supplied but isn't a known config.
2021-08-10 16:44:41.464 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.jaas.file' was supplied but isn't a known config.
2021-08-10 16:44:41.465 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.

查看instance日志

2021-08-10 16:44:17.441 [Thread-6] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - stop CannalInstance for null-example
2021-08-10 16:44:17.450 [Thread-6] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - stop successful....
2021-08-10 16:44:42.421 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2021-08-10 16:44:42.432 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*..*$
2021-08-10 16:44:42.432 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql.slave_.*$
2021-08-10 16:44:42.524 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

配置kafka

安装kafka

https://github.com/alibaba/canal/wiki/Kafka-QuickStart
https://kafka.apache.org/quickstart

启动,停止:

bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-stop.sh

查看消息是否同步过来:

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

kafka常用命令

创建一个topic
[root@localhost kafka_2.13-2.8.0]# bin/kafka-topics.sh --create --topic lzm --bootstrap-server localhost:9092
Created topic lzm.
查看对应topic描述
[root@localhost kafka_2.13-2.8.0]# bin/kafka-topics.sh --describe --topic lzm --bootstrap-server localhost:9092
Topic: lzm TopicId: KBSTdK-uRxyw8pGEc-pFag PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: lzm Partition: 0 Leader: 0 Replicas: 0 Isr: 0

插入数据:
[root@localhost kafka_2.13-2.8.0]# bin/kafka-console-producer.sh --topic lzm --bootstrap-server localhost:9092
>lzm
>this is test

查看对应topic下的数据
[root@localhost kafka_2.13-2.8.0]# bin/kafka-console-consumer.sh --topic lzm --from-beginning --bootstrap-server localhost:9092
lzm
this is test

列出所有的topic:
[root@localhost kafka_2.13-2.8.0]# bin/kafka-topics.sh --list --bootstrap-server localhost:9092

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注