Dubbo学习之SPI机制

相关阅读

简介

本文基于Spring Boot 2.6.6dubbo-spring-boot-starter 3.0.6环境。

SPI全称为Service Provider Interface,是一种服务发现机制。SPI的本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类。这样可以在运行时,动态为接口替换实现类。正因此特性,我们可以很容易的通过SPI机制为我们的程序提供拓展功能。
SPI机制在第三方框架中也有所应用,比如Dubbo就是通过SPI机制加载所有的组件。不过,Dubbo并未使用Java原生的SPI机制,而是对其进行了增强,使其能够更好的满足需求。

本文将从以下角度介绍SPI机制;

  1. Java SPI;
  2. Dubbo SPI;
  3. Dubbo SPI自适应拓展;

Java SPI

示例

首先定义一个接口demo.spi.Person,代码如下:

public interface Person {

    void saySomething();
}

然后定义两个实现类,demo.spi.Studentdemo.spi.Teacher,代码如下:

public class Student implements Person {

    @Override
    public void saySomething() {
        System.out.println("I'm a student");
    }
}

public class Teacher implements Person {

    @Override
    public void saySomething() {
        System.out.println("I'm a teacher");
    }
}

接下来,在项目META-INF/services/目录下创建一个文件,文件名为接口Person的全限定名demo.spi.Person,文件内容为实现类的全限定名,具体如下:

demo.spi.Student
demo.spi.Teacher

准备好以上内容后,便可以编写代码进行测试Java SPI机制,测试代码如下:

public class SpiTest {

    public static void main(String[] args) {
        ServiceLoader<Person> serviceLoader = ServiceLoader.load(Person.class);
        serviceLoader.forEach(Person::saySomething);
    }
}

测试结果如下:

I'm a student
I'm a teacher

Dubbo SPI

简介

Dubbo并未使用Java SPI,而是重新实现了一套功能更强的SPI机制。Dubbo SPI的相关逻辑被封装在了ExtensionLoader类中,通过ExtensionLoader,我们可以加载指定的实现类。与Java SPI实现类配置不同,Dubbo SPI是通过键值对的方式进行配置,这样我们可以按需加载指定的实现类,而且Dubbo SPI中,支持文件路径如下:

  1. META-INF/dubbo/internal/
  2. META-INF/dubbo/
  3. META-INF/services/

优先级越来越高,即后者定义的内容可以覆盖前者的内容;

示例

首先定义一个接口demo.dubbo.spi.Person,需要标注@SPI,代码如下:

@SPI
public interface Person {

    void saySomething();
}

然后定义两个实现类,demo.dubbo.spi.Studentdemo.dubbo.spi.Teacher,代码如下:

public class Student implements Person {

    @Override
    public void saySomething() {
        System.out.println("I'm a student");
    }
}

public class Teacher implements Person {

    @Override
    public void saySomething() {
        System.out.println("I'm a teacher");
    }
}

接下来,在项目META-INF/dubbo/目录下创建一个文件,文件名为接口Person的全限定名demo.dubbo.spi.Person,文件内容为实现类的全限定名,具体如下:

student=demo.dubbo.spi.Student
teacher=demo.dubbo.spi.Teacher

准备好以上内容后,便可以编写代码进行测试Java SPI机制,测试代码如下:

public class SpiTest {

    public static void main(String[] args) {
        ExtensionLoader<Person> extensionLoader =
                ApplicationModel.defaultModel().getDefaultModule().getExtensionLoader(Person.class);
        Person student = extensionLoader.getExtension("student");
        student.saySomething();
        Person teacher = extensionLoader.getExtension("teacher");
        teacher.saySomething();
    }
}

测试结果如下:

I'm a student
I'm a teacher

简析

首先分析ExtensionLoader的创建,代码如下:

// ExtensionAccessor.java
default <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    return this.getExtensionDirector().getExtensionLoader(type);
}


// ExtensionDirector.java
public <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    checkDestroyed();
    if (type == null) {
        throw new IllegalArgumentException("Extension type == null");
    }
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
    }
    if (!withExtensionAnnotation(type)) {
        // 如果type上没有SPI注解,则抛出异常
        throw new IllegalArgumentException("Extension type (" + type +
            ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
    }

    // 此处实现了缓存
    // 先从本地缓存中查找
    ExtensionLoader<T> loader = (ExtensionLoader<T>) extensionLoadersMap.get(type);

    ExtensionScope scope = extensionScopeMap.get(type);
    if (scope == null) {
        // 缓存中不存在,则需要根据type解析获取
        SPI annotation = type.getAnnotation(SPI.class);
        scope = annotation.scope();
        // 缓存,方便下次直接获取
        extensionScopeMap.put(type, scope);
    }

    if (loader == null && scope == ExtensionScope.SELF) {
        // 创建实例
        loader = createExtensionLoader0(type);
    }

    // 不存在,则尝试从父类中查找,若存在父类的话
    if (loader == null) {
        if (this.parent != null) {
            loader = this.parent.getExtensionLoader(type);
        }
    }

    // 还不存在,则创建实例
    if (loader == null) {
        loader = createExtensionLoader(type);
    }

    return loader;
}

private static boolean withExtensionAnnotation(Class<?> type) {
    return type.isAnnotationPresent(SPI.class);
}

private <T> ExtensionLoader<T> createExtensionLoader(Class<T> type) {
    ExtensionLoader<T> loader = null;
    // scope是否匹配
    if (isScopeMatched(type)) {
        // 匹配,则创建ExtensionLoader
        loader = createExtensionLoader0(type);
    } else {
        // 不匹配,则返回null
    }
    return loader;
}

private <T> ExtensionLoader<T> createExtensionLoader0(Class<T> type) {
    checkDestroyed();
    ExtensionLoader<T> loader;
    // 创建ExtensionLoader实例并放入缓存
    extensionLoadersMap.putIfAbsent(type, new ExtensionLoader<T>(type, this, scopeModel));
    loader = (ExtensionLoader<T>) extensionLoadersMap.get(type);
    return loader;
}

继续分析ExtensionLoader的初始化过程,代码如下:

ExtensionLoader(Class<?> type, ExtensionDirector extensionDirector, ScopeModel scopeModel) {
    this.type = type;
    this.extensionDirector = extensionDirector;
    this.extensionPostProcessors = extensionDirector.getExtensionPostProcessors();
    initInstantiationStrategy();
    // 用于查找依赖,以便注入到创建的实例
    // 此处也使用了Dubbo SPI机制
    this.injector = (type == ExtensionInjector.class ? null : extensionDirector.getExtensionLoader(ExtensionInjector.class)
        .getAdaptiveExtension());
    this.activateComparator = new ActivateComparator(extensionDirector);
    this.scopeModel = scopeModel;
}

ExtensionLoader创建好后,便可以根据指定的名称获取对应的实现类了,代码如下:

public T getExtension(String name) {
    T extension = getExtension(name, true);
    if (extension == null) {
        throw new IllegalArgumentException("Not find extension: " + name);
    }
    return extension;
}

public T getExtension(String name, boolean wrap) {
    checkDestroyed();
    if (StringUtils.isEmpty(name)) {
        throw new IllegalArgumentException("Extension name == null");
    }
    if ("true".equals(name)) {
        // 使用默认实例
        return getDefaultExtension();
    }
    String cacheKey = name;
    if (!wrap) {
        cacheKey += "_origin";
    }
    final Holder<Object> holder = getOrCreateHolder(cacheKey);
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            // DCL
            if (instance == null) {
                // 创建拓展实例
                instance = createExtension(name, wrap);
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}

private T createExtension(String name, boolean wrap) {
    // 1. 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null || unacceptableExceptions.contains(name)) {
        throw findException(name);
    }
    try {
        T instance = (T) extensionInstances.get(clazz);
        if (instance == null) {
            // 2. 创建实例并缓存若实例不存在的话
            extensionInstances.putIfAbsent(clazz, createExtensionInstance(clazz));
            instance = (T) extensionInstances.get(clazz);
            // 初始化预处理
            instance = postProcessBeforeInitialization(instance, name);
            // 3. 向实例中注入依赖
            injectExtension(instance);
            // 初始化后处理
            instance = postProcessAfterInitialization(instance, name);
        }

        if (wrap) {
            List<Class<?>> wrapperClassesList = new ArrayList<>();
            if (cachedWrapperClasses != null) {
                // 存在Wrapper
                wrapperClassesList.addAll(cachedWrapperClasses);
                wrapperClassesList.sort(WrapperComparator.COMPARATOR);
                Collections.reverse(wrapperClassesList);
            }

            if (CollectionUtils.isNotEmpty(wrapperClassesList)) {
                // 4. 循环创建 Wrapper 实例
                for (Class<?> wrapperClass : wrapperClassesList) {
                    Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
                    boolean match = (wrapper == null) ||
                        ((ArrayUtils.isEmpty(wrapper.matches()) || ArrayUtils.contains(wrapper.matches(), name)) &&
                            !ArrayUtils.contains(wrapper.mismatches(), name));
                    if (match) {
                        // 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。
                        // 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                        instance = postProcessAfterInitialization(instance, name);
                    }
                }
            }
        }

        initExtension(instance);
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
            type + ") couldn't be instantiated: " + t.getMessage(), t);
    }
}

createExtension方法主要逻辑如下:

  1. 通过getExtensionClasses获取所有的拓展类;
  2. 创建拓展对象;
  3. 向拓展对象中注入依赖;
  4. 将拓展对象包裹在相应的Wrapper对象中;

先分析getExtensionClasses方法,代码如下:

private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            // DCL
            if (classes == null) {
                // 家在拓展实现类
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}

private Map<String, Class<?>> loadExtensionClasses() {
    checkDestroyed();
    // 缓存默认拓展实现类
    cacheDefaultExtensionName();

    Map<String, Class<?>> extensionClasses = new HashMap<>();

    // 遍历需要加载的目录
    for (LoadingStrategy strategy : strategies) {
        // 加载该目录下本type的拓展实现类
        loadDirectory(extensionClasses, strategy, type.getName());

        if (this.type == ExtensionInjector.class) {
            // 兼容ExtensionFactory
            loadDirectory(extensionClasses, strategy, ExtensionFactory.class.getName());
        }
    }

    return extensionClasses;
}

分析loadDirectory实现之前,先简单分析下strategies的内容,其赋值语句如下:

private static volatile LoadingStrategy[] strategies = loadLoadingStrategies();

private static LoadingStrategy[] loadLoadingStrategies() {
    return stream(load(LoadingStrategy.class).spliterator(), false)
        .sorted()
        .toArray(LoadingStrategy[]::new);
}


// ServiceLoader.java
public static <S> ServiceLoader<S> load(Class<S> service) {
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    return ServiceLoader.load(service, cl);
}

这里借助Java SPI机制实现多LoadingStrategy,在dubbo-3.0.6.jarMETA-INF/services/目录下存在文件org.apache.dubbo.common.extension.LoadingStrategy,内容如下:

org.apache.dubbo.common.extension.DubboInternalLoadingStrategy
org.apache.dubbo.common.extension.DubboLoadingStrategy
org.apache.dubbo.common.extension.ServicesLoadingStrategy

这三个LoadingStrategy的核心代码如下:

public class DubboInternalLoadingStrategy implements LoadingStrategy {

    @Override
    public String directory() {
        return "META-INF/dubbo/internal/";
    }

    @Override
    public int getPriority() {
        return MAX_PRIORITY;
    }
}

public class DubboLoadingStrategy implements LoadingStrategy {

    @Override
    public String directory() {
        return "META-INF/dubbo/";
    }

    @Override
    public boolean overridden() {
        // 支持覆盖
        return true;
    }

    @Override
    public int getPriority() {
        return NORMAL_PRIORITY;
    }
}

public class ServicesLoadingStrategy implements LoadingStrategy {

    @Override
    public String directory() {
        return "META-INF/services/";
    }

    @Override
    public boolean overridden() {
        // 支持覆盖
        return true;
    }

    @Override
    public int getPriority() {
        return MIN_PRIORITY;
    }
}

至此,可以知道strategies的内容如下:

  1. DubboInternalLoadingStrategy
  2. DubboLoadingStrategy
  3. ServicesLoadingStrategy

且遍历顺序由上而下,而DubboLoadingStrategyServicesLoadingStrategy支持覆盖,所以,若三者都配置了某个Type的拓展实现类,则优先使用ServicesLoadingStrategy,其次DubboLoadingStrategy,最后才是DubboInternalLoadingStrategy

知道了strategies的内容后,继续分析loadDirectory的实现,代码如下:

private void loadDirectory(Map<String, Class<?>> extensionClasses, LoadingStrategy strategy, String type) {
    loadDirectory(extensionClasses, strategy.directory(), type, strategy.preferExtensionClassLoader(),
        strategy.overridden(), strategy.includedPackages(), strategy.excludedPackages(), strategy.onlyExtensionClassLoaderPackages());
    // 兼容以前的版本
    String oldType = type.replace("org.apache", "com.alibaba");
    loadDirectory(extensionClasses, strategy.directory(), oldType, strategy.preferExtensionClassLoader(),
        strategy.overridden(), strategy.includedPackagesInCompatibleType(), strategy.excludedPackages(), strategy.onlyExtensionClassLoaderPackages());
}

private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
                           boolean extensionLoaderClassLoaderFirst, boolean overridden, String[] includedPackages,
                           String[] excludedPackages, String[] onlyExtensionClassLoaderPackages) {
    // 生成文件路径
    String fileName = dir + type;
    try {
        List<ClassLoader> classLoadersToLoad = new LinkedList<>();

        if (extensionLoaderClassLoaderFirst) {
            ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
            if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
                classLoadersToLoad.add(extensionLoaderClassLoader);
            }
        }

        Set<ClassLoader> classLoaders = scopeModel.getClassLoaders();

        if (CollectionUtils.isEmpty(classLoaders)) {
            Enumeration<java.net.URL> resources = ClassLoader.getSystemResources(fileName);
            if (resources != null) {
                while (resources.hasMoreElements()) {
                    loadResource(extensionClasses, null, resources.nextElement(), overridden, includedPackages, excludedPackages, onlyExtensionClassLoaderPackages);
                }
            }
        } else {
            classLoadersToLoad.addAll(classLoaders);
        }

        Map<ClassLoader, Set<java.net.URL>> resources = ClassLoaderResourceLoader.loadResources(fileName, classLoadersToLoad);
        // 获取到Resource URL后,逐个加载
        resources.forEach(((classLoader, urls) -> {
            loadFromClass(extensionClasses, overridden, urls, classLoader, includedPackages, excludedPackages, onlyExtensionClassLoaderPackages);
        }));
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: " +
            type + ", description file: " + fileName + ").", t);
    }
}

private void loadFromClass(Map<String, Class<?>> extensionClasses, boolean overridden, Set<java.net.URL> urls, ClassLoader classLoader,
                           String[] includedPackages, String[] excludedPackages, String[] onlyExtensionClassLoaderPackages) {
    if (CollectionUtils.isNotEmpty(urls)) {
        for (java.net.URL url : urls) {
            // 加载Resource
            loadResource(extensionClasses, classLoader, url, overridden, includedPackages, excludedPackages, onlyExtensionClassLoaderPackages);
        }
    }
}

private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
                          java.net.URL resourceURL, boolean overridden, String[] includedPackages, String[] excludedPackages, String[] onlyExtensionClassLoaderPackages) {
    try {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
            String line;
            String clazz;
            // 逐行遍历
            while ((line = reader.readLine()) != null) {
                final int ci = line.indexOf('#');
                if (ci >= 0) {
                    // 跳过注释
                    line = line.substring(0, ci);
                }
                line = line.trim();
                if (line.length() > 0) {
                    try {
                        String name = null;
                        int i = line.indexOf('=');
                        if (i > 0) {
                            // 键值对形式
                            name = line.substring(0, i).trim();
                            clazz = line.substring(i + 1).trim();
                        } else {
                            // Java SPI形式
                            clazz = line;
                        }
                        if (StringUtils.isNotEmpty(clazz) && !isExcluded(clazz, excludedPackages) && isIncluded(clazz, includedPackages)
                            && !isExcludedByClassLoader(clazz, classLoader, onlyExtensionClassLoaderPackages)) {
                            // 加载Class
                            loadClass(extensionClasses, resourceURL, Class.forName(clazz, true, classLoader), name, overridden);
                        }
                    } catch (Throwable t) {
                        IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type +
                            ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                        exceptions.put(line, e);
                    }
                }
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: " +
            type + ", class file: " + resourceURL + ") in " + resourceURL, t);
    }
}

private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
                       boolean overridden) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
        throw new IllegalStateException("Error occurred when loading extension class (interface: " +
            type + ", class line: " + clazz.getName() + "), class "
            + clazz.getName() + " is not subtype of interface.");
    }
    // 检测目标类上是否有 Adaptive 注解
    if (clazz.isAnnotationPresent(Adaptive.class)) {
        // 缓存Adaptive类
        cacheAdaptiveClass(clazz, overridden);
    } else if (isWrapperClass(clazz)) {
        // 检测 clazz 是否是 Wrapper 类型
        
        // 缓存Wrapper类
        cacheWrapperClass(clazz);
    } else {
        // 普通的拓展类

        if (StringUtils.isEmpty(name)) {
            // 优先使用注解Extension.value()作为名称若存在的话
            // 若本Class名以Type名结尾,则取去掉Type的前半部分
            // 否则直接去本Class名
            name = findAnnotationName(clazz);
            if (name.length() == 0) {
                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
            }
        }

        String[] names = NAME_SEPARATOR.split(name);
        if (ArrayUtils.isNotEmpty(names)) {
            // 缓存Activate类,若本类标注了Activate注解的话
            cacheActivateClass(clazz, names[0]);
            for (String n : names) {
                // 缓存本拓展类和名称的映射关系
                cacheName(clazz, n);
                // 缓存本拓展类
                saveInExtensionClass(extensionClasses, clazz, n, overridden);
            }
        }
    }
}

从配置文件中加载所有的拓展类后,通过指定的名称便可以得到指定的拓展实现类,接下来便是实例化该类,代码如下:

private Object createExtensionInstance(Class<?> type) throws ReflectiveOperationException {
    // 默认为InstantiationStrategy
    // 查找合适的构造方法,然后查找对应的构造参数进行创建实例
    return instantiationStrategy.instantiate(type);
}

拓展实现类实例创建后,便需要注入其依赖,代码如下:

private T injectExtension(T instance) {

    if (injector == null) {
        return instance;
    }

    try {
        for (Method method : instance.getClass().getMethods()) {
            if (!isSetter(method)) {
                // 跳过非Setter方法
                continue;
            }
            
            if (method.isAnnotationPresent(DisableInject.class)) {
                // 跳过@DisableInject标注的方法
                continue;
            }
            Class<?> pt = method.getParameterTypes()[0];
            if (ReflectUtils.isPrimitives(pt)) {
                // 跳过原始类型参数的Setter方法
                continue;
            }

            try {
                // 获取参数名称
                String property = getSetterProperty(method);
                // 从容器中查找对应的依赖
                Object object = injector.getInstance(pt, property);
                if (object != null) {
                    // 注入依赖
                    method.invoke(instance, object);
                }
            } catch (Exception e) {
                logger.error("Failed to inject via method " + method.getName()
                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
            }

        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

injector的赋值是在构造方法中,代码如下:

this.injector = (type == ExtensionInjector.class ? null : extensionDirector.getExtensionLoader(ExtensionInjector.class)
        .getAdaptiveExtension());

ExtensionInjector的拓展实现类的加载基于Dubbo SPI机制,其配置文件为META-INF/dubbo/internal/org.apache.dubbo.common.extension.ExtensionInjector,内容如下:

spring=org.apache.dubbo.config.spring.extension.SpringExtensionInjector

adaptive=org.apache.dubbo.common.extension.inject.AdaptiveExtensionInjector
spi=org.apache.dubbo.common.extension.inject.SpiExtensionInjector
scopeBean=org.apache.dubbo.common.beans.ScopeBeanExtensionInjector

getAdaptiveExtension()方法是基于后文介绍的Dubbo SPI 自适应拓展,这里暂不做分析,getAdaptiveExtension()方法最终获取到的是标注了@AdaptiveAdaptiveExtensionInjectorAdaptiveExtensionInjector中的injectors存放了其它的ExtensionInjector的拓展实现类,其getInstance方法代码如下:

public <T> T getInstance(Class<T> type, String name) {
    // 遍历内部的ExtensionInjector拓展实现类
    for (ExtensionInjector injector : injectors) {
        T extension = injector.getInstance(type, name);
        if (extension != null) {
            // 找到则立即返回
            return extension;
        }
    }
    return null;
}

拓展实例创建且注入相应的依赖后,最后一步便是包装;
如果需要包装,且存在包装类的话,那么就依次进行包装,核心代码如下:

// 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例并注入依赖
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
// 进行初始化后处理
instance = postProcessAfterInitialization(instance, name);

Dubbo SPI 自适应拓展

简介

在Dubbo中,很多拓展都是通过SPI机制进行加载的,比如ProtocolClusterLoadBalance等。有时,有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载。这听起来有些矛盾,拓展未被加载,那么拓展方法就无法被调用(静态方法除外);拓展方法未被调用,拓展就无法被加载。对于这个矛盾的问题,Dubbo通过自适应拓展机制很好地解决了。
自适应拓展机制的实现逻辑比较复杂,如果存在具有代理功能的拓展实现类(标注了@Adaptive)则Dubbo直接使用,否则Dubbo会为该拓展接口生成具有代理功能的代码,然后通过javassistjdk编译这段代码,得到代理Class类;

示例

存在标注了@Adaptive的拓展实现类

以Dubbo中的ExtensionInjector为例;
ExtensionInjector的全限定名为org.apache.dubbo.common.extension.ExtensionInjector,存在对应的配置文件META-INF/dubbo/internal/org.apache.dubbo.common.extension.ExtensionInjector,文件内容如下:

spring=org.apache.dubbo.config.spring.extension.SpringExtensionInjector

adaptive=org.apache.dubbo.common.extension.inject.AdaptiveExtensionInjector
spi=org.apache.dubbo.common.extension.inject.SpiExtensionInjector
scopeBean=org.apache.dubbo.common.beans.ScopeBeanExtensionInjector

该配置文件中定义了三种获取依赖的拓展实现类,以用于不同的场景;

  1. spring,注入Spring Bean;
  2. spi,注入SPI Bean;
  3. scopeBean,注入Scope Bean;

还定义了adaptive,这是具有代理功能的拓展实现类,其内部维护多个拓展实现类,当需要注入依赖时,便遍历内部持有的拓展实现类进行依赖获取,其getInstance方法实现代码如下:

public <T> T getInstance(Class<T> type, String name) {
    // 遍历内部持有的拓展实现类
    for (ExtensionInjector injector : injectors) {
        T extension = injector.getInstance(type, name);
        if (extension != null) {
            // 获取到直接返回使用
            return extension;
        }
    }
    return null;
}

injectors成员的赋值动作是在其初始化过程中发生的,相关代码如下:

public void initialize() throws IllegalStateException {
    ExtensionLoader<ExtensionInjector> loader = extensionAccessor.getExtensionLoader(ExtensionInjector.class);
    List<ExtensionInjector> list = new ArrayList<ExtensionInjector>();
    for (String name : loader.getSupportedExtensions()) {
        // 添加ExtensionInjector的所有拓展实现类
        list.add(loader.getExtension(name));
    }
    injectors = Collections.unmodifiableList(list);
}

以上就是ExtensionInjector的配置信息,接下里看Dubbo中如何使用;

ExtensionLoader中使用ExtensionInjector成员给创建的拓展实现类注入依赖,成员的初始化发生在其构造方法中,代码如下:

ExtensionLoader(Class<?> type, ExtensionDirector extensionDirector, ScopeModel scopeModel) {
    this.type = type;
    this.extensionDirector = extensionDirector;
    this.extensionPostProcessors = extensionDirector.getExtensionPostProcessors();
    initInstantiationStrategy();
    // 通过getAdaptiveExtension()方法获取到具有代理功能的拓展实现类
    this.injector = (type == ExtensionInjector.class ? null : extensionDirector.getExtensionLoader(ExtensionInjector.class)
        .getAdaptiveExtension());
    this.activateComparator = new ActivateComparator(extensionDirector);
    this.scopeModel = scopeModel;
}

不存在标注了@Adaptive的拓展实现类

以Dubbo中的Protocol为例;
Protocol的全限定名为org.apache.dubbo.rpc.Protocol,存在对应的配置文件META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol,内容如下:

filter=org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
serializationwrapper=org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper

dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
rest=org.apache.dubbo.rpc.protocol.rest.RestProtocol
grpc=org.apache.dubbo.rpc.protocol.grpc.GrpcProtocol
tri=org.apache.dubbo.rpc.protocol.tri.TripleProtocol
registry=org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol
service-discovery-registry=org.apache.dubbo.registry.integration.RegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper

可知,Protocol的拓展实现类都未标注@Adaptive,那么就会由Dubbo动态创建具有代理功能的拓展实现类Protocol$Adaptive,其export方法代码如下:

public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
// 获取协议名称
// 默认使用Dubbo协议
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
// 根据协议名称获取对应的拓展实现类
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
// 调用具体的拓展实现类方法
return extension.export(arg0);
}

以上就是Protocol的配置信息,接下里看Dubbo中如何使用;
ServiceConfig为例,它会借助Protocol$Adaptive选择合适的Protocol的拓展实现类进行服务暴露,相关代码如下:

private void doExportUrl(URL url, boolean withMetaData) {
    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
    if (withMetaData) {
        invoker = new DelegateProviderMetaDataInvoker(invoker, this);
    }
    Exporter<?> exporter = protocolSPI.export(invoker);
    exporters.add(exporter);
}

protocolSPI的赋值动作在其postProcessAfterScopeModelChanged方法,代码如下:

protected void postProcessAfterScopeModelChanged(ScopeModel oldScopeModel, ScopeModel newScopeModel) {
    super.postProcessAfterScopeModelChanged(oldScopeModel, newScopeModel);
    // 通过getAdaptiveExtension()方法获取到具有代理功能的拓展实现类
    protocolSPI = this.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    proxyFactory = this.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
}

简析

入口为ExtensionLoader.getAdaptiveExtension方法,代码如下:

public T getAdaptiveExtension() {
    checkDestroyed();
    // 此处实现了缓存机制
    // 优先从缓存中获取
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if (createAdaptiveInstanceError != null) {
            throw new IllegalStateException("Failed to create adaptive instance: " +
                createAdaptiveInstanceError.toString(),
                createAdaptiveInstanceError);
        }

        synchronized (cachedAdaptiveInstance) {
            instance = cachedAdaptiveInstance.get();
            // DCL
            if (instance == null) {
                try {
                    // 缓存中不存在,则需要创建代理拓展类实例
                    instance = createAdaptiveExtension();
                    // 缓存代理拓展类实例
                    cachedAdaptiveInstance.set(instance);
                } catch (Throwable t) {
                    createAdaptiveInstanceError = t;
                    throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                }
            }
        }
    }

    return (T) instance;
}

private T createAdaptiveExtension() {
    try {
        // 先获取代理拓展类
        // 然后在创建实例
        T instance = (T) getAdaptiveExtensionClass().newInstance();
        instance = postProcessBeforeInitialization(instance, null);
        // 注入依赖
        instance = injectExtension(instance);
        instance = postProcessAfterInitialization(instance, null);
        initExtension(instance);
        return instance;
    } catch (Exception e) {
        throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}

createAdaptiveExtension方法逻辑和createExtension方法很类似,主要差别在于getAdaptiveExtensionClass(),这里只关注该方法的实现,其它见上文中createExtension方法的分析,getAdaptiveExtensionClass()方法代码如下:

private Class<?> getAdaptiveExtensionClass() {
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

这里就可以看出两种使用场景:

  1. 如果配置文件中已经存在具有代理功能的拓展实现类,则直接使用;
  2. 否则,动态创建具有代理功能的拓展实现类;

Dubbo SPI简析中,分析了getExtensionClasses的实现,在底层loadClass方法中,会对拓展实现类区分对待,逻辑如下:

  1. 标注了@Adaptive,则缓存到cachedAdaptiveClass
  2. 是Wrapper类,则缓存到cachedWrapperClasses
  3. 普通类,则缓存到extensionClasses

存在标注了@Adaptive的拓展实现类

以Dubbo中的ExtensionInjector为例,其配置文件中的拓展实现类AdaptiveExtensionInjector标注了@Adaptive,所以此刻cachedAdaptiveClass存在,那么就可以直接返回使用;

不存在标注了@Adaptive的拓展实现类

以Dubbo中的Protocol为例,其配置文件的拓展实现类都未标注@Adaptive,那么就会由Dubbo动态创建具有代理功能的拓展实现类,代码如下:

private Class<?> createAdaptiveExtensionClass() {
    ClassLoader classLoader = type.getClassLoader();
    try {
        if (NativeUtils.isNative()) {
            return classLoader.loadClass(type.getName() + "$Adaptive");
        }
    } catch (Throwable ignore) {

    }
    // 生成具有代理功能的代码
    String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
    // 基于Dubbo SPI 自适应拓展获取具有代理功能的Compiler的拓展实现类
    org.apache.dubbo.common.compiler.Compiler compiler = extensionDirector.getExtensionLoader(
        org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    // 编译代码得到拓展实现类
    return compiler.compile(type, code, classLoader);
}

生成代码和编译代码的过程不是本文重点,就不分析了,为Protocol接口动态生成的代码如下:

package org.apache.dubbo.rpc;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public void destroy()  {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort()  {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public java.util.List getServers()  {
throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}

编译得到的Protocol$Adaptive,实现了Protocol接口,在其exportrefer方法中,先根据url中获取到具体的协议名称,然后再根据ExtensionLoader.getExtension获取具体的拓展实现类,从而实现了代理功能;

到此,关于Dubbo SPI 自适应拓展的分析就结束了。