顺序性
Binlog本身是有序的,写入到mq之后需要保障顺序。
方案
Canal目前选择支持Kafka/Rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力。
也就是Binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择。
Canal支持MQ数据的几种路由方式:
- 单topic单分区,可以严格保证与binlog相同的顺序,但效率比较低,TPS只有2~3K。
- 多topic单分区,由于是按照表划分topic,因此可以保证表级别的有序性,但是每个表的热度不一样,对于热点表仍然会有性能问题。
- 单/多topic多分区,按照给定的hash方法来划分partition,性能无疑是最好的。但必须要多加小心,每个表的hash依据都必须是其主键或者主键组。只有保证每表每主键binlog的顺序性,才能准确恢复变动数据。
多线程消费
为了保证一个消费者中多个线程去处理时,不会使得消息的顺序被打乱,则可以在消费者中,消息分发至不同的线程时,加一个队列,消费者去做hash分发,将需要放在一起的数据,分发至同一个队列中,最后多个线程从队列中取数据。
生产指定分区
DefaultPartitioner默认的分区策略
如果key为null
- 则先根据topic名获取上次计算分区时使用的一个整数并加一。然后判断topic的可用分区数是否大于0,如果大于0则使用获取的nextValue的值和可用分区数进行取模操作。
- 如果topic的可用分区数小于等于0,则用获取的nextValue的值和总分区数进行取模操作(其实就是随机选择了一个不可用分区)。
但是消息的key不为空,则基于key的哈希值来选择一个分区。
如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区。
如果在发消息的时候指定了分区,则消息投递到指定的分区。
自定义分区策略1
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());
1 | public static class CustomPartition implements Partitioner { |