Hazelcast队列和发布订阅实现

队列和订阅区别

  • 队列是queue,订阅是topic
  • 队列是只能被消费一次,订阅是被消费多次
  • 队列消费是主动轮询take,订阅是消息被动通知

队列实现

  • 采用FIFO先进先出顺序消费
  • 元素没有批量处理,只能迭代逐个take
  • 元素被复制到本地处理
  • 可能使用ItemListener监听队列新增和删除操作

生产

1
2
3
4
5
6
7
8
9
10
11
12
public class ProducerMember {
public static void main( String[] args ) throws Exception {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
IQueue<Integer> queue = hz.getQueue( "queue" );
for ( int k = 1; k < 100; k++ ) {
queue.put( k );
System.out.println( "Producing: " + k );
Thread.sleep(1000);
}
queue.put( -1 );
System.out.println( "Producer Finished!" );
} }

消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ConsumerMember {
public static void main( String[] args ) throws Exception {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
IQueue<Integer> queue = hz.getQueue( "queue" );
while ( true ) {
int item = queue.take();
System.out.println( "Consumed: " + item );
if ( item == -1 ) {
queue.put( -1 );
break; }
Thread.sleep( 5000 );
}
System.out.println( "Consumer Finished!" );
}
}

效率提升

  • 提供多个消费者
  • 非严格顺序,可以创建多个队列分别生产和消费
  • 消费者可以启动定时任务,提高消费频度

可靠性

  • 一个队列成员,会有一个备份
  • 队列元素使用全局递增itemId,不会重复
  • QueueStore提供队列内存时,同时也持久化,在超内存限制时数据也会持久化
  • max-size可以设置队列长度,多余元素put会被阻塞,直到有队列空间
  • 高吞吐量时,可支持多个消费者并发
  • 网络分区故障时,提供Split-Brain脑裂保护,避免仅部分员执行操作成功

订阅实现

  • 发布/订阅模型,发布是异步操作
  • 订阅同一个topic,所有订阅者均收到消息
  • 支持全局顺序

实现

发布

1
2
3
4
5
6
7
public class TopicPublisher {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
ITopic<Date> topic = hz.getTopic("topic");
topic.publish(new Date());
}
}

订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TopicSubscriber {
  public static void main(String[] args) {
  HazelcastInstance hz = Hazelcast.newHazelcastInstance();
  ITopic<Date> topic = hz.getTopic("topic");
  topic.addMessageListener(new MessageListenerImpl());
  System.out.println("Subscribed");
  }
  private static class MessageListenerImpl implements MessageListener<Date> {
  public void onMessage(Message<Date> m) {
  System.out.println("Received: " + m.getMessageObject());
  }
  }
}

MessageListener 可以监听和消费具体消息

全局顺序

  • 消息是顺序发布,订阅者是顺序接受
  • 消息也可能是全局顺序,所有订阅者接受多个发布者消息的顺序都是一致

例如有3个成员,member1发布消息a1 和 a2. Member3 发布消息 c1 and c2. 可能有以下顺序:

member1 → c1, a1, a2, c2

member2 → c1, c2, a1, a2

  • 开启 globalOrderEnabled

如果member1 → a1, c1, a2, c2

则member2 → a1, c1, a2, c2

确保所有成员接受所有消息的顺序是一致,而非来自某个发布者。

  • StripedExecutor 有很多线程,负责消息的全局顺序发布。
  • hazelcast.event.thread.count是线程数量。
  • Topic的名称取hash%线程数量,即为处理该Topic的线程id

Reliable Topic

  • Reliable Topic 根据Ringbuffer数据结构进行备份
  • 可靠性高,解决事件丢失问题
  • 快速消费,更稳定和安全
1
ITopic<Long> topic = hz.getReliableTopic("sometopic");
  • TopicOverloadPolicy 超过容量可以设置过载策略,DISCARD_OLDEST丢失最老、DISCARD_NEWEST丢弃最新、BLOCK阻塞直到超时、ERROR抛出异常
  • read-batch-size,支持批量读取消息
------ 本文结束------

本文标题:Hazelcast队列和发布订阅实现

文章作者:Perkins

发布时间:2019年10月16日

原始链接:https://perkins4j2.github.io/posts/48149/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。