前言
上篇在源码层面对RocketMQ的Producer端的高可用进行了分析,主要解释了其负载均衡机制,那么本章就对Broker的高可用做一个简单的分析。
一、Broker高可用
broker的高可用主要体现在主从同步上,一个Broker集群拥有多个Broker分组,每个Broker分组包含两类的服务器,一类是Master主节点,只允许有一个,另一类是Slave,允许有多个从节点。Master结点只提供读写的服务,Slave从结点只提供读的服务。每个Broker分组内Master会和Slave进行通信和数据同步,但是每个Broker分组之间不进行通信,主从同步不具备主从切换功能,当主节点宕机以后,从结点不会接管消息的接收,但可以提供消息的读取。消息从Master复制到Slave有同步和异步两种复制方式。
- 同步复制:等Master和Slave都写成功后才反馈给客户端写成功的状态。
- 异步复制:只要Master写成功后,即给客户端反馈写成功的状态,数据不一定能写入Slave成功。
目前官方给了三种配置的方案2m-2s-async、2m-2s-sync和2m-noslave,分别是双主双从异步,双主双从同步以及双主的架构。
在读源码之前和上一篇一样,先提出我的几个问题。
主从服务器都在运行过程中,消息消费者是从主拉取消息还是从从服务器拉取消息。
如果主服务器宕机,那么消息消费如何保证?如果主服务器又恢复之后,消息消费者从主还是从拉取消息,现在又如何保证消息同步?
这两个问题光看Broker是回答不了的,因为涉及到消息消费者,占个坑,在下一章介绍消费者的时候再结合broker的相关内容来分析一下。下面主要从主从同步出发来进行相关原理涉及到的代码的解析。
1.1 Slave
主从同步的关键类的位置在org.apache.rocketmq.store.ha.HAService,其中HAClient是其的内部类。
HAClient
1 |
|
断点1:这步的目的是与Master建立起连接,用的是NIO的方式建立起请求进行通信。并在此步要获取一下当前最大的offset。
1 | private boolean connectMaster() throws ClosedChannelException { |
断点2:判断是否达到上报的时间间隔,默认是5s。也就是Slave每隔5秒就会向master上报一次maxOffset信息。所以说主从同步的第一步是slave先走出的,他要向Master上报自己此时的maxOffset。
1 | private boolean isTimeToReportOffset() { |
1 | private int haSendHeartbeatInterval = 1000 * 5; |
断点3:看看slave端是如何处理master返回的数据的。
1 | private boolean processReadEvent() { |
至此完成了slave向master拉取数据,也就是将master的数据同步到slave中。
1.2 Master
分析完了slave拉取master的逻辑,再看分析一下master是如何处理发送过来的请求。
这边主要分析的类位于org.apache.rocketmq.store.ha.HAConnection,先来看一眼整个类的结构。
/master%E5%A4%84%E7%90%86slave%E5%90%8C%E6%AD%A5%E7%B1%BB%E7%9A%84%E7%B1%BB%E7%BB%93%E6%9E%84.png)
1 | public void start() { |
主要的逻辑就是先读取slave传送过来的请求,接着根据offset将新的消息发送回slave。
1.2.1 readSocketService
整体的逻辑是这样子的
1 |
|
也就是master每隔1s钟来处理slave传递过来的请求,如有新的信息,则处理。
断点1:
1 | private boolean processReadEvent() { |
1.2.2 writeSocketService
读取slave请求完之后,接着就要将maxoffset之后的信息发送给slave进行主从同步了。
1 |
|
断点1:注意,这边masteroffset=masteroffset-(masterOffset%(1024*1024*1024))=masteroffset-masteroffset
masteroffset确实是master最大的偏移量,但是后面进行了减的操作,也就是0。
这块之前的理解有偏差,这边的计算结果不一定是masteroffset=0,应该是获取到最后一个commitlog的初始地址,比如说,masteroffset=1.5G的位置,那么masteroffset=1.5G-1.5G%1G=1G,也就是最后的结果是1G开始的那个偏移量地址,也就是最后一个commitlog对应的偏移量位置。
所以这边可以引申出一个问题,如果之前masterA的从服务器是slaveA,此时slaveA需要被替换成slaveB,那么slaveB还能同步到所有masterA的消息么?从之前的分析可以知道,只能从最后一个commitlog开始的位置进行主从同步,所以并不能获取到所有的消息,只能获取到最后一个commitlog起始的所有数据。
1 | masterOffset = |
注意:一次HA的传输是有最大字节数的限制的,通过设置ByteBuffer的limit来控制只传输指定长度的字节,这就意味着HA客户端收到的消息会包含不完整的消息,HA一批次传输消息最大字节通过haTransferBatchSize设置,默认值为32K。
最后给出一张主从同步的整体流程图,很好的归纳了以上的过程,包括slave向master传递自己最大偏移量位置,拉取master的消息并更新自己的commitLog,master监听slave的消息,发送自己新的消息到slave。
/%E4%B8%BB%E4%BB%8E%E5%90%8C%E6%AD%A5%E6%95%B4%E4%BD%93%E6%B5%81%E7%A8%8B.png)
补充:如何判断主从同步是否完成的依据是Slave中已成功复制的最大偏移量是否大于等于消息生产者发送消息后消息服务端返回下一条消息的起始偏移量,如果是则表示主从同步复制已经完成,唤醒消息发送线程,否则等待1s再次判断。
参考