队列和订阅区别
- 队列是queue,订阅是topic
- 队列是只能被消费一次,订阅是被消费多次
- 队列消费是主动轮询take,订阅是消息被动通知
队列实现
生产
1 | public class ProducerMember { |
消费
1 | public class ConsumerMember { |
效率提升
- 提供多个消费者
- 非严格顺序,可以创建多个队列分别生产和消费
- 消费者可以启动定时任务,提高消费频度
可靠性
- 一个队列成员,会有一个备份
- 队列元素使用全局递增itemId,不会重复
- QueueStore提供队列内存时,同时也持久化,在超内存限制时数据也会持久化
- max-size可以设置队列长度,多余元素put会被阻塞,直到有队列空间
- 高吞吐量时,可支持多个消费者并发
- 网络分区故障时,提供Split-Brain脑裂保护,避免仅部分员执行操作成功
订阅实现
- 发布/订阅模型,发布是异步操作
- 订阅同一个topic,所有订阅者均收到消息
- 支持全局顺序
实现
发布1
2
3
4
5
6
7public class TopicPublisher {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
ITopic<Date> topic = hz.getTopic("topic");
topic.publish(new Date());
}
}
订阅1
2
3
4
5
6
7
8
9
10
11
12
13public class TopicSubscriber {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
ITopic<Date> topic = hz.getTopic("topic");
topic.addMessageListener(new MessageListenerImpl());
System.out.println("Subscribed");
}
private static class MessageListenerImpl implements MessageListener<Date> {
public void onMessage(Message<Date> m) {
System.out.println("Received: " + m.getMessageObject());
}
}
}
MessageListener 可以监听和消费具体消息
全局顺序
- 消息是顺序发布,订阅者是顺序接受
- 消息也可能是全局顺序,所有订阅者接受多个发布者消息的顺序都是一致
例如有3个成员,member1发布消息a1 和 a2. Member3 发布消息 c1 and c2. 可能有以下顺序:
member1 → c1, a1, a2, c2
member2 → c1, c2, a1, a2
- 开启 globalOrderEnabled
如果member1 → a1, c1, a2, c2
则member2 → a1, c1, a2, c2
确保所有成员接受所有消息的顺序是一致,而非来自某个发布者。
- StripedExecutor 有很多线程,负责消息的全局顺序发布。
- hazelcast.event.thread.count是线程数量。
- Topic的名称取hash%线程数量,即为处理该Topic的线程id
Reliable Topic
- Reliable Topic 根据Ringbuffer数据结构进行备份
- 可靠性高,解决事件丢失问题
- 快速消费,更稳定和安全
1 | ITopic<Long> topic = hz.getReliableTopic("sometopic"); |
- TopicOverloadPolicy 超过容量可以设置过载策略,DISCARD_OLDEST丢失最老、DISCARD_NEWEST丢弃最新、BLOCK阻塞直到超时、ERROR抛出异常
- read-batch-size,支持批量读取消息