Canal日志增量订阅和消费

基于日志增量订阅和消费的业务

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

    MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

binlog

mysql的binlog是多文件存储,定位一个LogEvent需要通过binlog filename + binlog position,进行定位

mysql的binlog数据格式,按照生成的方式,主要分为:statement-based、row-based、mixed。

1
2
3
4
5
6
7
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)

目前canal支持所有模式的增量订阅(但配合同步时,因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为ROW模式)

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

架构

说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1..n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

canal-admin

v1.1.4+,引入canal-admin工程,支持面向WebUI的canal管理能力

get/ack/rollback协议介绍

  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
    a. batch id 唯一标识
    b. entries 具体的数据对象,对应的数据对象格式:EntryProtocol.proto
  • void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
  • void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

关键配置

服务配置canal.properties

1
2
//实例名称
canal.destinations = example

实例配置instance.properties

1
2
3
4
5
6
7
8
//mysql地址
canal.instance.master.address=192.168.x.x:3306
//数据库名
canal.instance.dbUsername=x
//数据库密码
canal.instance.dbPassword=x
//监听的schema筛选
canal.instance.filter.regex=x.*

关键代码

1
2
3
4
5
<dependency>  
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
    //获取地址
public List<InetSocketAddress> getAddress() {
return BinlogConfig.ips.stream()
.map(e -> {
String[] ip = e.split(":");
return new InetSocketAddress(ip[0], Integer.parseInt(ip[1]));
})
.collect(Collectors.toList());
}
//创建连接
private boolean createConnector() {
log.info("create connector ...");
connector = CanalConnectors.newClusterConnector(getAddress(),
BinlogConfig.destination,
BinlogConfig.username,
BinlogConfig.password);
log.info("create connector success");
return true;
}
//连接
private boolean connect() {
log.info("connect ...");
connector.connect();
connector.subscribe();
log.info("connect success");
return true;
}
//处理过程
public void process() {
while (BinlogConfig.running) {
log.info("processing ...");
// 获取指定数量的数据
Message message = connector.get(BinlogConfig.batchSize);
long batchId = message.getId();

//过滤有效待处理消息
List<CanalEntry.Entry> entries = message.getEntries().stream()
.filter(e -> e.getEntryType() == CanalEntry.EntryType.ROWDATA)
.collect(Collectors.toList());

if (batchId != -1 && !CollectionUtil.isEmpty(entries)) {
int size = entries.size();
long memSize = entries
.stream()
.mapToLong(e -> e.getHeader().getEventLength())
.sum();

log.info(receiveInfo, batchId, size, memSize);
BinlogConfig.parseFunction.apply(entries);
} else {
try {
//惩罚等待
TimeUnit.SECONDS.sleep(BinlogConfig.waitSeconds);
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
}

注意,get方法和getWithoutAck方法的区别是:

  • get方法会立即调用ack
  • getWithoutAck方法不会调用ack
------ 本文结束------

本文标题:Canal日志增量订阅和消费

文章作者:Perkins

发布时间:2019年11月18日

原始链接:https://perkins4j2.github.io/posts/53545/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。