Canal及MQ顺序性

顺序性

Binlog本身是有序的,写入到mq之后需要保障顺序。

方案

Canal目前选择支持Kafka/Rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力。

也就是Binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择。

Canal支持MQ数据的几种路由方式:

  • 单topic单分区,可以严格保证与binlog相同的顺序,但效率比较低,TPS只有2~3K。
  • 多topic单分区,由于是按照表划分topic,因此可以保证表级别的有序性,但是每个表的热度不一样,对于热点表仍然会有性能问题。
  • 单/多topic多分区,按照给定的hash方法来划分partition,性能无疑是最好的。但必须要多加小心,每个表的hash依据都必须是其主键或者主键组。只有保证每表每主键binlog的顺序性,才能准确恢复变动数据。

多线程消费

为了保证一个消费者中多个线程去处理时,不会使得消息的顺序被打乱,则可以在消费者中,消息分发至不同的线程时,加一个队列,消费者去做hash分发,将需要放在一起的数据,分发至同一个队列中,最后多个线程从队列中取数据。

生产指定分区

DefaultPartitioner默认的分区策略

  • 如果key为null

    • 则先根据topic名获取上次计算分区时使用的一个整数并加一。然后判断topic的可用分区数是否大于0,如果大于0则使用获取的nextValue的值和可用分区数进行取模操作。
    • 如果topic的可用分区数小于等于0,则用获取的nextValue的值和总分区数进行取模操作(其实就是随机选择了一个不可用分区)。
  • 但是消息的key不为空,则基于key的哈希值来选择一个分区。

  • 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区。

  • 如果在发消息的时候指定了分区,则消息投递到指定的分区。

自定义分区策略

1
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static class CustomPartition implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
if (null == o) {
return 0;
}
long id = getPartitionByKey(String.valueOf(o));
return (int) id;
}

private long getPartitionByKey(String key) {
long id = key.hashCode();
// key取模
int partition = (int) (id & (BusConfig.partitionNum - 1));
return Math.abs(partition);
}
@Override
public void close() {

}
@Override
public void configure(Map<String, ?> map){
}
}
------ 本文结束------

本文标题:Canal及MQ顺序性

文章作者:Perkins

发布时间:2019年11月29日

原始链接:https://perkins4j2.github.io/posts/47529/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。