Dubbo之集群容错前导篇(上)

前言


在Dubbo集群容错分析之前,要先有一个宏观的了解,集群容错源码分别是服务目录 Directory、服务路由 Router、集群 Cluster和负载均衡LoadBalance四个部分,这个在官网的博客中有简单的介绍,本篇就结合官网的介绍来简单介绍一下服务目录 Directory部分。

ps:以下所有的源码分析都是基于Dubbo2.6.6版本

Directory

1、简介

Directory也成为服务目录,我们知道,在服务消费者发送请求之前,我们需要知道它要往哪些服务提供者上发送请求,通过服务目录,服务消费者就能够获取到服务提供者的信息,就能够通过Netty进行相关函数的远程调用,所以也可以说,服务目录的功能和注册中心的功能是类似的,只不过它是将注册中心的每条服务提供者的信息封装成了一个Invoker对象, 在服务提供者发生变化的时候,服务目录也会随着注册中心动态更新。

官网上有这么一句话,Directory 代表多个 Invoker,可以把它看成 List ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更。注意,这边说的是可能是动态变化的,为什么呢?因为Directory有两种,一种是静态服务目录[StaticDirectory],它是静态的,也就是不会动态变化的。另一个是RegistryDirectory,它是一个动态的服务目录。这边给出一张官网的继承体系图。

服务目录继承体系图

好像用的比较多的是RegistryDirectoryStaticDirectory用得比较少,主要用在服务对多注册中心的引用。

2、源码分析

AbstractDirectory源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}

AbstractDirectory里面最重要的一个函数就是list函数,返回的是一个List<Invoker>对象,也就是返回服务目录。那么它是如何得到的呢,主要是通过两个主要方法

1.doList(invocation);方法:主要用来获得Invoker列表,它是由具体的子类实现的。

2.router.route(invokers, getConsumerUrl(), invocation);:进行服务路由。

好了,简单说完了AbstractDirectory,就来说说它的两个子类的实现。第一个StaticDirectory

StaticDirectory源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public StaticDirectory(List<Invoker<T>> invokers) {
this(null, invokers, null);
}

public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers) {
this(null, invokers, routers);
}

public StaticDirectory(URL url, List<Invoker<T>> invokers) {
this(url, invokers, null);
}

public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);
if (invokers == null || invokers.isEmpty())
throw new IllegalArgumentException("invokers == null");
this.invokers = invokers;
}

@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {

return invokers;
}

StaticDirectory的实现很简单,因为是静态的服务目录,所以在构造函数中就把invokers对象传进去了,后面也不能做动态的变化。doList()函数也就是很简单的把invokers对象返回出去。

RegistryDirectory源码分析

首先它实现了NotifyListener接口。当注册中心服务配置发生变化后,RegistryDirectory可收到与当前服务相关的变化,也就是它之所以能根据注册中心动态变化的根源所在。收到变更通知后,RegistryDirectory 可根据配置变更信息刷新 Invoker 列表。RegistryDirectory 中有几个比较重要的逻辑,第一是 Invoker 的列举逻辑,第二是接收服务配置变更的逻辑,第三是 Invoker 列表的刷新逻辑。接下来按顺序对这三块逻辑。

doList()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}
List<Invoker<T>> invokers = null;
// methodInvokerMap对象,是类中的一个成员变量,也就是Invoker的本地缓存,后面的List<Invoker<T>>也就是从这里读出来的
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
// 通过 方法名 + 第一个参数名称 查询 Invoker 列表
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
}
if (invokers == null) {
// 通过方法名获取 Invoker 列表
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
// 通过星号 * 获取 Invoker 列表
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
// 冗余逻辑,pull request #2861 移除了下面的 if 分支代码
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}

和静态服务目录的doList()方法不同,RegistryDirectory中的doList()方法就复杂多了。主要获取逻辑就是从methodInvokerMap对象中把Invoker读取出来就行了,读取逻辑是根据key = 方法名+参数的各种组合进行map的映射,也就是methodNameargs对象。至于methodInvokerMap是啥,后面再分析一下,这边我猜测应该是所有在注册中心注册服务的服务提供者的方法和对应的List<Invoker>,所以服务消费者只要根据key就能把List映射下来。

notify()方法

上面提到了它实现了NotifyListener接口,它是为了获取注册中心的变更通知,这边看一下它的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
public synchronized void notify(List<URL> urls) {
// 定义三个集合,分别用于存放服务提供者 url,路由 url,配置器 url
List<URL> invokerUrls = new ArrayList<URL>(); // 服务提供者 url,后面刷新也是刷新的这个
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls); // 刷新服务提供者 url
}

下面来看刷新函数

refreshInvoker(invokerUrls)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void refreshInvoker(List<URL> invokerUrls) {
// 禁用服务,关闭所有Invoker
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access 禁止访问
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers 关闭所有Invoker
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
// 将 url列表转成 Invoker列表。这边最后得到的是<url, Invoker> 映射关系表
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
// 将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射。这边得到的是方法名到 Invoker 列表的映射关系
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
// 合并多个组的 Invoker
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
// 销毁无用的Invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}

当注册中心有变化的时候,更新两个值,一个是methodInvokerMap,另一个是urlInvokerMap。上面在doList()中有对methodInvokerMap对象读的操作,这边是对methodInvokerMap对象写的操作。

这边涉及到的三个比较主要的函数这边就不分析了,具体的分析官方文档给出了详细的说明,我的理解是

1
2
3
4
5
6
7
8
// 返回值对应<url, Invoker>
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
// 返回值对应<方法名, List<Invoker<T>>>
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
// 返回值对应<方法名, List<Invoker<T>>>,但是这边是对group进行了分组
toMergeMethodInvokerMap(newMethodInvokerMap)`;
// 最后销毁无用Invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);

这四步是紧密关联的,最终动态改变成员变量methodInvokerMap