前言
上一篇开始的地方占了个坑,其实那两个问题也涉及到消费者这边的相关逻辑,本篇主要从保证消费者消费高可用出发,其中涉及到两个方面,其一消费者消费时,会对broker集群的所有队列进行选择,也就需要考虑消费者的负载均衡机制。其二,在Consumer的配置文件中不需要设置消费者是从master读取数据还是slave读取,当master繁忙的时候,Consumer会被自动切换到从slave读取数据,也之所以有了这种机制,当master发生故障的时候,Consumer仍然可以从slave读取数据,不影响Consumer的消费,达到高可用。
一、Consumer高可用
1.1 Consumer端的负载均衡机制
先引入一个消费者组的概念
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。
负载均衡会发生在两种情况下
每次有新的consumer加入group就会重新做一下负载。
每10秒自动做一次负载。
源码中提供了六种消息队列消费者负载均衡的方法
AllocateMessageQueueAveragely:平均算法(默认)
AllocateMessageQueueAveragelyByCircle:平均轮询算法
AllocateMessageQueueByConfig:根据配置负载均衡算法
AllocateMessageQueueByMachineRoom:根据机房负载均衡算法
AllocateMessageQueueConsistentHash:一致性哈希负载均衡算法
AllocateMachineRoomNearby:靠近机房策略
AllocateMessageQueueAveragely
1 | /** |
整个逻辑是为了实现消息队列的平均分配,比如有8个队列,3个消费者,那么最终分配是这样子的
消费者0:队列0、队列1、队列2
消费者1:队列3、队列4、队列5
消费者2:队列6、队列7
AllocateMessageQueueAveragelyByCircle
和之前的平均的那个很类似,但是这个是间隔的分配,比如按照之前的可以划分为
消费者0:队列0、队列3、队列6
消费者1:队列1、队列4、队列7
消费者2:队列2、队列5
AllocateMessageQueueByConfig
这个是根据消息自定义配置固定的消息队列。
AllocateMessageQueueByMachineRoom
根据brokerName找到机房,也就是找到消息队列,然后再分配,与第一个平均分配的差不多,但是这个是先根据mod平均值的数量分配,然后再继续重新分配剩下的。还是上面的那个例子,8/3=2,剩下2个再继续分配。
消费者0:队列0、队列1、队列6
消费者1:队列2、队列3、队列7
消费者2:队列4、队列5
AllocateMessageQueueConsistentHash
这个一致性哈希的方法在之前介绍Dubbo的负载均衡的时候就有提到过,这边就不赘述了。可以访问此篇博客。
AllocateMachineRoomNearby
同机房分配策略,首先统计消费者与broker所在机房,保证broker中的消息优先被同机房的消费者消费,如果机房中没有消费者,则有其他机房消费者消费。实际的队列分配(同机房或跨机房)可以是指定其他算法。假设有三个机房,实际负载策略使用算法1,机房1和机房3中存在消费者,机房2没有消费者。机房1、机房3中的队列会分配给各自机房中的消费者,机房2的队列会被虽有的消费者平均分配。
消息消费者执行消费逻辑的时候会调用相应的函数
MQClientInstance.java中的start方法里就有设置负载均衡的相关方法
1 | public void start() throws MQClientException { |
RebalanceService.java中的函数
1 |
|
1.2 Consumer主从选择
之前还提到一点消费者高可用的实现就是主从的选择,当主结点处于繁忙或者主节点宕机的时候,Consumer会自动切换到从节点进行消息的消费,简单看一下实现。
在消费者向broker中消息队列拉取消息的时候,会调用到org.apache.rocketmq.broker.processor.PullMessageProcessor这个类
1 | if (getMessageResult.isSuggestPullingFromSlave()) { |
当设置了suggestPullingFromSlave属性为true的时候,就允许从从服务器来拉取消息,这个属性是在org.apache.rocketmq.store.GetMessageResult中的
1 | // 默认是false |
注意:以上true or false的选择是基于主服务器不宕机的情况,如果主服务器宕机的话,因为是要发心跳的,所以直接是找不到主的ip地址,自然而然的消费的是slave中的消息。以上的情况是发生在主服务器消息在内存中的占用超过40%的情况下而发生的主从切换。
问:主从服务器都在运行过程中,消息消费者是从主拉取消息还是从从服务器拉取消息?
答:当主服务器没有宕机且消息占用内存大小小于40%的时候,是从主服务器拉取消息。当master宕机或者占用超过40%的时候,是从从服务器上拉取消息。
问:如果主服务器宕机,那么消息消费如何保证?如果主服务器又恢复之后,消息消费者从主还是从拉取消息,现在又如何保证消息同步?
答:主服务器宕机之后,消息消费从从服务器拉取。当主服务器恢复之后,消息消费者从主服务器拉取消息,消息消费者内存中存在消息消费的进度,因为和Broker有长连接,所以会让master主动更新消息消费进度。从而达到主从的消息同步,不会再去消费那些在从服务器中已经消费过的消息。
二、消费者消息拉取
RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。
消费进度的管理
广播模式:同一个消费组的所有消息消费者都需要消费主题下的所有消息,也就是同组内的消费者的消息消费行为是对立的、互不影响的,故消息进度需要独立存储,最理想的存储地方应该是与消费者绑定。消费进度存储在消费者本地
集群模式:同一个消费者组内的所有消息消费者共享消息主题下的所有消息,同一条消息(同一个消息消费队列)在同一时间内只会被消费组内的一个消费者消费,并且随着消费队列的动态变化重新负载,所以消费进度需要保存在每个消费者都能访问到的地方。消息进度存储文件放在消息服务端Broker
定时消息是如何实现的
RocketMQ并不支持任意时间精度的的定时消息,主要考虑是性能上的损耗,所以设置了18个级别的延迟级别,具体的实现是这样的。
- producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别
- broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1
- mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列
- 根据消费偏移量offset从commitLog中解析出对应消息
- 从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递
- 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递
参考