Dubbo之SPI

前言


Dubbo中的SPI机制,全称为 Service Provider Interface,是一种服务发现机制。是提供给扩展者使用的一种机制,比方说可以载入Dubbo中的各种可配置组件,比如:动态代理方式(ProxyFactory)、负载均衡策略(LoadBalance)、RCP协议(Protocol)、拦截器(Filter)、容器类型(Container)、集群方式(Cluster)和注册中心类型(RegistryFactory)等,增强了JDK 的SPI,使得其在最大程度的解耦。比较类似于Spring中IoC的思想。关于Spring的IoC的具体分析留到后面学习Spring源码的时候再做分析,这边主要看的是Dubbo中SPI机制的实现。另外,我们知道jdk本身就有SPI,那么Dubbo中对SPI是如何实现的。另外补充一点,SPI是一种破坏双亲委派机制的做法,关于双亲委派机制可以看本篇第四节的部分,所以说一种机制的出现肯定有好也有坏,关键看的是在何种情况下的使用。

1、两种SPI的简单用法

先来简单的看一下jdk的spi与dubbo中的spi如何使用,这边引用官方的例子。

jdk SPI

首先是一个接口和俩个实现类

1
2
3
public interface Robot {
void sayHello();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class OptimusPrime implements Robot {

@Override
public void sayHello() {
System.out.println("Hello, I am Optimus Prime.");
}
}

public class Bumblebee implements Robot {

@Override
public void sayHello() {
System.out.println("Hello, I am Bumblebee.");
}
}

接下来 META-INF/services 文件夹下创建一个文件,名称为 Robot 的全限定名 org.apache.spi.Robot。文件内容为实现类的全限定的类名,如下:

1
2
org.apache.spi.OptimusPrime
org.apache.spi.Bumblebee

测试

1
2
3
4
5
6
7
8
public class JavaSPITest {
@Test
public void sayHello() throws Exception {
ServiceLoader<Robot> serviceLoader = ServiceLoader.load(Robot.class);
System.out.println("Java SPI");
serviceLoader.forEach(Robot::sayHello);
}
}

Java SPI

Hello, I am Optimus Prime.

Hello, I am Bumblebee.

Dubbo SPI

同样的,Dubbo SPI 所需的配置文件需放置在 META-INF/dubbo 路径下,配置内容如下,是通过一种键值对的方式进行配置的。另外,在测试 Dubbo SPI 时,需要在 Robot 接口上标注 @SPI 注解。

1
2
optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee

测试

1
2
3
4
5
6
7
8
9
10
11
public class DubboSPITest {
@Test
public void sayHello() throws Exception {
ExtensionLoader<Robot> extensionLoader =
ExtensionLoader.getExtensionLoader(Robot.class);
Robot optimusPrime = extensionLoader.getExtension("optimusPrime");
optimusPrime.sayHello();
Robot bumblebee = extensionLoader.getExtension("bumblebee");
bumblebee.sayHello();
}
}

Dubbo SPI

Hello, I am Optimus Prime.

Hello, I am Bumblebee.

对比两种SPI,给出一个比较官方的对比,Dubbo比jdk的SPI优越之处在于

  • JDK标准的SPI会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。

  • 如果扩展点加载失败,连扩展点的名称都拿不到了。比如:JDK标准的ScriptEngine,通过getName();获取脚本类型的名称,但如果RubyScriptEngine因为所依赖的jruby.jar不存在,导致RubyScriptEngine类加载失败,这个失败原因被吃掉了,和ruby对应不起来,当用户执行ruby脚本时,会报不支持ruby,而不是真正失败的原因。

  • 增加了对扩展点IoC和AOP的支持,一个扩展点可以直接setter注入其它扩展点。

总结一下就是两点:提高了效率和性能节约资源、增加了功能。

2、从源码层面进行分析

jdk SPI

java原生的SPI的类存在于java.util.ServiceLoader 目录下,从ServiceLoader.load(Robot.class);进入进行分析。

在此之前,要先知道一个ServiceLoader类下关于位置的静态变量PREFIX,这边可以看到已经是写死的一个位置,也就是只能加载该目录下的文件。

1
private static final String PREFIX = "META-INF/services/";
1
2
3
4
5
6
7
8
9
// 调用load方法
public static <S> ServiceLoader<S> load(Class<S> service) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return ServiceLoader.load(service, cl);
}
// 获得类加载器c1之后,把之当作参数传入,继续调用 ServiceLoader.load(service, cl);
public static <S> ServiceLoader<S> load(Class<S> service, ClassLoader loader) {
return new ServiceLoader<>(service, loader);
}
1
2
3
4
5
6
7
8
9
10
11
private ServiceLoader(Class<S> svc, ClassLoader cl) {
service = Objects.requireNonNull(svc, "Service interface cannot be null");
loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
reload();
}
// 构造 LazyIterator
public void reload() {
providers.clear();
lookupIterator = new LazyIterator(service, loader);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
private class LazyIterator
implements Iterator<S>
{
Class<S> service;
ClassLoader loader;
Enumeration<URL> configs = null;
Iterator<String> pending = null;
String nextName = null;

private LazyIterator(Class<S> service, ClassLoader loader) {
this.service = service;
this.loader = loader;
}

// 判断是否有下一个 Service,
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
// 构造全名称,前缀 + 服务全名称,这边也就是META-INF/services/org.apache.spi.Robot找到这个文件
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
pending = parse(service, configs.nextElement());
}
nextName = pending.next();
return true;
}
// 获取下一个service
private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
// 通过反射,获取目标类
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,
"Provider " + cn + " not a subtype");
}
try {
// 并将其加载到 providers 列表中,就是一个map,这边可以看到,已经用newInstance()方法创建出了新的对象,这也是为什么jdk spi会实例化所有扩展点的原因。
S p = service.cast(c.newInstance());
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error(); // This cannot happen
}

// 底层调用 hasNextService()
public boolean hasNext() {
if (acc == null) {
return hasNextService();
} else {
PrivilegedAction<Boolean> action = new PrivilegedAction<Boolean>() {
public Boolean run() { return hasNextService(); }
};
return AccessController.doPrivileged(action, acc);
}
}

// 底层调用 nextService()
public S next() {
if (acc == null) {
return nextService();
} else {
PrivilegedAction<S> action = new PrivilegedAction<S>() {
public S run() { return nextService(); }
};
return AccessController.doPrivileged(action, acc);
}
}

public void remove() {
throw new UnsupportedOperationException();
}
}

总结一下,jdk的spi虽然也使用了延时加载,将服务加载的这个动作延迟到使用服务的时候,但是由于LazyIterator类实现了Iterator接口,所以在使用的时候只能通过遍历来全部获取,也就是接口的实现类全部加载并实例化一遍(实例化是在nextService()函数中出现的)。如果有些实现类你是不需要的,但是仍然会被实例化,这就会造成浪费。而且迭代器并不能直接通过某个参数来直接获取对应的实现类。这便是jdk的缺点所在。

Dubbo SPI

ExtensionLoader的源码位于 com.alibaba.dubbo.common.extension 包下,详细分析以上测试代码的几句话在源码层面到底干了啥。

首先,获取ExtensionLoader实例:ExtensionLoader.getExtensionLoader(Robot.class);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
// 判断是否通过SPI注解定义的可扩展接口,也就是下面的那个函数。若不是则抛出异常。
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}

// 先从EXTENSION_LOADERS中,根据传入可扩展类型type查找,也就是可扩展的接口,如Filter、Transporter等。这边EXTENSION_LOADERS是一个map。
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
// 如果没有在EXTENSION_LOADERS中找到的话,new一个ExtensionLoader实例
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}

private static <T> boolean withExtensionAnnotation(Class<T> type) {
return type.isAnnotationPresent(SPI.class);
}

// ExtensionLoader实例的构造函数
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

ExtensionLoader在这边类似于工厂模式,提供了私有的构造器,其入参type为扩展接口类型。Dubbo通过SPI注解定义了可扩展的接口,如Filter、Transporter等。每个类型的扩展对应一个ExtensionLoader。SPI的value参数决定了默认的扩展实现。

比如,以负载均衡为例。

SPI负载均衡

文件下配置了所有负载均衡的方式,也就是之前博客中提到的几种负载均衡机制。

1
2
3
4
random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance

加载方式也就是 ExtensionLoader.getExtensionLoader(LoadBalance.class)

接下去,可以通过extensionLoader.getExtension("optimusPrime");获取到你需要的类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension();
}
// 先从缓存中取相应的扩展实现类实例
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
// double check
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 创建相应的扩展实现类实例
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}

private T createExtension(String name) {
// 通过这句来获得文件中对应的类路径下的名称
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}

这边判断是否有实例instance的时候,用到了double check,既保证效率也能保证线程的安全性,是一个亮点。

getExtensionClasses()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 这边看看如何获得到MATE-INF文件下的内容的
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}

// synchronized in getExtensionClasses
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) cachedDefaultName = names[0];
}
}

/*** 这边加载三个位置的文件,分别是
private static final String SERVICES_DIRECTORY = "META-INF/services/";
private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
***/
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadDirectory(extensionClasses, DUBBO_DIRECTORY);
loadDirectory(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}

loadDirectory()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir) {
String fileName = dir + type.getName();
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
// 逐行读取配置文件,提取出扩展名或扩展类路径
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
loadResource(extensionClasses, classLoader, resourceURL);
}
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", description file: " + fileName + ").", t);
}
}

private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), "utf-8"));
try {
String line;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
// 用Class.forName方法进行类加载
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
}
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", class file: " + resourceURL + ") in " + resourceURL, t);
}
}

private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + "is not subtype of interface.");
}
// 处理Adaptive注解,若存在则将该实现类保存至cachedAdaptiveClass属性
if (clazz.isAnnotationPresent(Adaptive.class)) {
if (cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}
} else if (isWrapperClass(clazz)) {
// 尝试获取参数类型为当前扩展类型的构造器方法,若成功则表明存在该扩展的封装类型,将封装类型存入wrappers集合
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
} else {
clazz.getConstructor();
if (name == null || name.length() == 0) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
// 处理active注解,将扩展名对应active注解存入cachedActivates
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
for (String n : names) {
if (!cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}

整理一下整个流程

0、首先初始化extensionLoader,也就是ExtensionLoader.getExtensionLoader(Robot.class);这步做的事情。

1、想要获取到对应的名称的类,调用extensionLoader.getExtension("optimusPrime")方法。

具体步骤

1.1 先从缓存cachedInstances中取相应的扩展实现类实例,如果没有则new一个,并放入缓存中。

1.2 用double check的方法判断是否已经获得了实例,这边保证了线程的安全性。如果没有则创建相应的扩展实现类实例:instance = createExtension(name)

2、进入到createExtension(name)函数中。

具体步骤

2.1 获得文件中对应的类路径下的名称,具体的方法是Class<?> clazz = getExtensionClasses().get(name)

2.2 从EXTENSION_INSTANCES中取得对应的instance并返回,如果没有,则通过反射newInstance()方法生成一个对象,当然这个对象就是指定name的对象。然后保存在EXTENSION_INSTANCES这个map中,方便下次使用,而不用重新创建了。

3、再进入到getExtensionClasses().get(name)中看如何根据name获得到对应的配置文件。(如name是某个具体负载均衡方法的名字)

具体步骤

3.1 判断缓存cachedClasses中有没有加载过。这里同样的也是用了double check。

3.2 如果没有加载过,也就是缓存中不存在,则调用loadExtensionClasses()方法加载文件。

4、进入到loadExtensionClasses()方法中。

具体步骤

4.1 里面主要是加载了三个位置的文件,并返回。其中加载文件的方法是loadDirectory(Map<String, Class<?>> extensionClasses, String dir)

5、进入到loadDirectory()中。

具体步骤

5.1 主要是通过loadResource(extensionClasses, classLoader, resourceURL)方法,该方法中主要是通过Class.forName方法进行类加载。

6、类加载的时候,有以下几步

6.1 处理Adaptive注解,若存在则将该实现类保存至cachedAdaptiveClass属性

6.2 尝试获取参数类型为当前扩展类型的构造器方法,若成功则表明存在该扩展的封装类型,将封装类型存入wrappers集合

6.3 处理active注解,将扩展名对应active注解存入cachedActivates

至此,就很容易解释为什么dubbo能通过这种key、value的形式获取一个指定实现类的对象,而不像jdk spi那样一次性加载所有类的对象。主要的原因就是它是将所有类加载保存在一个map中,通过指定的key去获取到对应的class,在获取到之后才会对这个key的class进行反射,创建出一个对象,而不是和jdk spi一样,一下子将所有对象都创建出来。

3、仿Dubbo SPI机制

具体的流程我仿照dubbo spi写了个demo,有利于加深理解,可以访问该地址参考。


参考:

http://dubbo.apache.org/zh-cn/docs/source_code_guide/dubbo-spi.html

https://www.jianshu.com/p/7daa38fc9711