RocketMQ源码学习之高可用分析(三)

前言


上一篇开始的地方占了个坑,其实那两个问题也涉及到消费者这边的相关逻辑,本篇主要从保证消费者消费高可用出发,其中涉及到两个方面,其一消费者消费时,会对broker集群的所有队列进行选择,也就需要考虑消费者的负载均衡机制。其二,在Consumer的配置文件中不需要设置消费者是从master读取数据还是slave读取,当master繁忙的时候,Consumer会被自动切换到从slave读取数据,也之所以有了这种机制,当master发生故障的时候,Consumer仍然可以从slave读取数据,不影响Consumer的消费,达到高可用。

一、Consumer高可用

1.1 Consumer端的负载均衡机制

先引入一个消费者组的概念

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。

负载均衡会发生在两种情况下

每次有新的consumer加入group就会重新做一下负载。

每10秒自动做一次负载。

源码中提供了六种消息队列消费者负载均衡的方法

AllocateMessageQueueAveragely:平均算法(默认)
AllocateMessageQueueAveragelyByCircle:平均轮询算法
AllocateMessageQueueByConfig:根据配置负载均衡算法
AllocateMessageQueueByMachineRoom:根据机房负载均衡算法
AllocateMessageQueueConsistentHash:一致性哈希负载均衡算法
AllocateMachineRoomNearby:靠近机房策略

AllocateMessageQueueAveragely

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* @param consumerGroup 当前消费者组名称
* @param currentCID 当前消费者id
* @param mqAll 当前topic下面所有的消息队列
* @param cidAll 当前消费者组下面所有的消费者id
* @return
*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}

List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}

int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

整个逻辑是为了实现消息队列的平均分配,比如有8个队列,3个消费者,那么最终分配是这样子的

消费者0:队列0、队列1、队列2

消费者1:队列3、队列4、队列5

消费者2:队列6、队列7

AllocateMessageQueueAveragelyByCircle

和之前的平均的那个很类似,但是这个是间隔的分配,比如按照之前的可以划分为

消费者0:队列0、队列3、队列6

消费者1:队列1、队列4、队列7

消费者2:队列2、队列5

AllocateMessageQueueByConfig

这个是根据消息自定义配置固定的消息队列。

AllocateMessageQueueByMachineRoom

根据brokerName找到机房,也就是找到消息队列,然后再分配,与第一个平均分配的差不多,但是这个是先根据mod平均值的数量分配,然后再继续重新分配剩下的。还是上面的那个例子,8/3=2,剩下2个再继续分配。

消费者0:队列0、队列1、队列6

消费者1:队列2、队列3、队列7

消费者2:队列4、队列5

AllocateMessageQueueConsistentHash

这个一致性哈希的方法在之前介绍Dubbo的负载均衡的时候就有提到过,这边就不赘述了。可以访问此篇博客

AllocateMachineRoomNearby

同机房分配策略,首先统计消费者与broker所在机房,保证broker中的消息优先被同机房的消费者消费,如果机房中没有消费者,则有其他机房消费者消费。实际的队列分配(同机房或跨机房)可以是指定其他算法。假设有三个机房,实际负载策略使用算法1,机房1和机房3中存在消费者,机房2没有消费者。机房1、机房3中的队列会分配给各自机房中的消费者,机房2的队列会被虽有的消费者平均分配。

消息消费者执行消费逻辑的时候会调用相应的函数

MQClientInstance.java中的start方法里就有设置负载均衡的相关方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void start() throws MQClientException {

synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}

RebalanceService.java中的函数

1
2
3
4
5
6
7
8
9
10
11
@Override
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}

log.info(this.getServiceName() + " service end");
}

1.2 Consumer主从选择

之前还提到一点消费者高可用的实现就是主从的选择,当主结点处于繁忙或者主节点宕机的时候,Consumer会自动切换到从节点进行消息的消费,简单看一下实现。

在消费者向broker中消息队列拉取消息的时候,会调用到org.apache.rocketmq.broker.processor.PullMessageProcessor这个类

1
2
3
4
5
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}

当设置了suggestPullingFromSlave属性为true的时候,就允许从从服务器来拉取消息,这个属性是在org.apache.rocketmq.store.GetMessageResult中的

1
2
// 默认是false
private boolean suggestPullingFromSlave = false;

注意:以上true or false的选择是基于主服务器不宕机的情况,如果主服务器宕机的话,因为是要发心跳的,所以直接是找不到主的ip地址,自然而然的消费的是slave中的消息。以上的情况是发生在主服务器消息在内存中的占用超过40%的情况下而发生的主从切换。

问:主从服务器都在运行过程中,消息消费者是从主拉取消息还是从从服务器拉取消息?

答:当主服务器没有宕机且消息占用内存大小小于40%的时候,是从主服务器拉取消息。当master宕机或者占用超过40%的时候,是从从服务器上拉取消息。

问:如果主服务器宕机,那么消息消费如何保证?如果主服务器又恢复之后,消息消费者从主还是从拉取消息,现在又如何保证消息同步?

答:主服务器宕机之后,消息消费从从服务器拉取。当主服务器恢复之后,消息消费者从主服务器拉取消息,消息消费者内存中存在消息消费的进度,因为和Broker有长连接,所以会让master主动更新消息消费进度。从而达到主从的消息同步,不会再去消费那些在从服务器中已经消费过的消息。

二、消费者消息拉取

RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。

消费进度的管理

广播模式:同一个消费组的所有消息消费者都需要消费主题下的所有消息,也就是同组内的消费者的消息消费行为是对立的、互不影响的,故消息进度需要独立存储,最理想的存储地方应该是与消费者绑定。消费进度存储在消费者本地

集群模式:同一个消费者组内的所有消息消费者共享消息主题下的所有消息,同一条消息(同一个消息消费队列)在同一时间内只会被消费组内的一个消费者消费,并且随着消费队列的动态变化重新负载,所以消费进度需要保存在每个消费者都能访问到的地方。消息进度存储文件放在消息服务端Broker

定时消息是如何实现的

RocketMQ并不支持任意时间精度的的定时消息,主要考虑是性能上的损耗,所以设置了18个级别的延迟级别,具体的实现是这样的。

  1. producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别
  2. broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1
  3. mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列
  4. 根据消费偏移量offset从commitLog中解析出对应消息
  5. 从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递
  6. 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递

参考

https://blog.csdn.net/mxlmxlmxl33/article/details/85949429