Hazelcast服务及注册中心实现

分区和复制

1
2
3
4
5
//backup
new MapConfig()
.setBackupCount(0)
.setAsyncBackupCount(1)
.setReadBackupData(true)

默认情况是同步复制一份,以上为异步复制1份,也可同步+异步。

  • 分区采用一致性Hash算法,便于节点变更复制
  • 计算分区,MOD(hash result, partition count) ,其中将key取byte数组,再取hash值,然后对分区默认271数取模,则为分区位置
  • 如果不指定备份数,则没有复制,节点下线后会丢失数据
  • 若有备份,则节点数据对等,单节点宕机,备份会成为查询节点
  • 若有备份,新增节点时,分区数据因一致性算法则最小化复制扩容
  • 读写操作时,会根据key将请求分发至该分区
  • 区分表由最早启动的节点告知其他节点,包括后续的变更通知以便重新分区,通知周期默认15s
  • 同步复制可防止脏数据,性能不如异步
  • 备份数据也可提供读,节省查询其他节点的分区的网络开销,但可能存在脏数据,且不计入主分区的命中次数,也不影响空闲过期时间

持久化

  • 通过MapStore实现单个、批量的读写持久化数据,MapStore继承了LoadStore。
  • 持久化应是中心化数据库,因为多个节点情况下,查询执行在主分区上,导致不同节点的数据可能不一致;在节点变化时,主分区变更至其他节点,则单点持久化数据失效。

    读时持久化

    如果配置和开启MapStore, IMap.get分区查询内存数据不存在时,调用MapStore.load查询并加载到内存;但不会新增备份。

    写时持久化

    write-delay-seconds为0时,map.put则MapStore.store同步写入数据至持久化,且可同步备份。

    写后持久化

  • write-delay-seconds大于0,map.put异步延迟持久化数据。
  • 该数据会标记为脏数据,存入队列中,直至被持久化。
  • 当write-coalescing即保留所有变更时,需要设置hazelcast.map.write.behind.queue.capacity以防脏数据在内存中过多,导致OOM。

Map及NearCache配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
Config config = new Config();

config.setClusterName("cluster");

//Network
config.setNetworkConfig(new NetworkConfig()
.setJoin(new JoinConfig()
.setMulticastConfig(
new MulticastConfig()
.setEnabled(false))
.setEurekaConfig(
new EurekaConfig()
.setEnabled(true)
.setProperty("self-registration", "true")
.setProperty("namespace", "hazelcast"))
)
);

//Serialize
config.setSerializationConfig(
new SerializationConfig()
.addDataSerializableFactory(SerializableFactory.FACTORY_ID, new SerializableFactory())
);

//backpressure
config.setProperty("hazelcast.backpressure.enabled", "true");
config.setProperty("hazelcast.backpressure.max.concurrent.invocations.per.partition", "100");
config.setProperty("hazelcast.backpressure.syncwindow", "500");
config.setProperty("hazelcast.backpressure.backoff.timeout.millis", "60000");
config.setProperty("hazelcast.operation.backup.timeout.millis", "60000");
config.setProperty("hazelcast.client.max.concurrent.invocations", "200");

//thread 3*3(accept request+read data from other+write data to other)
config.setProperty("hazelcast.io.thread.count", "3");
config.setProperty("hazelcast.operation.thread.count", "100");

//slow invalidation
config.setProperty("hazelcast.slow.operation.detector.enabled", "true");
config.setProperty("hazelcast.slow.operation.detector.stacktrace.logging.enabled", "true");
config.setProperty("hazelcast.slow.operation.detector.log.purge.interval.seconds", "600");
config.setProperty("hazelcast.slow.operation.detector.log.retention.seconds", "600");
config.setProperty("hazelcast.slow.operation.detector.threshold.millis", "300");

//near cache invalidation
config.setProperty("hazelcast.map.invalidation.batch.enabled", "true");
config.setProperty("hazelcast.map.invalidation.batch.size", "100");
config.setProperty("hazelcast.map.invalidation.batchfrequency.seconds", "5");

//当前 write-coalescing is false即保留全部变更, set per-node maximum capacity
config.setProperty("hazelcast.map.write.behind.queue.capacity", "5000");

//map config
List<CacheConfig> list = new ArrayList<>();
list.add(new CacheConfig("cache", 900, 50000));

list.forEach(e -> {
config.addMapConfig(new MapConfig()
.setName(e.getName())
.setInMemoryFormat(InMemoryFormat.BINARY)
.setMetadataPolicy(MetadataPolicy.OFF)
//backup
.setBackupCount(0)
.setAsyncBackupCount(1)
//timeout
.setTimeToLiveSeconds(e.getTimeout())
.setMaxIdleSeconds(e.getTimeout() / 2)
//evict
.setEvictionConfig(new EvictionConfig()
.setEvictionPolicy(EvictionPolicy.LFU)
.setMaxSizePolicy(MaxSizePolicy.PER_NODE)
.setSize(e.getSize()))
//backup read
.setReadBackupData(true)
//partition
.setPartitioningStrategyConfig(new PartitioningStrategyConfig()
.setPartitioningStrategy(new StringPartitioningStrategy()))
//near cache
.setNearCacheConfig(new NearCacheConfig()
.setName(e.getName() + "_nc")
.setInMemoryFormat(InMemoryFormat.OBJECT)
.setCacheLocalEntries(true)
.setInvalidateOnChange(true)
.setLocalUpdatePolicy(NearCacheConfig.LocalUpdatePolicy.INVALIDATE)
.setMaxIdleSeconds(e.getTimeout() / 10)
.setEvictionConfig(new EvictionConfig()
.setEvictionPolicy(EvictionPolicy.LFU)
.setMaxSizePolicy(MaxSizePolicy.ENTRY_COUNT)
.setSize(e.getSize() / 100))
)
//map store
.setMapStoreConfig(new MapStoreConfig()
.setEnabled(true)
//保留最新,而不是全部变更
.setWriteCoalescing(true)
)
);


});

return Hazelcast.newHazelcastInstance(config);

Eureka注册中心

1
2
3
4
5
6
7
8
9
10
11
12
config.setNetworkConfig(new NetworkConfig()
.setJoin(new JoinConfig()
.setMulticastConfig(
new MulticastConfig()
.setEnabled(false))
.setEurekaConfig(
new EurekaConfig()
.setEnabled(true)
.setProperty("self-registration", "true")
.setProperty("namespace", "hazelcast"))
)
);

eureka-client.properties,默认配置名称,路径在classpath下。

1
2
3
hazelcast.shouldUseDns=false
hazelcast.name=hz-instance
hazelcast.serviceUrl.default=,http://192.168.1.100:8000/eureka/

以上可实现复用eureka配置中心,实现各hazelcast节点同步,无需广播。

------ 本文结束------

本文标题:Hazelcast服务及注册中心实现

文章作者:Perkins

发布时间:2020年10月09日

原始链接:https://perkins4j2.github.io/posts/5377900/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。