前言
NameServer是RocketMQ中一个十分重要的概念,还记得上一篇中提到启动消息队列的步骤吗,就是先启动NameServer,然后再启动Broker。NameServer是一个简单的路由注册中心,也就是相当于之前dubbo中ZK的地位,在开始本篇之前,先提出几个问题:
- NameServer在整个架构中的作用是什么
- NameServer中存储了Broker中的哪些信息
- NameServer与Producer以及Consumer之前是什么关系
一、基础概念
官方文档这么介绍NameServer
NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
按照我的理解总结一下:NameServer的作用是解耦Broker、Producer、Consumer三者。由Broker提供路由信息表,供生产者提供对应Topic的消息,消费者消费对应Topic的消息。
写到这边我有一个疑问,为什么rocketMQ不直接使用ZK,而要自己造轮子,单独弄了个NameServer呢?结合找到的一些资料以及自己的理解,总结原因:其实在rocketMQ的早期版本是用到ZK来作为路由注册中心的,但是由于ZK太重了,而rocketMQ中只需要使用到一些简单的功能,比如topic的路由以及一些KV配置的管理,所以才有了这种轻量级的NameServer,也能减少运维成本。
二、启动过程
还记得在当时启动消息队列的时候先执行的命令
1 | nohup sh mqnamesrv & |
看看里面到底做了什么操作
发现mqnamesrv中最后一句
1 | sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@ |
很容易发现,这是调用了org.apache.rocketmq.namesrv.NamesrvStartup这个启动类啊,runserver.sh中大多是一些虚拟机配置等等,就不多说了,直接找到源码来看一下这个类。

三、源码分析
整个NameServer启动涉及到的类并不是很多,先给出一个总的调用框架图

先有个宏观的认识,首先创建两个配置类:namesrvConfig和nettyServerConfig,通过都配置文件的方式将配置内容存放在这两个类中,很显然,一个是用来做nameserver的配置,一个用于netty的启动。接着new出一个nameserver的控制器,接下去就是初始化这个控制器,控制器启动,Netty启动。其中主要用到了三个关键的类

NamesrvStartup、NamesrvController和RouteInfoManager分别为启动类、核心控制器类以及存放路由信息的类。
3.1 NamesrvStartup
NamesrvStartup类,基本的流程见注释。
1 | public static NamesrvController main0(String[] args) { |
进到createNamesrvController(args)方法中看一下具体流程
1 | public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { |
以上一段代码是启动namesrv的关键代码部分,细节的地方暂时先不考究,比如利用commandLine命令行工具读取配置文件,如何装配两个配置对象等等,这边我的关注点主要流程,即在此步我们完成了两个配置文件的装配工作以及控制器对象的创建工作,在看源码的过程中再加入一些猜测,其实就可以猜到后面的步骤应该就是初始化控制器,启动控制器并且后面MQ各个部分通信肯定是要通过网络的方式进行的,也就是启动Netty。
再看接下去的一步start(controller)
1 | public static NamesrvController start(final NamesrvController controller) throws Exception { |
虽然这步的函数名是start,但是里面主要是做了控制器初始化、控制器优雅停止以及控制器启动三步,具体的步骤还要看红色方框中的第二个重要的类NamesrvController。
到此为止,继续跟踪到NamesrvController中对这三个步骤的具体实现。注意,控制性优雅停止这边使用的是钩子线程,这个不是本篇的重点,暂且可以认为是在JVM关闭时(以下五种场景会被调用 a.程序正常退出使用。b.System.exit()。c.终端使用Ctrl+C触发的中断。d.系统关闭。e.使用Kill pid命令干掉进程。)触发这个线程,来进行一些资源的回收工作,可以参考这篇博客。
3.2 NamesrvController
NamesrvController类中的构造函数
1 | public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { |
NamesrvController类中的controller.initialize()方法
1 | public boolean initialize() { |
总结一下,initialize()方法中主要是KV配置的加载,创建远程调用对象,后续用来调用netty,创建两个定时任务:监控broker路由信息以及KV信息。
shutdown():主要用于回收线程池、netty连接等资源。
1 | public void shutdown() { |
start():开启NettyRemotingServer服务,也就是netty服务开启。
1 | public void start() throws Exception { |
至此为止,nameserver启动完毕,整体分析思路都是按照一开始的流程图进行,也很容易理解。
3.3 RouteInfoManager
在NamesrvController的构造函数中,其中有一个RouteInfoManager类的创建,用于管理路由信息,也就是broker与nameserver之间的信息,来看看都保存了哪些信息。
1 | /**成员变量*/ |
topicQueueTable
首先要知道的是,一个Topic拥有多个消息队列,如果不指定队列的数量,一个Broker会为每个Topic创建4个读队列和4个写队列。key是topic的名字,value值是List
1 | private String brokerName; //broker名称 |
brokerAddrTable
key是brokerName,value是BrokerData对象,其中包括内容
1 | private String cluster; // 集群名称 |
从上一篇的结构图中可以看出,rocketMQ主从模式允许有一台主服务器,也就是Master,多台从服务器,也就是slavers,其中它们的brokerName是一样的,但是brokerId确实不同的,默认0是主服务器的id,从服务器id从1开始递增即可。
clusterAddrTable
key是集群名,value是brokerName的集合。
brokerLiveTable
key是broker地址,value是BrokerLiveInfo对象,其中包括
1 | private long lastUpdateTimestamp; //最后的更新时间 |
filterServerTable
key是brokerAddr,也就是broker地址,value是过滤的服务,因为一个broker可能会过滤多个服务,所以对应的value是一个集合。
在了解了RouteInfoManager中保存的一些信息之后,我们知道broker会通过定时任务定时发送心跳包,这块先暂时不分析,本篇简单说一下当之前启动的netty服务监听到心跳包后,如何通过broker发送的心跳包来更新nameserver中的这些信息的。
1 | public RegisterBrokerResult registerBroker( |
清除不活跃的broker。在3.2节的时候我们就有讲到在NameServer控制器初始化的时候就会通过定时任务删除不活跃的broker,也就是下面的这个函数
1 | public void scanNotActiveBroker() { |
遍历brokerLiveTable,通过最后更新的时间来决定是否清除这个broker
1 | private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; |
当超过2分钟没有更新,则移除它并执行销毁逻辑,也就是用onChannelDestroy()方法从其他hashmap中移除掉这台服务器。
补充一点:RocketMQ中有两个触发点来触发路由删除,一个就是上面所说的定时扫描brokerLiveTable来进行判断,若超过两分钟没有更新活跃的时间,则删除改broker。还有一个在broker正常关闭的时候,会执行unregisterBroker执行。但是两者的删除方式都是一样的,都是从路由表中删除与该broker相关的信息。
注意,这边总结一下:每隔30秒broker向nameserver发送一次心跳包,nameserver接收到心跳包之后更新自己的路由信息,且nameserver每隔10s扫描一次brokerLiveTable,去清除brokerId和brokerAddr不符的broker或者是那些已经超过两分钟没有活跃的broker。
在nameserver接收到每一个心跳包后,将更新brokerLiveTable中关于Broker的状态信息以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable)。更新上述路由表使用了锁粒度较少的读写锁,允许多个消息发送者并发读,保证消息发送时的高并发。但同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行。这也是读写锁经典的使用场景。
参考