Consumer手动提交Kafka偏移量和Rebalance

再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为,再均衡期间,消费组内的消费组无法读取消息。

消费者需要自己保留一个offset,从kafka 获取消息时,只拉去当前offset以后的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@Slf4j
public class DemoConsumer extends Thread {
private final KafkaConsumer<String, byte[]> consumer;
private final String topic;
/**
* 已消费最大offset
*/
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new ConcurrentHashMap<>();

/**
* 初始化
*
* @param topic
*/
public DemoConsumer(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BusConfig.servers.get(0));
props.put(ConsumerConfig.GROUP_ID_CONFIG, BusConfig.group);
//自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, BusConfig.autoCommit);
//自动提交时间间隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//coordinator检测失败的时间,1分钟
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
//每次poll总消费时长,3分钟
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
//心跳
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
//最大10k
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "10240");
//每次最大拉取数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "20");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
this.topic = topic;

consumer = new KafkaConsumer<>(props);

}

public ConsumerRebalanceListener listen() {

return new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//重新分配分区之后和消费者开始读取消息之前被调用,获取最新的偏移量,设置拉取分量
log.warn("kafka rebalance onPartitionsAssigned");
for (TopicPartition partition : partitions) {
//获取消费偏移量,实现原理是向协调者发送获取请求
OffsetAndMetadata offset = consumer.committed(partition);
log.warn("kafka rebalance offset:" + offset.offset());
//设置本地拉取分量,下次拉取消息以这个偏移量为准
consumer.seek(partition, offset.offset());
}
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//在再均衡开始之前和消费者停止读取消息之后被调用,提交偏移量
log.warn("kafka rebalance onPartitionsRevoked");
commit();
}
};
}

/**
* commit
*/
private void commit() {
// 尽量降低synchronized块对offsets锁定的时间
Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
synchronized (offsetAndMetadataMap) {
if (offsetAndMetadataMap.isEmpty()) {
return;
}
unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsetAndMetadataMap));

unmodfiedMap.forEach((k, v) -> log.info("commit last offset:" + v.offset()));

offsetAndMetadataMap.clear();
}
consumer.commitSync(unmodfiedMap);
}

/**
* 消费
*/
@Override
public void run() {
//订阅且监听rebalance
consumer.subscribe(Collections.singletonList(this.topic), listen());

//检查线程中断标志是否设置, 如果设置则表示外界想要停止该任务,终止该任务
while (!Thread.currentThread().isInterrupted()) {
//提交
commit();

//如果关闭
if (!BusConfig.enable) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}

//开启
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
if (records != null && records.count() > 0) {
log.info("consume message,size:{}", records.count());
records.forEach(this::handle);
}

//控制并发
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 异步处理
*
* @param record
*/
public void handle(ConsumerRecord<String, byte[]> record) {
Runnable runnable = () -> {
Message message = null;

try {
message = Message.parseFrom(record.value());
if (message == null) {
return;
}
BusConfig.businessHandler.handle(message);
} catch (Exception e) {
e.printStackTrace();
log.error("consume handle error:" + message.toString());
}

log.info("consume messageId:{},offset:{}", record.key(), record.offset());

TopicPartition topicPartition = new TopicPartition(topic, record.partition());
synchronized (offsetAndMetadataMap) {
//下一条需要读的偏移量
long lastOffset = record.offset() + 1;
if (!offsetAndMetadataMap.containsKey(topicPartition)) {
offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(lastOffset));
} else if (offsetAndMetadataMap.get(topicPartition).offset() < lastOffset) {
offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(lastOffset));
}
}
};
TaskThreadPool.getInstance().execute(runnable);
}

/**
* 关闭
*/
public void close() {
BusConfig.enable = false;
consumer.wakeup();
}
}
------ 本文结束------

本文标题:Consumer手动提交Kafka偏移量和Rebalance

文章作者:Perkins

发布时间:2019年11月18日

原始链接:https://perkins4j2.github.io/posts/32177/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。