Eureka客户端源码解析
Eureka Client为了简化开发人员的开发工作,将很多与Eureka Server交互的工作隐藏起来,自主完成。
为了跟踪Eureka的运行机制,读者可以通过打开Spring Boot的Debug模式来查看更多的输出日志,如下所示:
logging:
level:
org.springframework: DEBUG
Eukeka Client通过Starter的方式引入依赖,Spring Boot将会为项目使用以下的自动配置类:
- EurekaClientAutoConfiguration:Eureke Client自动配置类,负责Eureka Client中关键Beans的配置和初始化,如ApplicationInfoManager和EurekaClientConfig等。
- RibbonEurekaAutoConfiguration:Ribbon负载均衡相关配置。
- EurekaDiscoveryClientConfiguration:配置自动注册和应用的健康检查器。
读取应用自身配置信息
通过EurekaDiscoveryClientConfiguration配置类,Spring Boot帮助Eureka Client完成很多必要Bean的属性读取和配置 下面我们对Spring Cloud中的服务发现客户端DiscoveryClient进行进一步的介绍,它是客户端进行服务发现的核心接口。 DiscoveryClient是Spring Cloud中用来进行服务发现的顶级接口,在Netflix Eureka或者Consul中都有相应的具体实现类,该接口提供的方法如下:
//DiscoveryClient.java
public interface DiscoveryClient {
// 获取实现类的描述
String description();
// 通过服务Id获取服务实例的信息
List<ServiceInstance> getInstances(String serviceId);
// 获取所有的服务实例Id
List<String> getServices();
...
}
EurekaDiscoveryClient继承了DiscoveryClient接口,但是通过查看EurekaDiscoveryClient中代码,会发现它是通过组合EurekaClien类实现接口的功能,如下为getInstance方法的实现:
//EurekaDiscoveryClient.java
@Override
public List<ServiceInstance> getInstances (String serviceId) {
List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId, false);
List<ServiceInstance> instances = new ArrayList<>();
for (InstanceInfo info : infos) {
instances.add(new EurekaServiceInstance(info));
}
return instances;
}
EurekaClient来自于com.netflix.discovery包中,其默认实现为com.netflix.discovery.DiscoveryClient,属于eureka-client的源代码,它提供了Eureka Client注册到Server上、续租、下线以及获取Server中注册表信息等诸多关键功能。Spring Cloud通过组合方式调用了Eureka中的服务发现方法。
服务发现客户端
为了对Eureka Client的执行原理进行讲解,首先需要对服务发现客户端com.netflix.discover.DiscoveryClient职能以及相关类进行讲解,它负责了与Eureka Server交互的关键逻辑。
DiscoveryClient职责
DiscoveryClient是Eureka Client的核心类,包括与Eureka Server交互的关键逻辑,具备了以下职能:
- 注册服务实例到Eureka Server中;
- 发送心跳更新与Eureka Server的租约;
- 在服务关闭时从Eureka Server中取消租约,服务下线;
- 查询在Eureka Server中注册的服务实例列表。
DiscoveryClient类结构
DiscoveryClient继承了LookupService接口,LookupService作用是发现活跃的服务实例,主要方法如下:
//LookupService.java
public interface LookupService<T> {
//根据服务实例注册的appName来获取封装有相同appName的服务实例信息容器
Application getApplication(String appName);
//返回当前注册表中所有的服务实例信息
Applications getApplications();
//根据服务实例的id获取服务实例信息
List<InstanceInfo> getInstancesById(String id);
...
}
Application持有服务实例信息列表,它可以理解成同一个服务的集群信息,这些服务实例都挂在同一个服务名appName下。InstanceInfo代表一个服务实例信息。Application部分代码如下:
//Application.java
public class Application {
private static Random shuffleRandom = new Random();
//服务名
private String name;
@XStreamOmitField
private volatile boolean isDirty = false;
@XStreamImplicit
private final Set<InstanceInfo> instances;
private final AtomicReference<List<InstanceInfo>> shuffledInstances;
private final Map<String, InstanceInfo> instancesMap;
...
}
为了保证原子性操作,Application中对InstanceInfo的操作都是同步操作。
Applications是注册表中所有服务实例信息的集合,里面的操作大多也是同步操作。
EurekaClient继承了LookupService接口,为DiscoveryClient提供了一个上层接口,目的是方便从Eureka 1.x到Eureka 2.x(已停止开发)的升级过渡。EurekaClient接口属于比较稳定的接口,即使在下一阶段也会被保留。
EurekaCient在LookupService的基础上扩充了更多的接口,提供了更丰富的获取服务实例的方式,主要有:
- 提供了多种方式获取InstanceInfo,例如根据区域、Eureka Server地址等获取。
- 提供了本地客户端(所处的区域、可用区等)的数据,这部分与AWS密切相关。
- 提供了为客户端注册和获取健康检查处理器的能力。
除去查询相关的接口,我们主要关注EurekaClient中以下两个接口,代码如下所示:
// EurekaClient.java
//为Eureka Client注册健康检查处理器
public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
//为Eureka Client注册一个EurekaEventListener(事件监听器)
// 监听Client服务实例信息的更新
public void registerEventListener(EurekaEventListener eventListener);
Eureka Server一般通过心跳(heartbeats)来识别一个实例的状态。Eureka Client中存在一个定时任务定时通过HealthCheckHandler检测当前Client的状态,如果Client的状态发生改变,将会触发新的注册事件,更新Eureka Server的注册表中该服务实例的相关信息。HealthCheckHandler的代码如下所示:
// HealthCheckHandler.java
public interface HealthCheckHandler {
InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus);
}
HealthCheckHandler接口的代码如上所示,其在spring-cloud-netflix-eureka-client中的实现类为EurekaHealthCheckHandler,主要组合了spring-boot-actuator中的HealthAggregator和HealthIndicator,以实现对Spring Boot应用的状态检测。
Eureka中的事件模式属于观察者模式,事件监听器将监听Client的服务实例信息变化,触发对应的处理事件:
DiscoveryClient构造函数
在DiscoveryClient构造函数中,Eureka Client会执行从Eureka Server中拉取注册表信息、服务注册、初始化发送心跳、缓存刷新(重新拉取注册表信息)和按需注册定时任务等操作,可以说DiscoveryClient的构造函数贯穿了Eureka Client启动阶段的各项工作。DiscoveryClient的构造函数传入的参数如下所示:
//DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider)
ApplicationInfoManager和EurekaClientConfig在前面内容中已经做了介绍,一个是应用信息管理器,另一个是封装了Client与Server交互配置信息的类。
AbstractDiscoveryClientOptionalArgs是用于注入一些可选参数,以及一些jersey1和jersey2通用的过滤器。而BackupRegistry充当了备份注册中心的职责,当Eureka Client无法从任何一个Eureka Server中获取注册表信息时,BackupRegistry将被调用以获取注册表信息。默认的实现是NotImplementedRegistryImpl,即没有实现。
在构造方法中,忽略掉构造方法中大部分的赋值操作,我们逐步了解了配置类中的属性会对DiscoveryClient的行为造成什么影响。DiscoveryClient构造函数中的部分代码如下所示:
// DiscoveryClient.java
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
config#shouldFetchRegistry(对应配置为eureka.client.fetch-register)为true表示Eureka Client将从Eureka Server中拉取注册表信息。config#shouldRegisterWithEureka(对应配置为eureka.client.register-with-eureka)为true表示Eureka Client将注册到Eureka Server中。如果上述的两个配置均为false,那么Discovery的初始化将直接结束,表示该客户端既不进行服务注册也不进行服务发现。
接着定义一个基于线程池的定时器线程池ScheduledExecutorService,线程池大小为2,一个线程用于发送心跳,另一个线程用于缓存刷新,同时定义了发送心跳和缓存刷新线程池,代码如下所示:
//DiscoveryClient.java
scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d").setDaemon(true).build());
heartbeatExecutor = new ThreadPoolExecutor(...);
cacheRefreshExecutor = new ThreadPoolExecutor(...);
之后,初始化Eureka Client与Eureka Server进行HTTP交互的Jersey客户端,将AbstractDiscoveryClientOptionalArgs中的属性用来构建EurekaTransport,如下所示:
// DiscoveryClient.java
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
EurekaTransport是DiscoveryClient中的一个内部类,其内封装了DiscoveryClient与Eureka Server进行HTTP调用的Jersey客户端。
再接着从Eureka Server中拉取注册表信息,代码如下所示:
// DiscoveryClient.java
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
如果EurekaClientConfig#shouldFetchRegistry为true时,fetchRegistry方法将会被调用。在Eureka Client向Eureka Server注册前,需要先从Eureka Server拉取注册表中的信息,这是服务发现的前提。通过将Eureka Server中的注册表信息缓存到本地,就可以就近获取其他服务的相关信息,减少与Eureka Server的网络通信。
拉取完Eureka Server中的注册表信息后,将对服务实例进行注册,代码如下所示
// DiscoveryClient.java
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
// 发起服务注册
if (!register() ) {
// 注册失败,抛出异常
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
throw new IllegalStateException(th);
}
}
initScheduledTasks(); // 初始化定时任务
在服务注册之前会进行注册预处理,Eureka没有对此提供默认实现。构造函数的最后将初始化并启动发送心跳、缓存刷新和按需注册等定时任务。
最后总结一下,在DiscoveryClient的构造函数中,主要依次做了以下的事情:
- 相关配置的赋值,类似ApplicationInfoManager、EurekaClientConfig等。
- 备份注册中心的初始化,默认没有实现。
- 拉取Eureka Server注册表中的信息。
- 注册前的预处理。
- 向Eureka Server注册自身。
- 初始化心跳定时任务、缓存刷新和按需注册等定时任务。
拉取注册表信息
在DiscoveryClient的构造函数中,调用了DiscoveryClient#fetchRegistry方法从Eureka Server中拉取注册表信息,方法执行如下所示:
//DiscoveryClient.java
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 如果增量式拉取被禁止,或者Applications为null,进行全量拉取
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1))
{
...
// 全量拉取注册表信息
getAndStoreFullRegistry();
} else {
// 增量拉取注册表信息
getAndUpdateDelta(applications);
}
// 计算应用集合一致性哈希码
applications.setAppsHashCode(applications.getReconcileHashCode());
// 打印注册表上所有服务实例的总数量
logTotalInstances();
} catch (Throwable e) {
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// 在更新远程实例状态之前推送缓存刷新事件,但是Eureka中并没有提供默认的事件监听器
onCacheRefreshed();
// 基于缓存中被刷新的数据更新远程实例状态
updateInstanceRemoteStatus();
// 注册表拉取成功,返回true
return true;
}
一般来讲,在Eureka客户端,除了第一次拉取注册表信息,之后的信息拉取都会尝试只进行增量拉取(第一次拉取注册表信息为全量拉取),下面将分别介绍拉取注册表信息的两种实现,全量拉取注册表信息DiscoveryClient#getAndStoreFullRegistry和增量式拉取注册表信息DiscoveryClient#getAndUpdateDelta。
- 全量拉取注册表信息
一般只有在第一次拉取的时候,才会进行注册表信息的全量拉取,主要在DiscoveryClient#getAndStoreFullRegistry方法中进行。代码如下所示:
// DiscoveryClient.java private void getAndStoreFullRegistry() throws Throwable { // 获取拉取的注册表的版本,防止拉取版本落后(由其他的线程引起) long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()): eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); // 获取成功 if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } if (apps == null) { // 日志 // 检查fetchRegistryGeneration的更新版本是否发生改变,无改变的话说明本次拉取是最新的 } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // 从apps中筛选出状态为UP的实例,同时打乱实例的顺序,防止同一个服务的不同实例在启动时 接受流量 localRegionApps.set(this.filterAndShuffle(apps)); } else { // 日志 } }
全量拉取将从Eureka Server中拉取注册表中所有的服务实例信息(封装在Applications中),并经过处理后替换掉本地注册表缓存Applications。
通过跟踪调用链,在AbstractJerseyEurekaHttpClient#getApplicationsInternal方法中发现了相关的请求url,接口地址为/eureka/apps 该接口位于Eureka Server中,可以直接访问,用于获取当前Eureka Server中持有的所有注册表信息。
getAndStoreFullRegistry方法可能被多个线程同时调用,导致新拉取的注册表被旧的注册表覆盖(有可能出现先拉取注册表信息的线程在覆盖apps时被阻塞,被后拉取注册表信息的线程抢先设置了apps,被阻塞的线程恢复后再次设置了apps,导致apps数据版本落后),产生脏数据,对此,Eureka通过类型为AtomicLong的currentUpdateGeneration对apps的更新版本进行跟踪。如果更新版本不一致,说明本次拉取注册表信息已过时,不需要缓存到本地。拉取到注册表信息之后会对获取到的apps进行筛选,只保留状态为UP的服务实例信息。
- 增量式拉取注册表信息
增量式的拉取方式,一般发生在第一次拉取注册表信息之后,拉取的信息定义为从某一段时间之后发生的所有变更信息,通常来讲是3分钟之内注册表的信息变化。在获取到更新的delta后,会根据delta中的增量更新对本地的数据进行更新。与getAndStoreFullRegistry方法一样,也通过fetchRegistryGeneration对更新的版本进行控制。增量式拉取是为了维护Eureka Client本地的注册表信息与Eureka Server注册表信息的一致性,防止数据过久而失效,采用增量式拉取的方式减少了拉取注册表信息的通信量。Client中有一个注册表缓存刷新定时器专门负责维护两者之间信息的同步性。但是当增量式拉取出现意外时,定时器将执行全量拉取以更新本地缓存的注册表信息。具体代码如下所示:
// DiscoveryClient.java private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } // 获取增量拉取失败 if (delta == null) { // 进行全量拉取 getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { // 更新本地缓存 updateDelta(delta); // 计算应用集合一致性哈希码 reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } // 比较应用集合一致性哈希码,如果不一致将认为本次增量式拉取数据已脏,将发起全量拉 取更新本地注册表信息 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig. shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); } } ... }
同理,在相同的位置也发现了增量式更新的url,/eureka/app/delta,可以直接访问 由于更新的过程过于漫长,时间成本为O(N^2),所以通过同步代码块防止多个线程同时进行更新,污染数据。
在根据从Eureka Server拉取的delta信息更新本地缓存的时候,Eureka定义了ActionType来标记变更状态,代码位于InstanceInfo类中,如下所示:
// InstanceInfo.java
public enum ActionType {
ADDED, // 添加Eureka Server
MODIFIED, // 在Euerka Server中的信息发生改变
DELETED // 被从Eureka Server中剔除
}
根据InstanceInfo#ActionType的不同,对delta中的InstanceInfo采取不同的操作,其中ADDED和MODIFIED状态变更的服务实例信息将添加到本地注册表,DELETED状态变更的服务实例将从本地注册表中删除。具体代码如下所示:
// DiscoveryClient.java
// 变更类型为ADDED
if (ActionType.ADDED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
// 添加到本地注册表中
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
// 变更类型为MODIFIED
} else if (ActionType.MODIFIED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
// 添加到本地注册表中
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
// 变更类型为DELETE
} else if (ActionType.DELETED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
// 从本地注册表中删除
applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
}
更新本地注册表缓存之后,Eureka Client会通过#getReconcileHashCode计算合并后的Applications的appsHashCode(应用集合一致性哈希码),和Eureka Server传递的delta上的appsHashCode进行比较(delta中携带的appsHashCode通过Eureka Server的全量注册表计算得出),比对客户端和服务端上注册表的差异。如果哈希值不一致的话将再调用一次getAndStoreFullRegistry获取全量数据保证Eureka Client与Eureka Server之间注册表数据的一致。代码如下所示:
// DiscoveryClient.java
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode);
}
reconcileAndLogDifference方法中将会执行拉取全量注册表信息操作。
appsHashCode的一般表示方式为:
appsHashCode = ${status}_${count}_
它通过将应用状态和数量拼接成字符串,表示了当前注册表中服务实例状态的统计信息。举个简单的例子,有10个应用实例的状态为UP,有5个应用实例状态为DOWN,其他状态的数量为0(不进行表示),那么appsHashCode的形式将是:
appsHashCode = UP_10_DOWN_5_
服务注册
在拉取完Eureka Server中的注册表信息并将其缓存在本地后,Eureka Client将向Eureka Server注册自身服务实例元数据,主要逻辑位于Discovery#register方法中。register方法代码如下所示:
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
throw e;
}
...
// 注册成功
return httpResponse.getStatusCode() == 204;
}
Eureka Client会将自身服务实例元数据(封装在InstanceInfo中)发送到Eureka Server中请求服务注册,当Eureka Server返回204状态码时,说明服务注册成功。
跟踪到AbstractJerseyEurekaHttpClient#register方法中,可以发现服务注册调用的接口以及传递的参数。注册接口地址为apps/${APP_NAME},传递参数为InstanceInfo,如果服务器返回204状态,则表明注册成功。
初始化定时任务
很明显,服务注册应该是一个持续的过程,Eureka Client通过定时发送心跳的方式与Eureka Server进行通信,维持自己在Server注册表上的租约。同时Eureka Server注册表中的服务实例信息是动态变化的,为了保持Eureka Client与Eureka Server的注册表信息的一致性,Eureka Client需要定时向Eureka Server拉取注册表信息并更新本地缓存。为了监控Eureka Client应用信息和状态的变化,Eureka Client设置了一个按需注册定时器,定时检查应用信息或者状态的变化,并在发生变化时向Eureka Server重新注册,避免注册表中的本服务实例信息不可用。
在DiscoveryClient#initScheduledTasks方法中初始化了三个定时器任务,一个用于向Eureka Server拉取注册表信息刷新本地缓存;一个用于向Eureka Server发送心跳;一个用于进行按需注册的操作。代码如下所示:
// DiscoveryClient.java
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// 注册表缓存刷新定时器
// 获取配置文件中刷新间隔,默认为30s,可以通过eureka.client.registry-fetch-
interval-seconds进行设置
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule(
new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor,
registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
// 发送心跳定时器,默认30秒发送一次心跳
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
// 心跳定时器
scheduler.schedule(
new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// 按需注册定时器
...
}
- 缓存刷新定时任务与发送心跳定时任务
在DiscoveryClient#initScheduledTasks方法中,通过ScheduledExecutorService#schedule的方式提交缓存刷新任务和发送心跳任务,任务执行的方式为延时执行并且不循环,这两个任务的定时循环逻辑由TimedSupervisorTask提供实现。TimedSupervisorTask继承了TimerTask,提供执行定时任务的功能。它在run方法中定义执行定时任务的逻辑。具体代码如下所示:
//TimedSupervisorTask.java public class TimedSupervisorTask extends TimerTask { ... public void run() { Future future = null; try { // 执行任务 future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); // 等待任务执行结果 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // 执行完成,设置下次任务执行频率(时间间隔) delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); } catch (TimeoutException e) { // 执行任务超时 timeoutCounter.increment(); // 设置下次任务执行频率(时间间隔) long currentDelay = delay.get(); long newDelay = Math.min(maxDelay, currentDelay * 2); delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { // 执行任务被拒绝 // 统计被拒绝次数 rejectedCounter.increment(); } catch (Throwable e) { // 其他的异常 // 统计异常次数 throwableCounter.increment(); } finally { // 取消未结束的任务 if (future != null) { future.cancel(true); } // 如果定时任务服务未关闭,定义下一次任务 if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } } }
run方法中存在以下的任务调度过程:
- scheduler初始化并延迟执行TimedSupervisorTask;
- TimedSupervisorTask将task提交executor中执行,task和executor在初始化TimedSupervisorTask时传入:
- 若task正常执行,TimedSupervisorTask将自己提交到scheduler,延迟delay时间后再次执行;
- 若task执行超时,计算新的delay时间(不超过maxDelay),TimedSupervisorTask将自己提交到scheduler,延迟delay时间后再次执行;
TimedSupervisorTask通过这种不断循环提交任务的方式,完成定时执行任务的要求。
在DiscoveryClient#initScheduledTasks方法中,提交缓存刷新定时任务的线程任务为CacheRefreshThread,提交发送心跳定时任务的线程为HeartbeatThread。CacheRefreshThread继承了Runnable接口,代码如下所示:
// DiscoveryClient.java class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } void refreshRegistry(){ ...//判断远程Region是否改变(即Eureka Server地址是否发生变化),决定进行全量拉取还是增量式拉取 boolean success = fetchRegistry(remoteRegionsModified); ...//打印更新注册表缓存后的变化 }
CacheRefreshThread线程任务将委托DiscoveryClient#fetchRegistry方法进行缓存刷新的具体操作。
HeartbeatThread同样继承了Runnable接口,该任务的作用是向Eureka Server发送心跳请求,维持Eureka Client在注册表中的租约。代码如下所示:
// DiscoveryClient.java
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
HeartbeatThread主要逻辑代码位于#renew方法中,代码如下所示:
//DiscovClient.java
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
// 调用HTTP发送心跳到Eureka Server中维持租约
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
// Eureka Server中不存在该应用实例,
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
// 重新注册
return register();
}
// 续约成功
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
return false;
}
}
Eureka Server会根据续租提交的appName与instanceInfoId来更新注册表中的服务实例的租约。当注册表中不存在该服务实例时,将返回404状态码,发送心跳请求的Eureka Client在接收到404状态后将会重新发起注册;如果续约成功,将会返回200状态码。
跟踪到AbstractJerseyEurekaHttpClient#sendHeartBeat方法中,可以发现服务续租调用的接口以及传递的参数 续租的接口地址为apps/${APP_NAME}/${INSTANCE_INFO_ID},HTTP方法为put,参数主要有status(当前服务的状态)、lastDirtyTimestamp(上次数据变化时间)以及overriddenStatus。
- 按需注册定时任务
按需注册定时任务的作用是当Eureka Client中的InstanceInfo或者status发生变化时,重新向Eureka Server发起注册请求,更新注册表中的服务实例信息,保证Eureka Server注册表中服务实例信息有效和可用。按需注册定时任务的代码如下:
// DiscoveryClient.java // 定时检查刷新服务实例信息,检查是否有变化,是否需要重新注册 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // 监控应用的status变化,发生变化即可发起重新注册 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { ... instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { // 注册应用状态改变监控器 applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 启动定时按需注册定时任务 instanceInfoReplicator.start(clientConfig. getInitialInstanceInfoReplicationIntervalSeconds()); }
按需注册分为两部分,一部分是定义了一个定时任务,定时刷新服务实例的信息和检查应用状态的变化,在服务实例信息发生改变的情况下向Eureka Server重新发起注册操作;另一部分是注册了状态改变监控器,在应用状态发生变化时,刷新服务实例信息,在服务实例信息发生改变的情况下向Eureka Server重新发起注册操作。InstanceInfoReplicator中的定时任务逻辑位于#run方法中,如下所示:
// InstanceInfoReplicator.java public void run() { try { // 刷新了InstanceInfo中的服务实例信息 discoveryClient.refreshInstanceInfo(); // 如果数据发生更改,则返回数据更新时间 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { // 注册服务实例 discoveryClient.register(); // 重置更新状态 instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { // 执行下一个延时任务 Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
DiscoveryClient中刷新本地服务实例信息和检查服务状态变化的代码如下:
// DiscoveryClient.java void refreshInstanceInfo() { // 刷新服务实例信息 applicationInfoManager.refreshDataCenterInfoIfRequired(); // 更新租约信息 applicationInfoManager.refreshLeaseInfoIfRequired(); InstanceStatus status; try { // 调用healthCheckHandler检查服务实例的状态变化 status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); } catch (Exception e) { status = InstanceStatus.DOWN; } if (null != status) { applicationInfoManager.setInstanceStatus(status); } }
run方法首先调用了discoveryClient#refreshInstanceInfo方法刷新当前的服务实例信息,查看当前服务实例信息和服务状态是否发生变化,如果当前服务实例信息或者服务状态发生变化将向Eureka Server重新发起服务注册操作。最后声明了下一个延时任务,用于再次调用run方法,继续检查服务实例信息和服务状态的变化,在服务实例信息发生变化的情况下重新发起注册。
如果Eureka Client的状态发生变化(在Spring Boot通过Actuator对服务状态进行监控,具体实现为EurekaHealthCheckHandler),注册在ApplicationInfoManager的状态改变监控器将会被触发,从而调用InstanceInfoReplicator#onDemandUpdate方法,检查服务实例信息和服务状态的变化,可能会引发按需注册任务。代码如下所示:
//InstanceInfoReplicator.java
public boolean onDemandUpdate() {
// 控制流量,当超过限制时,不能进行按需更新
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
scheduler.submit(new Runnable() {
@Override
public void run() {
Future latestPeriodic = scheduledPeriodicRef.get();
// 取消上次#run任务
if (latestPeriodic != null && !latestPeriodic.isDone()) {
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
return false;
}
}
InstanceInfoReplicator#onDemandUpdate方法调用InstanceInfoReplicator#run方法检查服务实例信息和服务状态的变化,并在服务实例信息发生变化的情况下向Eureka Server发起重新注册的请求。为了防止重复执行run方法,onDemandUpdate方法还会取消执行上次已提交且未完成的run方法,执行最新的按需注册任务。
服务下线
一般情况下,应用服务在关闭的时候,Eureka Client会主动向Eureka Server注销自身在注册表中的信息。DiscoveryClient中对象销毁前执行的清理方法如下所示:
// DiscoveryClient.java
@PreDestroy
@Override
public synchronized void shutdown() {
// 同步方法
if (isShutdown.compareAndSet(false, true)) {
// 原子操作,确保只会执行一次
if (statusChangeListener != null && applicationInfoManager != null) {
// 注销状态监听器
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
// 取消定时任务
cancelScheduledTasks();
if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
// 服务下线
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
// 关闭Jersy客户端
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
// 关闭相关Monitor
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
}
}
在销毁DiscoveryClient之前,会进行一系列清理工作,包括注销ApplicationInfoManager中的StatusChangeListener、取消定时任务、服务下线和关闭Jersey客户端等。我们主要关注unregister服务下线方法,其实现代码如下所示:
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
} catch (Exception e) {
...
}
}
}
跟踪到AbstractJerseyEurekaHttpClient#cancel方法中,可以发现服务下线调用的接口以及传递的参数,代码如下所示:
@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath). getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder.delete(ClientResponse.class);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf (response)).build();
} finally {
if (response != null) {
response.close();
}
}
}
服务下线的接口地址为apps/${APP_NAME}/${INSTANCE_INFO_ID},传递参数为服务名和服务实例id,HTTP方法为delete。