nacos-sdk-go, go 版本sdk master分支对应1.x版本, 2.x版本需要切换到2.0.0分支, 目前是beta版, 本次分析的是master版本上的实现,不包含gRPC部分
服务发现部分代码在 clients/naming_client/ 目录下
API
sdk的接口定义在 INamingClient 的interface中, 任何具名的 naming_client 需要实现下面这些接口
// 注册实例
RegisterInstance(param vo.RegisterInstanceParam) (bool, error)
// 注销实例
DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error)
// 修改实例
UpdateInstance(param vo.UpdateInstanceParam) (bool, error)
// 获取服务信息
GetService(param vo.GetServiceParam) (model.Service, error)
// 获取所有的实例列表
SelectAllInstances(param vo.SelectAllInstancesParam) ([]model.Instance, error)
// 获取实例列表
SelectInstances(param vo.SelectInstancesParam) ([]model.Instance, error)
// 获取一个健康的实例(加权随机轮询)
SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error)
// 监听服务变化
Subscribe(param *vo.SubscribeParam) error
// 取消服务监听
Unsubscribe(param *vo.SubscribeParam) error
// 获取服务名列表
GetAllServicesInfo(param vo.GetAllServiceInfoParam) (model.ServiceList, error)
通过这一层的抽象可以使用http和gRPC两种方式实现具体的 naming_client,
SDK定义的接口和OPEN-API相比,主要有以下不同
- SelectOneHealthyInstance 这个可以看做是一个客户端实现的负载均衡
- 少了直接暴露的心跳方法,需要确认如何实现了心跳
- Subscribe / Unsubscribe 两个方法,用于(取消)监听服务的变化,这里应该可以解答之前阅读open-api接口的时候留下的疑问
源码分析
上面与open-api的几个差异,第一个是负载均衡相关的,暂时忽略,带着2和3两个问题,我们看一下相关的源码实现
心跳
先看第2点,如何实现心跳的。go sdk中name_client包中定义了一个 BeatReactor 结构,它包含了AddBeatInfo 和 RemoveBeatInfo两个公开的方法,来看一下实现
func (br *BeatReactor) AddBeatInfo(serviceName string, beatInfo model.BeatInfo) {
logger.Infof("adding beat: <%s> to beat map", util.ToJsonString(beatInfo))
k := buildKey(serviceName, beatInfo.Ip, beatInfo.Port)
defer br.mux.Unlock()
br.mux.Lock()
if data, ok := br.beatMap.Get(k); ok {
beatInfo := data.(*model.BeatInfo)
atomic.StoreInt32(&beatInfo.State, int32(model.StateShutdown))
br.beatMap.Remove(k)
}
br.beatMap.Set(k, &beatInfo)
go br.sendInstanceBeat(k, &beatInfo)
}
func (br *BeatReactor) RemoveBeatInfo(serviceName string, ip string, port uint64) {
logger.Infof("remove beat: %s@%s:%d from beat map", serviceName, ip, port)
k := buildKey(serviceName, ip, port)
defer br.mux.Unlock()
br.mux.Lock()
data, exist := br.beatMap.Get(k)
if exist {
beatInfo := data.(*model.BeatInfo)
atomic.StoreInt32(&beatInfo.State, int32(model.StateShutdown))
}
br.beatMap.Remove(k)
}
AddBeatInfo 主要是往beatMap中添加betaInfo,然后起一个协程执行 sendInstanceBeat 保持心跳,RemoveBeatInfo 则是从betaMap中移除掉该betaInfo
这两个方法中针对beatMap中如果已经存在betaInfo时,会更新一下betaInfo的状态,用于停止发送即将移除(更新)的心跳请求
再来看一下 sendInstanceBeat 的实现
func (br *BeatReactor) sendInstanceBeat(k string, beatInfo *model.BeatInfo) {
for {
br.beatThreadSemaphore.Acquire()
//如果当前实例注销,则进行停止心跳
if atomic.LoadInt32(&beatInfo.State) == int32(model.StateShutdown) {
logger.Infof("instance[%s] stop heartBeating", k)
br.beatThreadSemaphore.Release()
return
}
//进行心跳通信
beatInterval, err := br.serviceProxy.SendBeat(*beatInfo)
if err != nil {
logger.Errorf("beat to server return error:%+v", err)
br.beatThreadSemaphore.Release()
t := time.NewTimer(beatInfo.Period)
<-t.C
continue
}
if beatInterval > 0 {
beatInfo.Period = time.Duration(time.Millisecond.Nanoseconds() * beatInterval)
}
br.beatRecordMap.Set(k, util.CurrentMillis())
br.beatThreadSemaphore.Release()
t := time.NewTimer(beatInfo.Period)
<-t.C
}
}
sendInstanceBeat 内部是一个for循环,首先会获取信号量,获取成功后会先检查实例是否停止,如果停止了则心跳也停止并退出协程,否则发送心跳,如果心跳发送失败则隔betaInfo.Period后重新发送心跳,如果成功则更新服务返回的betaInterval到betaInfo.Period中,并记录心跳的时间,再启动一个定时器并等待下一次心跳
通过检索代码,发现 clients/naming_client/naming_client.go 中在注册服务实例 RegisterInstance 时,会判断是否是临时的实例,如果是临时的实例就会为该实例添加一个beat
// RegisterInstance
if instance.Ephemeral {
sc.beatReactor.AddBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), beatInfo)
}
// DeregisterInstance
sc.beatReactor.RemoveBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), param.Ip, param.Port)
// UpdateInstance
if param.Ephemeral {
// Update the heartbeat information first to prevent the information
// from being flushed back to the original information after reconnecting
sc.beatReactor.RemoveBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), param.Ip, param.Port)
beatInfo := model.BeatInfo{
Ip: param.Ip,
Port: param.Port,
Metadata: param.Metadata,
ServiceName: util.GetGroupName(param.ServiceName, param.GroupName),
Cluster: param.ClusterName,
Weight: param.Weight,
Period: util.GetDurationWithDefault(param.Metadata, constant.HEART_BEAT_INTERVAL, time.Second*5),
State: model.StateRunning,
}
sc.beatReactor.AddBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), beatInfo)
}
类似的注销实例时,会移除beat, 以及更新实例信息时会对临时节点先移除beat然后再重新添加。
看到这里,第2个问题有答案了,go sdk中 INamingClient 接口虽然没有直接暴露方法,实际的实现时,心跳保活已经包含在实例的注册/注销过程中了
订阅
下面再来看一下第三个问题订阅逻辑如何实现的
// 服务监听
func (sc *NamingClient) Subscribe(param *vo.SubscribeParam) error {
if len(param.GroupName) == 0 {
param.GroupName = constant.DEFAULT_GROUP
}
serviceParam := vo.GetServiceParam{
ServiceName: param.ServiceName,
GroupName: param.GroupName,
Clusters: param.Clusters,
}
sc.subCallback.AddCallbackFuncs(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), ¶m.SubscribeCallback)
svc, err := sc.GetService(serviceParam)
if err != nil {
return err
}
if !sc.hostReactor.serviceProxy.clientConfig.NotLoadCacheAtStart {
sc.subCallback.ServiceChanged(&svc)
}
return nil
}
Subscribe 的实现比较简单,入参是一个 SubscribeParam 结构体,必须包含 serviceName 和 SubscribeCallback,首先将回调函数注册到 subCallback中,subCallback结构中包含了一个ConcurrentMap, 用于存储 serviceName和clusters 生成的key 和 对应的回调函数列表。
接下来直接调用GetService方法,实际是 hostReactor.GetServiceInfo 的一个封装,代码如下
// 获取服务列表
func (sc *NamingClient) GetService(param vo.GetServiceParam) (model.Service, error) {
if len(param.GroupName) == 0 {
param.GroupName = constant.DEFAULT_GROUP
}
service, err := sc.hostReactor.GetServiceInfo(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
return service, err
}
type HostReactor struct {
serviceInfoMap cache.ConcurrentMap
cacheDir string
updateThreadNum int
serviceProxy NamingProxy
pushReceiver PushReceiver
subCallback SubscribeCallback
updateTimeMap cache.ConcurrentMap
updateCacheWhenEmpty bool
}
func (hr *HostReactor) GetServiceInfo(serviceName string, clusters string) (model.Service, error) {
key := util.GetServiceCacheKey(serviceName, clusters)
cacheService, ok := hr.serviceInfoMap.Get(key)
if !ok {
hr.updateServiceNow(serviceName, clusters)
if cacheService, ok = hr.serviceInfoMap.Get(key); !ok {
return model.Service{}, errors.New("get service info failed")
}
}
return cacheService.(model.Service), nil
}
func (hr *HostReactor) updateServiceNow(serviceName, clusters string) {
result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)
if err != nil {
logger.Errorf("QueryList return error!serviceName:%s cluster:%s err:%+v", serviceName, clusters, err)
return
}
if result == "" {
logger.Errorf("QueryList result is empty!serviceName:%s cluster:%s", serviceName, clusters)
return
}
hr.ProcessServiceJson(result)
}
func (hr *HostReactor) ProcessServiceJson(result string) {
service := util.JsonToService(result)
if service == nil {
return
}
cacheKey := util.GetServiceCacheKey(service.Name, service.Clusters)
oldDomain, ok := hr.serviceInfoMap.Get(cacheKey)
if ok && !hr.updateCacheWhenEmpty {
//if instance list is empty,not to update cache
if service.Hosts == nil || len(service.Hosts) == 0 {
logger.Errorf("do not have useful host, ignore it, name:%s", service.Name)
return
}
}
hr.updateTimeMap.Set(cacheKey, uint64(util.CurrentMillis()))
hr.serviceInfoMap.Set(cacheKey, *service)
if !ok || ok && !reflect.DeepEqual(service.Hosts, oldDomain.(model.Service).Hosts) {
if !ok {
logger.Info("service not found in cache " + cacheKey)
} else {
logger.Info("service key:%s was updated to:%s", cacheKey, util.ToJsonString(service))
}
cache.WriteServicesToFile(*service, hr.cacheDir)
hr.subCallback.ServiceChanged(service)
}
}
hostReactor是HostReactor的实例, GetServiceInfo是其暴露的方法, 其流程是先从本地缓存中查找是否有service, 如果没有就触发 updateServiceNow, 通过service的代理 QueryList 查询指定的serviceName的实例列表, 查询到结果后交给 ProcessServiceJson 判断是否需要更新缓存,如果更新了缓存则触发对应service注册的回调。到这里
这里还有一处细节是 通过 hr.serviceProxy.QueryList 查询实例列表时,实际的接口和open-api中查询serviceName下的实例列表一模一样,也是 /v1/ns/instance/list, 差别是第三个参数传入了一个pushReceiver.port,对应于udpPort, 这个参数open-api中没有暴露出来, 此外还有一个app参数也没暴露出来。
pushReceiver是HostReactor中的成员, 它在NewHostReactor时会一同初始化
func NewHostReactor(serviceProxy NamingProxy, cacheDir string, updateThreadNum int, notLoadCacheAtStart bool, subCallback SubscribeCallback, updateCacheWhenEmpty bool) HostReactor {
if updateThreadNum <= 0 {
updateThreadNum = Default_Update_Thread_Num
}
hr := HostReactor{
serviceProxy: serviceProxy,
cacheDir: cacheDir,
updateThreadNum: updateThreadNum,
serviceInfoMap: cache.NewConcurrentMap(),
subCallback: subCallback,
updateTimeMap: cache.NewConcurrentMap(),
updateCacheWhenEmpty: updateCacheWhenEmpty,
}
pr := NewPushReceiver(&hr)
hr.pushReceiver = *pr
if !notLoadCacheAtStart {
hr.loadCacheFromDisk()
}
go hr.asyncUpdateService()
return hr
}
pushReceiver 通过 NewPushReceiver 初始化,代码如下
func NewPushReceiver(hostReactor *HostReactor) *PushReceiver {
pr := PushReceiver{
hostReactor: hostReactor,
}
pr.startServer()
return &pr
}
func (us *PushReceiver) startServer() {
conn := us.getConn()
go func() {
defer func() {
if conn != nil {
conn.Close()
}
}()
for {
us.handleClient(conn)
}
}()
}
func (us *PushReceiver) getConn() *net.UDPConn {
var conn *net.UDPConn
for i := 0; i < 3; i++ {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
port := r.Intn(1000) + 54951
us.port = port
conn1, ok := us.tryListen()
if ok {
conn = conn1
logger.Infof("udp server start, port: " + strconv.Itoa(port))
return conn
}
if !ok && i == 2 {
logger.Errorf("failed to start udp server after trying 3 times.")
}
}
return nil
}
可以看到 NewPushReceiver 启动了一个UDP服务, 然后等待处理发送过来的udp包,nacos服务就是根据 /v1/ns/instance/list 接口中的clientIP和udpPort 知道往这里推送服务变更的消息
func (us *PushReceiver) handleClient(conn *net.UDPConn) {
if conn == nil {
time.Sleep(time.Second * 5)
conn = us.getConn()
if conn == nil {
return
}
}
data := make([]byte, 4024)
n, remoteAddr, err := conn.ReadFromUDP(data)
if err != nil {
logger.Errorf("failed to read UDP msg because of %+v", err)
return
}
s := TryDecompressData(data[:n])
logger.Info("receive push: "+s+" from: ", remoteAddr)
var pushData PushData
err1 := json.Unmarshal([]byte(s), &pushData)
if err1 != nil {
logger.Infof("failed to process push data.err:%+v", err1)
return
}
ack := make(map[string]string)
if pushData.PushType == "dom" || pushData.PushType == "service" {
us.hostReactor.ProcessServiceJson(pushData.Data)
ack["type"] = "push-ack"
ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10)
ack["data"] = ""
} else if pushData.PushType == "dump" {
ack["type"] = "dump-ack"
ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10)
ack["data"] = util.ToJsonString(us.hostReactor.serviceInfoMap)
} else {
ack["type"] = "unknow-ack"
ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10)
ack["data"] = ""
}
bs, _ := json.Marshal(ack)
c, err := conn.WriteToUDP(bs, remoteAddr)
if err != nil {
logger.Errorf("WriteToUDP failed,return:%d,err:%+v", c, err)
}
}
具体业务逻辑在上面的 handleClient 中,主要逻辑在PushType是dom或者service时,也会交给ProcessServiceJson 判断是否需要更新缓存,同样如果更新了缓存则触发对应service注册的回调。
代码分析到这里,基本可以回答前面的问题了,nacos go sdk 通过创建naming_client时,启动一个udp server 来接收推送的,而向服务订阅serviceName的变更则是通过open-api的 /v1/ns/instance/list 接口中附加clientIP和udpPort实现的。
验证
针对上面的源码分析,参照sdk的代码, 写了一个udp server demo 来验证
先调用 /v1/ns/instance/list 接口,带上clientIP和udpPort, 再注册和注销服务,观察服务日志打印,要注意的是需要指定一些特殊的请求头,因为nacos服务只会对特定的sdk版本进行udp推送
#!/bin/bash
curl -H "Client-Version: Nacos-Go-Client:v1.0.1" \
-H "User-Agent: Nacos-Go-Client:v1.0.1" \
-H "Request-Module: Naming" \
-H "Content-Type: application/x-www-form-urlencoded;charset=utf-8" \
-X GET \
'http://172.16.57.42:8848/nacos/v1/ns/instance/list?serviceName=nacos.test.3&udpPort=65013&clientIP=172.16.57.42'
可以看到确实可以收到推送过来的消息, demo暂时没有回ack, 可能服务会重试发送,但是已经可以说明订阅流程是通的
Start udp server at 172.16.57.42:65013
Recv data: {"type":"dom","data":"{\"name\":\"DEFAULT_GROUP@@nacos.test.3\",\"clusters\":\"\",\"cacheMillis\":10000,\"hosts\":[],\"lastRefTime\":1634608400805,\"checksum\":\"\",\"allIPs\":false,\"reachProtectionThreshold\":false,\"valid\":true}","lastRefTime":1271435940556204}
Recv data: {"type":"dom","data":"{\"name\":\"DEFAULT_GROUP@@nacos.test.3\",\"clusters\":\"\",\"cacheMillis\":10000,\"hosts\":[],\"lastRefTime\":1634608400805,\"checksum\":\"\",\"allIPs\":false,\"reachProtectionThreshold\":false,\"valid\":true}","lastRefTime":1271435940556204}
Recv data: {"type":"dom","data":"{\"name\":\"DEFAULT_GROUP@@nacos.test.3\",\"clusters\":\"\",\"cacheMillis\":10000,\"hosts\":[{\"instanceId\":\"172.16.57.42#8849#DEFAULT#DEFAULT_GROUP@@nacos.test.3\",\"ip\":\"172.16.57.42\",\"port\":8849,\"weight\":1.0,\"healthy\":true,\"enabled\":true,\"ephemeral\":true,\"clusterName\":\"DEFAULT\",\"serviceName\":\"DEFAULT_GROUP@@nacos.test.3\",\"metadata\":{},\"instanceHeartBeatInterval\":5000,\"instanceHeartBeatTimeOut\":15000,\"ipDeleteTimeout\":30000,\"instanceIdGenerator\":\"simple\"}],\"lastRefTime\":1634608413716,\"checksum\":\"\",\"allIPs\":false,\"reachProtectionThreshold\":false,\"valid\":true}","lastRefTime":1271448808976688}
Recv data: {"type":"dom","data":"{\"name\":\"DEFAULT_GROUP@@nacos.test.3\",\"clusters\":\"\",\"cacheMillis\":10000,\"hosts\":[{\"instanceId\":\"172.16.57.42#8849#DEFAULT#DEFAULT_GROUP@@nacos.test.3\",\"ip\":\"172.16.57.42\",\"port\":8849,\"weight\":1.0,\"healthy\":true,\"enabled\":true,\"ephemeral\":true,\"clusterName\":\"DEFAULT\",\"serviceName\":\"DEFAULT_GROUP@@nacos.test.3\",\"metadata\":{},\"instanceHeartBeatInterval\":5000,\"instanceHeartBeatTimeOut\":15000,\"ipDeleteTimeout\":30000,\"instanceIdGenerator\":\"simple\"}],\"lastRefTime\":1634608413716,\"checksum\":\"\",\"allIPs\":false,\"reachProtectionThreshold\":false,\"valid\":true}","lastRefTime":1271448808976688}