Eureka服务端源码

2020/09/03 Eureka

Eureka服务端源码

Eureka Server提供服务注册服务,各个节点启动后,会在Eureka Server中进行注册,这样EurekaServer中的服务注册表中将会存储所有可用服务节点的信息。

Eureka Server简介

Eureka Server作为一个开箱即用的服务注册中心,提供了以下的功能,用以满足与Eureka Client交互的需求:

  • 服务注册
  • 接受服务心跳
  • 服务剔除
  • 服务下线
  • 集群同步
  • 获取注册表中服务实例信息 需要注意的是,Eureka Server同时也是一个Eureka Client,在不禁止Eureka Server的客户端行为时,它会向它配置文件中的其他Eureka Server进行拉取注册表、服务注册和发送心跳等操作。

服务实例注册表

InstanceRegistry是Eureka Server中注册表管理的核心接口。

  • InstanceRegistry对Eureka Server的注册表实现类PeerAwareInstanceRegistryImpl进行了继承和扩展,使其适配Spring Cloud的使用环境,主要实现由PeerAwareInstanceRegistryImpl提供。
  • InstanceRegistry是Eureka Server注册表的最核心接口,其职责是在内存中管理注册到Eureka Server中的服务实例信息。

InstanceRegistry分别继承了LeaseManager和LookupService接口。LeaseManager接口的功能是对注册到Eureka Server中的服务实例租约进行管理。而LookupService提供对服务实例进行检索的功能,在Eureka Client的源码解析中已进行介绍,在此不对其接口进行展示。 LeaseManager接口提供的方法代码如下所示:

//LeaseManager.java
public interface LeaseManager<T> {
    void register(T r, int leaseDuration, boolean isReplication);
    boolean cancel(String appName, String id, boolean isReplication);
    boolean renew(String appName, String id, boolean isReplication);
    void evict();
}

LeaseManager接口作用是对注册到Eureka Server中的服务实例租约进行管理,分别有服务注册、服务下线、服务租约更新以及服务剔除等操作。

LeaseManager中管理的对象是Lease,Lease代表一个Eureka Client服务实例信息的租约,它提供了对其内持有的类的时间有效性操作。Lease持有的类是代表服务实例信息的InstanceInfo。Lease中定义了租约的操作类型,分别是注册、下线、更新,同时提供了对租约中时间属性的各项操作。租约默认有效时长(duration)为90秒。

InstanceRegistry在继承LeaseManager和LookupService接口的基础上,还添加了一些特有的方法,可以更为简单地管理服务实例租约和查询注册表中的服务实例信息。可以通过AbstractInstanceRegistry查看InstanceRegistry接口方法的具体实现。

PeerAwareInstanceRegistry继承了InstanceRegistry接口,在其基础上添加了Eureka Server集群同步的操作,其实现类PeerAwareInstanceRegistryImpl继承了AbstractInstanceRegistry的实现,在对本地注册表操作的基础上添加了对其peer节点的同步复制操作,使得Eureka Server集群中的注册表信息保持一致。

服务注册

我们了解到Eureka Client在发起服务注册时会将自身的服务实例元数据封装在InstanceInfo中,然后将InstanceInfo发送到Eureka Server。Eureka Server在接收到Eureka Client发送的InstanceInfo后将会尝试将其放到本地注册表中以供其他Eureka Client进行服务发现。

服务注册的主要实现位于AbstractInstanceRegistry#registry方法中,代码如下所示:

//AbstractInstanceRegistry.java
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        // 获取读锁
        read.lock();
        // 这里的registry是ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
           registry根据appName对服务实例集群进行分类
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
        // 这里有一个比较严谨的操作,防止在添加新的服务实例集群租约时把已有的其他线程添加的集群
           租约覆盖掉如果存在该键值直接返回已存在的值否则添加该键值对返回null
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        //根据instanceId获取实例的租约
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            // 如果该实例的租约已经存在,比较最后更新时间戳的大小,取最大值的注册信息为有效
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                registrant = existingLease.getHolder();
            }

在register中,服务实例的InstanceInfo保存在Lease中,Lease在AbstractInstanceRegistry中统一通过ConcurrentHashMap保存在内存中。在服务注册过程中,会先获取一个读锁,防止其他线程对registry注册表进行数据操作,避免数据的不一致。然后从resgitry查询对应的InstanceInfo租约是否已经存在注册表中,根据appName划分服务集群,使用InstanceId唯一标记服务实例。如果租约存在,比较两个租约中的InstanceInfo的最后更新时间lastDirtyTimestamp,保留时间戳大的服务实例信息InstanceInfo。如果租约不存在,意味这是一次全新的服务注册,将会进行自我保护的统计,创建新的租约保存InstanceInfo。接着将租约放到resgitry注册表中。

之后将进行一系列缓存操作并根据覆盖状态规则设置服务实例的状态,缓存操作包括将InstanceInfo加入用于统计Eureka Client增量式获取注册表信息的recentlyChangedQueue和失效responseCache中对应的缓存。最后设置服务实例租约的上线时间用于计算租约的有效时间,释放读锁并完成服务注册。代码如下所示:

// AbstractInstanceRegistry.java
        } else {
            // 如果租约不存在,这是一个新的注册实例
            synchronized (lock) {
            if (this.expectedNumberOfRenewsPerMin > 0) {
            // 自我保护机制
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                    (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
              }
          }
        }
        // 创建新的租约
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            // 如果租约存在,继承租约的服务上线初始时间
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 保存租约
        gMap.put(registrant.getId(), lease);
        // 添加最近注册队列
        // private final CircularQueue<Pair<Long, String>> recentRegisteredQueue
        // 用来统计最近注册服务实例的数据
        synchronized (recentRegisteredQueue) {
        recentRegisteredQueue.add(new Pair<Long, String>(
            System.currentTimeMillis(),
            registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        ...
        // 根据覆盖状态规则得到服务实例的最终状态,并设置服务实例的当前状态
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);
        // 如果服务实例状态为UP,设置租约的服务上线时间,只有第一次设置有效
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        // 添加最近租约变更记录队列,标识ActionType为ADDED
        // 这将用于Eureka Client增量式获取注册表信息
        // private ConcurrentLinkedQueue<RecentlyChangedItem> 
        recentlyChangedQueue
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        // 设置服务实例信息更新时间
        registrant.setLastUpdatedTimestamp();
        // 设置response缓存过期,这将用于Eureka Client全量获取注册表信息
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant. getSecureVipAddress());
    } finally {
        // 释放锁
        read.unlock();
    }
}

在register方法中有诸多的同步操作,为了防止数据被错误地覆盖,有兴趣的读者可以细细研究一下,在此不再展开讲述。

接受服务心跳

在Eureka Client完成服务注册之后,它需要定时向Eureka Server发送心跳请求(默认30秒一次),维持自己在Eureka Server中租约的有效性。

Eureka Server处理心跳请求的核心逻辑位于AbstractInstanceRegistry#renew方法中。renew方法是对Eureka Client位于注册表中的租约的续租操作,不像register方法需要服务实例信息,仅根据服务实例的服务名和服务实例id即可更新对应租约的有效时间。具体代码如下所示:

// AbstractInstanceRegistry.java
public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    // 根据appName获取服务集群的租约集合
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
    leaseToRenew = gMap.get(id);
    }
    // 租约不存在,直接返回false
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
           // 根据覆盖状态规则得到服务实例的最终状态
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                // 如果得到的服务实例最后状态是UNKNOWN,取消续约
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                instanceInfo.setStatus(overriddenInstanceStatus);
            }
        }
        // 统计每分钟续租的次数,用于自我保护
        renewsLastMin.increment();
        // 更新租约中的有效时间
        leaseToRenew.renew();
        return true;
    }
}

在#renew方法中,不关注InstanceInfo,仅关注于租约本身以及租约的服务实例状态。如果根据服务实例的appName和instanceInfoId查询出服务实例的租约,并且根据#getOverriddenInstanceStatus方法得到的instanceStatus不为InstanceStatus.UNKNOWN,那么更新租约中的有效时间,即更新租约Lease中的lastUpdateTimestamp,达到续约的目的;如果租约不存在,那么返回续租失败的结果。

服务剔除

如果Eureka Client在注册后,既没有续约,也没有下线(服务崩溃或者网络异常等原因),那么服务的状态就处于不可知的状态,不能保证能够从该服务实例中获取到回馈,所以需要服务剔除AbstractInstanceRegistry#evict方法定时清理这些不稳定的服务,该方法会批量将注册表中所有过期租约剔除。实现代码如下所示:

// AbstractInstanceRegistry.java
@Override
public void evict() {
    evict(0l);
}
public void evict(long additionalLeaseMs) {
    // 自我保护相关,如果出现该状态,不允许剔除服务
    if (!isLeaseExpirationEnabled()) {
        return;
    }
    // 遍历注册表register,一次性获取所有的过期租约
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.
        entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.
                entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                // 1
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }
    // 计算最大允许剔除的租约的数量,获取注册表租约总数
    int registrySize = (int) getLocalRegistrySize();
    // 计算注册表租约的阀值,与自我保护相关
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;
    // 计算剔除租约的数量
    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        Random random = new Random(System.currentTimeMillis());
        // 逐个随机剔除
        for (int i = 0; i < toEvict; i++) {
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);
            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            // 逐个剔除
            internalCancel(appName, id, false);
        }
    }
}

服务剔除将会遍历registry注册表,找出其中所有的过期租约,然后根据配置文件中续租百分比阀值和当前注册表的租约总数量计算出最大允许的剔除租约的数量(当前注册表中租约总数量减去当前注册表租约阀值),分批次剔除过期的服务实例租约。对过期的服务实例租约调用AbstractInstanceRegistry#internalCancel服务下线的方法将其从注册表中清除掉。

服务剔除#evict方法中有很多限制,都是为了保证Eureka Server的可用性:

  • 自我保护时期不能进行服务剔除操作。
  • 过期操作是分批进行。
  • 服务剔除是随机逐个剔除,剔除均匀分布在所有应用中,防止在同一时间内同一服务集群中的服务全部过期被剔除,以致大量剔除发生时,在未进行自我保护前促使了程序的崩溃。

服务剔除是一个定时的任务,所以AbstractInstanceRegistry中定义了一个EvictionTask用于定时执行服务剔除,默认为60秒一次。服务剔除的定时任务一般在AbstractInstanceRegistry初始化结束后进行,按照执行频率evictionIntervalTimerInMs的设定,定时剔除过期的服务实例租约。

自我保护机制主要在Eureka Client和Eureka Server之间存在网络分区的情况下发挥保护作用,在服务器端和客户端都有对应实现。假设在某种特定的情况下(如网络故障),Eureka Client和Eureka Server无法进行通信,此时Eureka Client无法向Eureka Server发起注册和续约请求,Eureka Server中就可能因注册表中的服务实例租约出现大量过期而面临被剔除的危险,然而此时的Eureka Client可能是处于健康状态的(可接受服务访问),如果直接将注册表中大量过期的服务实例租约剔除显然是不合理的。

针对这种情况,Eureka设计了“自我保护机制”。在Eureka Server处,如果出现大量的服务实例过期被剔除的现象,那么该Server节点将进入自我保护模式,保护注册表中的信息不再被剔除,在通信稳定后再退出该模式;在Eureka Client处,如果向Eureka Server注册失败,将快速超时并尝试与其他的Eureka Server进行通信。“自我保护机制”的设计大大提高了Eureka的可用性。

服务下线

Eureka Client在应用销毁时,会向Eureka Server发送服务下线请求,清除注册表中关于本应用的租约,避免无效的服务调用。在服务剔除的过程中,也是通过服务下线的逻辑完成对单个服务实例过期租约的清除工作。

服务下线的主要实现代码位于AbstractInstanceRegistry#internalCancel方法中,仅需要服务实例的服务名和服务实例id即可完成服务下线。具体代码如下所示:

// AbstractInstanceRegistry.java
@Override
public boolean cancel(String appName, String id, boolean isReplication) {
    return internalCancel(appName, id, isReplication);
}

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        // 获取读锁,防止被其他线程进行修改
        read.lock();
        CANCEL.increment(isReplication);
        // 根据appName获取服务实例的集群
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        // 移除服务实例的租约
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        // 将服务实例信息添加到最近下线服务实例统计队列
        synchronized (recentCanceledQueue) {
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        }
        // 租约不存在,返回false
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            return false;
        } else {
           // 设置租约的下线时间
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            ...
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                // 添加最近租约变更记录队列,标识ActionType为DELETED
                // 这将用于Eureka Client增量式获取注册表信息
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
            }
            // 设置response缓存过期
            invalidateCache(appName, vip, svip);
            // 下线成功
            return true;
        }
    } finally {
        // 释放锁
        read.unlock();
    }
}

internalCancel方法与register方法的行为过程很类似,首先通过registry根据服务名和服务实例id查询关于服务实例的租约Lease是否存在,统计最近请求下线的服务实例用于Eureka Server主页展示。如果租约不存在,返回下线失败;如果租约存在,从registry注册表中移除,设置租约的下线时间,同时在最近租约变更记录队列中添加新的下线记录,以用于Eureka Client的增量式获取注册表信息,最后设置repsonse缓存过期。

internalCancel方法中同样通过读锁保证registry注册表中数据的一致性,避免脏读。

集群同步

如果Eureka Server是通过集群的方式进行部署,那么为了维护整个集群中Eureka Server注册表数据的一致性,势必需要一个机制同步Eureka Server集群中的注册表数据。

Eureka Server集群同步包含两个部分,一部分是Eureka Server在启动过程中从它的peer节点中拉取注册表信息,并将这些服务实例的信息注册到本地注册表中;另一部分是Eureka Server每次对本地注册表进行操作时,同时会将操作同步到它的peer节点中,达到集群注册表数据统一的目的。

  • Eureka Server初始化本地注册表信息 在Eureka Server启动的过程中,会从它的peer节点中拉取注册表来初始化本地注册表,这部分主要通过PeerAwareInstanceRegistry#syncUp方法完成,它将从可能存在的peer节点中,拉取peer节点中的注册表信息,并将其中的服务实例信息注册到本地注册表中,如下所示:
    // PeerAwareInstanceRegistry.java
    public int syncUp() {
      // 从临近的peer中复制整个注册表
      int count = 0;
      // 如果获取不到,线程等待
      for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); 
          i++) {
          if (i > 0) {
              try {
                  Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
              } catch (InterruptedException e) {
                  break;
              }
          }
          // 获取所有的服务实例
          Applications apps = eurekaClient.getApplications();
          for (Application app : apps.getRegisteredApplications()) {
              for (InstanceInfo instance : app.getInstances()) {
                  try {
                      // 判断是否可注册,主要用于AWS环境下进行,若部署在其他的环境,直接返回true
                      if (isRegisterable(instance)) {
                          // 注册到自身的注册表中
                          register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                          count++;
                      }
                  } catch (Throwable t) {
                      logger.error("During DS init copy", t);
                  }
              }
          }
      }
      return count;
    }
    

    Eureka Server也是一个Eureka Client,在启动的时候也会进行DiscoveryClient的初始化,会从其对应的Eureka Server中拉取全量的注册表信息。在Eureka Server集群部署的情况下,Eureka Server从它的peer节点中拉取到注册表信息后,将遍历这个Applications,将所有的服务实例通过AbstractRegistry#register方法注册到自身注册表中。

在初始化本地注册表时,Eureka Server并不会接受来自Eureka Client的通信请求(如注册、或者获取注册表信息等请求)。在同步注册表信息结束后会通过PeerAwareInstanceRegistryImpl#openForTraffic方法允许该Server接受流量。代码如下所示:

// PeerAwareInstanceRegistryImpl.java
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // 初始化自我保护机制统计参数
    this.expectedNumberOfRenewsPerMin = count * 2;
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());

    this.startupTime = System.currentTimeMillis();
    // 如果同步的应用实例数量为0,将在一段时间内拒绝Client获取注册信息
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    // 判断是否是AWS运行环境,此处忽略
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        primeAwsReplicas(applicationInfoManager);
    }
    // 修改服务实例的状态为健康上线,可以接受流量
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}

在Eureka Server中有一个StatusFilter过滤器,用于检查Eureka Server的状态,当Server的状态不为UP时,将拒绝所有的请求。在Client请求获取注册表信息时,Server会判断此时是否允许获取注册表中的信息。上述做法是为了避免Eureka Server在#syncUp方法中没有获取到任何服务实例信息时(Eureka Server集群部署的情况下),Eureka Server注册表中的信息影响到Eureka Client缓存的注册表中的信息。如果Eureka Server在#syncUp方法中没有获得任何的服务实例信息,它将把peerInstancesTransferEmptyOnStartup设置为true,这时该Eureka Server在WaitTimeInMsWhenSyncEmpty(可以通过eureka.server.wait-time-in-ms-when-sync-empty设置,默认是5分钟)时间后才能被Eureka Client访问获取注册表信息。

  • Eureka Server之间注册表信息的同步复制 为了保证Eureka Server集群运行时注册表信息的一致性,每个Eureka Server在对本地注册表进行管理操作时,会将相应的操作同步到所有peer节点中。

在PeerAwareInstanceRegistryImpl中,对Abstractinstanceregistry中的#register、#cancel和#renew等方法都添加了同步到peer节点的操作,使Server集群中注册表信息保持最终一致性,如下所示:

//PeerAwareInstanceRegistryImpl.java
@Override
public boolean cancel(final String appName, final String id, final boolean isReplication) {
    if (super.cancel(appName, id, isReplication)) {
        // 同步下线状态
        replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
        ...
   }
   ...
}
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    // 同步注册状态
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) {
        // 同步续约状态
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

同步的操作主要有:

public enum Action {
    Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
    ...
}

对此需要关注replicateToPeers方法,它将遍历Eureka Server中peer节点,向每个peer节点发送同步请求。代码如下所示:

//PeerAwareInstanceRegistryImpl.java
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // 如果peer集群为空,或者这本来就是复制操作,那么就不再复制,防止造成循环复制
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // 向peer集群中的每一个peer进行同步
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // 如果peer节点是自身的话,不进行同步复制
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 根据Action调用不同的同步请求
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

PeerEurekaNode代表一个可同步共享数据的Eureka Server。在PeerEurekaNode中,具有register、cancel、heartbeat和statusUpdate等诸多用于向peer节点同步注册表信息的操作。

在replicateInstanceActionsToPeers方法中将根据action的不同,调用PeerEurekaNode的不同方法进行同步复制,代码如下所示:

//PeerAwareInstanceRegistryImpl.java
private void replicateInstanceActionsToPeers(Action action, String appName,
    String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
            // 同步下线
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // 同步心跳
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                // 同步注册
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // 同步状态更新
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}

PeerEurekaNode中的每一个同步复制都是通过批任务流的方式进行操作,同一时间段内相同服务实例的相同操作将使用相同的任务编号,在进行同步复制的时候根据任务编号合并操作,减少同步操作的数量和网络消耗,但是同时也造成同步复制的延时性,不满足CAP中的C(强一致性)。

通过Eureka Server在启动过程中初始化本地注册表信息和Eureka Server集群间的同步复制操作,最终达到了集群中Eureka Server注册表信息一致的目的。

获取注册表中服务实例信息

Eureka Server中获取注册表的服务实例信息主要通过两个方法实现:AbstractInstanceRegistry#getApplicationsFromMultipleRegions从多地区获取全量注册表数据,AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions从多地区获取增量式注册表数据。

  • getApplicationsFromMultipleRegions getApplicationsFromMultipleRegions方法将会从多个地区中获取全量注册表信息,并封装成Applications返回,实现代码如下所示:
    //AbstractInstanceRegistry.java
    public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
      boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
      Applications apps = new Applications();
      apps.setVersion(1L);
      // 从本地registry获取所有的服务实例信息InstanceInfo
      for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
          Application app = null;
          if (entry.getValue() != null) {
              for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.
                  getValue().entrySet()) {
                  Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                  if (app == null) {
                      app = new Application(lease.getHolder().getAppName());
                  }
                  app.addInstance(decorateInstanceInfo(lease));
              }
          }
          if (app != null) {
              apps.addApplication(app);
          }
      }
      if (includeRemoteRegion) {
          // 获取远程Region中的Eureka Server中的注册表信息
          ...
      }
      apps.setAppsHashCode(apps.getReconcileHashCode());
      return apps;
    }
    

    它首先会将本地注册表registry中的所有服务实例信息提取出来封装到Applications中,再根据是否需要拉取远程Region中的注册表信息,将远程Region的Eureka Server注册表中的服务实例信息添加到Applications中。最后将封装了全量注册表信息的Applications返回给Client。

  • getApplicationDeltasFromMultipleRegions getApplicationDeltasFromMultipleRegions方法将会从多个地区中获取增量式注册表信息,并封装成Applications返回,实现代码如下所示:
    //AbstractInstanceRegistry.java
    public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
      if (null == remoteRegions) {
          remoteRegions = allKnownRemoteRegions; // null means all remote regions.
      }
    
      boolean includeRemoteRegion = remoteRegions.length != 0;
      Applications apps = new Applications();
      apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
      Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
      try {
          write.lock();// 开启写锁
          // 遍历recentlyChangedQueue队列获取最近变化的服务实例信息InstanceInfo
          Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
          while (iter.hasNext()) {
              //...
          }
          if (includeRemoteRegion) {
              // 获取远程Region中的Eureka Server的增量式注册表信息
             ...
          } finally {
          write.unlock();
      }
      // 计算应用集合一致性哈希码,用以在Eureka Client拉取时进行对比
      apps.setAppsHashCode(apps.getReconcileHashCode());
      return apps;
    }
    

    获取增量式注册表信息将会从recentlyChangedQueue中获取最近变化的服务实例信息。recentlyChangedQueue中统计了近3分钟内进行注册、修改和剔除的服务实例信息,在服务注册AbstractInstanceRegistry#registry、接受心跳请求AbstractInstanceRegistry#renew和服务下线AbstractInstanceRegistry#internalCancel等方法中均可见到recentlyChangedQueue对这些服务实例进行登记,用于记录增量式注册表信息。#getApplicationsFromMultipleRegions方法同样提供了从远程Region的Eureka Server获取增量式注册表信息的能力。

Search

    微信好友

    博士的沙漏

    Table of Contents