Dubbo启停原理

2020/02/13 Dubbo

Dubbo启停原理

开始分析消费者调用提供者服务的过程。本文的分析会基于Main 方法的Dubbo调用

消费端启动

这里先列出来我们的消费者代码,此代码并非在Spring中运行,后面会单独开设文章分析Spring 如何集成Dubbo。

    public static void main(String[] args) {
    	// 获取 ReferenceConfig
        ReferenceConfig<DemoService> referenceConfig = DubboUtil.referenceConfig("dubbo-demo");
        // 获取服务 接口
        DemoService demoService = referenceConfig.get();
        // 进行服务调用
        String result = demoService.sayHello("demo");
        System.out.println("result = " + result);
    }
	....
    public static ReferenceConfig<DemoService> referenceConfig(String applicationName) {
        ReferenceConfig<DemoService> referenceConfig = new ReferenceConfig<>();
        // 设置服务名称
        referenceConfig.setApplication(new ApplicationConfig(applicationName));
        // 设置注册中心地址
        RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
        referenceConfig.setRegistry(registryConfig);
        // 设置暴露接口
        referenceConfig.setInterface(DemoService.class);
        referenceConfig.setTimeout(5000);
        // 设置版本号和分组 服务接口 + 服务分组 + 服务版本号确定唯一服务
        referenceConfig.setVersion("1.0.0");
        referenceConfig.setGroup("dubbo");
        return referenceConfig;
    }

上面的代码一目了然,我们可以很自然的发现关键逻辑在下面一句中:

 DemoService demoService = referenceConfig.get();

referenceConfig.get() 方法完成了对服务接口的代理,创建了服务提供者的网络服务连接。

ReferenceConfig#get

通过上面代码我们可以看到 ReferenceConfig#get 是Dubbo消费者的入口。其实现如下:

    public synchronized T get() {
    	// 1. 检查并更新缺省配置
        checkAndUpdateSubConfigs();

        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        // 如果ref 为null 则说明没有初始化,进行初始化。
        if (ref == null) {
        	//2. 服务初始化
            init();
        }
        return ref;
    }

这里可以看到两个比较明显的方法:

  • checkAndUpdateSubConfigs(); : 检查并更新缺省配置
  • init(); : 消费者服务的初始化,核心逻辑。 下面我们来进行分析。

ReferenceConfig#checkAndUpdateSubConfigs

服务提供者在服务暴露时也有类似的配置检查过程。ReferenceConfig#checkAndUpdateSubConfigs的实现如下:

    public void checkAndUpdateSubConfigs() {
    	// 如果引用接口不合法直接抛出异常
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
        }
        // 按照一定优先级整合配置 application > module > consumer
        completeCompoundConfigs();
        // 启动配置中心
        startConfigCenter();
        // get consumer's global configuration
        // 消费者缺省校验
        checkDefault();
        // 刷新配置
        this.refresh();
        // 设置泛化调用相关信息
        if (getGeneric() == null && getConsumer() != null) {
            setGeneric(getConsumer().getGeneric());
        }
        // 获取引用的接口Class
        if (ProtocolUtils.isGeneric(getGeneric())) {
            interfaceClass = GenericService.class;
        } else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            checkInterfaceAndMethods(interfaceClass, methods);
        }
        // 解析映射文件赋值 ReferenceConfig#url 属性,
        // 依次尝试获取系统属性中的 {接口名}、dubbo.resolve.file、dubbo-resolve.properties 配置文件,如果前一个配置存在则不会往后加载。将加载后的配置解析后赋值给 url
        resolveFile();
        // 缺省校验
        checkApplication();
        // 元数据中心校验
        checkMetadataReport();
    }

可以看到 ReferenceConfig#checkAndUpdateSubConfigs 的逻辑并不复杂,整个过程就是对参数的校验,与提供者的参数校验类似,这里不再进行深入分析。毕竟下面的 ReferenceConfig#init 才是关键所在。

ReferenceConfig#init

ReferenceConfig#init 创建了服务提供者的代理类,是消费者端核心代码的入口。其实现如下:

 	private void init() {
 		// 如果消费者服务已经初始化过,则直接返回
        if (initialized) {
            return;
        }
        initialized = true;
        // 检查存根合法性
        checkStubAndLocal(interfaceClass);
        // 检查mock合法性
        checkMock(interfaceClass);
        // 拼接参数
        Map<String, String> map = new HashMap<String, String>();
		// 添加 side、协议版本信息、时间戳和进程号等信息到 map 中
        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
        appendRuntimeParameters(map);
        // 不是泛化调用
        if (!isGeneric()) {
        	// 获取版本
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }
			 // 获取接口方法列表,并添加到 map 中
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            } else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        map.put(Constants.INTERFACE_KEY, interfaceName);
        // 添加参数到 map中
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, consumer, Constants.DEFAULT_KEY);
        appendParameters(map, this);
        Map<String, Object> attributes = null;
        // 对 <method> 标签的解析
        if (methods != null && !methods.isEmpty()) {
            attributes = new HashMap<String, Object>();
            for (MethodConfig methodConfig : methods) {
                appendParameters(map, methodConfig, methodConfig.getName());
                // 重试参数解析
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
            }
        }
		 // 获取消费者要使用注册的 ip 地址,多网卡情况下需要可以指定ip
        String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry == null || hostToRegistry.length() == 0) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
		// 创建提供者代理
        ref = createProxy(map);
		// 根据服务名,ReferenceConfig,代理类构建 ConsumerModel,
    	// 并将 ConsumerModel 存入到 ApplicationModel 中
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), interfaceClass, ref, interfaceClass.getMethods(), attributes);
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }

上面的代码很多都和服务提供者的类似,前期工作都是对配置参数的处理,关键内容在于 代理的创建 createProxy(map);,这一步创建了引用服务的代理。

下面我们来看看 ReferenceConfig#createProxy 的实现过程。

ReferenceConfig#createProxy

ReferenceConfig#createProxy 实现如下 (这一部分注释基本都是参考官网):

    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        /*******  1. 服务引用判断  ******/
        // 根据 isInjvm 参数判断是否指定了本地引用
        if (isInjvm() == null) {
        	// 如果没有指定是否本地引用则 需要根据下面逻辑判断
        	 // 在指定本地引用的时候,但是 url 配置被指定,则不当做本地引用。
        	 // 该 url 在 ReferenceConfig#init 中 调用ReferenceConfig#resolveFile 方法解析获取。
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            } else {
                // by default, reference local service if there is
                // 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
        		// 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true
                isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
            }
        } else {
        	//如果进行了本地引用配置,则按照配置的决定
            isJvmRefer = isInjvm();
        }
		// 确定了本地引用
        if (isJvmRefer) {
        	/*******  2. 本地服务调用  ******/
        	// 生成一个本地引用的URL,协议类型为 injvm
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // 调用 refer 方法构建 InjvmInvoker 实例
            invoker = refprotocol.refer(interfaceClass, url);
        } else {
        	/*******  3. 远程服务调用  ******/
        	// 如果 url不为空,则说明可能会进行点对点调用,即服务直连
        	// 这里的 url 是在 ReferenceConfig#init 中 调用 ReferenceConfig#resolveFile 方法解析获取。
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
            // 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                    	
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                        	// 设置接口全限定名为 url 路径
                            url = url.setPath(interfaceName);
                        }
                        // 如果 url 协议类型是 Registry,则说明需要使用指定的注册中心
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        	// 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中,与服务暴露时的url 异曲同工
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                         	// 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
	                        // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
	                        // 最后将合并后的配置设置为 url 查询字符串中
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
            	// 如果没有服务直连,则需要从注册中心中获取提供者信息
            	// 检查注册中心配置
                checkRegistry();
                // 加载注册中心 URL
                List<URL> us = loadRegistries(false);
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                    	// 加载监控中心
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                        	// 添加监控中心信息
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // 到这里说明存在注册中心, 则添加 refer 参数到 url 中,并将 url 添加到 urls 
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                // 未配置注册中心,抛出异常
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }
			// 单个注册中心或服务提供者(服务直连)
            if (urls.size() == 1) {
            	/*******  3.1 单URL场景的远程调用  ******/
            	 // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
	            /*******  3.2 多URL场景的远程调用  ******/
            	// 多个注册中心或多个服务提供者,或者两者混合
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                // 获取所有的 Invoker
                for (URL url : urls) {
                	// 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
                	// 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
                	// 这里 refprotocol 是 Protocol$Adaptive 适配器类型
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use RegistryAwareCluster only when register's cluster is available
                     // 如果注册中心链接不为空,则将使用 RegistryAwareCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
                    // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                    // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url, must be direct invoke.
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
		// 是否检查提供者的可用性
        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        // 如果 为true,则消费者启动时对 invoker 进行可用性检查
        if (c && !invoker.isAvailable()) {
            // make it possible for consumer to retry later if provider is temporarily unavailable
            initialized = false;
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
         // 2.7.0 版本后,元数据中心发布消费者服务信息
         /*******  4. 元数据中心服务发布  ******/
        MetadataReportService metadataReportService = null;
        if ((metadataReportService = getMetadataReportService()) != null) {
            URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
            metadataReportService.publishConsumer(consumerURL);
        }
        // create service proxy
        /*******  5. 代理类的创建  ******/
        // 生成代理类
        return (T) proxyFactory.getProxy(invoker);
    }

我们这里可以总结为五个过程:

  • 服务引用判断 :首先需要根据用户参数确定当前服务引用是本地引用还是远程引用。二者的逻辑并不相同。
  • 本地服务调用 :首先根据配置检查是否为本地调用,若是,则调用 InjvmProtocol#refer 方法生成 InjvmInvoker 实例。若不是,则读取直连配置项或注册中心 url,并将读取到的 url 存储到 urls 中。
  • 远程服务调用 :远程服务的调用,分为两种情况,单URL 或 多URL场景。(这里的URL 指的是用户指定的注册中心或者直连URL)。通过这一步,将 URL 转化为了 Invoker。
  • 单URL场景的远程调用 :根据 urls 元素数量进行后续操作。若 urls 元素数量为1,则说明注册中心或直连URL只有一个,则直接通过 Protocol 自适应拓展类构建 Invoker 实例接口。
  • 多URL的远程调用:若 urls 元素数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker。
  • 元数据中心服务发布 :Dubbo 2.7 版本之后新增了元数据中心和配置中心,其中元数据中心会存储当前暴露的服务信息。
  • 代理类的创建 :在获取到Ref 的Invoker 后调用 ProxyFactory 生成代理类。 其中 :Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。

服务引用判断

这一部分的代码如下:其中注释已经解释的很清楚,这里不再赘述。

 		// 如果没有指定是否本地引用则 需要自己根据逻辑判断
        if (isInjvm() == null) {
        	 // 在指定本地引用的时候,但是 url 配置被指定,则不做本地引用
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            } else {
                // by default, reference local service if there is
                // 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
        		// 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true
                isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
            }
        } else {
        	//如果进行了本地引用配置,则按照配置的决定
            isJvmRefer = isInjvm();
        }

本地服务调用

实际上,无论是本地还是远程调用,其核心方法都是 Protocol#refer 方法,区别在于由于协议的不同,调用的 Protocol 实例不同,本地服务调用调用的是 InjvmProtocol#refer。 (需要注意的是无论是远程还是本地调用,这里的调用顺序都是经过 XxxProtocolWrapper 后才真正调用到InjvmProtocol#refer)

   URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
     invoker = refprotocol.refer(interfaceClass, url);

InjvmProtocol#refer 的实现如下:

    @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
    }

可以看到 InjvmProtocol#refer 的实现很简单,直接包装了一个InjvmInvoker 返回。

不过需要注意的是,这里将 exporterMap 传递给了 InjvmInvoker,而exporterMap中保存了本地暴露的服务信息。当消费者进行服务调用时,会调用 InjvmInvoker#doInvoke 方法,其实现如下,我们可以看到,InjvmInvoker#doInvoke 通过 exporterMap 获取到 调用的本地服务 Exporter 再调用

    @Override
    public Result doInvoke(Invocation invocation) throws Throwable {
        Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
        if (exporter == null) {
            throw new RpcException("Service [" + key + "] not found.");
        }
        RpcContext.getContext().setRemoteAddress(NetUtils.LOCALHOST, 0);
        return exporter.getInvoker().invoke(invocation);
    }

远程服务调用 远程服务调用首先要确定的就是从哪获取服务提供者。提供者的获取有两种可能

  • 消费者通过配置直接指定了提供者的地址。
  • 消费者通过注册中心获取提供者的地址。 下面的代码就针对这两种情况进行处理,最后获取的地址会保存到 urls 中,供后续处理。
     	// 如果 url不为空,则说明可能会进行点对点调用,即服务直连
     	// 这里的 url 是在 ReferenceConfig#init 中 调用 ReferenceConfig#resolveFile 方法解析获取。
         if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
         // 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
             String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
             if (us != null && us.length > 0) {
                 for (String u : us) {
                   	
                     URL url = URL.valueOf(u);
                     if (url.getPath() == null || url.getPath().length() == 0) {
                     	// 设置接口全限定名为 url 路径
                         url = url.setPath(interfaceName);
                     }
                     // 如果 url 协议类型是 Registry,则说明需要使用指定的注册中心
                     if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                     	// 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中,与服务暴露时的url 异曲同工
                         urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                     } else {
                      	// 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
                      // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
                      // 最后将合并后的配置设置为 url 查询字符串中
                         urls.add(ClusterUtils.mergeUrl(url, map));
                     }
                 }
             }
         } else { // assemble URL from register center's configuration
         	// 如果没有服务直连,则需要从注册中心中获取提供者信息
         	// 检查注册中心配置
             checkRegistry();
             // 加载注册中心 URL
             List<URL> us = loadRegistries(false);
             if (us != null && !us.isEmpty()) {
                 for (URL u : us) {
                 	// 加载监控中心
                     URL monitorUrl = loadMonitor(u);
                     if (monitorUrl != null) {
                     	// 添加监控中心信息
                         map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                     }
                     // 到这里说明存在注册中心, 则添加 refer 参数到 url 中,并将 url 添加到 urls 
                     urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                 }
             }
             // 未配置注册中心,抛出异常
             if (urls.isEmpty()) {
                 throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
             }
         }
    

    单URL场景的远程调用 这里的单URL 场景指的是 单一注册中心或 单一直连URL(服务提供者)时 远程调用服务Invoker 的创建,会直接调用 refprotocol.refer,如下:

              // 单个注册中心或服务提供者(服务直连)
              if (urls.size() == 1) {
              	/*******  3.1 单URL场景的远程调用  ******/
              	 // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
                  invoker = refprotocol.refer(interfaceClass, urls.get(0));
              }
    

    refprotocol.refer 的调用需要分为两种情况,

指定单一注册中心 :我们这里假设注册中心是zk,接口协议为Dubbo,则调用顺序为 :

refprotocol.refer = XxxProtocolWrapper#refer  => RegistryProtocol#refer =  XxxProtocolWrapper#refer  => DubboProtocol#refer

当消费者进行服务进行调用时,此时会通过下面的流程挑选出合适的Invoker用于服务调用。

 MockClusterInvoker#doInvoke -> FailoverClusterInvoker#doInvoke -> RegistryDirectory#list

指定单一直连URL :这里则不会出现上面注册中心的情况,一个直连 URL 即一个提供者。我们这里仍假设服务协议为Dubbo,则调用顺序为 :

refprotocol.refer = XxxProtocolWrapper#refer  => DubboProtocol#refer

可以得知,关键的两个方法为 RegistryProtocol#refer 和 DubboProtocol#refer,而在RegistryProtocol#refer 过程中又调用了 DubboProtocol#refer 方法,RegistryProtocol#refer 主要处理和注册中心相关的内容,而 DubboProtocol#refer 而是创建与提供者的连接。

多URL场景的远程调用

Search

    微信好友

    博士的沙漏

    Table of Contents