RocketMQ源码学习之高可用分析(二)

前言


上篇在源码层面对RocketMQ的Producer端的高可用进行了分析,主要解释了其负载均衡机制,那么本章就对Broker的高可用做一个简单的分析。

一、Broker高可用

broker的高可用主要体现在主从同步上,一个Broker集群拥有多个Broker分组,每个Broker分组包含两类的服务器,一类是Master主节点,只允许有一个,另一类是Slave,允许有多个从节点。Master结点只提供读写的服务,Slave从结点只提供的服务。每个Broker分组内Master会和Slave进行通信和数据同步,但是每个Broker分组之间不进行通信,主从同步不具备主从切换功能,当主节点宕机以后,从结点不会接管消息的接收,但可以提供消息的读取。消息从Master复制到Slave有同步和异步两种复制方式。

  • 同步复制:等Master和Slave都写成功后才反馈给客户端写成功的状态。
  • 异步复制:只要Master写成功后,即给客户端反馈写成功的状态,数据不一定能写入Slave成功。

目前官方给了三种配置的方案2m-2s-async2m-2s-sync2m-noslave,分别是双主双从异步,双主双从同步以及双主的架构。

在读源码之前和上一篇一样,先提出我的几个问题。

主从服务器都在运行过程中,消息消费者是从主拉取消息还是从从服务器拉取消息。

如果主服务器宕机,那么消息消费如何保证?如果主服务器又恢复之后,消息消费者从主还是从拉取消息,现在又如何保证消息同步?

这两个问题光看Broker是回答不了的,因为涉及到消息消费者,占个坑,在下一章介绍消费者的时候再结合broker的相关内容来分析一下。下面主要从主从同步出发来进行相关原理涉及到的代码的解析。

1.1 Slave

主从同步的关键类的位置在org.apache.rocketmq.store.ha.HAService,其中HAClient是其的内部类。

HAClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
// 断点1:与Master进行连接
if (this.connectMaster()) {
// 断点2:判断是否达到上报Master的时间间隔
if (this.isTimeToReportOffset()) {
// 向当前slave服务器反馈拉取,默认会尝试3次。
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
// 每隔1s钟被唤醒一次,来处理以下事件
this.selector.select(1000);
// 断点3:处理读取事件
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}

if (!reportSlaveMaxOffsetPlus()) {
continue;
}
// Master在相应的时间段内没有返回数据,则关闭连接
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}

log.info(this.getServiceName() + " service end");
}

断点1:这步的目的是与Master建立起连接,用的是NIO的方式建立起请求进行通信。并在此步要获取一下当前最大的offset。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
// 获得Master的IP地址
String addr = this.masterAddress.get();
if (addr != null) {
/**建立起通道,并检测读的请求**/
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 获得当前最大的Offset
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

this.lastWriteTimestamp = System.currentTimeMillis();
}

return this.socketChannel != null;
}

断点2:判断是否达到上报的时间间隔,默认是5s。也就是Slave每隔5秒就会向master上报一次maxOffset信息。所以说主从同步的第一步是slave先走出的,他要向Master上报自己此时的maxOffset。

1
2
3
4
5
6
7
8
private boolean isTimeToReportOffset() {
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();

return needHeart;
}
1
private int haSendHeartbeatInterval = 1000 * 5;

断点3:看看slave端是如何处理master返回的数据的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 遍历完byteBufferRead
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}

return true;
}

private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();

while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
if (diff >= msgHeaderSize) {
// dispatchPosition可以防止数据“粘包”,避免数据读取不完整。
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);

long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 判断master传回的offset是否和slave的最大offset相等,如果不相等则抛出异常。
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
// 开始写数据到CommitLog
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
this.byteBufferRead.get(bodyData);

HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
// 记录处理到的位置,这边要更新dispatchPosition,就是为了防止“粘包”的问题。
this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += msgHeaderSize + bodySize;

if (!reportSlaveMaxOffsetPlus()) {
return false;
}

continue;
}
}
// 空间写满,重新分配空间
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}

break;
}

return true;
}

至此完成了slave向master拉取数据,也就是将master的数据同步到slave中。

1.2 Master

分析完了slave拉取master的逻辑,再看分析一下master是如何处理发送过来的请求。

这边主要分析的类位于org.apache.rocketmq.store.ha.HAConnection,先来看一眼整个类的结构。

类结构

1
2
3
4
public void start() {
this.readSocketService.start();
this.writeSocketService.start();
}

主要的逻辑就是先读取slave传送过来的请求,接着根据offset将新的消息发送回slave。

1.2.1 readSocketService

整体的逻辑是这样子的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
// 每隔1s钟被唤醒一次
this.selector.select(1000);
// 断点1:读取slave的数据
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}

long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
//...省略
}

也就是master每隔1s钟来处理slave传递过来的请求,如有新的信息,则处理。

断点1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 清空byteBufferRead
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}

while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 读取Slave 请求来的CommitLog的maxOffset
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPosition = pos;
// 记录当前处理到的slave的offset的最大位置
HAConnection.this.slaveAckOffset = readOffset;
// slave第一次请求的情况
if (HAConnection.this.slaveRequestOffset < 0) {
// 记录当前处理到的slave的offset的最大位置
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
// 通知目前Slave进度。主要用于Master节点为同步类型的。
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}

return true;
}

1.2.2 writeSocketService

读取slave请求完之后,接着就要将maxoffset之后的信息发送给slave进行主从同步了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
// 设置1s的唤醒时间,每隔1s被唤醒一次。
this.selector.select(1000);
// 未读到slave请求的消息,线程等待
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 第一次进行数据传输
if (-1 == this.nextTransferFromWhere) {
// 断点1:slave传过来的offset为0,则设置masterOffset=0,即从最开始的地方同步。目的是从最后一个commitlog开始同步。
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());

if (masterOffset < 0) {
masterOffset = 0;
}
// 下次同步的位置设置为0
this.nextTransferFromWhere = masterOffset;
} else {
// 如果slave传过来不等于0,也就是得到了slave的最大偏移量。下次同步的位置设置为slave传过来的最大slave偏移量。
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}

log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 传输完成
if (this.lastWriteOver) {

long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {

// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();

this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else { // 未传输完成
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}

// 进行CommitLog数据的传输。从之前判断的当前slave的最大偏移量位置开始。
SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}

// 一次只传送size长度的数据量,如果超过就要截断。
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;

selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;

// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
// 传输数据
this.lastWriteOver = this.transferData();
} else {
// 如果没有新的消息则挂起等待100ms
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// 下面就是最后的收尾工作:断开连接,暂停写线程,暂停读线程,释放CommitLog
HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
if (this.selectMappedBufferResult != null) {
this.selectMappedBufferResult.release();
}
this.makeStop();
readSocketService.makeStop();
haService.removeConnection(HAConnection.this);
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}

try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}

HAConnection.log.info(this.getServiceName() + " service end");
}

断点1:注意,这边masteroffset=masteroffset-(masterOffset%(1024*1024*1024))=masteroffset-masteroffset

masteroffset确实是master最大的偏移量,但是后面进行了减的操作,也就是0。

这块之前的理解有偏差,这边的计算结果不一定是masteroffset=0,应该是获取到最后一个commitlog的初始地址,比如说,masteroffset=1.5G的位置,那么masteroffset=1.5G-1.5G%1G=1G,也就是最后的结果是1G开始的那个偏移量地址,也就是最后一个commitlog对应的偏移量位置。

所以这边可以引申出一个问题,如果之前masterA的从服务器是slaveA,此时slaveA需要被替换成slaveB,那么slaveB还能同步到所有masterA的消息么?从之前的分析可以知道,只能从最后一个commitlog开始的位置进行主从同步,所以并不能获取到所有的消息,只能获取到最后一个commitlog起始的所有数据。

1
2
3
4
5
6
7
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());

// CommitLog file size,default is 1G
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;

注意:一次HA的传输是有最大字节数的限制的,通过设置ByteBuffer的limit来控制只传输指定长度的字节,这就意味着HA客户端收到的消息会包含不完整的消息,HA一批次传输消息最大字节通过haTransferBatchSize设置,默认值为32K。

最后给出一张主从同步的整体流程图,很好的归纳了以上的过程,包括slave向master传递自己最大偏移量位置,拉取master的消息并更新自己的commitLog,master监听slave的消息,发送自己新的消息到slave。

主从同步流程

补充:如何判断主从同步是否完成的依据是Slave中已成功复制的最大偏移量是否大于等于消息生产者发送消息后消息服务端返回下一条消息的起始偏移量,如果是则表示主从同步复制已经完成,唤醒消息发送线程,否则等待1s再次判断。


参考

https://blog.csdn.net/prestigeding/article/details/93672079