Dubbo服务发布流程
Dubbo 中的 URL
在 Dubbo 中 URL 可以看做是一个被封装的信息承载结构,用于信息的传递。可以简单的认为 Dubbo 中的 URL 就是一个承载信息的实体类,在整个流程中负责数据传递。 一个标准的 URL 格式至多可以包含如下的几个部分
protocol://username:password@host:port/path?key=value&key=value
在 dubbo 中,也使用了类似的 URL,主要用于在各个扩展点之间传递数据,组成此 URL 对象的具体参数如下:
- protocol:一般是 dubbo 中的各种协议 如:dubbo thrift http zk
- username/password:用户名/密码
- host/port:主机/端口
- path:接口名称
- parameters:参数键值对
一些典型的 Dubbo URL如下:
dubbo://192.168.1.6:20880/moe.cnkirito.sample.HelloService?timeout=3000
描述一个 dubbo 协议的服务
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=1214&qos.port=33333×tamp=1545721981946
描述一个 zookeeper 注册中心
consumer://30.5.120.217/org.apache.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=1209&qos.port=33333&side=consumer×tamp=1545721827784
描述一个消费者
我们以如下代码进行分析,注册中心使用zk :
public class SimpleProvider {
public static void main(String[] args) throws IOException {
// 进行一些Dubbo 配置
ServiceConfig<SimpleDemoService> serviceConfig = DubboUtil.serviceConfig("dubbo-provider", SimpleDemoService.class, new MainSimpleDemoServiceImpl());
serviceConfig.getRegistry().setSimplified(true);
// MetadataReportConfig metadataReportConfig = new MetadataReportConfig();
// metadataReportConfig.setAddress("zookeeper://127.0.0.1:2181");
// serviceConfig.setMetadataReportConfig(metadataReportConfig);
// ConfigCenterConfig configCenterConfig = new ConfigCenterConfig();
// configCenterConfig.setAddress("zookeeper://127.0.0.1:2181");
// serviceConfig.setConfigCenter(configCenterConfig);
// 服务暴露
serviceConfig.export();
System.out.println("service is start");
System.in.read();
}
}
其中 DubboUtil#serviceConfig 是自己封装的一个简单方法,如下:
public static <T> ServiceConfig<T> serviceConfig(String applicationName, Class<T> tClass, T obj) {
ServiceConfig<T> serviceConfig = new ServiceConfig<>();
// 设置服务名称
final ApplicationConfig applicationConfig = new ApplicationConfig(applicationName);
serviceConfig.setApplication(applicationConfig);
// 设置注册中心地址
RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
registryConfig.setSimplified(true);
serviceConfig.setRegistry(registryConfig);
// 设置暴露接口
serviceConfig.setInterface(tClass);
serviceConfig.setRef(obj);
// 设置 版本 和 分组
serviceConfig.setVersion("1.0.0");
serviceConfig.setGroup("dubbo");
return serviceConfig;
}
暴露流程
Dubbo 的服务暴露逻辑始于 ServiceConfig#export,下面我们来看看其实现:
private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));
public synchronized void export() {
// 1. 对默认配置进行检查,某些配置没有提供时,提供缺省值。
checkAndUpdateSubConfigs();
// 获取到服务提供者的属性: 是否导出服务,是否延迟发布
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
// 如果不导出服务,则直接结束
if (export != null && !export) {
return;
}
// 如果配置了延迟发布,则过了延迟时间再发布,这里 delayExportExecutor 是一个线程池
if (delay != null && delay > 0) {
delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
} else {
// 2.否则直接发布
doExport();
}
}
ServiceConfig#export 的逻辑还是比较清楚的 ,如下:
- 对暴露的服务进行参数校验,这里是通过 ServiceConfig#checkAndUpdateSubConfigs 完成。
- 判断服务是否需要发布。
- 判断服务是否需要延迟发布。
- ServiceConfig#doExport 来发布服务。 上面我们只关注 ServiceConfig#checkAndUpdateSubConfigs 参数校验的过程 和 ServiceConfig#doExport 服务暴露的过程。
ServiceConfig#checkAndUpdateSubConfigs
ServiceConfig#checkAndUpdateSubConfigs 不仅仅是基本配置进行检查,对于一些用户没有进行定义的配置,将提供配置的缺省值。其详细代码如下:
public void checkAndUpdateSubConfigs() {
// 1.1 使用显示配置的provider、module、application 来进行一些全局配置,其优先级为 ServiceConfig > provider > module > application
completeCompoundConfigs();
// Config Center should always being started first.
// 1.2 如果配置了配置中心,这里会启动配置中心,并加载外部化配置。
startConfigCenter();
// 下面的步骤就是检查provider、application、registry、protocol是否存在,如果不存在,就默认一个,并调用refresh方法
// 1.3 参数的处理
checkDefault();
checkApplication();
checkRegistry();
checkProtocol();
this.refresh();
// 1.4 元数据中心的处理
checkMetadataReport();
// 接口参数校验
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
// 1.5 对泛化实现的处理
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
// 根据接口全路径名反射获取 Class
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 校验 MethodConfig 配置的是否是接口中的方法
checkInterfaceAndMethods(interfaceClass, methods);
// 校验 ref 引用是否是null 和是否是接口类的实现
checkRef();
// 设置泛化调用为false
generic = Boolean.FALSE.toString();
}
//1.6 对local 和 sub 属性的检查
if (local != null) {
// 如果 local 为 true,则认为是默认情况local 接口是 接口名 + Local
if ("true".equals(local)) {
local = interfaceName + "Local";
}
// 否则通过反射去获取指定 Local 类
Class<?> localClass;
try {
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 获取的Local 不是接口的子类抛出异常
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
// local 已经过时,现在都使用 stub 作为本地存根,逻辑同 local
if (stub != null) {
if ("true".equals(stub)) {
stub = interfaceName + "Stub";
}
Class<?> stubClass;
try {
stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
// 对 local 和 stub 的检查
checkStubAndLocal(interfaceClass);
// 对 mock 数据的检查
checkMock(interfaceClass);
}
这一步对 提供者启动时的大部分参数进行了合法性校验。包括配置中心、元数据中心、本地存根、本地mock等功能。
completeCompoundConfigs
这一步的目的是 使用显示配置的provider、module、application 来进行一些全局配置,其优先级为 ServiceConfig > provider > module > application 。即按照优先级合并一些重复的配置。其实现如下:
// 当前是在 org.apache.dubbo.config.ServiceConfig#completeCompoundConfigs 方法中执行
// 所以这里的 provider 、application 等都是 ServiceConfig 的属性
private void completeCompoundConfigs() {
// 如果 provider 不为空
if (provider != null) {
// 如果 application 没设置,则使用 provider 提供的 application。下面以此类推
if (application == null) {
setApplication(provider.getApplication());
}
if (module == null) {
setModule(provider.getModule());
}
if (registries == null) {
setRegistries(provider.getRegistries());
}
if (monitor == null) {
setMonitor(provider.getMonitor());
}
if (protocols == null) {
setProtocols(provider.getProtocols());
}
if (configCenter == null) {
setConfigCenter(provider.getConfigCenter());
}
}
if (module != null) {
if (registries == null) {
setRegistries(module.getRegistries());
}
if (monitor == null) {
setMonitor(module.getMonitor());
}
}
if (application != null) {
if (registries == null) {
setRegistries(application.getRegistries());
}
if (monitor == null) {
setMonitor(application.getMonitor());
}
}
}
startConfigCenter
在 Dubbo 2.7 及以上版本, Dubbo除了注册中心外还提供了配置中心和元数据中心。配置中心中可以存储一些配置信息。我们这里的逻辑是启动加载配置中心的配置,如下:
void startConfigCenter() {
// 如果未配置中心,尝试从 ConfigManager 中加载
if (configCenter == null) {
ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc);
}
// 如果获取到了配置中心的配置
if (this.configCenter != null) {
// TODO there may have duplicate refresh
// 1. 刷新配置中心,按照优先级合并配置信息,因为配置文件具有优先级,系统配置优先级最高,如下配置顺序
// isConfigCenterFirst = true : SystemConfiguration -> ExternalConfiguration -> AppExternalConfiguration -> AbstractConfig -> PropertiesConfiguration
// isConfigCenterFirst = false : SystemConfiguration -> AbstractConfig -> ExternalConfiguration -> AppExternalConfiguration -> PropertiesConfiguration
this.configCenter.refresh();
// 2. 环境准备,读取配置中心的配置加载到 Environment 中
prepareEnvironment();
}
// 3. 刷新全部配置,将外部配置中心的配置应用到本地
ConfigManager.getInstance().refreshAll();
}
这里可以分为三步:
- this.configCenter.refresh(); :这里会刷新配置中心的配置,合并当前关于配置中心的属性。如配置中心的地址、协议等可能会在环境变量、外部配置、代码执行等多种方式指定,此时需要根据优先级来获取优先级最高的配置属性参数作为最终参数用于初始化配置中心。简单来说就是确定需要加载的配置中心的一些信息。
- prepareEnvironment(); : 根据第一步中获取到的配置中心属性获取到配置中心实例,并读取配置中心的配置文件的内容,并保存到 Environment 中。这里会根据配置的优先级进行保存,优先级高的可以先获取到。
- ConfigManager.getInstance().refreshAll(); :这里会触发其他配置类的配置刷新操作,其他配置类会从 Environment 中读取到配置中心设置的内容,以完成自身内容的更新。
这里的逻辑可以简单理解为: 如果存在配置中心配置,则通过 this.configCenter.refresh(); 首先确定配置中心的配置。在确定配置中心的信息后在 prepareEnvironment(); 中加载配置中心实例,并获取配置中心上的配置内容,保存到 Environment 中。ConfigManager.getInstance().refreshAll(); 触发其他Dubbo 配置类的刷新操作,这个刷新操作会从 Environment 中获取属于自己的配置信息并加载。
默认参数的处理
这一部分都是,对默认参数的处理。
// 对默认参数的检查,如果不存在则补充一个缺省值
checkDefault();
checkApplication();
checkRegistry();
checkProtocol();
// 将当前配置添加到环境中, 并且循环方法,并且获取覆盖值并将新值设置回方法
this.refresh();
// 元数据中心的检查
checkMetadataReport();
我们这里主要来看 ServiceConfig#refresh,这里的实现是 AbstractConfig#refresh:
public void refresh() {
try {
// 1. 这里从根据指定的前缀和 id 从环境中获取配置
// getPrefix() 是按照一定规则拼接 : 对 ServiceConfig 来说 为dubbo.service.{interfaceName}
// getId 获取的是 interfaceName 名,对 ServiceConfig 来说会在 serviceConfig.setInterface 时会赋值 id
CompositeConfiguration compositeConfiguration = Environment.getInstance().getConfiguration(getPrefix(), getId());
// 2. 构建当前类的配置类 InmemoryConfiguration
InmemoryConfiguration config = new InmemoryConfiguration(getPrefix(), getId());
config.addProperties(getMetaData());
// 3. 按照规则配置将现在的配置添加到复合配置中。
if (Environment.getInstance().isConfigCenterFirst()) {
// The sequence would be: SystemConfiguration -> ExternalConfiguration -> AppExternalConfiguration -> AbstractConfig -> PropertiesConfiguration
compositeConfiguration.addConfiguration(3, config);
} else {
// The sequence would be: SystemConfiguration -> AbstractConfig -> ExternalConfiguration -> AppExternalConfiguration -> PropertiesConfiguration
compositeConfiguration.addConfiguration(1, config);
}
// loop methods, get override value and set the new value back to method
// 4. 和复合配置中的属性合并,这里通过反射将自身的一些属性设置为复合配置中的属性。
Method[] methods = getClass().getMethods();
for (Method method : methods) {
if (ClassHelper.isSetter(method)) {
try {
String value = compositeConfiguration.getString(extractPropertyName(getClass(), method));
// isTypeMatch() is called to avoid duplicate and incorrect update, for example, we have two 'setGeneric' methods in ReferenceConfig.
if (StringUtils.isNotEmpty(value) && ClassHelper.isTypeMatch(method.getParameterTypes()[0], value)) {
method.invoke(this, ClassHelper.convertPrimitive(method.getParameterTypes()[0], value));
}
} catch (NoSuchMethodException e) {
logger.info("Failed to override the property " + method.getName() + " in " +
this.getClass().getSimpleName() +
", please make sure every property has getter/setter method provided.");
}
}
}
} catch (Exception e) {
logger.error("Failed to override ", e);
}
}
- 从根据指定的前缀和 id 从环境中获取配置。
- 构建当前类的配置类 InmemoryConfiguration,用于后面合并当前配置类的属性
- 按照规则配置将现在的配置添加到复合配置中。
- 和复合配置中的属性合并,这里通过反射将自身的一些属性设置为复合配置中的属性。经过这一步后,当前配置类中的属性配置就是最新的配置。
关于上面的复合配置 :
由于 Dubbo 中存在很多作用域的配置,如注册中心的配置、配置中心的配置、服务接口的配置等, Dubbo将这些配置保存到 Environment 中,不同的配置存在不同前缀,如配置中心的前缀 dubbo.config-center、监控中心的前缀dubbo.monitor 等。当需要加载不同的配置时只需要指定前缀,如果配置精确到服务级别则使用 id来区分不同的服务。又由于 Dubbo 相同配置间存在优先级,所以在 Environment 中每个优先级存在一个 Map,而在上面的代码中,我们看到,如果设置了 configCenterFirst = true。则优先级为 SystemConfiguration -> ExternalConfiguration -> AppExternalConfiguration -> AbstractConfig -> PropertiesConfiguration 否则为 SystemConfiguration -> AbstractConfig -> ExternalConfiguration -> AppExternalConfiguration -> PropertiesConfiguration 在 Environment 中每个优先级声明为 一个 Map,其key 是作用域,value 为对应配置。 systemConfigs 是系统级别配置、externalConfigs 和 appExternalConfigs 是配置中心外部化配置,propertiesConfigs 是 属性配置等。
那么这里 我们再来看看这里 Environment#getConfiguration的实现如下:
public CompositeConfiguration getConfiguration(String prefix, String id) {
CompositeConfiguration compositeConfiguration = new CompositeConfiguration();
// Config center has the highest priority
compositeConfiguration.addConfiguration(this.getSystemConfig(prefix, id));
compositeConfiguration.addConfiguration(this.getAppExternalConfig(prefix, id));
compositeConfiguration.addConfiguration(this.getExternalConfig(prefix, id));
compositeConfiguration.addConfiguration(this.getPropertiesConfig(prefix, id));
return compositeConfiguration;
}
这里可以看到,Environment#getConfiguration 返回的符合保存了多个级别针对于 prefix + id 的配置。CompositeConfiguration 使用 一个 List 保存添加的配置。
checkMetadataReport
ServiceConfig#checkMetadataReport 是对元数据中心的检查,元数据中心也是 Dubbo 2.7 及以上版本提供的功能,用于保存服务的元数据信息。其实现如下:
protected void checkMetadataReport() {
// TODO get from ConfigManager first, only create if absent.
// 如果未配置元数据中心,则默认创建一个
if (metadataReportConfig == null) {
setMetadataReportConfig(new MetadataReportConfig());
}
// 刷新元数据中心的配置
metadataReportConfig.refresh();
if (!metadataReportConfig.isValid()) {
logger.warn("There's no valid metadata config found, if you are using the simplified mode of registry url, " +
"please make sure you have a metadata address configured properly.");
}
}
这里我们需要注意,metadataReportConfig.refresh(); 的实现是 AbstractConfig#refresh,这个我们在上面讲过:这里会获取Dubbo配置 (包括配置中心、系统配置,配置文件配置等)中关于元数据中心的配置,如果获取到则通过反射填充到 metadataReportConfig 中。
对泛化实现的处理
泛化实现:泛化接口实现主要用于服务提供端没有API接口类及模型类元(比如入参和出参的POJO 类)的情况下使用。消费者发起接口请求时需要将相关信息转换为 Map 传递给 提供者,由提供者根据信息找到对应的泛型实现来进行处理。
简单来说 : 泛化实现即服务提供者端启用了泛化实现,而服务消费者端则是正常调用。
而这里的代码就是在服务暴露前根据暴露接口判断是否泛化实现:如果提供者暴露的接口是 GenericService,则会被认为当前暴露的接口是泛化实现,则将泛化参数 generic 设置为 true。否则的话检查 MethodConfig、InterfaceRef 是否合法。
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
// 根据接口全路径名反射获取 Class
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 校验 MethodConfig 配置的是否是接口中的方法
checkInterfaceAndMethods(interfaceClass, methods);
// 校验 ref 引用是否是null 和是否是接口类的实现
checkRef();
// 设置泛化调用为false
generic = Boolean.FALSE.toString();
}
对 local 和 sub 属性的处理
远程服务后,客户端通常只剩下接口,而实现全在服务器端,但提供方有些时候想在客户端也执行部分逻辑,比如:做 ThreadLocal 缓存,提前验证参数,调用失败后伪造容错数据等等,此时就需要在 API 中带上 Stub,客户端生成 Proxy 实例,会把 Proxy 通过构造函数传给 Stub 1,然后把 Stub 暴露给用户,Stub 可以决定要不要去调 Proxy 如果需要使用本地存根,可以通过 其中 stub 参数可以 为true ,此时存根类默认为为 {intefaceName}+ Stub;stub 参数 也可以为 存根类路径名。此时存根类为stub 指向的类。如下:
// stub 为 ture。 存根类为 {intefaceName}+ Stub,即 com.foo.BarServiceStub
<dubbo:service interface="com.foo.BarService" stub="true" />
// 存根类为 stub 指定的类 com.foo.BarServiceStub
<dubbo:service interface="com.foo.BarService" stub="com.foo.BarServiceStub" />
需要注意,存根类有两个条件:
- 存根类也需要实现暴露的服务接口
- 存根类需要一个有参构造函数,入参为服务接口的实现实例。 那么这里的代码就很简单了: ```java //1.5 对local 和 sub 属性的检查 if (local != null) { …. 同 stub } // local 已经过时,现在都使用 stub 作为本地存根,逻辑同 local if (stub != null) { // 如果 stub 为true ,则使用 interfaceName + Stub 作为存根类 if (“true”.equals(stub)) { stub = interfaceName + “Stub”; } Class<?> stubClass; try { // 否则反射获取存根类 stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 如果存根类没有实现服务接口则抛出异常 if (!interfaceClass.isAssignableFrom(stubClass)) { throw new IllegalStateException(“The stub implementation class “ + stubClass.getName() + “ not implement interface “ + interfaceName); } } // 对 local 和 stub 的检查:是否存在入参为 Interface 的构造函数 checkStubAndLocal(interfaceClass);
## ServiceConfig#doExport
上面我们可以看到 Dubbo 的服务暴露是通过ServiceConfig#doExport 方法完成,其详细代码如下:
```java
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
// 1. 如果已经暴露则返回
if (exported) {
return;
}
exported = true;
if (path == null || path.length() == 0) {
path = interfaceName;
}
//2. ProviderModel 表示服务提供者模型,此对象中存储了与服务提供者相关的信息。
// 比如服务的配置信息,服务实例等。每个被导出的服务对应一个 ProviderModel。
// ApplicationModel 持有所有的 ProviderModel。
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass);
// 分组 + 版本号 + 接口 是一个服务的唯一id
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
// 3. 继续进行服务发布
doExportUrls();
}
我们这里很显然可以看到,发布工作是在 ServiceConfig#doExportUrls 中完成,其实现如下:
private void doExportUrls() {
// Dubbo 支持多协议 多注册中心
// 1. 解析出所有注册中心
List<URL> registryURLs = loadRegistries(true);
// 2. 遍历协议类型,进行服务发布
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
Dubbo 允许我们使用不同的协议导出服务,也允许我们向多个注册中心注册服务。因此 Dubbo 在 ServiceConfig#doExportUrls 方法中对多协议,多注册中心进行了支持,主要逻辑在下面两个方法:
- loadRegistries(true); :加载所有注册中心,因为Dubbo允许多协议多注册中心的实现,所以这里会解析出所有的注册中心。
- doExportUrlsFor1Protocol(protocolConfig, registryURLs); :上面解析出了所有的注册中心,现在开始遍历协议类型,对服务进行不同注册中心的不同协议的发布。
loadRegistries(true);
该方法加载所有注册中心,将注册中心解析成 URL 返回。由于一个服务可以被注册到多个服务注册中心,这里加载所有的服务注册中心对象。
下面我们来看详细代码:
// org.apache.dubbo.config.AbstractInterfaceConfig#loadRegistries
protected List<URL> loadRegistries(boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<URL>();
// 如果注册中心的配置不为空
if (registries != null && !registries.isEmpty()) {
// 遍历所有注册中心
for (RegistryConfig config : registries) {
String address = config.getAddress();
// 如果注册中心地址为空,则设置为 0.0.0.0
if (StringUtils.isEmpty(address)) {
address = Constants.ANYHOST_VALUE;
}
// 排除不可用的地址 (地址信息为 N/A)
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
// 参数会拼接到map 中,最后会将map 转换成url
Map<String, String> map = new HashMap<String, String>();
/************* 1. 参数解析,将参数添加到 Map 中 **************/
// 添加 ApplicationConfig 中的字段信息到 map 中
appendParameters(map, application);
// 添加 RegistryConfig 字段信息到 map 中
appendParameters(map, config);
// 添加 path、pid,protocol 等信息到 map 中
// 设置注册中心的 path 为 RegistryService
map.put("path", RegistryService.class.getName());
// 拼接运行时参数
appendRuntimeParameters(map);
// 设置默认协议类型为 dubbo
if (!map.containsKey("protocol")) {
map.put("protocol", "dubbo");
}
/************* 2. 根据 address 和 map 将信息转化为 URL **************/
// 根据协议类型将map转化成URL
// dubbo 协议如下格式: zookeeper://localhost:2181/org.apache.dubbo.registry.RegistryService?application=Api-provider&dubbo=2.0.2&pid=24736&release=2.7.0×tamp=1615539839228
// 这里返回URL 列表,因为address 可能包含多个注册中心。address 被正则切割,每个地址对应一个URL
List<URL> urls = UrlUtils.parseURLs(address, map);
/************* 3. 对 URL 进行进一步处理 **************/
for (URL url : urls) {
// 保存服务暴露使用的注册中心的协议
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
// 设置 url 协议为 registry,表示当前URL 用于配置注册中心
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
// 通过判断条件,决定是否添加 url 到 registryList 中
// 满足两个条件会往里添加:1、是服务提供者且需要想注册中心注册;2、不是提供者但订阅了注册中心
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
// 返回封装好的注册中心 URL
return registryList;
}
这一步的目的是筛选并生成注册中心的URL,其逻辑如下
- 参数解析:解析ApplicationConfig、RegistryConfig 中的配置,用于作为注册中心的配置。这里会通过 AbstractConfig#appendParameters 将参数解析并保存到 Map 中。
- Map 转换 :经过解析后, Map 中保存的是注册中心相关的信息,这里会根据 address 和 map 将信息转化为 URL。
- 对 URL 进行进一步处理 :这里将 URL的协议类型替换为 registry,并保存了原先协议类型。
参数解析
Map<String, String> map = new HashMap<String, String>();
// 解析 ApplicationConfig 中的参数保存到 map 中
appendParameters(map, application);
// 解析 RegistryConfig 中配的配置保存到 Map 中
appendParameters(map, config);
// 设置 url 的 path 为 org.apache.dubbo.registry.RegistryService
map.put("path", RegistryService.class.getName());
// 添加运行时参数
appendRuntimeParameters(map);
// 填充 protocol
if (!map.containsKey("protocol")) {
map.put("protocol", "dubbo");
}
其中 .AbstractConfig#appendParameters 方法会获取 config 中的所有 get 方法,如下:
protected static void appendParameters(Map<String, String> parameters, Object config, String prefix) {
if (config == null) {
return;
}
// 反射获取方法
Method[] methods = config.getClass().getMethods();
for (Method method : methods) {
try {
String name = method.getName();
// 判断是 get 方法
if (ClassHelper.isGetter(method)) {
// 如果方法被 Parameter 注解修饰,则解析 Parameter 注解
Parameter parameter = method.getAnnotation(Parameter.class);
// 返回类型为 Object || (Parameter 注解不为空且被排除在为) 直接跳过
if (method.getReturnType() == Object.class || parameter != null && parameter.excluded()) {
continue;
}
// 如果 Parameter 不为空,则使用 Parameter 的key为作为属性名,否则从get 方法上解析属性名
String key;
if (parameter != null && parameter.key().length() > 0) {
key = parameter.key();
} else {
key = calculatePropertyFromGetter(name);
}
// 执行get 方法获取值
Object value = method.invoke(config);
String str = String.valueOf(value).trim();
// 如果返回值不为空则将其添加到Map中
if (value != null && str.length() > 0) {
if (parameter != null && parameter.escaped()) {
str = URL.encode(str);
}
if (parameter != null && parameter.append()) {
String pre = parameters.get(Constants.DEFAULT_KEY + "." + key);
if (pre != null && pre.length() > 0) {
str = pre + "," + str;
}
pre = parameters.get(key);
if (pre != null && pre.length() > 0) {
str = pre + "," + str;
}
}
if (prefix != null && prefix.length() > 0) {
key = prefix + "." + key;
}
parameters.put(key, str);
} else if (parameter != null && parameter.required()) {
// Parameter 标记为必须但是值为空则抛出异常
throw new IllegalStateException(config.getClass().getSimpleName() + "." + key + " == null");
}
} else if ("getParameters".equals(name)
&& Modifier.isPublic(method.getModifiers())
&& method.getParameterTypes().length == 0
&& method.getReturnType() == Map.class) {
// 对 getParameters 方法的特殊处理,由于 getParameters 方法返回的是个 Map,所以这里需要将整个Map 添加
Map<String, String> map = (Map<String, String>) method.invoke(config, new Object[0]);
if (map != null && map.size() > 0) {
String pre = (prefix != null && prefix.length() > 0 ? prefix + "." : "");
for (Map.Entry<String, String> entry : map.entrySet()) {
parameters.put(pre + entry.getKey().replace('-', '.'), entry.getValue());
}
}
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
UrlUtils#parseURLs
我们在指定多注册中心时可能通过指定格式如下:
<dubbo:registry protocol="zookeeper" address="zookeeper://localhost:2181, zookeeper://localhost:2182"/>
在这里我们需要将 zookeeper://localhost:2181, zookeeper://localhost:2182 的地址进行解析,分别生成 URL,其实现如下:
public static List<URL> parseURLs(String address, Map<String, String> defaults) {
if (address == null || address.length() == 0) {
return null;
}
// 将注册中心地址分割,因为注册中心可能会存在多个
String[] addresses = Constants.REGISTRY_SPLIT_PATTERN.split(address);
if (addresses == null || addresses.length == 0) {
return null; //here won't be empty
}
List<URL> registries = new ArrayList<URL>();
for (String addr : addresses) {
// 解析出来的每个地址和 参数 转化为 URL
registries.add(parseURL(addr, defaults));
}
return registries;
}
对 URL 进行进一步处理
经历上面几步之后,假设当前阶段解析的URL如下:
zookeeper://localhost:2181/org.apache.dubbo.registry.RegistryService?application=spring-dubbo-provider&dubbo=2.0.2&pid=10404&release=2.7.0×tamp=1628842310884
下面我们看一下是如何 对 URL 进行处理的:
for (URL url : urls) {
// 1. 保存服务暴露使用的注册中心的协议
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
// 2. 设置 url 协议为 registry,表示当前URL 用于配置注册中心
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
// 3. 通过判断条件,决定是否添加 url 到 registryList 中
// 满足两个条件会往里添加:1、是服务提供者且需要想注册中心注册;2、不是提供者但订阅了注册中心
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
第一步 和 第二步执行结束后URL 为:
registry://localhost:2181/org.apache.dubbo.registry.RegistryService?application=spring-dubbo-provider&dubbo=2.0.2&pid=10404®istry=zookeeper&release=2.7.0×tamp=1628842310884
这里可以看到第一步和第二部的目的就是 将原始 URL 的协议类型从 zookeeper 替换成 registry,并且在URL 中添加 registry=zookeeper 用于保存注册中心的协议。
这么做的目的是为了让 RegistryProtocol 来处理服务。因为在后面的代码中Dubbo会通过 Protocol#export 方法来暴露服务。而由于 Dubbo SPI机制的存在,所以 Dubbo在加载一些 SPI 接口的时候,是根据参数或者属性判断的,对于 Protocol 接口,则是通过url.getProtocol 返回的协议类型判断加载哪个实现类,而这里的协议类型是 registry,则会加载 RegistryProtocol 来处理, 而 RegistryProtocol 可以通过 registry 属性得知注册中心的真正协议 。而对于多种多样的注册中心(如 zk,nacos,redis), RegistryProtocol 会根据注册中心的实际协议类型来选择合适的 Registry 实现类来完成操作。
其中 关于 Dubbo SPI ,以 Protocol 为例, Dubbo中存在 多个实现,如 RegistryProtocol 、DubboProtocol、InjvmProtocol 等,每个实现都有一个唯一的name,如 RegistryProtocol 为 registry, DubboProtocol 为 dubbo, InjvmProtocol 为 injvm。
Dubbo 会为每个 SPI 接口生成一个适配器,用根据URL的参数来选择使用哪个实现类。如 :对于 Protocol,Protocol$Adaptive 是 Dubbo 自动生成的针对于 Protocol 接口的适配器。在调用 Protocol#export 会先调用 Protocol$Adaptive#export ,在这个方法中会根据 Url 的协议类型选择合适的 Protocol 来处理,这里协议类型为 registry, 则会选择 RegistryProtocol 来处理服务。
doExportUrlsFor1Protocol
在 loadRegistries(true); 解析出来所有的注册中心后,doExportUrlsFor1Protocol(protocolConfig, registryURLs) 开始针对不同的协议和注册中心进行服务发布。
需要注意,Dubbo 允许不通过注册中心,而是直连的方式进行调用。在这种情况下,由于没有注册中心的存在,最终暴露的URL也不相同,如下(URL 有简化):
存在注册中心:在上面我们提到了如果存在注册中心,则会使用 RegistryProtocol 来处理服务。这就导致 URL 变更为了 registry://localhost:2181/org.apache.dubbo.registry.RegistryService,但是这个URL中并没有包含暴露的接口的信息,所以URL 在暴露服务时会添加一个参数 export来记录需要暴露的服务信息。此时 URL 会变成 registry://localhost:2181/org.apache.dubbo.registry.RegistryService&export=URL.encode(“dubbo://localhost:9999/com.kingfish.service.DemoService?version=1.0.0”)。 而之后基于 Dubbo SPI的 自适应机制,根据 URL registry 协议会选择RegistryProtocol 来暴露服务,而 RegistryProtocol 只负责处理注册中心相关的内容,额外的暴露服务,会根据 export 参数指定的 URL 信息选择。这里URL 协议为 dubbo,则说明服务的暴露需要使用 Dubbo协议,则会使用 DubboProtocol 来进行服务暴露。
不存在注册中心 :不存在注册中心时,最终暴露服务的URL 为 dubbo://localhost:9999/com.kingfish.service.DemoService?version=1.0.0,此时会根据 Dubbo SPI选择 DubboProtocol中的export方法进行暴露服务端口。
在加载出所有的注册中心URL后,此时的URL并非终点,还需要继续根据不同的协议进行解析 而 ServiceConfig#doExportUrlsFor1Protocol 对不同注册中心 和 不同协议的进行了服务发布,下面我们来详细看一下其实现过程。
// org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
=========================1. 下面开始解析配置参数 ===============
// 解析各种配置参数,组成URL
// 获取当前协议的协议名,默认 dubbo,用于服务暴露
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = Constants.DUBBO;
}
// 追加参数到map中
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
appendRuntimeParameters(map);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
// 这里将 ServiceConfig 也进行的参数追加
appendParameters(map, this);
// 1.1 对 MethodConfig 的解析。MethodConfig 保存了针对服务暴露方法的配置,可以映射到 <dubbo:method />
// 对 methods 配置的解析,即是对参数的解析并保存到 map 中
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
// 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。
// 比如存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig,
// 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
// 检测 MethodConfig retry 是否为 false,若是,则设置重试次数为0
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
// 获取 ArgumentConfig 列表
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
// 检测 type 属性是否为空,或者空串,如果指定了type 则按照 type进行解析
if (argument.getType() != null && argument.getType().length() > 0) {
// 获取接口方法
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
// 比对方法名,查找 MethodConfig 配置的方法
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
// argument.getIndex() 默认值是 -1 ,如果这里不为-1,说明用户不仅设置了type还设置了index
// 则需要校验index索引的参数类型是否是 type类型
if (argument.getIndex() != -1) {
// 检测 ArgumentConfig 中的 type 属性与方法参数列表
// 中的参数名称是否一致,不一致则抛出异常
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
// 添加 ArgumentConfig 字段信息到 map 中,
// 键前缀 = 方法名.index,比如:
// map = {"sayHello.3": true}
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
// index =-1 则需要遍历所有参数列表,获取到指定type 类型的参数
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
// 从参数类型列表中查找类型名称为 argument.type 的参数
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
// 用户未配置 type 属性,但配置了 index 属性,且 index != -1
// 添加 ArgumentConfig 字段信息到 map 中
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
// 1.2 针对 泛化调用的 参数,设置泛型类型( true、bean 和 nativejava)
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// 获取服务版本信息
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
// 获取 将要暴露接口 的所有方法,并使用逗号拼接在map中
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// 1.3 如果接口使用了token验证,则对token处理
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
// 1.4 本地导出属性设置,本地导出,不开端口,不发起远程调用,仅与JVM 内直接关联
// LOCAL_PROTOCOL 值为 injvm
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// export service
// 获取全局配置路径
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
// 这里获取的host,port是dubbo暴露的地址端口
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 拼接URL对象
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
// 1.5 确定是否存在当前协议的扩展的ConfiguratorFactory可以用来设计自己的URL组成规则
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
=========================2. 下面开始服务导出 ===============
// 2. 服务导出, 针对 本地导出和远程导出
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
// scope 为SCOPE_NONE 则不导出服务
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
// scope 不为SCOPE_NONE 则导出本地服务
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
// 2.1 本地服务导出
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
//scope 不为SCOPE_LOCAL则导出远程服务
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// ... 日志打印
// 如果存在服务注册中心
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
// 是否动态,该字段标识是有自动管理服务提供者的上线和下线,若为false 则人工管理
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 加载 Dubbo 监控中心配置
URL monitorUrl = loadMonitor(registryURL);
// 如果监控中心配置存在,则添加到url中
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
// 代理配置解析
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// 2.2 远程服务导出
// 将registryURL拼接export=providerUrl参数。 为服务提供类(ref)生成 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 导出服务,并生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// 如果没有注册中心,则采用直连方式,即没有服务注册和监听的过程
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
// 元数据存储
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
ServiceConfig#doExportUrlsFor1Protocol 的代码比较长,其中较大篇幅的内容都用于配置参数解析。我们将上面的代码分为两部分
- 解析配置参数:这一部分解析的URL 是待暴露服务的URL信息。包括 方法配置解析、泛化、token 等。
- 服务导出 : 将 服务 URL 解析完成后,则开始准备暴露服务。
解析配置参数
ServiceConfig#doExportUrlsFor1Protocol 方法将大部分篇幅都花在了解析配置参数的过程中,我们关注其中部分参数的解析。
MethodConfig的解析
这部分内容比较冗长,这里并没有放出来,职责就是对 MethodConfig 配置的解析,我们以
<dubbo:reference interface="com.xxx.XxxService">
<!-- 指定方法, 超时时间为3s,重试次数为2 -->
<dubbo:method name="findXxx" timeout="3000" retries="2">
<!-- 指定第一个参数(index=0,index 默认-1) 类型为callback -->
<dubbo:argument index="0" callback="true" />
<!-- 指定参数XXX 类型的(index=0) 类型为callback, index 和 type 二选一 -->
<dubbo:argument type="com.kingfish.XXX" callback="true"/>
<dubbo:method>
</dubbo:reference>
Dubbo method 的配置有两种方式,指定类型 type 或者指定 索引 index。
泛化实现的解析
这里判断一下是否是泛化实现,如果是则添加 generic 参数。
// 判断是否是泛化调用,判断逻辑就是 generic的值是否与 true、bean、nativejava相等
if (ProtocolUtils.isGeneric(generic)) {
// 标记泛化的规则
map.put(Constants.GENERIC_KEY, generic);
// 设置匹配所有的方法 。Constants.ANY_VALUE 为 *
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
}
token属性的解析
token 作为 令牌验证,为空表示不开启,如果为true,表示随机生成动态令牌,否则使用静态令牌,令牌的作用是防止消费者绕过注册中心直接访问,保证注册中心的授权功能有效,如果使用点对点调用,需关闭令牌功能
<dubbo:service interface="" token="123"/>
代码也很简单,如下:
if (!ConfigUtils.isEmpty(token)) {
// 如果未 true 或者 default,则使用UUID
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
token 功能的实现依赖于 Dubbo的 TokenFilter 过滤器
本地服务的解析
Dubbo服务导出分为本地导出和远程导出,本地导出使用了injvm协议,这是一个伪协议,它不开启端口,不发起远程调用,只在JVM 内直接关联,但执行Dubbo的 Filter 链。
在默认情况下,Dubbo同时支持本地导出和远程协议导出,我们可以通过ServiceConfig 的setScope 方法进行配置,为none表示不导出服务,为 remote 表示只导出远程服务,为local表示只导出本地服务。
这里判断如果协议类型是 injvm 则直接设置为 本地导出服务。
// LOCAL_PROTOCOL 值为 injvm
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
// 设置当前服务不注册到注册中心
protocolConfig.setRegister(false);
// 设置不通知
map.put("notify", "false");
}
ConfiguratorFactory 的解析
这里是根据当前协议类型来获取到 SPI 接口 ConfiguratorFactory 的实现类,这里是Dubbo预留的一个扩展点,通过 ConfiguratorFactory 可以用来设计自己的URL生成策略。
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
而默认的实现为:
override=org.apache.dubbo.rpc.cluster.configurator.override.OverrideConfiguratorFactory
absent=org.apache.dubbo.rpc.cluster.configurator.absent.AbsentConfiguratorFactory
我们可以通过定义 org.apache.dubbo.rpc.cluster.ConfiguratorFactory 文件来自定义策略,如下(由于默认策略为dubbo,所以我们指定dubbo协议我们定制的ConfiguratorFactory ):
CustomConfiguratorFactory 的实现参考OverrideConfiguratorFactory 的实现,我们这里通过自定义的CustomConfiguratorFactory 策略给URL 添加了 custom 参数 value 为 demo。
public class CustomConfiguratorFactory extends AbstractConfigurator implements ConfiguratorFactory {
public CustomConfiguratorFactory(URL url) {
super(url);
}
// 返回当前定制下的 AbstractConfigurator 实现类
@Override
public Configurator getConfigurator(URL url) {
return new CustomConfiguratorFactory(url);
}
// url 添加 custom 参数
@Override
protected URL doConfigure(URL currentUrl, URL configUrl) {
return currentUrl.addParameter("custom", "demo");
}
}
服务导出
参数解析并非是重头戏,在上面一系列的解析结束后,Dubbo 开始进行服务导出的操作。我们这里假设使用zk作为注册中心,协议为dubbo。 在默认情况下,Dubbo同时支持本地导出和远程协议导出,我们可以通过ServiceConfig 的setScope 方法进行配置,scope 的取值有四种情况:
- none :表示不导出服务,
- remote :表示只导出远程服务,
- local :表示只导出本地服务。
- null :在默认情况下为null,会同时导出本地服务和远程服务
本地服务导出
本地服务导出并不需要与注册中心交互,也不需要开启远程服务,所以其实现比较简单,将某些参数限制,指定协议类型为injvm。
private void exportLocal(URL url) {
// 如果 URL 的协议头等于 injvm,说明已经导出到本地了,无需再次导出
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
// 设置协议为 injvm
.setProtocol(Constants.LOCAL_PROTOCOL)
// 设置host为127.0.0.1
.setHost(LOCALHOST)
// 设置端口为0
.setPort(0);
// 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
// 添加到暴露的服务集合中
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
因为Dubbo SPI 的原因,这里协议设置为 injvm,会使用 InjvmProtocol 来进行服务暴露。
远程服务导出
相较于本地服务导出,远程服务导出需要考虑到协议类型,notify 通知等方面,所以逻辑就显得复杂许多。
// 如果存在服务注册中心
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
// 是否动态,该字段标识是有自动管理服务提供者的上线和下线,若为false 则人工管理
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 1. 加载 Dubbo 监控中心配置
URL monitorUrl = loadMonitor(registryURL);
// 如果监控中心配置存在,则添加注册中心的 URL。key 为 monitor
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
// 代理配置解析
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// 2. 远程服务导出
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 将暴露的服务保存到 exporters中,exporters 保存了本机暴露出的服务
exporters.add(exporter);
}
} else {
// 3. 直连方式,不存在服务注册中心,直连方式是和上面的区别就是不经过注册中心注册,并不订阅注册中心节点。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 由于没有注册中心,这里会直接根据服务指定的协议,如Dubbo,则会直接使用 DubboProtocol 来暴露服务。
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
这里需要注意Dubbo在这里区分了存在注册中心和不存在注册中心的情况,因为 Dubbo允许不使用注册中心而通过服务之间直接连接的方式来调用服务。其区别就是直连方式不会经过 Dubbo的 各种容错机制。
我们这里仍以上一篇的 URL 为例,这里为了方便描述,URL 做了简化,registryURL 和 URL 如下:
// 注册中心 URl
registryURL = registry://localhost:2181/org.apache.dubbo.registry.RegistryService®istry=zookeeper
// 要暴露的服务 URL
url = dubbo://localhost:9999/com.kingfish.service.DemoService
注册监控中心
对监控中心的处理并不复杂,通过loadMonitor 生成一个关于监控中心的 URL ,并追加到 url 上。 loadMonitor 方法是完成监控URL的生成过程,并不复杂,这里篇幅问题不再展开。
// 1. 加载 Dubbo 监控中心配置
URL monitorUrl = loadMonitor(registryURL);
// 如果监控中心配置存在,则添加
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
我们假设为存在 localhost:8080 的监控中心,则此时 URL 变为(URL 有简化) :
dubbo://localhost:9999/com.kingfish.service.DemoService
&monitor=http://localhost:8080?interface=org.apache.dubbo.monitor.MonitorService
服务的暴露
服务提供者导出服务具体使用的是下列代码:
// 1. 生成代理类
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 2. 暴露服务
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 将暴露的服务保存到 exporters中,当消费者调用时,提供者可以从中获取已经暴露的服务
exporters.add(exporter);
这里的 Invoker 是 通过JavassistProxyFactory#getInvoker 方法生成的匿名 AbstractProxyInvoker 类。从这里开始,慢慢进入了服务导出的核心内容,下面我们来继续具体分析这个过程。
生成代理对象
该部分代码如下,其中Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠拢,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
这里需要注意: proxyFactory.getInvoker 的入参URL 为 registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())。这里将 url 添加到了 registryURL 中,所以proxyFactory.getInvoker 的入参 URL 如下:
registry://localhost:2181/org.apache.dubbo.registry.RegistryService®istry=zookeeper
&export=dubbo://localhost:9999/com.kingfish.service.DemoService
&monitor=http://localhost:8080?interface=org.apache.dubbo.monitor.MonitorService
此时我们看一下 URL的结构:
registry://... : 保存了注册中心的信息, registry=zookeeper 表明使用zk作为注册中心
export=dubbo://... : 保存了要导出服务的信息
monitor=http://... : 保存了监控中心的信息
在后面我们会介绍,由于 registry:// 所以 Dubbo会选择 RegistryProtocol 来处理本地服务暴露,而 RegistryProtocol 中会将 export 的参数取出识别出 dubbo:// 选择 DubboProtocol 来进行服务暴露。
proxyFactory 和 protocol 定义如下:
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
可以看出 proxyFactory 和 protocol 都是 SPI 扩展接口的适配器类型。所以这里的 proxyFactory实际上是 ProxyFactory$Adaptive 类型,所以这里首先执行的是 ProxyFactory$Adaptive#getInvoker() 方法,而对于 ProxyFactory SPI 接口来说,默认的协议类型为 javassist,所以调用的是 JavassistProxyFactory#getInvoker 方法获取了代理类。而JavassistProxyFactory#getInvoker 代码如下 :
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// 将服务实现类转换成Wrapper 类,以减少反射的调用
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
可以看到,在JavassistProxyFactory#getInvoker中将SPI 实现类动态转换成了 Wrapper,并封装成Invoker类型返回。
其中Wrapper 是在 JavassistProxyFactory#getInvoker 中动态生成的,如对于DemoService 接口来说,
public interface DemoService {
void sayMsg(String msg);
String sayHello(String name);
}
其生成的Wrapper 类如下,可以看到这里的 invokeMethod 方法通过方法名称来匹配调用的方法,从而调用ref 对应的方法。:
/*
* Decompiled with CFR.
*
* Could not load the following classes:
* com.kingfish.service.impl.DemoServiceImpl
*/
package org.apache.dubbo.common.bytecode;
import com.kingfish.service.impl.DemoServiceImpl;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import org.apache.dubbo.common.bytecode.ClassGenerator;
import org.apache.dubbo.common.bytecode.NoSuchMethodException;
import org.apache.dubbo.common.bytecode.NoSuchPropertyException;
import org.apache.dubbo.common.bytecode.Wrapper;
public class Wrapper1
extends Wrapper
implements ClassGenerator.DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
public static Class[] mts1;
public static Class[] mts2;
public static Class[] mts3;
@Override
public String[] getPropertyNames() {
return pns;
}
@Override
public boolean hasProperty(String string) {
return pts.containsKey(string);
}
public Class getPropertyType(String string) {
return (Class)pts.get(string);
}
@Override
public String[] getMethodNames() {
return mns;
}
@Override
public String[] getDeclaredMethodNames() {
return dmns;
}
@Override
public void setPropertyValue(Object object, String string, Object object2) {
try {
DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class com.kingfish.service.impl.DemoServiceImpl.").toString());
}
@Override
public Object getPropertyValue(Object object, String string) {
try {
DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class com.kingfish.service.impl.DemoServiceImpl.").toString());
}
// 这里会根据方法名匹配到对应实例到的具体方法
public Object invokeMethod(Object object, String string, Class[] classArray, Object[] objectArray) throws InvocationTargetException {
DemoServiceImpl demoServiceImpl;
try {
demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
if ("sayMsg".equals(string) && classArray.length == 1) {
demoServiceImpl.sayMsg((String)objectArray[0]);
return null;
}
if ("sayHello".equals(string) && classArray.length == 1) {
return demoServiceImpl.sayHello((String)objectArray[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.kingfish.service.impl.DemoServiceImpl.").toString());
}
}
这里就可以知道 ProxyFactory#getInvoker 返回的是由 JavassistProxyFactory#getInvoker 生成的匿名代理类 Invoker。
我们这里再来梳理一下:
当服务提供者暴露服务接口时,会调用 ProxyFactory#getInvoker 来生成一个 Invoker,此处由于 Dubbo SPI 的存在实际调用的是 JavassistProxyFactory#getInvoker。我们这里为了方便描述,把 JavassistProxyFactory#getInvoker 方法返回的 Invoker 称为 Javassist Invoker。
JavassistProxyFactory#getInvoker 方法中 第一步会生成一个 Wrapper 类来包装暴露的接口实例。如果需要调用实例的方法,则通过 Wrapper#invokeMethod 方法来调用,在 Wrapper#invokeMethod 中会根据调用的methodName 来直接调用实例的方法。如下代码:
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type)
JavassistProxyFactory#getInvoker 方法中 第二步会创建一个AbstractProxyInvoker 的匿名实现类并返回,即我们上面提到的 Javassist Invoker。而 Javassist Invoker 的doInvoke 方法具体实现会委托给 Wrapper#invokeMethod 方法来处理。
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
当消费者调用提供者服务时,提供者会调用 Javassist Invoker 的 doInvoke 方法,该方法会委托给 Wrapper#invokeMethod 来处理,而 Wrapper#invokeMethod 中会根据 methodName 来调用实例的具体方法。即整个流程简化如下:
Invoker#invoke -> AbstractProxyInvoker#doInvoke -> Wrapper#invokeMethod -> 调用 Ref 具体方法
注: Wrapper 的存在是为了减少调用 接口实例的反射。如果没有Wrapper,在 Javassist Invoker 的 doInvoke 方法中,则会根据 methodName 直接反射调用 proxy 对象。而 Wrapper 中动态生成了针对当前对象的方法,会根据方法名是否相同来直接调用实例方法,免去了反射调用 proxy 对象的过程。
服务暴露
经过了上面的过程, 暴露的接口对象的代理对象已经创建,这一步开始对该代理对象进行暴露。 该部分代码如下:
// 使用 DelegateProviderMetaDataInvoker 包装了一下 代理invoker。DelegateProviderMetaDataInvoker 中除了 Invoker 还保存了服务配置的数据
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 进行服务暴露
Exporter<?> exporter = protocol.export(wrapperInvoker);
当执行到 protocol.export(wrapperInvoker); 时,同样由于Dubbo SPI 机制,实际调用的是 Protocol$Adaptive#export() 方法。
并且由于Dubbo SPI 的扩展点使用了Wrapper自动增强,对于 Protocol 来说,存在三个Wrapper 增强。由于只有已知的这三个Wrapper,并且加载顺序不影响功能,为了方便描述,这里忽略这三个 Wrapper的排序问题。所以整个调用过程是 Protocol$Adaptive#export() => QosProtocolWrapper#export() => ProtocolListenerWrapper #export() => ProtocolFilterWrapper#export() => XxxProtocol#export() 。
对于 远程导出 的URL 来说:
- 存在注册中心:会使用 RegistryProtocol 来处理服务。这就导致 URL 变更为了 registry://localhost:2181/org.apache.dubbo.registry.RegistryService,但是这个URL中并没有包含暴露的接口的信息,所以URL 在暴露服务时会添加一个参数 export来记录需要暴露的服务信息。此时 URL 会变成 registry://localhost:2181/org.apache.dubbo.registry.RegistryService&export=URL.encode(“dubbo://localhost:9999/com.kingfish.service.DemoService?version=1.0.0”)。 而之后基于 Dubbo SPI的 自适应机制,根据 URL registry 协议会选择RegistryProtocol 来暴露服务,而 RegistryProtocol 只负责处理注册中心相关的内容,额外的暴露服务,会根据 export 参数指定的 URL 信息选择。这里URL 协议为 dubbo,则说明服务的暴露需要使用 Dubbo协议,则会使用 DubboProtocol 来进行服务暴露。
- 不存在注册中心 :不存在注册中心时,最终暴露服务的URL 为 dubbo://localhost:9999/com.kingfish.service.DemoService?version=1.0.0,此时会根据 Dubbo SPI选择 DubboProtocol中的export方法进行暴露服务端口。这里 URL 也可能是别的协议,此时会寻找对应的Protocol 来处理,我们这里还是以 Dubbo 为例。
所以这里对于 XxxProtocol,存在三种情况:
- 本地服务导出:这种情况我们在上面说了,其内部根据URL 中Protocol类型为 injvm,会选择InjvmProtocol
- 远程服务导出 & 有注册中心:其内部根据URL 中 Protocol 类型为 registry,会选择RegistryProtocol
- 远程服务导出 & 没有注册中心:根据服务协议头类型判断,我们这里假设是 dubbo ,则会选择 DubboProtocol