Dubbo SPI源码分析

Dubbo SPI源码分析

对于一个优秀的框架需要很好的扩展性,给出一个接口,自己可以给出默认实现,同时也允许其他人实现拓展。即“对扩展开放,对修改封闭”的原则。Dubbo 采用微内核+插件的方式来实现,微内核架构中,内核通常采用 Factory、IoC、OSGi 等方式管理插件生命周期,Dubbo 最终决定采用 SPI 机制来加载插件,Dubbo SPI 参考 JDK 原生的 SPI 机制,进行了性能优化以及功能增强。

我们来看看 SPI 定义:

Service Provider Interface (SPI) is an API intended to be implemented or extended by a third party. It can be used to enable framework extension and replaceable components.

JDK SPI

JDK SPI 最比较常见的在访问数据库会使用到java.sql.Driver这个接口,不同的数据库产商会有不同的实现,JDK SPI机制可以为某个接口寻找服务实现。

JDK SPI 机制

我们先看一个例子,模拟连接数据库,先定义一个 Driver 接口。

package com.cuzz.api;

public interface Driver {
void connect(String url);
}

然后不同的产商有不同的实现,以 mysql 和 oracle 两个实现。

package com.cuzz.mysql;

import com.cuzz.api.Driver;

public class MysqlDriver implements Driver {
@Override
public void connect(String url) {
System.out.println("connect mysql: " + url);
}
}

// -----------------

package com.cuzz.oracle;

import com.cuzz.api.Driver;

public class OracleDriver implements Driver {
@Override
public void connect(String url) {
System.out.println("connect oracle: " + url);
}
}

在项目的 resources/META-INF/services 目录下添加一个名为 com.cuzz.api.Driver 的文件,这是 JDK SPI 需要读取的配置文件,具体内容如下:

com.cuzz.mysql.MysqlDriver
com.cuzz.oracle.OracleDriver

加载配置:

public class Main {
public static void main(String[] args) {
// Java spi 机制
ServiceLoader<Driver> serviceLoader = ServiceLoader.load(Driver.class);
System.out.println(serviceLoader);
Iterator<Driver> iterator = serviceLoader.iterator();
while (iterator.hasNext()) {
Driver driver = iterator.next();
driver.connect("localhost:3306");
}
}
}

运行结果:

java.util.ServiceLoader[com.cuzz.api.Driver]
connect mysql: localhost:3306
connect oracle: localhost:3306

JDK SPI 源码分析

我们从ServiceLoader<Driver> serviceLoader = ServiceLoader.load(Driver.class);定位到ServiceLoader构造方法中的java.util.ServiceLoader#reload方法

// 前缀
private static final String PREFIX = "META-INF/services/";

// The class or interface representing the service being loaded
private final Class<S> service;

// The class loader used to locate, load, and instantiate providers
private final ClassLoader loader;

// The access control context taken when the ServiceLoader is created
private final AccessControlContext acc;

// Cached providers, in instantiation order
// 缓存
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();

// The current lazy-lookup iterator
// 懒加载迭代器
private LazyIterator lookupIterator;

public void reload() {
providers.clear();
lookupIterator = new LazyIterator(service, loader);
}

重点看看这个 LazyIterator 类,这是一个内部类,主要以懒加载形式实现。Iterator 这个接口需要实现 Iterator#hasNext 方法和 Iterator#next 方法,hasNext方法调用了LazyIterator#hasNextService,而next方法调用LazyIterator#nextService。

private class LazyIterator implements Iterator<S> {
Class<S> service;
ClassLoader loader;
// 像这样的URL file:/Users/cuzz/Projects/Java/dubbo/cuzz-demo/cuzz-demo-spi/target/classes/META-INF/services/com.cuzz.api.Driver
Enumeration<URL> configs = null;
Iterator<String> pending = null;
String nextName = null;

private LazyIterator(Class<S> service, ClassLoader loader) {
this.service = service;
this.loader = loader;
}

private boolean hasNextService() {
if (nextName != null) {
return true;
}
// 第一次获取,config 为空开始加载文件
if (configs == null) {
try {
// 获取文件名 META-INF/services/com.cuzz.api.Driver
String fullName = PREFIX + service.getName();
// 加载配置路径
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
// 解析文件
pending = parse(service, configs.nextElement());
}
// 把实现类的名称记录下来 com.cuzz.mysql.MysqlDriver
nextName = pending.next();
return true;
}

private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
// 存一个备份
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
// 通过反射获取该实现类
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,
"Provider " + cn + " not a subtype");
}
try {
S p = service.cast(c.newInstance());
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error(); // This cannot happen
}

public boolean hasNext() {
if (acc == null) {
return hasNextService();
} else {
PrivilegedAction<Boolean> action = new PrivilegedAction<Boolean>() {
public Boolean run() { return hasNextService(); }
};
return AccessController.doPrivileged(action, acc);
}
}

public S next() {
if (acc == null) {
return nextService();
} else {
PrivilegedAction<S> action = new PrivilegedAction<S>() {
public S run() { return nextService(); }
};
return AccessController.doPrivileged(action, acc);
}
}

public void remove() {
throw new UnsupportedOperationException();
}

}

最后我们来 ServiceLoader#iterator 这个方法是怎么实现的,主要是先走缓存,在走懒加载。

public Iterator<S> iterator() {
return new Iterator<S>() {

Iterator<Map.Entry<String,S>> knownProviders
= providers.entrySet().iterator();

public boolean hasNext() {
// 先走缓存,在走懒加载
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}

public S next() {
// 先走缓存,在走懒加载
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}

public void remove() {
throw new UnsupportedOperationException();
}
};
}

JDK SPI 在 JDBC 中的应用

当我们引入mysql 驱动时候,在 META-INF/services 目录下,有一个 java.sql.Driver 文件,内容如下。

om.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver

当我们要链接 JDBC 会通过 DriverManager驱动管理来连接。

String url = "jdbc:mysql://localhost:3306/demo?useSSL=true&useUnicode=true&characterEncoding=UTF-8";
String username = "root";
String pwd = "12345";
Connection conn = DriverManager.getConnection(url, username, pwd);

DriverManager类的静态方法在 JVM加载类的时候会执行,执行 loadInitialDrivers 方法。

private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();

static {
loadInitialDrivers();
println("JDBC DriverManager initialized");
}

private static void loadInitialDrivers() {
// 看看系统属性是否配置了jdbc.drivers
String drivers;
try {
drivers = AccessController.doPrivileged(new PrivilegedAction<String>() {
public String run() {
return System.getProperty("jdbc.drivers");
}
});
} catch (Exception ex) {
drivers = null;
}
// If the driver is packaged as a Service Provider, load it.
// Get all the drivers through the classloader
// exposed as a java.sql.Driver.class service.
// ServiceLoader.load() replaces the sun.misc.Providers()

AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {

// JDK SPI 方式加载并实例化
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);
Iterator<Driver> driversIterator = loadedDrivers.iterator();

/* Load these drivers, so that they can be instantiated.
* It may be the case that the driver class may not be there
* i.e. there may be a packaged driver with the service class
* as implementation of java.sql.Driver but the actual class
* may be missing. In that case a java.util.ServiceConfigurationError
* will be thrown at runtime by the VM trying to locate
* and load the service.
*
* Adding a try catch block to catch those runtime errors
* if driver not available in classpath but it's
* packaged as service and that service is there in classpath.
*/
try{
while(driversIterator.hasNext()) {
driversIterator.next();
}
} catch(Throwable t) {
// Do nothing
}
return null;
}
});

println("DriverManager.initialize: jdbc.drivers = " + drivers);

if (drivers == null || drivers.equals("")) {
return;
}
// 配置了jdbc.dirvers属性通过反射实例化
String[] driversList = drivers.split(":");
println("number of Drivers:" + driversList.length);
for (String aDriver : driversList) {
try {
println("DriverManager.Initialize: loading " + aDriver);
Class.forName(aDriver, true,
ClassLoader.getSystemClassLoader());
} catch (Exception ex) {
println("DriverManager.Initialize: load failed: " + ex);
}
}
}

实例化 java.sql.Driver 接口实现类,在MySQL提供的,会吧自己注册到 DriverManager 中。

package com.mysql.jdbc;

import java.sql.SQLException;

public class Driver extends NonRegisteringDriver implements java.sql.Driver {
// Register ourselves with the DriverManager
static {
try {
// 注册到DriverManager的CopyOnWriteArrayList中
java.sql.DriverManager.registerDriver(new Driver());
} catch (SQLException E) {
throw new RuntimeException("Can't register driver!");
}
}
}

最后调用 DriverManager#getConnection 从注册中获取连接。

//  Worker method called by the public getConnection() methods.
private static Connection getConnection(
String url, java.util.Properties info, Class<?> caller) throws SQLException {
// 循环从注册中获取,获取到一个就返回。
for(DriverInfo aDriver : registeredDrivers) {
try {
Connection con = aDriver.driver.connect(url, info);
if (con != null) {
// Success!
println("getConnection returning " + aDriver.driver.getClass().getName());
return (con);
}
} catch (SQLException ex) {
if (reason == null) {
reason = ex;
}
}
}
}

JDK SPI 的缺点

  • 虽然ServiceLoader也算是使用的延迟加载,但是基本只能通过遍历全部获取,也就是接口的实现类全部加载并实例化一遍。如果你并不想用某些实现类,它也被加载并实例化了,这就造成了浪费。
  • 获取某个实现类的方式不够灵活,只能通过Iterator形式获取,不能根据某个参数来获取对应的实现类。

Dubbo SPI

Dubbo SPI 对 JDK SPI 进行了扩展,由原来的提供者类的全限定名列表改成了 K-V 形式,如果 SPI 配置文件中定义了多个实现类,而我们只需要使用其中一个实现类时,就会生成不必要的对象,除此之外 Dubbo 对 JDK SPI 做了三个方面的扩展:

  • 方便获取扩展实现:JDK SPI仅仅通过接口类名获取所有实现,而 ExtensionLoader 则通过接口类名和key值获取一个实现。

  • IOC依赖注入功能:Adaptive实现,就是生成一个代理类,这样就可以根据实际调用时的一些参数动态决定要调用的类了。

  • 采用装饰器模式进行功能增强,自动包装实现,这种实现的类一般是自动激活的,常用于包装类,比如:Protocol的两个实现类:ProtocolFilterWrapper、ProtocolListenerWrapper。

Dubbo 按照 SPI 配置文件的用途,将其分成了三类目录。

  • META-INF/services/ 目录:该目录下的 SPI 配置文件用来兼容 JDK SPI 。

  • META-INF/dubbo/ 目录:该目录用于存放用户自定义 SPI 配置文件。

  • META-INF/dubbo/internal/ 目录:该目录用于存放 Dubbo 内部使用的 SPI 配置文件。

Dubbo SPI 机制

定义一个接口,用 @SPI 标识表示是 Dubbo SPI。

@SPI
public interface Driver {
void connect(String url);
}

实现类:

package com.cuzz.mysql;

import com.cuzz.api.Driver;

public class MysqlDriver implements Driver {
@Override
public void connect(String url) {
System.out.println("connect mysql: " + url);
}
}

// -----------------

package com.cuzz.oracle;

import com.cuzz.api.Driver;

public class OracleDriver implements Driver {
@Override
public void connect(String url) {
System.out.println("connect oracle: " + url);
}
}

在项目的 resources/META-INF/dubbo 目录下添加一个名为 com.cuzz.api.Driver 的文件,这是 Dubbo SPI 需要读取的配置文件,与JDK SPI 不一样是KV形式,具体内容如下:

mysqlDriver=com.cuzz.mysql.MysqlDriver
oracleDriver=com.cuzz.oracle.OracleDriver

获取实现类:

public class App {
public static void main(String[] args) {
Driver driver = ExtensionLoader.getExtensionLoader(Driver.class).getExtension("mysqlDriver");
driver.connect("localhost:3306");
}
}

输出:

connect mysql: localhost:3306

Dubbo SPI 主流程

我们先从获取 ExtensLoader 实例开始,ExtensionLoader#getExtensionLoader

/**
* Dubbo 中一个扩展接口对应一个 ExtensionLoader 实例,该集合缓存了全部 ExtensionLoader 实例,
* 其中的 Key 为扩展接口,Value 为加载其扩展实现的 ExtensionLoader 实例。
*/
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
// 必须为接口
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
}
// 必须有@SPI接口
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type (" + type +
") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
}

// 从缓存中获取
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
// 如果已经存在 key 就不往 map 中添加
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); // --->
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}

接着看看 ExtensionLoader#ExtensionLoader 构造方法,如果 type 不为 ExtensionFactory.class 初始化拓展适配器。

/**
* 表示拓展类实例工厂,可以通过工厂创建实例
*/
private final ExtensionFactory objectFactory;

private ExtensionLoader(Class<?> type) {
this.type = type;
// 初始化拓展适配器
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

获取拓展实现类,ExtensionLoader#getExtension

/**
* 缓存了该 ExtensionLoader 加载的扩展名与扩展实现对象之间的映射关系。
*/
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();

public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
// @SPI中value有值,如@SPI("dubbo") 默认获取 key 为 dubbo 的 Extension
if ("true".equals(name)) {
return getDefaultExtension();
}
// getOrCreateHolder()方法中封装了查找cachedInstances缓存的逻辑
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) { // 双重锁防止并发
instance = holder.get();
if (instance == null) {
instance = createExtension(name); // --->
holder.set(instance);
}
}
}
return (T) instance;
}

private Holder<Object> getOrCreateHolder(String name) {
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<>());
holder = cachedInstances.get(name);
}
return holder;
}

ExtensionLoader#createExtension 方法中完成了 SPI 配置文件的查找以及相应扩展实现类的实例化,同时还实现了自动装配以及自动 Wrapper 包装等功能。

private T createExtension(String name) {
// 获取 cachedClasses 缓存,根据扩展名从 cachedClasses 缓存中获取扩展实现类。
Class<?> clazz = getExtensionClasses().get(name); // ---> 1
if (clazz == null) {
throw findException(name);
}
try {
// 根据扩展实现类从 EXTENSION_INSTANCES 缓存中查找相应的实例。如果查找失败,会通过反射创建扩展实现对象。
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 自动装配扩展实现对象中的属性(即调用其 setter)。这里涉及 ExtensionFactory 以及自动装配的相关内容。
injectExtension(instance);
// 自动包装扩展实现对象。这里涉及 Wrapper 类以及自动包装特性的相关内容.
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
// 如果扩展实现类实现了 Lifecycle 接口,在 initExtension() 方法中会调用 initialize() 方法进行初始化。
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}

Dubbo SPI 获取拓展类

ExtensionLoader#getExtensionClasses

/**
* 缓存了该 ExtensionLoader 加载的扩展名与扩展实现类之间的映射关系。cachedNames 集合的反向关系缓存。
*/
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();

private Map<String, Class<?>> getExtensionClasses() {
// 先从缓存中获取
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 加载类
classes = loadExtensionClasses(); // --->
cachedClasses.set(classes);
}
}
}
return classes;
}

ExtensionLoader#loadExtensionClasses

/**
* synchronized in getExtensionClasses
*/
private Map<String, Class<?>> loadExtensionClasses() {
// 只能有一个默认值
cacheDefaultExtensionName();

// 加载的扩展名与扩展实现类之间的映射关系
Map<String, Class<?>> extensionClasses = new HashMap<>();

for (LoadingStrategy strategy : strategies) {
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages()); // --->
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}

return extensionClasses;
}

private void cacheDefaultExtensionName() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation == null) {
return;
}

String value = defaultAnnotation.value();
// 只能有一个车默认值,这种 @SPI("dubbo,http") 就会报错
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("More than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) {
cachedDefaultName = names[0];
}
}
}

ExtensionLoader#loadDirectory

private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
boolean extensionLoaderClassLoaderFirst, boolean overridden, String... excludedPackages) {
String fileName = dir + type;
try {
Enumeration<java.net.URL> urls = null;
ClassLoader classLoader = findClassLoader();

// try to load from ExtensionLoader's ClassLoader first
if (extensionLoaderClassLoaderFirst) {
ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
urls = extensionLoaderClassLoader.getResources(fileName);
}
}

if (urls == null || !urls.hasMoreElements()) {
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
}

// 循环获取
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
loadResource(extensionClasses, classLoader, resourceURL, overridden, excludedPackages); // --->
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", description file: " + fileName + ").", t);
}
}

ExtensionLoader#loadResource

private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
java.net.URL resourceURL, boolean overridden, String... excludedPackages) {
try {
// 必须 utf-8 格式
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) {
// 去掉注释
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
// 没有被排除外
if (line.length() > 0 && !isExcluded(line, excludedPackages)) {
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name, overridden); // --->
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", class file: " + resourceURL + ") in " + resourceURL, t);
}
}

ExtensionLoader#loadClass

private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
boolean overridden) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
// 处理Adaptive注解,若存在则将该实现类保存至cachedAdaptiveClass属性
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz, overridden);
}
// 是否为包装类,是包装类缓存到 cachedWrapperClasses Set中
else if (isWrapperClass(clazz)) {
cacheWrapperClass(clazz);
} else {
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
// key可以为多个,如:mysqlDriver,mysqlDriver2=com.cuzz.mysql.MysqlDriver
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
// 缓存到 cachedActivates 属性中
cacheActivateClass(clazz, names[0]);
for (String n : names) {
// 缓存了该 ExtensionLoader 加载的扩展实现类与扩展名之间的映射关系。
cacheName(clazz, n);
// 加载的扩展名与扩展实现类之间的映射关系
saveInExtensionClass(extensionClasses, clazz, n, overridden);
}
}
}
}

private void cacheAdaptiveClass(Class<?> clazz, boolean overridden) {
if (cachedAdaptiveClass == null || overridden) {
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getName()
+ ", " + clazz.getName());
}
}

Dubbo SPI 的自动包装和自动注入

回到前面我们分析ExtensionLoader#createExtension方法,现在我们重点关注 ExtensionLoader#injectExtension 方法

private T createExtension(String name) {
// 获取 cachedClasses 缓存,根据扩展名从 cachedClasses 缓存中获取扩展实现类。
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
// 根据扩展实现类从 EXTENSION_INSTANCES 缓存中查找相应的实例。如果查找失败,会通过反射创建扩展实现对象。
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 自动装配扩展实现对象中的属性(即调用其 setter)。这里涉及 ExtensionFactory 以及自动装配的相关内容。
injectExtension(instance); // --->
// 自动包装扩展实现对象。这里涉及 Wrapper 类以及自动包装特性的相关内容。
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
// 遍历所有的包装类,包装类需要有一个参数类被包装类型的构造器。
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
// 如果扩展实现类实现了 Lifecycle 接口,在 initExtension() 方法中会调用 initialize() 方法进行初始化。
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}

ExtensionLoader#injectExtension

private T injectExtension(T instance) {

if (objectFactory == null) {
return instance;
}

try {
for (Method method : instance.getClass().getMethods()) {
// 判断是否为set方法
if (!isSetter(method)) {
continue;
}

// 如果有 @DisableInject 注解也不注入
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
// 获取参数类型,如果是基本类型也忽略
Class<?> pt = method.getParameterTypes()[0];
if (ReflectUtils.isPrimitives(pt)) {
continue;
}

try {
// 根据 Setter 方法获取属性名
String property = getSetterProperty(method);
// 加载这个类,并实例化
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
// 反射注入
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("Failed to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}

}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}

Dubbo SPI 的 @Adaptive 注解与适配器

在dubbo扩展中,适配器模式被广泛使用,其作用在于为同一扩展类型下的多个扩展实现的调用提供路由功能,如指定优先级等。dubbo提供了两种方式来生成扩展适配器:

  • 静态代码形式的默认适配器:这些类会被Adaptive注解修饰,且一个接口只能有一个这样的静态适配器。这种形式仅应用于一些特殊的接口,如:AdaptiveCompiler、AdaptiveExtensionFactory这两个适配器,ExtensionLoader需要依赖它们来工作,所以使用了这种特殊的构建方式。

  • 动态代码适配器:实际上其余的接口都是使用动态适配器,ExtensionLoader 根据接口定义动态生成一段适配器代码,并构建这个动态类的实例。这个时候接口中的一些方法具有 Adaptive 标记,它提供了一些用于查找具体 Extension 的key,如果这些方法中有URL类型的参数,则会依次在url中查找这些key对应的value,再以此为 name 确定要使用的 Extension。如果没有从url中找到该参数,则会使用 SPI 注解中的默认值 name 进行构建。

我们回到构造方法中ExtensionLoader#getAdaptiveExtension

private ExtensionLoader(Class<?> type) {
this.type = type;
// 初始化拓展适配器
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

ExtensionLoader#getAdaptiveExtension

public T getAdaptiveExtension() {
// 先从缓存中获取
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}

synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建
instance = createAdaptiveExtension(); // ---> 1
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}

return (T) instance;
}

private T createAdaptiveExtension() {
try {
// 注入属性
return injectExtension((T) getAdaptiveExtensionClass().newInstance()); // ---> 2
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}

private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass(); // ---> 3
}

private Class<?> createAdaptiveExtensionClass() {
// 创建适配器类,并继承 type 接口
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); // ---> 4
ClassLoader classLoader = findClassLoader();
// ExtensionLoader再调用默认的JavassitCompiler进行编译和类加载
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

ExtensionLoader#createAdaptiveExtensionClass

以 Transsporter为例子

@SPI("netty") 
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

Dubbo 会生成一个 Transporter$Adaptive 适配器类,该类继承了 Transporter 接口:

public class Transporter$Adaptive implements Transporter { 
public org.apache.dubbo.remoting.Client connect(URL arg0, ChannelHandler arg1) throws RemotingException {
// 必须传递URL参数
if (arg0 == null) throw new IllegalArgumentException("url == null");
URL url = arg0;
// 确定扩展名,优先从URL中的client参数获取,其次是transporter参数
// 这两个参数名称由@Adaptive注解指定,最后是@SPI注解中的默认值
String extName = url.getParameter("client",
url.getParameter("transporter", "netty"));
if (extName == null)
throw new IllegalStateException("...");
// 通过ExtensionLoader加载Transporter接口的指定扩展实现
Transporter extension = (Transporter) ExtensionLoader
.getExtensionLoader(Transporter.class)
.getExtension(extName);
return extension.connect(arg0, arg1);
}
... // 省略bind()方法
}

Dubbo SPI 的 @Activate注解与自动激活特性

这里以 Dubbo 中的 Filter 为例说明自动激活特性的含义,org.apache.dubbo.rpc.Filter 接口有非常多的扩展实现类,在一个场景中可能需要某几个 Filter 扩展实现类协同工作,而另一个场景中可能需要另外几个实现类一起工作。这样,就需要一套配置来指定当前场景中哪些 Filter 实现是可用的,这就是 @Activate 注解要做的事情。

@Activate 注解标注在扩展实现类上,有 group、value 以及 order 三个属性。

  • group 属性:修饰的实现类是在 Provider 端被激活还是在 Consumer 端被激活。
  • value 属性:修饰的实现类只在 URL 参数中出现指定的 key 时才会被激活。
  • order 属性:用来确定扩展实现类的排序。

如 Filter 接口和实现类:

@SPI
public interface Filter {
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;1
}

@Activate(group = Constants.PROVIDER)
public class TimeoutFilter implements Filter {
...
}

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class MonitorFilter implements Filter {
...
}

首先来关注 getActivateExtension() 方法的参数:url 中包含了配置信息,values 是配置中指定的扩展名,group 为 Provider 或 Consumer。

public List<T> getActivateExtension(URL url, String[] values, 
String group) {
    List<T> activateExtensions = new ArrayList<>();
    // values配置就是扩展名
    List<String> names = values == null ?
new ArrayList<>(0) : asList(values);
    if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {// ---1
        getExtensionClasses(); // 触发cachedActivates等缓存字段的加载
        for (Map.Entry<String, Object> entry :
cachedActivates.entrySet()) {
            String name = entry.getKey(); // 扩展名
            Object activate = entry.getValue(); // @Activate注解
            String[] activateGroup, activateValue;
            if (activate instanceof Activate) { // @Activate注解中的配置
                activateGroup = ((Activate) activate).group();
                activateValue = ((Activate) activate).value();
            } else {
                continue;
            }
            if (isMatchGroup(group, activateGroup) // 匹配group
// 没有出现在values配置中的,即为默认激活的扩展实现
                    && !names.contains(name)
// 通过"-"明确指定不激活该扩展实现
                    && !names.contains(REMOVE_VALUE_PREFIX + name)
// 检测URL中是否出现了指定的Key
                    && isActive(activateValue, url)) {
// 加载扩展实现的实例对象,这些都是激活的
                activateExtensions.add(getExtension(name));
            }
        }
        // 排序 --- 2
        activateExtensions.sort(ActivateComparator.COMPARATOR);
    }

    List<T> loadedExtensions = new ArrayList<>();
    for (int i = 0; i < names.size(); i++) { // ---3
        String name = names.get(i);
// 通过"-"开头的配置明确指定不激活的扩展实现,直接就忽略了
        if (!name.startsWith(REMOVE_VALUE_PREFIX)
                && !names.contains(REMOVE_VALUE_PREFIX + name)) {
            if (DEFAULT_KEY.equals(name)) {
                if (!loadedExtensions.isEmpty()) {
// 按照顺序,将自定义的扩展添加到默认扩展集合前面
                    activateExtensions.addAll(0, loadedExtensions);
                    loadedExtensions.clear();
                }
            } else {
                loadedExtensions.add(getExtension(name));
            }
        }
    }
    if (!loadedExtensions.isEmpty()) {
// 按照顺序,将自定义的扩展添加到默认扩展集合后面
        activateExtensions.addAll(loadedExtensions);
    }
    return activateExtensions;
}

总结

本文总结了 JDK SPI 和 Dubbo SPI 机制和原理,参考了很多文章,以下几点需要值得注意:

  • JDK SPI 需要对加载实例化所有的推展对象,而 Dubbo SPI 根据 KV 形式,只需要实例化需要的拓展。
  • Dubbo SPI 对 JDK SPI 拓展了自动注入、自动注入以及自动激活等特性。

参考

Comments