Dubbo集群容错

2020/02/15 Dubbo

Dubbo集群容错

为了避免单点故障,现在的应用通常至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多的服务器。这样,在同一环境下的服务提供者数量会大于1。对于服务消费者来说,同一环境下出现了多个服务提供者。这时会出现一个问题,服务消费者需要决定选择哪个服务提供者进行调用。另外服务调用失败时的处理措施也是需要考虑的,是重试呢,还是抛出异常,亦或是只打印异常等。为了处理这些问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。集群模块是服务提供者和服务消费者的中间层,为服务消费者屏蔽了服务提供者的情况,这样服务消费者就可以专心处理远程调用相关事宜。比如发请求,接受服务提供者返回的数据等。这就是集群的作用。

工作过程

集群工作过程可分为两个阶段:

  • 第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例。
  • 第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。Directory 的用途是保存 Invoker,可简单类比为 List< Invoker >。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删 Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。

Dubbo集群组件包含 Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。

整个集群工作流程如下:

ClusterInvoker#invoke-> Directory#list -> Router#route -> LoadBalance#select

组件介绍

Cluster & Cluster Invoker

Dubbo集群容错中存在两个概念,分别是集群接口 Cluster 和 Cluster Invoker,这两者是不同的。Cluster 是接口,而 Cluster Invoker 是一种 Invoker。服务提供者的选择逻辑,以及远程调用失败后的的处理逻辑均是封装在 Cluster Invoker 中。而 Cluster 接口和相关实现类的用途比较简单,仅用于生成 Cluster Invoker。简单来说,Cluster 就是用来创建 Cluster Invoker 的,并且一一对应。而Cluster 和 Cluster Invoker 的作用就是,在消费者进行服务调用时选择何种容错策略,如:服务调用失败后是重试、还是抛出异常亦或者返回一个空的结果集等。

Directory

Directory 即服务目录, 服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。而实际上服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。简单来说,Directory 中保存了当前可以提供服务的服务提供者列表集合。当消费者进行服务调用时,会从 Directory 中按照某些规则挑选出一个服务提供者来提供服务。

Router

服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,即规定了服务消费者可调用哪些服务提供者。服务目录在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由,筛选出符合路由规则的服务提供者。简单来说, Router制定了一些服务规则,Directory 中的服务提供者列表必须要满足 Router 规则才能作为候选服务提供者列表。

LoadBalance

当服务提供方是集群时,为了避免大量请求一直集中在一个或者几个服务提供方机器上,从而使这些机器负载很高,甚至导致服务不可用,需要做一定的负载均衡策略。Dubbo提供了多种均衡策略,默认为random,也就是每次随机调用一台服务提供者的服务。简单来说,LoadBalance 制定了某种策略,让请求可以按照某种规则(随机、hash 等)分发到服务提供者的机器上。

流程简述

  • 消费者进行服务调用,会经过 ClusterInvoker#invoke 方法,在 ClusterInvoker#invoke 方法中完成了集群容错的功能。
  • ClusterInvoker#invoke 方法中,首先会调用 Directory#list 来获取当前可用的服务列表,
  • Directory#list 会调用 Router#route,以便根据路由规则筛选出合适的服务列表。
  • ClusterInvoker#invoke 方法汇总,在获取到Directory#list 返回的服务列表后,会调用 LoadBalance#select 方法来根据负载均衡算法挑选一个服务提供者,来完成本次消费者的服务调用。需要注意的是,并非所有的容错策略都会调用负载均衡,如 BroadcastClusterInvoker 则不会再调用负载均衡组件。

Dubbo集群容错策略概述

当我们进行系统设计时,不仅要考虑正常情况下代码逻辑应该如何走,还要考虑异常情况下代码逻辑应该怎么走。当服务消费方调用服务提供方的服务出现错误时,Dubbo提供了多种容错方案,默认模式为Failover Cluster,也就是失败重试。

下面让我们看看Dubbo提供的集群容错模式。

  • Failover Cluster:失败重试 当服务消费方调用服务提供者失败后,会自动切换到其他服务提供者服务器进行重试,这通常用于读操作或者具有幂等的写操作。需要注意的是,重试会带来更长延迟。可以通过retries=”2”来设置重试次数(不含第1次)。 可以使用<dubbo:reference retries=”2”/>来进行接口级别配置的重试次数,当服务消费方调用服务失败后,此例子会再重试两次,也就是说最多会做3次调用,这里的配置对该接口的所有方法生效。 当然你也可以针对某个方法配置重试次数,比如: ```
- Failfast Cluster:快速失败
当服务消费方调用服务提供者失败后,立即报错,也就是只调用一次。通常,这种模式用于非幂等性的写操作。
- Failsafe Cluster:安全失败
当服务消费者调用服务出现异常时,直接忽略异常。这种模式通常用于写入审计日志等操作。
- Failback Cluster:失败自动恢复
当服务消费端调用服务出现异常后,在后台记录失败的请求,并按照一定的策略后期再进行重试。这种模式通常用于消息通知操作。
- Forking Cluster:并行调用
当消费方调用一个接口方法后,Dubbo Client会并行调用多个服务提供者的服务,只要其中有一个成功即返回。这种模式通常用于实时性要求较高的读操作,但需要浪费更多服务资源。
例如可通过forks="4"来设置最大并行数4。
- Broadcast Cluster:广播调用
当消费者调用一个接口方法后,Dubbo Client会逐个调用所有服务提供者,任意一台服务器调用异常则这次调用就标志失败。这种模式通常用于通知所有提供者更新缓存或日志等本地资源信息。

Dubbo本身提供了丰富的集群容错模式,但是如果你有定制化需求,可以根据Dubbo提供的扩展接口Cluster进行定制。

## Cluster 的概念
Dubbo集群容错中存在两个概念,分别是集群接口 Cluster 和 Cluster Invoker,这两者是不同的。Cluster 是接口,而 Cluster Invoker 是一种 Invoker。服务提供者的选择逻辑,以及远程调用失败后的的处理逻辑均是封装在 Cluster Invoker 中。而 Cluster 接口和相关实现类的用途比较简单,仅用于生成 Cluster Invoker。简单来说,Cluster 就是用来创建 Cluster Invoker 的,并且一一对应。而Cluster 和 Cluster Invoker 的作用就是,在消费者进行服务调用时选择何种容错策略,如:服务调用失败后是重试、还是抛出异常亦或者返回一个空的结果集等。
Cluster 接口如下:
```java
@SPI(FailoverCluster.NAME)
public interface Cluster {
	// 
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;
}

配置方式

<dubbo:reference cluster="failsafe" />

Cluster 的种类

Cluster 接口存在多个实现对应不同的容错策略。 如下是org.apache.dubbo.rpc.cluster.Cluster 文件中针对不同协议的具体实现类:

mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
registryaware=org.apache.dubbo.rpc.cluster.support.RegistryAwareCluster

上面的每个类都 对应一个 Cluster Invoker, 其中 MockClusterWrapper 是 扩展类,对应的 Cluster Invoker 为 MockClusterInvoker,其余都是容错策略,其作用如下:

  • MockClusterInvoker : MockClusterWrapper 对应的 Cluster Invoker。完成了本地Mock 的功能。这里需要注意由于MockClusterWrapper 是 扩展类,所以 MockClusterInvoker 在最外层,即当服务调用时的顺序为 : MockClusterInvoker#invoker -> XxxClusterInvoker#invoker。关于 MockClusterInvoker 的实现,详参: Dubbo衍生篇⑦ :本地Mock 和服务降级
  • Failover Cluster:失败重试。当服务消费方调用服务提供者失败后,会自动切换到其他服务提供者服务器进行重试,这通常用于读操作或者具有幂等的写操作。需要注意的是,重试会带来更长延迟。可以通过retries=”2”来设置重试次数(不含第1次)。 可以使用<dubbo:reference retries=“2”/>来进行接口级别配置的重试次数,当服务消费方调用服务失败后,此例子会再重试两次,也就是说最多会做3次调用,这里的配置对该接口的所有方法生效。
  • Failfast Cluster:快速失败。当服务消费方调用服务提供者失败后,立即报错,也就是只调用一次。通常,这种模式用于非幂等性的写操作。
  • Failsafe Cluster:安全失败。当服务消费者调用服务出现异常时,直接忽略异常。这种模式通常用于写入审计日志等操作。
  • Failback Cluster:失败自动恢复。当服务消费端调用服务出现异常后,在后台记录失败的请求,并按照一定的策略后期再进行重试。这种模式通常用于消息通知操作。
  • Forking Cluster:并行调用。当消费方调用一个接口方法后,Dubbo Client会并行调用多个服务提供者的服务,只要其中有一个成功即返回。这种模式通常用于实时性要求较高的读操作,但需要浪费更多服务资源。如下代码可通过forks=”4”来设置最大并行数:
  • Available Cluster :可用集群调用器。前面提到doInvoke的入参有远程服务提供者的列表invokers。AvailableClusterInvoker遍历invokers,当遍历到第一个服务可用的提供者时,便访问该提供者,成功返回结果,如果访问时失败抛出异常终止遍历。
  • Mergeable Cluster :该集群容错策略是对多个服务端返回结果合并,在消费者调多个分组下的同一个服务时会指定使用该 Cluster 来合并 多个分组执行的结果。
  • Broadcast Cluster:广播调用。当消费者调用一个接口方法后,Dubbo Client会逐个调用所有服务提供者,任意一台服务器调用异常则这次调用就标志失败。这种模式通常用于通知所有提供者更新缓存或日志等本地资源信息。
  • RegistryAware Cluster :当消费者引用多个注册中心时会指定使用该策略。默认会首先引用默认的注册中心服务,如果默认注册中心服务没有提供该服务,则会从其他注册中心中寻找该服务。 Dubbo本身提供了丰富的集群容错模式,但是如果你有定制化需求,可以根据Dubbo提供的扩展接口Cluster进行定制。

上述的每个Cluster 对应一个Cluster Invoker, 如下 FailoverCluster 对应 FailoverClusterInvoker,而 FailoverClusterInvoker 才是容错逻辑实现的地方,所以下文会直接分析 Cluster Invoker。

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }

}

由于每个Cluster Invoker 都是 AbstractClusterInvoker 的子类。例如 MockClusterInvoker#invoker 后的调用为:

MockClusterInvoker#invoker => AbstractClusterInvoker#invoke => AbstractClusterInvoker#doInvoker (AbstractClusterInvoker并未实现该方法供子类实现)

其中 MockClusterInvoker 为 MockClusterWrapper 对应 Cluster Invoker。完成了 本地mock功能。

AbstractClusterInvoker

由于上述的每个Cluster Invoker 都是 AbstractClusterInvoker 的子类。所以这里先来介绍一下AbstractClusterInvoker 中的一些公用方法。

AbstractClusterInvoker#invoke

我们上面提到过,服务消费者调用服务时会遵循如下流程:

MockClusterInvoker#invoker => AbstractClusterInvoker#invoke => AbstractClusterInvoker#doInvoker (AbstractClusterInvoker并未实现该方法供子类实现)

所以我们这里先来看看 AbstractClusterInvoker#invoke 的实现:

    @Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();

        // binding attachments into invocation.
        // 绑定 attachments 到 invocation 中.
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }
		// 列举 Invoker,即 从 Directory 中获取 Invoker 列表(路由后的列表)
		// 这里的调用顺序是 list -> AbstractDirectory#List -> RegistryDirectory#doList
        List<Invoker<T>> invokers = list(invocation);
        // 初始化负载均衡。如果 invokers 不为空,则从第一个invokers 的URL进行初始化,如果调用为空,则从默认调用LoadBalance(RandomLoadBalance)进行调用
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        // 异步调用时附加 id
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 供ClusterInvoker 子类实现。
        return doInvoke(invocation, invokers, loadbalance);
    }
    
	...
	// 加载负载均衡策略。
    protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
    	// 如果 invokers 不为空,则从第一个 invoker 上获取调用方法指定的 负载均衡策略,没有指定默认为 random
        if (CollectionUtils.isNotEmpty(invokers)) {
            return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
    }

整个流程比较简单:

  • list(invocation); :方法获取服务路由后的 invoker 列表。通过 Directory#list 获取。
  • initLoadBalance(invokers, invocation) : 加载负载均衡策略,加载策略为 : 如果 invokers 不为空,则从第一个 invoker 上获取调用方法指定的 负载均衡策略,没有指定默认为 random。
  • doInvoke(invocation, invokers, loadbalance); :开始真正调用逻辑。该方法 AbstractClusterInvoker 类中并未实现,供子类实现。

AbstractClusterInvoker#select

AbstractClusterInvoker#select 的作用是负载均衡策略来筛选 Invoker,AbstractClusterInvoker 的子类 在 doInvoker 中会调用该方法来应用负载均衡策略。如FailoverClusterInvoker等。但是需要注意并非所有的容错策略都会应用负载均衡策略。


	// 源码注释:使用负载平衡策略选择一个调用程序。 
	// a)首先,使用loadbalance选择一个调用程序。 如果此调用程序在先前选择的列表中,或者如果此调用程序不可用,则继续执行步骤b(重新选择),否则返回第一个选定的调用程序 
	// b)重新选择,重新选择的验证规则:selected > 可用。 此规则确保所选调用者有最小机会成为先前选择的列表中的一个,并且还保证此调用者可用。
	// 入参中的 selected 代表之前已经调用过的 invoker 列表
 	protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
        List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        }
        // 获取调用方法名
        String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();
 		// 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的
    	// 调用同一个服务提供者,除非该提供者挂了再进行切换
        boolean sticky = invokers.get(0).getUrl()
            .getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);

        //ignore overloaded method
        // 检测 invokers 列表是否包含 stickyInvoker,如果不包含,
        // 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
            stickyInvoker = null;
        }
        //ignore concurrency problem
        // 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 包含 
        // stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供服务。
        // 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
         	// availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的 
            // isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。
            if (availablecheck && stickyInvoker.isAvailable()) {
                return stickyInvoker;
            }
        }
	    // 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。
	    // 此时继续调用 doSelect 选择 Invoker
        Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
		 // 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker
        if (sticky) {
            stickyInvoker = invoker;
        }
        return invoker;
    }

这里需要注意 粘滞连接特性 :粘滞连接是指让服务消费者尽可能 调用同一个服务提供者,除非该提供者挂了再进行切换。 综上, AbstractClusterInvoker#select 方法的流程是(官方描述): 首先是获取 sticky 配置,然后再检测 invokers 列表中是否包含 stickyInvoker。如果不包含,则认为该 stickyInvoker 不可用,此时将其置空。这里的 invokers 列表可以看做是存活着的服务提供者列表,如果这个列表不包含 stickyInvoker,那自然而然的认为 stickyInvoker 挂了,所以置空。 如果 stickyInvoker 存在于 invokers 列表中,此时要进行下一项检测 — 检测 selected 中是否包含 stickyInvoker。 如果selected 中包含 stickyInvoker的话,说明 stickyInvoker 在此之前没有成功提供服务(但其仍然处于存活状态)。此时我们认为这个服务不可靠,不应该在重试期间内再次被调用,因此这个时候不会返回该 stickyInvoker。 如果 selected 不包含 stickyInvoker,此时还需要进行可用性检测,比如检测服务提供者网络连通性等。当可用性检测通过,才可返回 stickyInvoker,否则调用 doSelect 方法选择 Invoker。如果 sticky 为 true,此时会将 doSelect 方法选出的 Invoker 赋值给 stickyInvoker。 在上面的代码中,我们发现AbstractClusterInvoker#doSelect才是真正完成负载均衡的调用。所以下面我们来看看 AbstractClusterInvoker#doSelect 的实现。

AbstractClusterInvoker#doSelect 真正完成了负载均衡的相关处理。其实现如下:


 	private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
        List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        }
        // 如果只有一个服务提供者,不用再执行负载均衡策略,直接返回即可。
        if (invokers.size() == 1) {
            return invokers.get(0);
        }
        // 通过 负载均衡策略选举出一个 Invoker
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

        //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
        // 如果 selected 包含负载均衡选择出的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选
        // 即,如果 invoker 已经被调用过 ||  invoker 服务不可用,则进行重选
        if ((selected != null && selected.contains(invoker))
                || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
            try {
            	// 进行重选
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                // 如果 rinvoker 不为空,则将其赋值给 invoker
                if (rinvoker != null) {
                    invoker = rinvoker;
                } else {
                    //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                    // rinvoker 为空,定位 invoker 在 invokers 中的位置
                    int index = invokers.indexOf(invoker);
                    try {
                        //Avoid collision
                        // 获取 index + 1 位置处的 Invoker,以下代码等价于:
                    	//     invoker = invokers.get((index + 1) % invokers.size());
                        invoker = invokers.get((index + 1) % invokers.size());
                    } catch (Exception e) {
                        logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                    }
                }
            } catch (Throwable t) {
                logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
            }
        }
        return invoker;
    }

AbstractClusterInvoker#doSelect 主要做了两件事 :

  • 通过负载均衡组件选择 Invoker。
  • 如果选出来的 Invoker 不稳定或不可用,此时需要调用 reselect 方法进行重选。若 reselect 选出来的 Invoker 为空,此时定位 invoker 在 invokers 列表中的位置 index,然后获取 index + 1 处的 invoker,这也可以看做是重选逻辑的一部分。

AbstractClusterInvoker#reselect

AbstractClusterInvoker#reselect 完成了服务的重选。当之前挑选的 Invoker 不可用,或者已经选择过(保存在 selected 集合中),则认为 invoker 不合格,进行重新筛选。

	//	重新选择,首先使用不在“ selected”中的调用者,如果所有调用者都在“ selected”中,只需使用负载均衡策略选择一个可用的调用者。
 	private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
        List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {

        //Allocating one in advance, this list is certain to be used.
        List<Invoker<T>> reselectInvokers = new ArrayList<>(
            invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

        // First, try picking a invoker not in `selected`.
        // 筛选出 可用 && 未被调用过的 invoker 保存到 reselectInvokers集合汇总
        for (Invoker<T> invoker : invokers) {
            if (availablecheck && !invoker.isAvailable()) {
                continue;
            }

            if (selected == null || !selected.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
		// 如果 reselectInvokers 不为空,则再次进行负载均衡
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }

        // Just pick an available invoker using loadbalance policy
        // 到这一步说明 reselectInvokers 为空,也即是说,所有的服务要么不可用,要么被调用过。则开始挑选已经调用过的 invoker 中可用的 invoker 保存到 reselectInvokers 集合中
        if (selected != null) {
            for (Invoker<T> invoker : selected) {
                if ((invoker.isAvailable()) // available first
                        && !reselectInvokers.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
        }
        // 再次进行负载均衡调用
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }

        return null;
    }

AbstractClusterInvoker#reselect 方法的逻辑很清楚

  • 首先获取 服务提供者列表中 (未被调用过 && 可用的服务) 的 invoker列表 交由负载均衡组件筛选。
  • 如果没有 (未被调用过 && 可用的服务) 的 invoker,则退而求其次,放松筛选规则,挑选 (调用过的服务 && 可用的服务) 的 invoker列表,并交由负载均衡组件筛选。

下面我们来分析AbstractClusterInvoker 各个子类的具体实现。根据上面的分析我们知道,AbstractClusterInvoker将 doInvoke 交由子类具体实现,所以我们下面着重看各个子类 doInvoke 方法的实现 。

FailoverClusterInvoker

失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries=“2” 来设置重试次数(不含第一次)。 FailoverClusterInvoker#doInvoke 实现如下 :

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    	// 1. 获取所有服务提供者
        List<Invoker<T>> copyInvokers = invokers;
        // 检查服务提供者,如果集合为空则抛出异常
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        // 2. 获取指定的重试次数  retries。默认重试次数为1次,也就是会调用两次。
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        // 使用循环,失败重试
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        // 3. 开始循环调用
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            // 重试时,进行重新选择,避免重试时  invoker 列表已经发生改变 这样做的好处是,如果某个服务挂了,通过调用 list 可得到最新可用的 Invoker 列表
            // 注意: 如果 列表发生变化,则 invoked判断会失效,因为  invoker 实例已经改变
            if (i > 0) {
            	// 3.1 重试时校验
            	// 如果当前实例已经被销毁,则抛出异常
                checkWhetherDestroyed();
                // 重新获取所有服务提供者
                copyInvokers = list(invocation);
                // check again
                // 再次检查 invoker 列表是否为空
                checkInvokers(copyInvokers, invocation);
            }
            // 3.2. 选择负载均衡策略。该方法调用的是上面所说的AbstractClusterInvoker#select 方法
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            //  添加到 invoker 到 invoked 列表中,表明当前 invoker已经被调用过
            invoked.add(invoker);
            //  设置 invoked 到 RPC 上下文中
            RpcContext.getContext().setInvokers((List) invoked);
            try {
            	// 3.3具体发起远程调用
                Result result = invoker.invoke(invocation);
				// ... 日志处理
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
		//	... 若重试失败,则抛出异常
    }

FailoverClusterInvoker#doInvoke 的实现很简单:

  • 获取所有的服务提供者。
  • 获取消费者指定的重试次数,如果没有指定,则默认的重试次数为1次,那么这个接口总共调用次数=重试次数+1(1是正常调用) 开始根据重试次数,循环调用。如果调用成功,则跳出循环 重试时的校验。当服务第一次调用时,此时i=0,不会进行重试校验。而当进行重试时, i>0 条件满足,此时会进行校验。 checkWhetherDestroyed(); 检查是否有线程调用了当前ReferenceConfig的destroy()方法,销毁了当前消费者。如果当前消费者实例已经被销毁,那么重试就没意义了,所以会抛出RpcException异常。如果 消费者没有被销毁,则通过 copyInvokers = list(invocation); 重新获取当前服务提供者列表,这是因为从第一次调开始到现在可能提供者列表已经变化了。随后又通过 checkInvokers(copyInvokers, invocation); 对服务列表进行了一次校验。 通过负载均衡策略筛选出合适的 Invoker 进行具体调用。需要注意的是在具体调用时(即 invoker.invoke(invocation) )出现异常会进行重试,而在3.1,3.2 时出现异常并不会重试。因为 3.1,3.2 属于调用前的准备工作。

FailfastClusterInvoker

FailfastClusterInvoker 即快速失败,其代码很简单,调用出错即抛出异常。 快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。 FailfastClusterInvoker#doInvoke 的实现如下,代码比较简单,不再赘述:

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    	// 检查服务提供者,如果集合为空则抛出异常
        checkInvokers(invokers, invocation);
        // 通过负载均衡策略选择 Invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
        	// 服务调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
        	// 调用错误抛出异常
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
          	// ... 抛出异常
        }
    }

FailsafeClusterInvoker

失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。 安全失败,即失败并不抛出异常,而是返回一个空结果。代码也很简单,这里不再赘述 FailsafeClusterInvoker#doInvoke 的实现如下,代码比较简单,不再赘述:

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            // 出现异常返回空结果集
            return new RpcResult(); // ignore
        }
    }

FailbackClusterInvoker

失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务消费者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。 FailbackClusterInvoker#doInvoke 的实现如下:

    @Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        try {
            checkInvokers(invokers, invocation);
            invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
        // 如果调用过程中发生异常,此时仅打印错误日志,不抛出异常
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                    + e.getMessage() + ", ", e);
            // 记录调用信息,进行重试调用
            addFailed(loadbalance, invocation, invokers, invoker);
             // 返回一个空结果给服务消费者
            return new RpcResult(); // ignore
        }
    }
    // 记录调用失败信息,并进行重试
    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
    	// 创建自定义计时器实例
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    failTimer = new HashedWheelTimer(
                            new NamedThreadFactory("failback-cluster-timer", true),
                            1,
                            TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
        // 创建定时重试任务,每隔5秒执行一次
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
        }
    }

RetryTimerTask

RetryTimerTask 由于实现了 TimerTask 接口,所以我们这里只需要关注RetryTimerTask#run 方法即可。如下

        @Override
        public void run(Timeout timeout) {
            try {
            	// 负载均衡选择
                Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                lastInvoker = retryInvoker;
                // 进行远端调用
                retryInvoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
                // 到达重试次数则不再重试
                if ((++retryTimes) >= retries) {
                    logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                } else {
                	// 再次重试
                    rePut(timeout);
                }
            }
        }
        
        private void rePut(Timeout timeout) {
            if (timeout == null) {
                return;
            }

            Timer timer = timeout.timer();
            if (timer.isStop() || timeout.isCancelled()) {
                return;
            }

            timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
        }

ForkingClusterInvoker

并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=“2” 来设置最大并行数。

    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;
            // 1. 获取参数配置
            // 获取最大并行数。默认2
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            // 获取超时时间。默认1000
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 2. 筛选并行调用的 invoker 
             // 2.1如果 forks 配置不合理,则直接将 invokers 赋值给 selected
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                 // 2.2 循环选出 forks 个 Invoker,并添加到 selected 中
                for (int i = 0; i < forks; i++) {
                    // TODO. Add some comment here, refer chinese version for more details.
                    // 挑选 invoker。
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    // 添加 invoker 到 selected 中
                    if (!selected.contains(invoker)) {
                        //Avoid add the same invoker several times.
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            // 用于记录调用出现异常的次数
            final AtomicInteger count = new AtomicInteger();
            // 创建阻塞队列
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            // 3. 进行并发调用
            for (final Invoker<T> invoker : selected) {
            	// 开启线程并发调用
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                         	// 进行远程调用
                            Result result = invoker.invoke(invocation);
                            // 将结果存在阻塞队列中。
                            ref.offer(result);
                        } catch (Throwable e) {
                        	// 3.1 value  >= selected.size() 则说明所有调用都失败了,记录错误
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
             	// 4. 从阻塞队列中取出远程调用结果并返回
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                 // 如果结果类型为 Throwable,则抛出异常
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                // 返回结果
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            // clear attachments which is binding to current thread.
            RpcContext.getContext().clearAttachments();
        }
    }

上面的代码逻辑如下:

  • 获取参数配置 :这里获取消费者指定的forks 和 timeout 。forks 代表最大并发量, timeout 代表请求超时时间
  • 筛选并行调用的 invoker : forks <= 0   forks >= invokers.size() : 参数配置不合理,则所有的 invoker 都作为并发调用的 invoker 否则遍历通过负载均衡策略筛选出指定数量的 invoker
  • 进行并发调用:开启线程池进行并发调用。 当 value >= selected.size() 时才会将异常信息入队。当 value >= selected.size() 则说明所有的并发调用都失败了,此时需要将异常信息记录到队列中。供后面使用。
  • 从阻塞队列中取出远程调用结果并返回。这里需要注意的是,即使这里的RPC 调用返回了值,其他的并行调用还在继续。

BroadcastClusterInvoker

BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。

 @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;
        // 遍历调用所有 Invoker
        for (Invoker<T> invoker : invokers) {
            try {
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        if (exception != null) {
            throw exception;
        }
        return result;
    }

AvailableClusterInvoker

AvailableClusterInvoker遍历invokers,当遍历到第一个服务可用的提供者时,便访问该提供者,成功返回结果,如果访问时失败抛出异常终止遍历。 AvailableClusterInvoker#doInvoke 的实现如下:

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        for (Invoker<T> invoker : invokers) {
        	// 如果服务可用,则进行调用
            if (invoker.isAvailable()) {
                return invoker.invoke(invocation);
            }
        }
        throw new RpcException("No provider available in " + invokers);
    }

MergeableClusterInvoker

当消费者引用多分组的服务提供者时,Dubbo 会指定使用 MergeableClusterInvoker 作为集群容错策略。

RegistryAwareClusterInvoker

RegistryAwareClusterInvoker 会优先选择默认的注册中心来寻找服务(通过 default 属性来指定),如果默认注册中心没有提供该服务则选择其他注册中心提供的服务。

如何基于扩展接口自定义集群容错策略

Dubbo 本身提供了丰富的集群容错策略,但是如果你有定制化需求,可以根据Dubbo提供的扩展接口Cluster进行定制。

  • 为了自定义扩展实现,创建自定义的集群类,实现Cluster接口。同时创建一个 ClusterInvoker 实现AbstractClusterInvoker类。 ```java public class SimpleCluster implements Cluster { @Override public Invoker join(Directory directory) throws RpcException { return new SimpleClusterInvoker<>(directory); } }

public class SimpleClusterInvoker extends AbstractClusterInvoker { public SimpleClusterInvoker(Directory directory) { super(directory); }

@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    return invokers.get(0).invoke(invocation);
} } ``` 通过上面的代码可知,doInvoke方法需要重写,在该方法内用户就可以实现自己的集群容错策略。 - 由于dubbo spi 机制,我们需要创建 META-INF/dubbo/org.apache.dubbo.rpc.cluster.Cluster 文件,并添加内容,其中 simple 为key,集群容错的策略。value 为对应的实现类。这里即代表着 simple 容错策略使用 SimpleCluster 来实现。 - 调用时通过 cluster 属性指定集群容错策略 ```java  @Reference(version = "1.0.0", group = "dubbo", cluster = "simple")
private SimpleDemoService simpleDemoService; ```

Directory 概念

在 Dubbo 中 存在 SPI 接口 org.apache.dubbo.rpc.cluster.Directory。即服务目录,用于存放服务提供列表。

Directory 即服务目录, 服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。在一个服务集群中,服务提供者数量并不是一成不变的,如果集群中新增了一台机器,相应地在服务目录中就要新增一条服务提供者记录。或者,如果服务提供者的配置修改了,服务目录中的记录也要做相应的更新。如果这样说,服务目录和注册中心的功能不就雷同了吗?确实如此,这里这么说是为了方便大家理解。实际上服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。Invoker 有什么用呢?看名字就知道了,这是一个具有远程调用功能的对象。讲到这大家应该知道了什么是服务目录了,它可以看做是 Invoker 集合,且这个集合中的元素会随注册中心的变化而进行动态调整。

简单来说 :Directory 中保存了当前可以提供服务的服务提供者列表集合。当消费者进行服务调用时,会从 Directory 中按照某些规则挑选出一个服务提供者来提供服务。

Directory 的种类

服务目录目前内置的实现有两个,分别为 StaticDirectory 和 RegistryDirectory,它们均是 AbstractDirectory 的子类。AbstractDirectory 实现了 Directory 接口,这个接口包含了一个重要的方法定义,即 list(Invocation),用于列举 Invoker。

Directory 继承自 Node 接口,Node 这个接口继承者比较多,像 Registry、Monitor、Invoker 等均继承了这个接口。这个接口包含了一个获取配置信息的方法 Node#getUrl,实现该接口的类可以向外提供配置信息。除此之外, RegistryDirectory 实现了 NotifyListener 接口,当注册中心节点信息发生变化后,RegistryDirectory 可以通过此接口方法得到变更信息,并根据变更信息动态调整内部 Invoker 列表。

AbstractDirectory

StaticDirectory 和 RegistryDirectory,它们均是 AbstractDirectory 的子类。所以我们这里先来看看 AbstractDirectory 中的公共逻辑。

AbstractDirectory 封装了 Invoker 列举流程,具体的列举逻辑则由子类实现,这是典型的模板模式。AbstractDirectory 的整个实现很简单

public abstract class AbstractDirectory<T> implements Directory<T> {

    // logger
    private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);
	// 当前 注册中心URL 或者 直连URL
    private final URL url;
	
    private volatile boolean destroyed = false;
	// 消费者URL
    private volatile URL consumerUrl;

    protected RouterChain<T> routerChain;

    public AbstractDirectory(URL url) {
        this(url, null);
    }

    public AbstractDirectory(URL url, RouterChain<T> routerChain) {
        this(url, url, routerChain);
    }

    public AbstractDirectory(URL url, URL consumerUrl, RouterChain<T> routerChain) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
		// 如果是注册中心的协议,则进行进一步解析
        if (url.getProtocol().equals(Constants.REGISTRY_PROTOCOL)) {
            Map<String, String> queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
            this.url = url.addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
        } else {
            this.url = url;
        }

        this.consumerUrl = consumerUrl;
        setRouterChain(routerChain);
    }
    
	// 根据调用信息获取到服务提供者列表
    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
		// 这里直接交由子类实现,也就是 StaticDirectory 和 RegistryDirectory 的实现
        return doList(invocation);
    }

    @Override
    public URL getUrl() {
        return url;
    }

    public RouterChain<T> getRouterChain() {
        return routerChain;
    }

    public void setRouterChain(RouterChain<T> routerChain) {
        this.routerChain = routerChain;
    }

    protected void addRouters(List<Router> routers) {
        routers = routers == null ? Collections.emptyList() : routers;
        routerChain.addRouters(routers);
    }

    public URL getConsumerUrl() {
        return consumerUrl;
    }

    public void setConsumerUrl(URL consumerUrl) {
        this.consumerUrl = consumerUrl;
    }

    public boolean isDestroyed() {
        return destroyed;
    }

    @Override
    public void destroy() {
        destroyed = true;
    }

    protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;

}

这里需要关注的就是 AbstractDirectory#list 方法。当消费者进行服务调用时,Dubbo 会将调用相关信息封装成Invocation进行调用。在调用过程中会通过 AbstractDirectory#doList 来获取服务提供者列表。而 AbstractDirectory#doList的具体实现则交由子类 StaticDirectory 和 RegistryDirectory 来实现。

StaticDirectory

StaticDirectory 即静态服务目录,顾名思义,它内部存放的 Invoker 是不会变动的。所以,理论上它和不可变 List 的功能很相似。下面我们来看一下这个类的实现。

public class StaticDirectory<T> extends AbstractDirectory<T> {
    private static final Logger logger = LoggerFactory.getLogger(StaticDirectory.class);
	// Invoker 列表,在 StaticDirectory 构造时就已经初始化
    private final List<Invoker<T>> invokers;

    public StaticDirectory(List<Invoker<T>> invokers) {
        this(null, invokers, null);
    }

    public StaticDirectory(List<Invoker<T>> invokers, RouterChain<T> routerChain) {
        this(null, invokers, routerChain);
    }

    public StaticDirectory(URL url, List<Invoker<T>> invokers) {
        this(url, invokers, null);
    }

    public StaticDirectory(URL url, List<Invoker<T>> invokers, RouterChain<T> routerChain) {
        super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routerChain);
        if (invokers == null || invokers.isEmpty())
            throw new IllegalArgumentException("invokers == null");
        this.invokers = invokers;
    }
	// 服务提供者的接口
    @Override
    public Class<T> getInterface() {
        return invokers.get(0).getInterface();
    }
	// 检测服务目录是否可用
    @Override
    public boolean isAvailable() {
        if (isDestroyed()) {
            return false;
        }
         // 只要有一个 Invoker 是可用的,就认为当前目录是可用的
        for (Invoker<T> invoker : invokers) {
            if (invoker.isAvailable()) {
                return true;
            }
        }
        return false;
    }

    @Override
    public void destroy() {
        if (isDestroyed()) {
            return;
        }
        // 调用父类销毁逻辑
        super.destroy();
         // 遍历 Invoker 列表,并执行相应的销毁逻辑
        for (Invoker<T> invoker : invokers) {
            invoker.destroy();
        }
        invokers.clear();
    }

	// 构建路由链
    public void buildRouterChain() {
        RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
        routerChain.setInvokers(invokers);
        this.setRouterChain(routerChain);
    }
	
	// 通过路由链路来进行路由,获取最终的 Invoker 列表
    @Override
    protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
        List<Invoker<T>> finalInvokers = invokers;
        if (routerChain != null) {
            try {
            	// 进行服务路由,筛选出满足路由规则的服务列表。
                finalInvokers = routerChain.route(getConsumerUrl(), invocation);
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
        return finalInvokers == null ? Collections.emptyList() : finalInvokers;
    }

}

StaticDirectory 的实现并不复杂,因为其是不可变的服务列表,相较于RegistryDirectory 较少了动态监听的功能。

RegistryDirectory

RegistryDirectory 是一种动态服务目录,实现了 NotifyListener 接口,也是我们最常使用的Directory。当注册中心服务配置发生变化后,RegistryDirectory 可收到与当前服务相关的变化。收到变更通知后,RegistryDirectory 可根据配置变更信息刷新 Invoker 列表。

RegistryDirectory 中有几个比较重要的逻辑:

  • Invoker 的列举逻辑(RegistryDirectory#doList) :服务目录功能的本质是提供服务提供者列表。而当消费者进行服务调用时,会通过Directory#list -> AbstractDirectory#doList 的方式来获取服务提供者列表。
  • 接收服务配置变更的逻辑(RegistryDirectory#notify):RegistryDirectory并非像StaticDirectory 一样服务列表不可变,这就代表着,RegistryDirectory有着接受服务变更的功能。
  • Invoker 列表的刷新逻辑(RegistryDirectory#refreshOverrideAndInvoker) :上面说了 RegistryDirectory 有接收服务列表变化的功能,那么就需要 RegistryDirectory 在接收到列表变更后刷新本地服务列表。

Invoker 的列举逻辑

RegistryDirectory#doList 的实现如下:

	@Override
    public List<Invoker<T>> doList(Invocation invocation) {
    	// 校测是否可用:没有服务提供者 || 提供者被禁用时会抛出异常
        if (forbidden) {
            // ... 抛出异常
        }
		// 如果group设置的是可以匹配多个组,则直接返回当前的 Invoker 集合
        if (multiGroup) {
            return this.invokers == null ? Collections.emptyList() : this.invokers;
        }
		// 否则通过 路由链路进行路由
        List<Invoker<T>> invokers = null;
        try {
            // Get invokers from cache, only runtime routers will be executed.
            invokers = routerChain.route(getConsumerUrl(), invocation);
        } catch (Throwable t) {
            logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
        }
        return invokers == null ? Collections.emptyList() : invokers;
    }

其中 RouterChain#route 的实现如下:

    public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = invokers;
        for (Router router : routers) {
        	// 这里将 finalInvokers 交由路由按照规则进行过滤,返回最终过滤后的 finalInvokers。
            finalInvokers = router.route(finalInvokers, url, invocation);
        }
        return finalInvokers;
    }

这里可以看到整个服务列举的过程很简单:将本地缓存的服务列表交由服务路由 router 进行筛选,将筛选后的服务列表返回。

我们这里需要注意的是,当消费者调用多分组的服务时,路由规则会失效。

接收服务配置变更的逻辑

RegistryDirectory 是一个动态服务目录,会随注册中心配置的变化进行动态调整。因此 RegistryDirectory 实现了 NotifyListener 接口,通过这个接口获取注册中心变更通知。

即服务消费者在启动时,会订阅 注册中心 providers、configurators、routers 节点,并设置回调函数为 RegistryDirectory#notify,当节点更新时,会调用 RegistryDirectory#notify 方法来进行本地的更新(在启动时会立刻调用一次该回调方法,用于同步当前节点配置)。

	// 在调用该方法之前,会调用 AbstractRegistry#notify 中将URL 按照类别划分,再分别调用 RegistryDirectory#notify 方法。
  	@Override
    public synchronized void notify(List<URL> urls) {
    	//  对 URLs 进行合法性过滤
        List<URL> categoryUrls = urls.stream()
        		// 合法性组别校验,默认 providers
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.toList());

        /**
         * TODO Try to refactor the processing of these three type of urls using Collectors.groupBy()?
         */
         // 筛选出配置信息URL 并转换成 configurators 
        this.configurators = Configurator.toConfigurators(classifyUrls(categoryUrls, UrlUtils::isConfigurator))
                .orElse(configurators);
		// 筛选出路由URL 并转换成Router 添加到 AbstractDirectory#routerChain 中
		// RouterChain保存了服务提供者的URL列表转换为invoker列表和可用服务提供者对应的invokers列表和路由规则信息
        toRouters(classifyUrls(categoryUrls, UrlUtils::isRoute)).ifPresent(this::addRouters);

        // providers
        // 筛选出 提供者URL 并进行服务提供者的更新
        refreshOverrideAndInvoker(classifyUrls(categoryUrls, UrlUtils::isProvider));
    }

简述逻辑:RegistryDirectory 在接收到服务配置变化后,会按照类型进行划分(configurators、routers,providers) ,并分别进行处理。

Invoker 列表的刷新逻辑

在上面 RegistryDirectory#notify 中,对URL进行了分组后分别进行处理,对于providers 节点的刷新,即服务提供者的列表刷新是比较重要的部分,其实现在RegistryDirectory#refreshOverrideAndInvoker中 :

	private void refreshOverrideAndInvoker(List<URL> urls) {
        // mock zookeeper://xxx?mock=return null
        // 重写URL(也就是把mock=return null等信息拼接到URL中)并保存到overrideDirectoryUrl中
        overrideDirectoryUrl();
        // 刷新 服务提供者 URL,根据URL 生成Invoker
        refreshInvoker(urls);
    }
	// 刷新 服务列表
	private void refreshInvoker(List<URL> invokerUrls) {
        Assert.notNull(invokerUrls, "invokerUrls should not be null");
		// 如果只有一个 协议为 empty 的url,则表明需要销毁所有协议,因为empty 协议为空协议,个人理解就是为了防止空url存在而生成的无意义的url
		 
        if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls
                .get(0)
                .getProtocol())) {
              // 设置禁止访问
            this.forbidden = true; // Forbid to access
            // invoker设置为空集合
            this.invokers = Collections.emptyList();
            routerChain.setInvokers(this.invokers);
            destroyAllInvokers(); // Close all invokers
        } else {
        	// 设置允许访问
            this.forbidden = false; // Allow to access
            // urlInvokerMap 需要使用本地引用,因为 urlInvokerMap自身可能随时变化,可能指向 null
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls == Collections.<URL>emptyList()) {
                invokerUrls = new ArrayList<>();
            }
            // 这里防止并发更新,cachedInvokerUrls 被  volatile 修饰
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            // 将 url 转换成 Invoker,key为 url.toFullString(), value 为 Invoker
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

            // state change
            // If the calculation is wrong, it is not processed.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                        .toString()));
                return;
            }
			// 转化为不可修改 list,防止并发修改
            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
            // pre-route and build cache, notice that route cache should build on original Invoker list.
            // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
            // 保存服务提供者 Invoker
            routerChain.setInvokers(newInvokers);
            //  如果匹配多个 group,则进行合并
            this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap;

            try {
            	// 销毁无用的 Invoker
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

上面可以看到整个过程还是比较简单:

  • 如果 invokerUrls 只有一个 empty 协议,则认为需要销毁所有 Invoker,则对Invoker进行销毁。
  • 否则将 URL 转换为 Invoker ,并进行校验后赋值给 RegistryDirectory#invokers 、RegistryDirectory#urlInvokerMap、AbstractDirectory#routerChain 中。随后销毁无用的 Invoker,避免服务消费者调用已下线的服务的服务。

RegistryDirectory#toInvokers

可以看到上面的关键的逻辑 在于 URL 转换为 Invoker ,即如下,

Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);

toInvokers 实现如下:

	// org.apache.dubbo.registry.integration.RegistryDirectory#toInvokers
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected	
            // 检测服务提供者协议是否被服务消费者所支持
            // 如果消费者指定了了调用服务的协议,则只使用指定的协议,即如果消费者指定提供者协议类型为 dubbo,则只会需要协议类型为dubbo的提供者。
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                   // 若服务提供者协议头不被消费者所支持,则忽略当前 providerUrl
                    continue;
                }
            }
            // 跳过 空协议
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            // 没有能够处理该协议的 Protocol实现类,则跳过
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
             	// .... 日志打印
                continue;
            }
            // 合并提供者url 顺序是 : override > -D >Consumer > Provider
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            // 避免重复解析
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            // 从缓存中获取 Invoker
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            // 获取与 url 对应的 Invoker
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            // 如果缓存中不存在当前URL对应的invoker,则进行远程引用
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    // 对 disabled 和 enabled 参数的校验,即服务是否启用
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                    	// 调用 生成invoker委托类
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                // 将 Invoker 保存到 缓存 newUrlInvokerMap 中
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        // 返回 url 转换成的 Invoker Map 
        return newUrlInvokerMap;
    }

可以看到整个逻辑

  • 如果消费者指定了服务提供者的协议类型,则按照指定协议来获取URL
  • 如果第一步获取成功,或者没有指定协议类型,则会对URL进行合并、缓存校验等过程
  • 如果缓存中不存在当前URL的Invoker ,则会通过下面这一句来创建Invoker。
    invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
    

    这里我们可以看到关键的 逻辑在 protocol.refer(serviceType, url),由于当前服务的协议是Dubbo,所以这里的调用顺序应为:

      refprotocol.refer = XxxProtocolWrapper#refer  => DubboProtocol#refer
    

DubboProtocol#refer

DubboProtocol#refer 根据URL 创建了 Invoker,在这里会建立与服务提供者的网络连接。其具体实现如下:

    @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    	// 序列化优化
        optimizeSerialization(url);
        // create rpc invoker.
        // 创建
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

	...
	
	private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        // 是否共享连接
        boolean service_share_connect = false;
        // 获取连接数,默认为0,表示未配置
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
         // 如果未配置 connections,则共享连接
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
            	// 获取共享客户端 : getSharedClient 中会从缓存中获取,如果没有命中,则会调用 initClient 方法创建客户端
                clients[i] = getSharedClient(url);
            } else {
            	// 初始化新的客户端
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

这里需要注意的是在创建连接的过程中, 由于一台机器可以提供多个服务,那么消费者在引用这些服务时会考虑是与这些服务建立一个共享连接,还是与每一个服务单独建立一个连接。这里可以通过 connections 设置数量来决定创建多少客户端连接,默认是共享同一个客户端。

下面我们看一下 DubboProtocol#getSharedClient 方法的实现:

 /**
     * Get shared connection
     * 获取共享客户端
     */
    private ExchangeClient getSharedClient(URL url) {
        String key = url.getAddress();
        // 获取带有“引用计数”功能的 ExchangeClient
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            if (!client.isClosed()) {
            	// 增加引用计数
                client.incrementAndGetCount();
                return client;
            } else {
                referenceClientMap.remove(key);
            }
        }

        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
            if (referenceClientMap.containsKey(key)) {
                return referenceClientMap.get(key);
            }
  			// 如果缓存没命中,则创建 ExchangeClient 客户端
            ExchangeClient exchangeClient = initClient(url);
            // 将 ExchangeClient 实例传给 ReferenceCountExchangeClient,这里使用了装饰模式
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            referenceClientMap.put(key, client);
            ghostClientMap.remove(key);
            locks.remove(key);
            return client;
        }
    }


  /**
     * Create new connection
     * 创建一个新的连接
     */
    private ExchangeClient initClient(URL url) {

        // client type setting.
        // 从url获取客户端类型,默认为 netty
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
		// 添加编解码和心跳包参数到 url 中
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue.
         // 检测客户端类型是否存在,不存在则抛出异常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // connection should be lazy
            // 获取 lazy 配置,并根据配置值决定创建的客户端类型
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            	// 创建懒加载 ExchangeClient 实例
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
            	// 创建普通 ExchangeClient 实例
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

这里我们可以发现,无论是共享连接还是非共享连接,都会调用 DubboProtocol#initClient 方法来创建客户端。这里需要注意在创建客户端的时候,可以根据 LAZY_CONNECT_KEY (默认为 lazy) 配置来决定是否懒加载客户端,如果懒加载客户端,则会在第一次调用服务时才会创建与服务端的连接(也是调用 Exchangers.connect 方法),如果不是懒加载,则在服务启动的时候便会与提供者建立连接。默认是立即加载,即消费者在启动时就会与提供者建立连接。

同时我们可以注意到 在非懒加载的情况下,当URl 转换成 Invoker 时,消费者便已经和提供者建立了链接(通过 Exchangers.connect(url, requestHandler) ),也即是说,默认情况下消费者在启动的时候将所有提供者URL转化为Invoker,即代表消费者启动时便已经和所有的提供者建立了连接。

Directory 的调用过程

经过上面的介绍,我们对 Directory 的功能了解的差不多,下面我们看看 Directory 的调用过程。根据 Directory 的功能我们就可以推测出其调用是在消费端。

Directory 的创建

当消费者服务启动时会通过 ReferenceConfig#createProxy 创建提供者的代理类。而在 ReferenceConfig#createProxy 方法中,完成了 Directory 的创建过程。

ReferenceConfig#createProxy 中会根据注册中心的数量或直连URL的数量,如果为单一URL,则创建 RegistryDirectory ,否则创建 StaticDirectory(也会创建 RegistryDirectory )。

RegistryProtocol的创建

上面讲了,无论是多URL还是单URL都会执行 RegistryProtocol#refer,而在 RegistryProtocol#refer方法中,创建了 RegistryDirectory。即是说,无论是单URL 还是多 URL 都会调用 RegistryProtocol#refer 创建RegistryDirectory。

StaticDirectory 的创建

在多注册中心或者多服务提供者时会创建StaticDirectory 作为服务目录。创建过程在ReferenceConfig#createProxy中,部分代码如下,其中 refprotocol.refer(interfaceClass, url) 即1.1 RegistryProtocol#refer 中的 RegistryProtocol#refer 方法

         List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
         URL registryURL = null;
         for (URL url : urls) {
             invokers.add(refprotocol.refer(interfaceClass, url));
             if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                 registryURL = url; // use last registry url
             }
         }
         if (registryURL != null) { // registry url is available
             // use RegistryAwareCluster only when register's cluster is available
             URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
             // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
             invoker = cluster.join(new StaticDirectory(u, invokers));
         } else { // not a registry url, must be direct invoke.
             invoker = cluster.join(new StaticDirectory(invokers));
         }

另外当消费端引用多个分组的服务时,dubbo会对每个分组创建一个对应的StaticDirectory对象。这一部分是在RegistryDirectory#toMergeInvokerList 中完成。

Diectory 的回调

Diectory 接收到服务列表变化会通过回调方法通知自己。而由于StaticDirectory是不可变列表,所以不存在回调逻辑。

Search

    微信好友

    博士的沙漏

    Table of Contents