RocketMQ源码学习之NameServer

前言


NameServer是RocketMQ中一个十分重要的概念,还记得上一篇中提到启动消息队列的步骤吗,就是先启动NameServer,然后再启动Broker。NameServer是一个简单的路由注册中心,也就是相当于之前dubbo中ZK的地位,在开始本篇之前,先提出几个问题:

  • NameServer在整个架构中的作用是什么
  • NameServer中存储了Broker中的哪些信息
  • NameServer与Producer以及Consumer之前是什么关系

一、基础概念

官方文档这么介绍NameServer

NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。

按照我的理解总结一下:NameServer的作用是解耦Broker、Producer、Consumer三者。由Broker提供路由信息表,供生产者提供对应Topic的消息,消费者消费对应Topic的消息。

写到这边我有一个疑问,为什么rocketMQ不直接使用ZK,而要自己造轮子,单独弄了个NameServer呢?结合找到的一些资料以及自己的理解,总结原因:其实在rocketMQ的早期版本是用到ZK来作为路由注册中心的,但是由于ZK太重了,而rocketMQ中只需要使用到一些简单的功能,比如topic的路由以及一些KV配置的管理,所以才有了这种轻量级的NameServer,也能减少运维成本。

二、启动过程

还记得在当时启动消息队列的时候先执行的命令

1
nohup sh mqnamesrv &

看看里面到底做了什么操作

发现mqnamesrv中最后一句

1
sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@

很容易发现,这是调用了org.apache.rocketmq.namesrv.NamesrvStartup这个启动类啊,runserver.sh中大多是一些虚拟机配置等等,就不多说了,直接找到源码来看一下这个类。

nameserver启动类

三、源码分析

整个NameServer启动涉及到的类并不是很多,先给出一个总的调用框架图

NameServer调用框架图

先有个宏观的认识,首先创建两个配置类:namesrvConfignettyServerConfig,通过都配置文件的方式将配置内容存放在这两个类中,很显然,一个是用来做nameserver的配置,一个用于netty的启动。接着new出一个nameserver的控制器,接下去就是初始化这个控制器,控制器启动,Netty启动。其中主要用到了三个关键的类

namesrv关键类

NamesrvStartupNamesrvControllerRouteInfoManager分别为启动类、核心控制器类以及存放路由信息的类。

3.1 NamesrvStartup

NamesrvStartup类,基本的流程见注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static NamesrvController main0(String[] args) {
try {
// 填充了namesrvConfig和nettyServerConfig两个配置文件,并将其二作为参数创建Nameserver控制器对象
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}

进到createNamesrvController(args)方法中看一下具体流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
/**
这段主要是根据运行时传递的参数生成commandLine命令行对象
*/
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 这边创建两个配置的对象,也就是之前提及的namesrvConfig和nettyServerConfig。并通过读取配置文件的方式来初始化这两个对象
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
// 输入流读取文件
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
// 装载配置
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);

namesrvConfig.setConfigStorePath(file);

System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}

if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 配置日志信息
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 通过之前初始化的namesrvConfig和nettyServerConfig创建NamesrvController控制器对象
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

return controller;
}

以上一段代码是启动namesrv的关键代码部分,细节的地方暂时先不考究,比如利用commandLine命令行工具读取配置文件,如何装配两个配置对象等等,这边我的关注点主要流程,即在此步我们完成了两个配置文件的装配工作以及控制器对象的创建工作,在看源码的过程中再加入一些猜测,其实就可以猜到后面的步骤应该就是初始化控制器,启动控制器并且后面MQ各个部分通信肯定是要通过网络的方式进行的,也就是启动Netty。

再看接下去的一步start(controller)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static NamesrvController start(final NamesrvController controller) throws Exception {

if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 控制器初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 控制器优雅停止
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 控制器启动
controller.start();

return controller;
}

虽然这步的函数名是start,但是里面主要是做了控制器初始化、控制器优雅停止以及控制器启动三步,具体的步骤还要看红色方框中的第二个重要的类NamesrvController

到此为止,继续跟踪到NamesrvController中对这三个步骤的具体实现。注意,控制性优雅停止这边使用的是钩子线程,这个不是本篇的重点,暂且可以认为是在JVM关闭时(以下五种场景会被调用 a.程序正常退出使用。b.System.exit()。c.终端使用Ctrl+C触发的中断。d.系统关闭。e.使用Kill pid命令干掉进程。)触发这个线程,来进行一些资源的回收工作,可以参考这篇博客

3.2 NamesrvController

NamesrvController类中的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
// nameserver配置文件,传入的参数
this.namesrvConfig = namesrvConfig;
// netty配置文件,传入的参数
this.nettyServerConfig = nettyServerConfig;
// KV配置管理创建,配置文件以json的格式存放,位置是/namesrv/kvConfig.json,
this.kvConfigManager = new KVConfigManager(this);
// 路由信息管理创建(下文详细说,比较重要)
this.routeInfoManager = new RouteInfoManager();
// broker连接事件处理服务,处理Broker连接发生变化的服务,判断一个broker是否存活,就像是ZK中的心跳检测
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

NamesrvController类中的controller.initialize()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean initialize() {
// KV配置管理加载
this.kvConfigManager.load();
// 创建远程服务对象,和netty相关
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

this.registerProcessor();

// 定时任务创建:从方法名就能看出这步的作用是管理broker的路由信息,扫描不活跃的broker,定期清除。在5s后开始执行,10s执行一次。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 定时任务创建:在1分钟后开始执行,每10分钟打印一次KV配置信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
/**......省略*/

return true;
}

总结一下,initialize()方法中主要是KV配置的加载,创建远程调用对象,后续用来调用netty,创建两个定时任务:监控broker路由信息以及KV信息。

shutdown():主要用于回收线程池、netty连接等资源。

1
2
3
4
5
6
7
8
9
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
this.scheduledExecutorService.shutdown();

if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}

start():开启NettyRemotingServer服务,也就是netty服务开启。

1
2
3
4
5
6
7
public void start() throws Exception {
this.remotingServer.start();

if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}

至此为止,nameserver启动完毕,整体分析思路都是按照一开始的流程图进行,也很容易理解。

3.3 RouteInfoManager

在NamesrvController的构造函数中,其中有一个RouteInfoManager类的创建,用于管理路由信息,也就是broker与nameserver之间的信息,来看看都保存了哪些信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**成员变量*/
// topic消息队列路由表信息,用来保存topic对应的broker名称等。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// broker结点信息。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// Broker集群信息,存储了集群中所有的Brokername。
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker状态信息,Nameserver每次收到Broker的心跳包就会更新该信息。
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 维护broker地址与Filter列表关系
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
/**构造函数*/
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}

topicQueueTable

首先要知道的是,一个Topic拥有多个消息队列,如果不指定队列的数量,一个Broker会为每个Topic创建4个读队列和4个写队列。key是topic的名字,value值是List,简单看一下QueueData类包括哪些内容

1
2
3
4
5
private String brokerName;  //broker名称
private int readQueueNums; // 可读的队列数
private int writeQueueNums; //可写的队列数
private int perm; // 读写权限
private int topicSynFlag; // 同步标志

brokerAddrTable

key是brokerName,value是BrokerData对象,其中包括内容

1
2
3
private String cluster;  // 集群名称
private String brokerName; // brokerName
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; // 存储同一个brokerName下的所有主从设备的id和地址

从上一篇的结构图中可以看出,rocketMQ主从模式允许有一台主服务器,也就是Master,多台从服务器,也就是slavers,其中它们的brokerName是一样的,但是brokerId确实不同的,默认0是主服务器的id,从服务器id从1开始递增即可。

clusterAddrTable

key是集群名,value是brokerName的集合。

brokerLiveTable

key是broker地址,value是BrokerLiveInfo对象,其中包括

1
2
3
4
private long lastUpdateTimestamp;  //最后的更新时间
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;

filterServerTable

key是brokerAddr,也就是broker地址,value是过滤的服务,因为一个broker可能会过滤多个服务,所以对应的value是一个集合。

在了解了RouteInfoManager中保存的一些信息之后,我们知道broker会通过定时任务定时发送心跳包,这块先暂时不分析,本篇简单说一下当之前启动的netty服务监听到心跳包后,如何通过broker发送的心跳包来更新nameserver中的这些信息的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// 获得当前集群下所有的brokerNames
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
// 初始化
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);

boolean registerFirst = false;
// 获取该brokerName下的BrokerData对象
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
// 初始化
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
// 获得该brokerName下所有机器,也就是主从设备的broker id和对应的机器地址
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
// 遍历当前brokerName下的机器
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
// 如果发送心跳包的这台服务器在nameserver的brokerAddrTable中的id发生改变,则移除
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
// 判断是否已经注册过,也可以说是没有发生变化
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// 如果brokerid是0,也就是该服务器是主服务器,则创建topic队列
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新brokerLiveTable信息
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
// 更新filterServerTable信息
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 当前发送心跳包的broker是从服务器,则保存到result中并返回
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}

return result;
}

清除不活跃的broker。在3.2节的时候我们就有讲到在NameServer控制器初始化的时候就会通过定时任务删除不活跃的broker,也就是下面的这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
// key为broker地址
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}

遍历brokerLiveTable,通过最后更新的时间来决定是否清除这个broker

1
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

当超过2分钟没有更新,则移除它并执行销毁逻辑,也就是用onChannelDestroy()方法从其他hashmap中移除掉这台服务器。

补充一点:RocketMQ中有两个触发点来触发路由删除,一个就是上面所说的定时扫描brokerLiveTable来进行判断,若超过两分钟没有更新活跃的时间,则删除改broker。还有一个在broker正常关闭的时候,会执行unregisterBroker执行。但是两者的删除方式都是一样的,都是从路由表中删除与该broker相关的信息。

注意,这边总结一下:每隔30秒broker向nameserver发送一次心跳包,nameserver接收到心跳包之后更新自己的路由信息,且nameserver每隔10s扫描一次brokerLiveTable,去清除brokerId和brokerAddr不符的broker或者是那些已经超过两分钟没有活跃的broker。

在nameserver接收到每一个心跳包后,将更新brokerLiveTable中关于Broker的状态信息以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable)。更新上述路由表使用了锁粒度较少的读写锁,允许多个消息发送者并发读,保证消息发送时的高并发。但同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行。这也是读写锁经典的使用场景。


参考

https://cloud.tencent.com/developer/article/1374005

https://juejin.im/post/5c98c5246fb9a070ee4296fd#heading-1