今天看啥  ›  专栏  ›  BlackSwift

EurekaClient的RPC与心跳分析

BlackSwift  · 简书  ·  · 2018-03-02 00:56

Eureka是SpringCloud中推荐的NamingServer,自然就少不了心跳。如果要是来类比的话,我认为DDNS与本文所讲比较类似。

本文结论

Eureka的客户端启动后开了2个轮询线程池

  • 通过定时put发送实例心跳
  • 通过定时GET获取增量信息
  • 上述心跳与增量信息均不是原子操作,但是有最终一致性

说完了结论,我们现在就可以开始动手抓包了

抓包准备工作

本文直接上Wireshark进行抓包分析(再发个感慨,Eureka的几个HTTP包相比LTE通信中的RRC等信令抓包实在是太简单了,同样也是服务发现,通信专业比所谓的微服务难很多,但工资却不如IT)

准备工作

  • 配置netflix的日志级别为DEBUG的多台Client/Server程序
  • Wireshark抓包工具,并配置好监听的网卡(如果你用的是localhost,一般是选择Loopback)

术语概念

appID is the name of the application and instanceID is the unique id associated with the instance. In AWS cloud, instanceID is the instance id of the instance and in other data centers, it is the hostname of the instance.

心跳抓包

抓包准备

启动EurekaServer,打开Wireshark,选择你要监听的网卡(如果你用的是localhost,一般是选择Loopback),然后配置如下过滤器

http&&tcp.port==8761

单机Client启动与心跳(RENEW)场景

如下是Server早已经启动后,Client从零开始发送的所有请求

Time Info
26.609128 GET /eureka/apps/ HTTP/1.1
26.610332 HTTP/1.1 200 (application/json)
26.78631 POST /eureka/apps/API-PROD-SZ1 HTTP/1.1 (application/json)
56.406603 HTTP/1.1 204
56.673179 GET /eureka/apps/ HTTP/1.1
56.67615 HTTP/1.1 200 (application/json)
56.676931 PUT /eureka/apps/API-PROD-SZ1/10.0.0.4:api-prod-sz1?status=UP&lastDirtyTimestamp=1519311933974 HTTP/1.1
56.679674 HTTP/1.1 200
86.688146 PUT /eureka/apps/API-PROD-SZ1/10.0.0.4:api-prod-sz1?status=UP&lastDirtyTimestamp=1519311933974 HTTP/1.1
86.691646 HTTP/1.1 200
86.725409 GET /eureka/apps/delta HTTP/1.1
86.729934 HTTP/1.1 200 (application/json)

其中获取增量更新Delta的返回如下,也就是一个Diff操作

{"applications":{"versions__delta":"11","apps__hashcode":"UP_1_","application":[]}}

而心跳就更简单了,只是一个PUT操作更新实例

节点的正常上线

在当前负载下额外启动一个Client,可以发现如下现象

  • 新增Client通过POST发送当前实例信息给Server
  • 其它Client通过GET增量信息接受新增Client的信息
  • 当POST发布实例操作没有完成时,其它Client获取的delta是空白的;操作完成后,其它Client获取的delta有了新增的实例

节点的正常下线

清空Wireshark日志,然后Kill 15关闭Eureka的某个Client,可以发现有如下请求发出

首先POST报文将JSON中的状态配置为DOWN

POST /eureka/apps/API-PROD-SZ1 HTTP/1.1  (application/json)

接着删除了此APP的实例(无论剩下的Client有几个都发送了)

DELETE /eureka/apps/API-PROD-SZ1/10.0.0.4:api-prod-sz1 HTTP/1.1 

其中

  • appID: API-PROD-SZ1
  • instanceID: 10.0.0.4:api-prod-sz1

节点的异常下线

异常下线后,判断逻辑肯定在Server端,本文暂时不分析。当然默认是90s后自动下线。

抓包结论

由上面可以得出如下结论

  • POST发布app全量实例的操作不是一个(阻塞的)原子操作
  • 通过轮询全量或增量同步应用信息,但是Eureka不保证各个节点的Consistence(也就是CAP的C没法保证),但是在多次轮询后可以达到最终一致性
  • 心跳本身很简单,只是PUT应用的实例信息

Java侧请求

在EurekaClient的构造函数中,主要有两步操作:第一步反序列化配置文件,第二步启动定时线程池(心跳与更新缓存),下文简要提供相关断点位置

首先进入构造函数

com.netflix.discovery.DiscoveryClient#DiscoveryClient

通过分析,可以发现在本地缓存如下

// 本地通过CAS实现
private final AtomicReference<Applications> localRegionApps = new AtomicReference<>();

通过REST接口反查第一次RPC请求断点位于,将获取全量的APP信息

com.netflix.discovery.DiscoveryClient#fetchRegistry

后续将通过轮询进行增量更新与心跳

全量更新

当本地缓存为空时,将进行全量更新

com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry

通过CAS保证本地线程安全

 if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
     // 此处的过滤操作类似于groupBy操作符过滤出状态为up的实例
     localRegionApps.set(this.filterAndShuffle(apps));
     logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
 } else {
     logger.warn("Not updating applications as another thread is updating it already");
 }

增量更新

当通过定时轮询从服务端获取到增量更新(Applications对象)后,将在本地CAS锁(ReentrantLock)更新

//com.netflix.discovery.DiscoveryClient#getAndUpdateDelta
if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
    logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
    String reconcileHashCode = "";
    if (fetchRegistryUpdateLock.tryLock()) {
        try {
            // 通过ActionType更新hashSet,此处内部也有锁
            updateDelta(delta);
            reconcileHashCode = getReconcileHashCode(applications);
        } finally {
            fetchRegistryUpdateLock.unlock();
        }
    } else {
        logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
    }
}

updateDelta

此部分比较简单,通过遍历并进行ActionType模式匹配更新Application列表中的状态

本地Hash校验

此处虽然有个所谓的一致性Hash的唬人名字,实际上就是本地与远程的对比,类似于分段下载完iso文档后进行CheckSum校验

举一个例子,比如当前有如下的机器

[
    "vip1": [UP, UP]
    "vip2": [UP, UP]
    "appHashCode": "UP_4_"
]

其中某一台挂了,服务端返回如下

[
    "vip1": [UP, UP]
    "vip2": [UP, DONW]
    "appHashCode": "DOWN_1_UP_3_"
]

本地增量更新远程的状态(updateDelta)后,也将进行Hash计算(getReconcileHashCode),并与远程的计算结果相对比。

此处校验算法虽然有一堆for循环,但是它实际上是类似Groovy中函数式编程countBy的实现,伪代码如下

// getReconcileHashCode 的伪代码
// 本地增量后的 localRegionApps
def localRegionApps = ["UP",'UP','DOWN','DOWN','UP','UP','UP','UP'];
getReconcileHashCode(list){
    def map = list.countBy {it}
    //=>{UP=6, DOWN=2}
    def list = map.collect { k, v -> k + "_" + v + "_" }
    //=>[UP_6_, DOWN_2_]
    // 注意,此处仅仅为伪代码,因为真实使用KetSet遍历的是基于TreeMap(按照Key)进行排序
    def hash = list.inject("") { old, it -> it.concat(old) }
    //=>DOWN_2_UP_6_
}
getReconcileHashCode(localRegionApps)
// => DOWN_2_UP_6_

如果hashCode相同,那么此次更新就成功了;如果hashCode不相同(我还没有见过,肯定是在本地合并的那一步),将会进行全量更新

Hash碰撞特例

上面的Hash算法太简单了,比如

[
    "vip1": [UP, UP]
    "vip2": [UP, DONW]
    "appHashCode": "DOWN_1_UP_3_"
]

变成了

[
    "vip1": [UP, DONW]
    "vip2": [UP, UP]
    "appHashCode": "DOWN_1_UP_3_"
]

此时本地updateDelta也更新失败的话,那么这次增量更新校验却被认为是更新成功了,这里的就存在碰撞问题。

此处待确认。

附录

服务端代码位置

此部分的服务端代码在如下位置

eureka-core-1.8.6.jar!/com/netflix/eureka/resources

RPC实现

Eureka在内部均采用了sun的jersey作为HTTP请求客户端,你可以把它类比为OkHttp或者HttpClient

例如获取Application就调用了如下

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplications

注意这里的Java代码也是通过Override闭包回掉的形式来实现分层的,与ServiceComb有点类似,因此读者在打断点时要明白代码并不一定是从上到下走的

如果你第一次看Eureka的源码,建议将下的所有AbstractJerseyEurekaHttpClient下的Jersey HTTP字符串日志相关的行都打上断点,先分析再把断点读薄。

参考

聊聊分布式散列表(DHT)的原理——以 Kademlia(Kad) 和 Chord 为例




原文地址:访问原文地址
快照地址: 访问文章快照