Dubbo学习之SPI机制
相关阅读
简介
本文基于Spring Boot 2.6.6
,dubbo-spring-boot-starter 3.0.6
环境。
SPI全称为Service Provider Interface
,是一种服务发现机制。SPI的本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类。这样可以在运行时,动态为接口替换实现类。正因此特性,我们可以很容易的通过SPI机制为我们的程序提供拓展功能。
SPI机制在第三方框架中也有所应用,比如Dubbo就是通过SPI机制加载所有的组件。不过,Dubbo并未使用Java原生的SPI机制,而是对其进行了增强,使其能够更好的满足需求。
本文将从以下角度介绍SPI机制;
- Java SPI;
- Dubbo SPI;
- Dubbo SPI自适应拓展;
Java SPI
示例
首先定义一个接口demo.spi.Person
,代码如下:
public interface Person {
void saySomething();
}
然后定义两个实现类,demo.spi.Student
、demo.spi.Teacher
,代码如下:
public class Student implements Person {
@Override
public void saySomething() {
System.out.println("I'm a student");
}
}
public class Teacher implements Person {
@Override
public void saySomething() {
System.out.println("I'm a teacher");
}
}
接下来,在项目META-INF/services/
目录下创建一个文件,文件名为接口Person
的全限定名demo.spi.Person
,文件内容为实现类的全限定名,具体如下:
demo.spi.Student
demo.spi.Teacher
准备好以上内容后,便可以编写代码进行测试Java SPI机制,测试代码如下:
public class SpiTest {
public static void main(String[] args) {
ServiceLoader<Person> serviceLoader = ServiceLoader.load(Person.class);
serviceLoader.forEach(Person::saySomething);
}
}
测试结果如下:
I'm a student
I'm a teacher
Dubbo SPI
简介
Dubbo并未使用Java SPI,而是重新实现了一套功能更强的SPI机制。Dubbo SPI的相关逻辑被封装在了ExtensionLoader
类中,通过ExtensionLoader
,我们可以加载指定的实现类。与Java SPI实现类配置不同,Dubbo SPI是通过键值对的方式进行配置,这样我们可以按需加载指定的实现类,而且Dubbo SPI中,支持文件路径如下:
META-INF/dubbo/internal/
;META-INF/dubbo/
;META-INF/services/
;
优先级越来越高,即后者定义的内容可以覆盖前者的内容;
示例
首先定义一个接口demo.dubbo.spi.Person
,需要标注@SPI
,代码如下:
@SPI
public interface Person {
void saySomething();
}
然后定义两个实现类,demo.dubbo.spi.Student
、demo.dubbo.spi.Teacher
,代码如下:
public class Student implements Person {
@Override
public void saySomething() {
System.out.println("I'm a student");
}
}
public class Teacher implements Person {
@Override
public void saySomething() {
System.out.println("I'm a teacher");
}
}
接下来,在项目META-INF/dubbo/
目录下创建一个文件,文件名为接口Person
的全限定名demo.dubbo.spi.Person
,文件内容为实现类的全限定名,具体如下:
student=demo.dubbo.spi.Student
teacher=demo.dubbo.spi.Teacher
准备好以上内容后,便可以编写代码进行测试Java SPI机制,测试代码如下:
public class SpiTest {
public static void main(String[] args) {
ExtensionLoader<Person> extensionLoader =
ApplicationModel.defaultModel().getDefaultModule().getExtensionLoader(Person.class);
Person student = extensionLoader.getExtension("student");
student.saySomething();
Person teacher = extensionLoader.getExtension("teacher");
teacher.saySomething();
}
}
测试结果如下:
I'm a student
I'm a teacher
简析
首先分析ExtensionLoader
的创建,代码如下:
// ExtensionAccessor.java
default <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
return this.getExtensionDirector().getExtensionLoader(type);
}
// ExtensionDirector.java
public <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
checkDestroyed();
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
}
if (!withExtensionAnnotation(type)) {
// 如果type上没有SPI注解,则抛出异常
throw new IllegalArgumentException("Extension type (" + type +
") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
}
// 此处实现了缓存
// 先从本地缓存中查找
ExtensionLoader<T> loader = (ExtensionLoader<T>) extensionLoadersMap.get(type);
ExtensionScope scope = extensionScopeMap.get(type);
if (scope == null) {
// 缓存中不存在,则需要根据type解析获取
SPI annotation = type.getAnnotation(SPI.class);
scope = annotation.scope();
// 缓存,方便下次直接获取
extensionScopeMap.put(type, scope);
}
if (loader == null && scope == ExtensionScope.SELF) {
// 创建实例
loader = createExtensionLoader0(type);
}
// 不存在,则尝试从父类中查找,若存在父类的话
if (loader == null) {
if (this.parent != null) {
loader = this.parent.getExtensionLoader(type);
}
}
// 还不存在,则创建实例
if (loader == null) {
loader = createExtensionLoader(type);
}
return loader;
}
private static boolean withExtensionAnnotation(Class<?> type) {
return type.isAnnotationPresent(SPI.class);
}
private <T> ExtensionLoader<T> createExtensionLoader(Class<T> type) {
ExtensionLoader<T> loader = null;
// scope是否匹配
if (isScopeMatched(type)) {
// 匹配,则创建ExtensionLoader
loader = createExtensionLoader0(type);
} else {
// 不匹配,则返回null
}
return loader;
}
private <T> ExtensionLoader<T> createExtensionLoader0(Class<T> type) {
checkDestroyed();
ExtensionLoader<T> loader;
// 创建ExtensionLoader实例并放入缓存
extensionLoadersMap.putIfAbsent(type, new ExtensionLoader<T>(type, this, scopeModel));
loader = (ExtensionLoader<T>) extensionLoadersMap.get(type);
return loader;
}
继续分析ExtensionLoader
的初始化过程,代码如下:
ExtensionLoader(Class<?> type, ExtensionDirector extensionDirector, ScopeModel scopeModel) {
this.type = type;
this.extensionDirector = extensionDirector;
this.extensionPostProcessors = extensionDirector.getExtensionPostProcessors();
initInstantiationStrategy();
// 用于查找依赖,以便注入到创建的实例
// 此处也使用了Dubbo SPI机制
this.injector = (type == ExtensionInjector.class ? null : extensionDirector.getExtensionLoader(ExtensionInjector.class)
.getAdaptiveExtension());
this.activateComparator = new ActivateComparator(extensionDirector);
this.scopeModel = scopeModel;
}
ExtensionLoader
创建好后,便可以根据指定的名称获取对应的实现类了,代码如下:
public T getExtension(String name) {
T extension = getExtension(name, true);
if (extension == null) {
throw new IllegalArgumentException("Not find extension: " + name);
}
return extension;
}
public T getExtension(String name, boolean wrap) {
checkDestroyed();
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
// 使用默认实例
return getDefaultExtension();
}
String cacheKey = name;
if (!wrap) {
cacheKey += "_origin";
}
final Holder<Object> holder = getOrCreateHolder(cacheKey);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
// DCL
if (instance == null) {
// 创建拓展实例
instance = createExtension(name, wrap);
holder.set(instance);
}
}
}
return (T) instance;
}
private T createExtension(String name, boolean wrap) {
// 1. 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null || unacceptableExceptions.contains(name)) {
throw findException(name);
}
try {
T instance = (T) extensionInstances.get(clazz);
if (instance == null) {
// 2. 创建实例并缓存若实例不存在的话
extensionInstances.putIfAbsent(clazz, createExtensionInstance(clazz));
instance = (T) extensionInstances.get(clazz);
// 初始化预处理
instance = postProcessBeforeInitialization(instance, name);
// 3. 向实例中注入依赖
injectExtension(instance);
// 初始化后处理
instance = postProcessAfterInitialization(instance, name);
}
if (wrap) {
List<Class<?>> wrapperClassesList = new ArrayList<>();
if (cachedWrapperClasses != null) {
// 存在Wrapper
wrapperClassesList.addAll(cachedWrapperClasses);
wrapperClassesList.sort(WrapperComparator.COMPARATOR);
Collections.reverse(wrapperClassesList);
}
if (CollectionUtils.isNotEmpty(wrapperClassesList)) {
// 4. 循环创建 Wrapper 实例
for (Class<?> wrapperClass : wrapperClassesList) {
Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
boolean match = (wrapper == null) ||
((ArrayUtils.isEmpty(wrapper.matches()) || ArrayUtils.contains(wrapper.matches(), name)) &&
!ArrayUtils.contains(wrapper.mismatches(), name));
if (match) {
// 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。
// 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
instance = postProcessAfterInitialization(instance, name);
}
}
}
}
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
createExtension
方法主要逻辑如下:
- 通过
getExtensionClasses
获取所有的拓展类; - 创建拓展对象;
- 向拓展对象中注入依赖;
- 将拓展对象包裹在相应的
Wrapper
对象中;
先分析getExtensionClasses
方法,代码如下:
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
// DCL
if (classes == null) {
// 家在拓展实现类
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
private Map<String, Class<?>> loadExtensionClasses() {
checkDestroyed();
// 缓存默认拓展实现类
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
// 遍历需要加载的目录
for (LoadingStrategy strategy : strategies) {
// 加载该目录下本type的拓展实现类
loadDirectory(extensionClasses, strategy, type.getName());
if (this.type == ExtensionInjector.class) {
// 兼容ExtensionFactory
loadDirectory(extensionClasses, strategy, ExtensionFactory.class.getName());
}
}
return extensionClasses;
}
分析loadDirectory
实现之前,先简单分析下strategies
的内容,其赋值语句如下:
private static volatile LoadingStrategy[] strategies = loadLoadingStrategies();
private static LoadingStrategy[] loadLoadingStrategies() {
return stream(load(LoadingStrategy.class).spliterator(), false)
.sorted()
.toArray(LoadingStrategy[]::new);
}
// ServiceLoader.java
public static <S> ServiceLoader<S> load(Class<S> service) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return ServiceLoader.load(service, cl);
}
这里借助Java SPI机制实现多LoadingStrategy
,在dubbo-3.0.6.jar
的META-INF/services/
目录下存在文件org.apache.dubbo.common.extension.LoadingStrategy
,内容如下:
org.apache.dubbo.common.extension.DubboInternalLoadingStrategy
org.apache.dubbo.common.extension.DubboLoadingStrategy
org.apache.dubbo.common.extension.ServicesLoadingStrategy
这三个LoadingStrategy
的核心代码如下:
public class DubboInternalLoadingStrategy implements LoadingStrategy {
@Override
public String directory() {
return "META-INF/dubbo/internal/";
}
@Override
public int getPriority() {
return MAX_PRIORITY;
}
}
public class DubboLoadingStrategy implements LoadingStrategy {
@Override
public String directory() {
return "META-INF/dubbo/";
}
@Override
public boolean overridden() {
// 支持覆盖
return true;
}
@Override
public int getPriority() {
return NORMAL_PRIORITY;
}
}
public class ServicesLoadingStrategy implements LoadingStrategy {
@Override
public String directory() {
return "META-INF/services/";
}
@Override
public boolean overridden() {
// 支持覆盖
return true;
}
@Override
public int getPriority() {
return MIN_PRIORITY;
}
}
至此,可以知道strategies
的内容如下:
DubboInternalLoadingStrategy
;DubboLoadingStrategy
;ServicesLoadingStrategy
;
且遍历顺序由上而下,而DubboLoadingStrategy
和ServicesLoadingStrategy
支持覆盖,所以,若三者都配置了某个Type
的拓展实现类,则优先使用ServicesLoadingStrategy
,其次DubboLoadingStrategy
,最后才是DubboInternalLoadingStrategy
;
知道了strategies
的内容后,继续分析loadDirectory
的实现,代码如下:
private void loadDirectory(Map<String, Class<?>> extensionClasses, LoadingStrategy strategy, String type) {
loadDirectory(extensionClasses, strategy.directory(), type, strategy.preferExtensionClassLoader(),
strategy.overridden(), strategy.includedPackages(), strategy.excludedPackages(), strategy.onlyExtensionClassLoaderPackages());
// 兼容以前的版本
String oldType = type.replace("org.apache", "com.alibaba");
loadDirectory(extensionClasses, strategy.directory(), oldType, strategy.preferExtensionClassLoader(),
strategy.overridden(), strategy.includedPackagesInCompatibleType(), strategy.excludedPackages(), strategy.onlyExtensionClassLoaderPackages());
}
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
boolean extensionLoaderClassLoaderFirst, boolean overridden, String[] includedPackages,
String[] excludedPackages, String[] onlyExtensionClassLoaderPackages) {
// 生成文件路径
String fileName = dir + type;
try {
List<ClassLoader> classLoadersToLoad = new LinkedList<>();
if (extensionLoaderClassLoaderFirst) {
ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
classLoadersToLoad.add(extensionLoaderClassLoader);
}
}
Set<ClassLoader> classLoaders = scopeModel.getClassLoaders();
if (CollectionUtils.isEmpty(classLoaders)) {
Enumeration<java.net.URL> resources = ClassLoader.getSystemResources(fileName);
if (resources != null) {
while (resources.hasMoreElements()) {
loadResource(extensionClasses, null, resources.nextElement(), overridden, includedPackages, excludedPackages, onlyExtensionClassLoaderPackages);
}
}
} else {
classLoadersToLoad.addAll(classLoaders);
}
Map<ClassLoader, Set<java.net.URL>> resources = ClassLoaderResourceLoader.loadResources(fileName, classLoadersToLoad);
// 获取到Resource URL后,逐个加载
resources.forEach(((classLoader, urls) -> {
loadFromClass(extensionClasses, overridden, urls, classLoader, includedPackages, excludedPackages, onlyExtensionClassLoaderPackages);
}));
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", description file: " + fileName + ").", t);
}
}
private void loadFromClass(Map<String, Class<?>> extensionClasses, boolean overridden, Set<java.net.URL> urls, ClassLoader classLoader,
String[] includedPackages, String[] excludedPackages, String[] onlyExtensionClassLoaderPackages) {
if (CollectionUtils.isNotEmpty(urls)) {
for (java.net.URL url : urls) {
// 加载Resource
loadResource(extensionClasses, classLoader, url, overridden, includedPackages, excludedPackages, onlyExtensionClassLoaderPackages);
}
}
}
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
java.net.URL resourceURL, boolean overridden, String[] includedPackages, String[] excludedPackages, String[] onlyExtensionClassLoaderPackages) {
try {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
String line;
String clazz;
// 逐行遍历
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) {
// 跳过注释
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
// 键值对形式
name = line.substring(0, i).trim();
clazz = line.substring(i + 1).trim();
} else {
// Java SPI形式
clazz = line;
}
if (StringUtils.isNotEmpty(clazz) && !isExcluded(clazz, excludedPackages) && isIncluded(clazz, includedPackages)
&& !isExcludedByClassLoader(clazz, classLoader, onlyExtensionClassLoaderPackages)) {
// 加载Class
loadClass(extensionClasses, resourceURL, Class.forName(clazz, true, classLoader), name, overridden);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type +
", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", class file: " + resourceURL + ") in " + resourceURL, t);
}
}
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
boolean overridden) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
// 检测目标类上是否有 Adaptive 注解
if (clazz.isAnnotationPresent(Adaptive.class)) {
// 缓存Adaptive类
cacheAdaptiveClass(clazz, overridden);
} else if (isWrapperClass(clazz)) {
// 检测 clazz 是否是 Wrapper 类型
// 缓存Wrapper类
cacheWrapperClass(clazz);
} else {
// 普通的拓展类
if (StringUtils.isEmpty(name)) {
// 优先使用注解Extension.value()作为名称若存在的话
// 若本Class名以Type名结尾,则取去掉Type的前半部分
// 否则直接去本Class名
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
// 缓存Activate类,若本类标注了Activate注解的话
cacheActivateClass(clazz, names[0]);
for (String n : names) {
// 缓存本拓展类和名称的映射关系
cacheName(clazz, n);
// 缓存本拓展类
saveInExtensionClass(extensionClasses, clazz, n, overridden);
}
}
}
}
从配置文件中加载所有的拓展类后,通过指定的名称便可以得到指定的拓展实现类,接下来便是实例化该类,代码如下:
private Object createExtensionInstance(Class<?> type) throws ReflectiveOperationException {
// 默认为InstantiationStrategy
// 查找合适的构造方法,然后查找对应的构造参数进行创建实例
return instantiationStrategy.instantiate(type);
}
拓展实现类实例创建后,便需要注入其依赖,代码如下:
private T injectExtension(T instance) {
if (injector == null) {
return instance;
}
try {
for (Method method : instance.getClass().getMethods()) {
if (!isSetter(method)) {
// 跳过非Setter方法
continue;
}
if (method.isAnnotationPresent(DisableInject.class)) {
// 跳过@DisableInject标注的方法
continue;
}
Class<?> pt = method.getParameterTypes()[0];
if (ReflectUtils.isPrimitives(pt)) {
// 跳过原始类型参数的Setter方法
continue;
}
try {
// 获取参数名称
String property = getSetterProperty(method);
// 从容器中查找对应的依赖
Object object = injector.getInstance(pt, property);
if (object != null) {
// 注入依赖
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("Failed to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
injector
的赋值是在构造方法中,代码如下:
this.injector = (type == ExtensionInjector.class ? null : extensionDirector.getExtensionLoader(ExtensionInjector.class)
.getAdaptiveExtension());
ExtensionInjector
的拓展实现类的加载基于Dubbo SPI机制,其配置文件为META-INF/dubbo/internal/org.apache.dubbo.common.extension.ExtensionInjector
,内容如下:
spring=org.apache.dubbo.config.spring.extension.SpringExtensionInjector
adaptive=org.apache.dubbo.common.extension.inject.AdaptiveExtensionInjector
spi=org.apache.dubbo.common.extension.inject.SpiExtensionInjector
scopeBean=org.apache.dubbo.common.beans.ScopeBeanExtensionInjector
getAdaptiveExtension()
方法是基于后文介绍的Dubbo SPI 自适应拓展,这里暂不做分析,getAdaptiveExtension()
方法最终获取到的是标注了@Adaptive
的AdaptiveExtensionInjector
,AdaptiveExtensionInjector
中的injectors
存放了其它的ExtensionInjector
的拓展实现类,其getInstance
方法代码如下:
public <T> T getInstance(Class<T> type, String name) {
// 遍历内部的ExtensionInjector拓展实现类
for (ExtensionInjector injector : injectors) {
T extension = injector.getInstance(type, name);
if (extension != null) {
// 找到则立即返回
return extension;
}
}
return null;
}
拓展实例创建且注入相应的依赖后,最后一步便是包装;
如果需要包装,且存在包装类的话,那么就依次进行包装,核心代码如下:
// 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例并注入依赖
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
// 进行初始化后处理
instance = postProcessAfterInitialization(instance, name);
Dubbo SPI 自适应拓展
简介
在Dubbo中,很多拓展都是通过SPI机制进行加载的,比如Protocol
、Cluster
、LoadBalance
等。有时,有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载。这听起来有些矛盾,拓展未被加载,那么拓展方法就无法被调用(静态方法除外);拓展方法未被调用,拓展就无法被加载。对于这个矛盾的问题,Dubbo通过自适应拓展机制很好地解决了。
自适应拓展机制的实现逻辑比较复杂,如果存在具有代理功能的拓展实现类(标注了@Adaptive
)则Dubbo直接使用,否则Dubbo会为该拓展接口生成具有代理功能的代码,然后通过javassist或jdk编译这段代码,得到代理Class类;
示例
存在标注了@Adaptive的拓展实现类
以Dubbo中的ExtensionInjector
为例;
ExtensionInjector
的全限定名为org.apache.dubbo.common.extension.ExtensionInjector
,存在对应的配置文件META-INF/dubbo/internal/org.apache.dubbo.common.extension.ExtensionInjector
,文件内容如下:
spring=org.apache.dubbo.config.spring.extension.SpringExtensionInjector
adaptive=org.apache.dubbo.common.extension.inject.AdaptiveExtensionInjector
spi=org.apache.dubbo.common.extension.inject.SpiExtensionInjector
scopeBean=org.apache.dubbo.common.beans.ScopeBeanExtensionInjector
该配置文件中定义了三种获取依赖的拓展实现类,以用于不同的场景;
- spring,注入Spring Bean;
- spi,注入SPI Bean;
- scopeBean,注入Scope Bean;
还定义了adaptive
,这是具有代理功能的拓展实现类,其内部维护多个拓展实现类,当需要注入依赖时,便遍历内部持有的拓展实现类进行依赖获取,其getInstance
方法实现代码如下:
public <T> T getInstance(Class<T> type, String name) {
// 遍历内部持有的拓展实现类
for (ExtensionInjector injector : injectors) {
T extension = injector.getInstance(type, name);
if (extension != null) {
// 获取到直接返回使用
return extension;
}
}
return null;
}
其injectors
成员的赋值动作是在其初始化过程中发生的,相关代码如下:
public void initialize() throws IllegalStateException {
ExtensionLoader<ExtensionInjector> loader = extensionAccessor.getExtensionLoader(ExtensionInjector.class);
List<ExtensionInjector> list = new ArrayList<ExtensionInjector>();
for (String name : loader.getSupportedExtensions()) {
// 添加ExtensionInjector的所有拓展实现类
list.add(loader.getExtension(name));
}
injectors = Collections.unmodifiableList(list);
}
以上就是ExtensionInjector
的配置信息,接下里看Dubbo中如何使用;
ExtensionLoader
中使用ExtensionInjector
成员给创建的拓展实现类注入依赖,成员的初始化发生在其构造方法中,代码如下:
ExtensionLoader(Class<?> type, ExtensionDirector extensionDirector, ScopeModel scopeModel) {
this.type = type;
this.extensionDirector = extensionDirector;
this.extensionPostProcessors = extensionDirector.getExtensionPostProcessors();
initInstantiationStrategy();
// 通过getAdaptiveExtension()方法获取到具有代理功能的拓展实现类
this.injector = (type == ExtensionInjector.class ? null : extensionDirector.getExtensionLoader(ExtensionInjector.class)
.getAdaptiveExtension());
this.activateComparator = new ActivateComparator(extensionDirector);
this.scopeModel = scopeModel;
}
不存在标注了@Adaptive的拓展实现类
以Dubbo中的Protocol
为例;
Protocol
的全限定名为org.apache.dubbo.rpc.Protocol
,存在对应的配置文件META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
,内容如下:
filter=org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
serializationwrapper=org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
rest=org.apache.dubbo.rpc.protocol.rest.RestProtocol
grpc=org.apache.dubbo.rpc.protocol.grpc.GrpcProtocol
tri=org.apache.dubbo.rpc.protocol.tri.TripleProtocol
registry=org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol
service-discovery-registry=org.apache.dubbo.registry.integration.RegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
可知,Protocol
的拓展实现类都未标注@Adaptive
,那么就会由Dubbo动态创建具有代理功能的拓展实现类Protocol$Adaptive
,其export
方法代码如下:
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
// 获取协议名称
// 默认使用Dubbo协议
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
// 根据协议名称获取对应的拓展实现类
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
// 调用具体的拓展实现类方法
return extension.export(arg0);
}
以上就是Protocol
的配置信息,接下里看Dubbo中如何使用;
以ServiceConfig
为例,它会借助Protocol$Adaptive
选择合适的Protocol
的拓展实现类进行服务暴露,相关代码如下:
private void doExportUrl(URL url, boolean withMetaData) {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
Exporter<?> exporter = protocolSPI.export(invoker);
exporters.add(exporter);
}
protocolSPI
的赋值动作在其postProcessAfterScopeModelChanged
方法,代码如下:
protected void postProcessAfterScopeModelChanged(ScopeModel oldScopeModel, ScopeModel newScopeModel) {
super.postProcessAfterScopeModelChanged(oldScopeModel, newScopeModel);
// 通过getAdaptiveExtension()方法获取到具有代理功能的拓展实现类
protocolSPI = this.getExtensionLoader(Protocol.class).getAdaptiveExtension();
proxyFactory = this.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
}
简析
入口为ExtensionLoader.getAdaptiveExtension
方法,代码如下:
public T getAdaptiveExtension() {
checkDestroyed();
// 此处实现了缓存机制
// 优先从缓存中获取
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
// DCL
if (instance == null) {
try {
// 缓存中不存在,则需要创建代理拓展类实例
instance = createAdaptiveExtension();
// 缓存代理拓展类实例
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}
private T createAdaptiveExtension() {
try {
// 先获取代理拓展类
// 然后在创建实例
T instance = (T) getAdaptiveExtensionClass().newInstance();
instance = postProcessBeforeInitialization(instance, null);
// 注入依赖
instance = injectExtension(instance);
instance = postProcessAfterInitialization(instance, null);
initExtension(instance);
return instance;
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
createAdaptiveExtension
方法逻辑和createExtension
方法很类似,主要差别在于getAdaptiveExtensionClass()
,这里只关注该方法的实现,其它见上文中createExtension
方法的分析,getAdaptiveExtensionClass()
方法代码如下:
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
这里就可以看出两种使用场景:
- 如果配置文件中已经存在具有代理功能的拓展实现类,则直接使用;
- 否则,动态创建具有代理功能的拓展实现类;
在Dubbo SPI的简析中,分析了getExtensionClasses
的实现,在底层loadClass
方法中,会对拓展实现类区分对待,逻辑如下:
- 标注了
@Adaptive
,则缓存到cachedAdaptiveClass
; - 是Wrapper类,则缓存到
cachedWrapperClasses
; - 普通类,则缓存到
extensionClasses
;
存在标注了@Adaptive的拓展实现类
以Dubbo中的ExtensionInjector
为例,其配置文件中的拓展实现类AdaptiveExtensionInjector
标注了@Adaptive
,所以此刻cachedAdaptiveClass
存在,那么就可以直接返回使用;
不存在标注了@Adaptive的拓展实现类
以Dubbo中的Protocol
为例,其配置文件的拓展实现类都未标注@Adaptive
,那么就会由Dubbo动态创建具有代理功能的拓展实现类,代码如下:
private Class<?> createAdaptiveExtensionClass() {
ClassLoader classLoader = type.getClassLoader();
try {
if (NativeUtils.isNative()) {
return classLoader.loadClass(type.getName() + "$Adaptive");
}
} catch (Throwable ignore) {
}
// 生成具有代理功能的代码
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
// 基于Dubbo SPI 自适应拓展获取具有代理功能的Compiler的拓展实现类
org.apache.dubbo.common.compiler.Compiler compiler = extensionDirector.getExtensionLoader(
org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
// 编译代码得到拓展实现类
return compiler.compile(type, code, classLoader);
}
生成代码和编译代码的过程不是本文重点,就不分析了,为Protocol
接口动态生成的代码如下:
package org.apache.dubbo.rpc;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public java.util.List getServers() {
throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
编译得到的Protocol$Adaptive
,实现了Protocol
接口,在其export
和refer
方法中,先根据url
中获取到具体的协议名称,然后再根据ExtensionLoader.getExtension
获取具体的拓展实现类,从而实现了代理功能;
到此,关于Dubbo SPI 自适应拓展的分析就结束了。