Dubbo启停原理
开始分析消费者调用提供者服务的过程。本文的分析会基于Main 方法的Dubbo调用
消费端启动
这里先列出来我们的消费者代码,此代码并非在Spring中运行,后面会单独开设文章分析Spring 如何集成Dubbo。
public static void main(String[] args) {
// 获取 ReferenceConfig
ReferenceConfig<DemoService> referenceConfig = DubboUtil.referenceConfig("dubbo-demo");
// 获取服务 接口
DemoService demoService = referenceConfig.get();
// 进行服务调用
String result = demoService.sayHello("demo");
System.out.println("result = " + result);
}
....
public static ReferenceConfig<DemoService> referenceConfig(String applicationName) {
ReferenceConfig<DemoService> referenceConfig = new ReferenceConfig<>();
// 设置服务名称
referenceConfig.setApplication(new ApplicationConfig(applicationName));
// 设置注册中心地址
RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
referenceConfig.setRegistry(registryConfig);
// 设置暴露接口
referenceConfig.setInterface(DemoService.class);
referenceConfig.setTimeout(5000);
// 设置版本号和分组 服务接口 + 服务分组 + 服务版本号确定唯一服务
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
return referenceConfig;
}
上面的代码一目了然,我们可以很自然的发现关键逻辑在下面一句中:
DemoService demoService = referenceConfig.get();
referenceConfig.get() 方法完成了对服务接口的代理,创建了服务提供者的网络服务连接。
ReferenceConfig#get
通过上面代码我们可以看到 ReferenceConfig#get 是Dubbo消费者的入口。其实现如下:
public synchronized T get() {
// 1. 检查并更新缺省配置
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
// 如果ref 为null 则说明没有初始化,进行初始化。
if (ref == null) {
//2. 服务初始化
init();
}
return ref;
}
这里可以看到两个比较明显的方法:
- checkAndUpdateSubConfigs(); : 检查并更新缺省配置
- init(); : 消费者服务的初始化,核心逻辑。 下面我们来进行分析。
ReferenceConfig#checkAndUpdateSubConfigs
服务提供者在服务暴露时也有类似的配置检查过程。ReferenceConfig#checkAndUpdateSubConfigs的实现如下:
public void checkAndUpdateSubConfigs() {
// 如果引用接口不合法直接抛出异常
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
}
// 按照一定优先级整合配置 application > module > consumer
completeCompoundConfigs();
// 启动配置中心
startConfigCenter();
// get consumer's global configuration
// 消费者缺省校验
checkDefault();
// 刷新配置
this.refresh();
// 设置泛化调用相关信息
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
// 获取引用的接口Class
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}
// 解析映射文件赋值 ReferenceConfig#url 属性,
// 依次尝试获取系统属性中的 {接口名}、dubbo.resolve.file、dubbo-resolve.properties 配置文件,如果前一个配置存在则不会往后加载。将加载后的配置解析后赋值给 url
resolveFile();
// 缺省校验
checkApplication();
// 元数据中心校验
checkMetadataReport();
}
可以看到 ReferenceConfig#checkAndUpdateSubConfigs 的逻辑并不复杂,整个过程就是对参数的校验,与提供者的参数校验类似,这里不再进行深入分析。毕竟下面的 ReferenceConfig#init 才是关键所在。
ReferenceConfig#init
ReferenceConfig#init 创建了服务提供者的代理类,是消费者端核心代码的入口。其实现如下:
private void init() {
// 如果消费者服务已经初始化过,则直接返回
if (initialized) {
return;
}
initialized = true;
// 检查存根合法性
checkStubAndLocal(interfaceClass);
// 检查mock合法性
checkMock(interfaceClass);
// 拼接参数
Map<String, String> map = new HashMap<String, String>();
// 添加 side、协议版本信息、时间戳和进程号等信息到 map 中
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
appendRuntimeParameters(map);
// 不是泛化调用
if (!isGeneric()) {
// 获取版本
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
// 获取接口方法列表,并添加到 map 中
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
map.put(Constants.INTERFACE_KEY, interfaceName);
// 添加参数到 map中
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
Map<String, Object> attributes = null;
// 对 <method> 标签的解析
if (methods != null && !methods.isEmpty()) {
attributes = new HashMap<String, Object>();
for (MethodConfig methodConfig : methods) {
appendParameters(map, methodConfig, methodConfig.getName());
// 重试参数解析
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
}
}
// 获取消费者要使用注册的 ip 地址,多网卡情况下需要可以指定ip
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
// 创建提供者代理
ref = createProxy(map);
// 根据服务名,ReferenceConfig,代理类构建 ConsumerModel,
// 并将 ConsumerModel 存入到 ApplicationModel 中
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), interfaceClass, ref, interfaceClass.getMethods(), attributes);
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
上面的代码很多都和服务提供者的类似,前期工作都是对配置参数的处理,关键内容在于 代理的创建 createProxy(map);,这一步创建了引用服务的代理。
下面我们来看看 ReferenceConfig#createProxy 的实现过程。
ReferenceConfig#createProxy
ReferenceConfig#createProxy 实现如下 (这一部分注释基本都是参考官网):
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
/******* 1. 服务引用判断 ******/
// 根据 isInjvm 参数判断是否指定了本地引用
if (isInjvm() == null) {
// 如果没有指定是否本地引用则 需要根据下面逻辑判断
// 在指定本地引用的时候,但是 url 配置被指定,则不当做本地引用。
// 该 url 在 ReferenceConfig#init 中 调用ReferenceConfig#resolveFile 方法解析获取。
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
} else {
// by default, reference local service if there is
// 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
// 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true
isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
}
} else {
//如果进行了本地引用配置,则按照配置的决定
isJvmRefer = isInjvm();
}
// 确定了本地引用
if (isJvmRefer) {
/******* 2. 本地服务调用 ******/
// 生成一个本地引用的URL,协议类型为 injvm
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 调用 refer 方法构建 InjvmInvoker 实例
invoker = refprotocol.refer(interfaceClass, url);
} else {
/******* 3. 远程服务调用 ******/
// 如果 url不为空,则说明可能会进行点对点调用,即服务直连
// 这里的 url 是在 ReferenceConfig#init 中 调用 ReferenceConfig#resolveFile 方法解析获取。
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
// 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
// 设置接口全限定名为 url 路径
url = url.setPath(interfaceName);
}
// 如果 url 协议类型是 Registry,则说明需要使用指定的注册中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中,与服务暴露时的url 异曲同工
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
// 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
// 最后将合并后的配置设置为 url 查询字符串中
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// 如果没有服务直连,则需要从注册中心中获取提供者信息
// 检查注册中心配置
checkRegistry();
// 加载注册中心 URL
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
// 加载监控中心
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
// 添加监控中心信息
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 到这里说明存在注册中心, 则添加 refer 参数到 url 中,并将 url 添加到 urls
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 未配置注册中心,抛出异常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
// 单个注册中心或服务提供者(服务直连)
if (urls.size() == 1) {
/******* 3.1 单URL场景的远程调用 ******/
// 调用 RegistryProtocol 的 refer 构建 Invoker 实例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
/******* 3.2 多URL场景的远程调用 ******/
// 多个注册中心或多个服务提供者,或者两者混合
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 获取所有的 Invoker
for (URL url : urls) {
// 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
// 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
// 这里 refprotocol 是 Protocol$Adaptive 适配器类型
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's cluster is available
// 如果注册中心链接不为空,则将使用 RegistryAwareCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
// 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
// 是否检查提供者的可用性
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
// 如果 为true,则消费者启动时对 invoker 进行可用性检查
if (c && !invoker.isAvailable()) {
// make it possible for consumer to retry later if provider is temporarily unavailable
initialized = false;
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
/**
* @since 2.7.0
* ServiceData Store
*/
// 2.7.0 版本后,元数据中心发布消费者服务信息
/******* 4. 元数据中心服务发布 ******/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
/******* 5. 代理类的创建 ******/
// 生成代理类
return (T) proxyFactory.getProxy(invoker);
}
我们这里可以总结为五个过程:
- 服务引用判断 :首先需要根据用户参数确定当前服务引用是本地引用还是远程引用。二者的逻辑并不相同。
- 本地服务调用 :首先根据配置检查是否为本地调用,若是,则调用 InjvmProtocol#refer 方法生成 InjvmInvoker 实例。若不是,则读取直连配置项或注册中心 url,并将读取到的 url 存储到 urls 中。
- 远程服务调用 :远程服务的调用,分为两种情况,单URL 或 多URL场景。(这里的URL 指的是用户指定的注册中心或者直连URL)。通过这一步,将 URL 转化为了 Invoker。
- 单URL场景的远程调用 :根据 urls 元素数量进行后续操作。若 urls 元素数量为1,则说明注册中心或直连URL只有一个,则直接通过 Protocol 自适应拓展类构建 Invoker 实例接口。
- 多URL的远程调用:若 urls 元素数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker。
- 元数据中心服务发布 :Dubbo 2.7 版本之后新增了元数据中心和配置中心,其中元数据中心会存储当前暴露的服务信息。
- 代理类的创建 :在获取到Ref 的Invoker 后调用 ProxyFactory 生成代理类。 其中 :Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。
服务引用判断
这一部分的代码如下:其中注释已经解释的很清楚,这里不再赘述。
// 如果没有指定是否本地引用则 需要自己根据逻辑判断
if (isInjvm() == null) {
// 在指定本地引用的时候,但是 url 配置被指定,则不做本地引用
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
} else {
// by default, reference local service if there is
// 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
// 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true
isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
}
} else {
//如果进行了本地引用配置,则按照配置的决定
isJvmRefer = isInjvm();
}
本地服务调用
实际上,无论是本地还是远程调用,其核心方法都是 Protocol#refer 方法,区别在于由于协议的不同,调用的 Protocol 实例不同,本地服务调用调用的是 InjvmProtocol#refer。 (需要注意的是无论是远程还是本地调用,这里的调用顺序都是经过 XxxProtocolWrapper 后才真正调用到InjvmProtocol#refer)
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
InjvmProtocol#refer 的实现如下:
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}
可以看到 InjvmProtocol#refer 的实现很简单,直接包装了一个InjvmInvoker 返回。
不过需要注意的是,这里将 exporterMap 传递给了 InjvmInvoker,而exporterMap中保存了本地暴露的服务信息。当消费者进行服务调用时,会调用 InjvmInvoker#doInvoke 方法,其实现如下,我们可以看到,InjvmInvoker#doInvoke 通过 exporterMap 获取到 调用的本地服务 Exporter 再调用
@Override
public Result doInvoke(Invocation invocation) throws Throwable {
Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
if (exporter == null) {
throw new RpcException("Service [" + key + "] not found.");
}
RpcContext.getContext().setRemoteAddress(NetUtils.LOCALHOST, 0);
return exporter.getInvoker().invoke(invocation);
}
远程服务调用 远程服务调用首先要确定的就是从哪获取服务提供者。提供者的获取有两种可能
- 消费者通过配置直接指定了提供者的地址。
- 消费者通过注册中心获取提供者的地址。
下面的代码就针对这两种情况进行处理,最后获取的地址会保存到 urls 中,供后续处理。
// 如果 url不为空,则说明可能会进行点对点调用,即服务直连 // 这里的 url 是在 ReferenceConfig#init 中 调用 ReferenceConfig#resolveFile 方法解析获取。 if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. // 当需要配置多个 url 时,可用分号进行分割,这里会进行切分 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { // 设置接口全限定名为 url 路径 url = url.setPath(interfaceName); } // 如果 url 协议类型是 Registry,则说明需要使用指定的注册中心 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中,与服务暴露时的url 异曲同工 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性), // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等 // 最后将合并后的配置设置为 url 查询字符串中 urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center's configuration // 如果没有服务直连,则需要从注册中心中获取提供者信息 // 检查注册中心配置 checkRegistry(); // 加载注册中心 URL List<URL> us = loadRegistries(false); if (us != null && !us.isEmpty()) { for (URL u : us) { // 加载监控中心 URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { // 添加监控中心信息 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 到这里说明存在注册中心, 则添加 refer 参数到 url 中,并将 url 添加到 urls urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } // 未配置注册中心,抛出异常 if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } }
单URL场景的远程调用 这里的单URL 场景指的是 单一注册中心或 单一直连URL(服务提供者)时 远程调用服务Invoker 的创建,会直接调用 refprotocol.refer,如下:
// 单个注册中心或服务提供者(服务直连) if (urls.size() == 1) { /******* 3.1 单URL场景的远程调用 ******/ // 调用 RegistryProtocol 的 refer 构建 Invoker 实例 invoker = refprotocol.refer(interfaceClass, urls.get(0)); }
refprotocol.refer 的调用需要分为两种情况,
指定单一注册中心 :我们这里假设注册中心是zk,接口协议为Dubbo,则调用顺序为 :
refprotocol.refer =》 XxxProtocolWrapper#refer => RegistryProtocol#refer =》 XxxProtocolWrapper#refer => DubboProtocol#refer
当消费者进行服务进行调用时,此时会通过下面的流程挑选出合适的Invoker用于服务调用。
MockClusterInvoker#doInvoke -> FailoverClusterInvoker#doInvoke -> RegistryDirectory#list
指定单一直连URL :这里则不会出现上面注册中心的情况,一个直连 URL 即一个提供者。我们这里仍假设服务协议为Dubbo,则调用顺序为 :
refprotocol.refer =》 XxxProtocolWrapper#refer => DubboProtocol#refer
可以得知,关键的两个方法为 RegistryProtocol#refer 和 DubboProtocol#refer,而在RegistryProtocol#refer 过程中又调用了 DubboProtocol#refer 方法,RegistryProtocol#refer 主要处理和注册中心相关的内容,而 DubboProtocol#refer 而是创建与提供者的连接。