RocketMQ源码学习之高可用分析(四)

前言


之前三篇已经从Producer、Broker、Consumer三者角度出发分析了RocketMQ框架的高可用,本篇简单介绍RocketMQ中的高可用地方。之前对NameServer已经做过详细的分析,就简单的介绍一下吧。

所有高可用部分总结

一、NameServer高可用

NameServer作为路由注册中心,与Producer、Broker、Consumer都建立了长连接,来进行互相之间的通信。并且,NameServer本身就是由多台服务器组成的集群,集群中每台服务器中数据都是一致的,并且互不影响,从而达到NameServer的高可用。

因此,Broker向NameServer注册的时候需要向每个NameServer都注册自己的信息。在Producer生产或者Consumer消费的时候只需要连接其中一台NameServer进行通信获得路由信息即可。下面具体来看看这部分的代码。

1.1 Broker注册路由信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {

final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 装载发送的消息brokerAddr、brokerId、brokerName、clusterName、haServerAddr、compressed
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);

RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
// 只有当for循环中所有线程都执行完之后才进行以下的操作
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 向每个NameServer注册Broker的信息
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}

log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}

try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}

return registerBrokerResultList;
}

1.2 Producer通过NameServer选择消息队列

在之前分析RocketMQ源码学习之高可用分析(一)文章中我们关注的重点是两种容错机制,在此之前Producer还需要通过NameServer获得所有的指定topic的消息队列所在的Broker的地址才能进行选择,也就是之前那篇文章中断点1的部分,我们deep进去,函数之间的调用逻辑比较复杂,如下:

1
sendDefaultImpl()--->tryToFindTopicPublishInfo()--->updateTopicRouteInfoFromNameServer()--->getTopicRouteInfoFromNameServer()--->invokeSync()--->getAndCreateChannel()--->getAndCreateNameserverChannel()

来看最终的这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
// 返回可以被选择到的NameServer
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}

final List<String> addrList = this.namesrvAddrList.get();
if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
// 返回可以被选择到的NameServer
addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
// 从NameServer列表中选择一个连接的返回
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);

this.namesrvAddrChoosed.set(newAddr);
log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null) {
return channelNew;
}
}
throw new RemotingConnectException(addrList.toString());
}
} finally {
this.lockNamesrvChannel.unlock();
}
} else {
log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}

return null;
}

至此,便连接上了一台NameServer服务器用于获得对应的路由表。

1.3 Consumer通过NameServer选择消息队列

与Producer类似的,在消费端进行消费的时候,也是通过相同的函数getAndCreateNameserverChannel()来获得某个NameServer服务器,从而获得路由表信息,这边就不赘述了。