nacos-sdk-go 1.x服务发现源码解读

nacos-sdk-go, go 版本sdk master分支对应1.x版本, 2.x版本需要切换到2.0.0分支, 目前是beta版, 本次分析的是master版本上的实现,不包含gRPC部分

服务发现部分代码在 clients/naming_client/ 目录下

API

sdk的接口定义在 INamingClient 的interface中, 任何具名的 naming_client 需要实现下面这些接口

// 注册实例
RegisterInstance(param vo.RegisterInstanceParam) (bool, error)


// 注销实例
DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error)


// 修改实例
UpdateInstance(param vo.UpdateInstanceParam) (bool, error)


// 获取服务信息
GetService(param vo.GetServiceParam) (model.Service, error)


// 获取所有的实例列表
SelectAllInstances(param vo.SelectAllInstancesParam) ([]model.Instance, error)


// 获取实例列表
SelectInstances(param vo.SelectInstancesParam) ([]model.Instance, error)


// 获取一个健康的实例(加权随机轮询)
SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error)


// 监听服务变化
Subscribe(param *vo.SubscribeParam) error


// 取消服务监听
Unsubscribe(param *vo.SubscribeParam) error


// 获取服务名列表
GetAllServicesInfo(param vo.GetAllServiceInfoParam) (model.ServiceList, error)

通过这一层的抽象可以使用http和gRPC两种方式实现具体的 naming_client,

SDK定义的接口和OPEN-API相比,主要有以下不同

  1. SelectOneHealthyInstance 这个可以看做是一个客户端实现的负载均衡
  2. 少了直接暴露的心跳方法,需要确认如何实现了心跳
  3. Subscribe / Unsubscribe 两个方法,用于(取消)监听服务的变化,这里应该可以解答之前阅读open-api接口的时候留下的疑问

源码分析

上面与open-api的几个差异,第一个是负载均衡相关的,暂时忽略,带着2和3两个问题,我们看一下相关的源码实现

心跳

先看第2点,如何实现心跳的。go sdk中name_client包中定义了一个 BeatReactor 结构,它包含了AddBeatInfoRemoveBeatInfo两个公开的方法,来看一下实现

func (br *BeatReactor) AddBeatInfo(serviceName string, beatInfo model.BeatInfo) {
        logger.Infof("adding beat: <%s> to beat map", util.ToJsonString(beatInfo))
        k := buildKey(serviceName, beatInfo.Ip, beatInfo.Port)
        defer br.mux.Unlock()
        br.mux.Lock()
        if data, ok := br.beatMap.Get(k); ok {
                beatInfo := data.(*model.BeatInfo)
                atomic.StoreInt32(&beatInfo.State, int32(model.StateShutdown))
                br.beatMap.Remove(k)
        }
        br.beatMap.Set(k, &beatInfo)
        go br.sendInstanceBeat(k, &beatInfo)
}


func (br *BeatReactor) RemoveBeatInfo(serviceName string, ip string, port uint64) {
        logger.Infof("remove beat: %s@%s:%d from beat map", serviceName, ip, port)
        k := buildKey(serviceName, ip, port)
        defer br.mux.Unlock()
        br.mux.Lock()
        data, exist := br.beatMap.Get(k)
        if exist {
                beatInfo := data.(*model.BeatInfo)
                atomic.StoreInt32(&beatInfo.State, int32(model.StateShutdown))
        }
        br.beatMap.Remove(k)
}

AddBeatInfo 主要是往beatMap中添加betaInfo,然后起一个协程执行 sendInstanceBeat 保持心跳,RemoveBeatInfo 则是从betaMap中移除掉该betaInfo

这两个方法中针对beatMap中如果已经存在betaInfo时,会更新一下betaInfo的状态,用于停止发送即将移除(更新)的心跳请求

再来看一下 sendInstanceBeat 的实现

func (br *BeatReactor) sendInstanceBeat(k string, beatInfo *model.BeatInfo) {
        for {
                br.beatThreadSemaphore.Acquire()
                //如果当前实例注销,则进行停止心跳
                if atomic.LoadInt32(&beatInfo.State) == int32(model.StateShutdown) {
                        logger.Infof("instance[%s] stop heartBeating", k)
                        br.beatThreadSemaphore.Release()
                        return
                }

                //进行心跳通信
                beatInterval, err := br.serviceProxy.SendBeat(*beatInfo)
                if err != nil {
                        logger.Errorf("beat to server return error:%+v", err)
                        br.beatThreadSemaphore.Release()
                        t := time.NewTimer(beatInfo.Period)
                        <-t.C
                        continue
                }
                if beatInterval > 0 {
                        beatInfo.Period = time.Duration(time.Millisecond.Nanoseconds() * beatInterval)
                }

                br.beatRecordMap.Set(k, util.CurrentMillis())
                br.beatThreadSemaphore.Release()

                t := time.NewTimer(beatInfo.Period)
                <-t.C
        }
}

sendInstanceBeat 内部是一个for循环,首先会获取信号量,获取成功后会先检查实例是否停止,如果停止了则心跳也停止并退出协程,否则发送心跳,如果心跳发送失败则隔betaInfo.Period后重新发送心跳,如果成功则更新服务返回的betaInterval到betaInfo.Period中,并记录心跳的时间,再启动一个定时器并等待下一次心跳

通过检索代码,发现 clients/naming_client/naming_client.go 中在注册服务实例 RegisterInstance 时,会判断是否是临时的实例,如果是临时的实例就会为该实例添加一个beat

// RegisterInstance
if instance.Ephemeral {
    sc.beatReactor.AddBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), beatInfo)
}


// DeregisterInstance
sc.beatReactor.RemoveBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), param.Ip, param.Port)


// UpdateInstance
if param.Ephemeral {
    // Update the heartbeat information first to prevent the information
    // from being flushed back to the original information after reconnecting
    sc.beatReactor.RemoveBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), param.Ip, param.Port)
    beatInfo := model.BeatInfo{
        Ip:          param.Ip,
        Port:        param.Port,
        Metadata:    param.Metadata,
        ServiceName: util.GetGroupName(param.ServiceName, param.GroupName),
        Cluster:     param.ClusterName,
        Weight:      param.Weight,
        Period:      util.GetDurationWithDefault(param.Metadata, constant.HEART_BEAT_INTERVAL, time.Second*5),
        State:       model.StateRunning,
    }
    sc.beatReactor.AddBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), beatInfo)
}

类似的注销实例时,会移除beat, 以及更新实例信息时会对临时节点先移除beat然后再重新添加。

看到这里,第2个问题有答案了,go sdk中 INamingClient 接口虽然没有直接暴露方法,实际的实现时,心跳保活已经包含在实例的注册/注销过程中了

订阅

下面再来看一下第三个问题订阅逻辑如何实现的

// 服务监听
func (sc *NamingClient) Subscribe(param *vo.SubscribeParam) error {
        if len(param.GroupName) == 0 {
                param.GroupName = constant.DEFAULT_GROUP
        }
        serviceParam := vo.GetServiceParam{
                ServiceName: param.ServiceName,
                GroupName:   param.GroupName,
                Clusters:    param.Clusters,
        }

        sc.subCallback.AddCallbackFuncs(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), &param.SubscribeCallback)
        svc, err := sc.GetService(serviceParam)
        if err != nil {
                return err
        }
        if !sc.hostReactor.serviceProxy.clientConfig.NotLoadCacheAtStart {
                sc.subCallback.ServiceChanged(&svc)
        }
        return nil
}

Subscribe 的实现比较简单,入参是一个 SubscribeParam 结构体,必须包含 serviceNameSubscribeCallback,首先将回调函数注册到 subCallback中,subCallback结构中包含了一个ConcurrentMap, 用于存储 serviceName和clusters 生成的key 和 对应的回调函数列表。

接下来直接调用GetService方法,实际是 hostReactor.GetServiceInfo 的一个封装,代码如下

// 获取服务列表
func (sc *NamingClient) GetService(param vo.GetServiceParam) (model.Service, error) {
        if len(param.GroupName) == 0 {
                param.GroupName = constant.DEFAULT_GROUP
        }
        service, err := sc.hostReactor.GetServiceInfo(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
        return service, err
}
type HostReactor struct {
        serviceInfoMap       cache.ConcurrentMap
        cacheDir             string
        updateThreadNum      int
        serviceProxy         NamingProxy
        pushReceiver         PushReceiver
        subCallback          SubscribeCallback
        updateTimeMap        cache.ConcurrentMap
        updateCacheWhenEmpty bool
}

func (hr *HostReactor) GetServiceInfo(serviceName string, clusters string) (model.Service, error) {
        key := util.GetServiceCacheKey(serviceName, clusters)
        cacheService, ok := hr.serviceInfoMap.Get(key)
        if !ok {
                hr.updateServiceNow(serviceName, clusters)
                if cacheService, ok = hr.serviceInfoMap.Get(key); !ok {
                        return model.Service{}, errors.New("get service info failed")
                }
        }

        return cacheService.(model.Service), nil
}


func (hr *HostReactor) updateServiceNow(serviceName, clusters string) {
        result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)

        if err != nil {
                logger.Errorf("QueryList return error!serviceName:%s cluster:%s err:%+v", serviceName, clusters, err)
                return
        }
        if result == "" {
                logger.Errorf("QueryList result is empty!serviceName:%s cluster:%s", serviceName, clusters)
                return
        }
        hr.ProcessServiceJson(result)
}

func (hr *HostReactor) ProcessServiceJson(result string) {
        service := util.JsonToService(result)
        if service == nil {
                return
        }
        cacheKey := util.GetServiceCacheKey(service.Name, service.Clusters)

        oldDomain, ok := hr.serviceInfoMap.Get(cacheKey)
        if ok && !hr.updateCacheWhenEmpty {
                //if instance list is empty,not to update cache
                if service.Hosts == nil || len(service.Hosts) == 0 {
                        logger.Errorf("do not have useful host, ignore it, name:%s", service.Name)
                        return
                }
        }
        hr.updateTimeMap.Set(cacheKey, uint64(util.CurrentMillis()))
        hr.serviceInfoMap.Set(cacheKey, *service)
        if !ok || ok && !reflect.DeepEqual(service.Hosts, oldDomain.(model.Service).Hosts) {
                if !ok {
                        logger.Info("service not found in cache " + cacheKey)
                } else {
                        logger.Info("service key:%s was updated to:%s", cacheKey, util.ToJsonString(service))
                }
                cache.WriteServicesToFile(*service, hr.cacheDir)
                hr.subCallback.ServiceChanged(service)
        }
}

hostReactorHostReactor的实例, GetServiceInfo是其暴露的方法, 其流程是先从本地缓存中查找是否有service, 如果没有就触发 updateServiceNow, 通过service的代理 QueryList 查询指定的serviceName的实例列表, 查询到结果后交给 ProcessServiceJson 判断是否需要更新缓存,如果更新了缓存则触发对应service注册的回调。到这里

这里还有一处细节是 通过 hr.serviceProxy.QueryList 查询实例列表时,实际的接口和open-api中查询serviceName下的实例列表一模一样,也是 /v1/ns/instance/list, 差别是第三个参数传入了一个pushReceiver.port,对应于udpPort, 这个参数open-api中没有暴露出来, 此外还有一个app参数也没暴露出来。

pushReceiverHostReactor中的成员, 它在NewHostReactor时会一同初始化

func NewHostReactor(serviceProxy NamingProxy, cacheDir string, updateThreadNum int, notLoadCacheAtStart bool, subCallback SubscribeCallback, updateCacheWhenEmpty bool) HostReactor {
        if updateThreadNum <= 0 {
                updateThreadNum = Default_Update_Thread_Num
        }
        hr := HostReactor{
                serviceProxy:         serviceProxy,
                cacheDir:             cacheDir,
                updateThreadNum:      updateThreadNum,
                serviceInfoMap:       cache.NewConcurrentMap(),
                subCallback:          subCallback,
                updateTimeMap:        cache.NewConcurrentMap(),
                updateCacheWhenEmpty: updateCacheWhenEmpty,
        }
        pr := NewPushReceiver(&hr)
        hr.pushReceiver = *pr
        if !notLoadCacheAtStart {
                hr.loadCacheFromDisk()
        }
        go hr.asyncUpdateService()
        return hr
}

pushReceiver 通过 NewPushReceiver 初始化,代码如下

func NewPushReceiver(hostReactor *HostReactor) *PushReceiver {
        pr := PushReceiver{
                hostReactor: hostReactor,
        }
        pr.startServer()
        return &pr
}


func (us *PushReceiver) startServer() {
        conn := us.getConn()
        go func() {
                defer func() {
                        if conn != nil {
                                conn.Close()
                        }
                }()
                for {
                        us.handleClient(conn)
                }
        }()
}

func (us *PushReceiver) getConn() *net.UDPConn {
        var conn *net.UDPConn
        for i := 0; i < 3; i++ {
                r := rand.New(rand.NewSource(time.Now().UnixNano()))
                port := r.Intn(1000) + 54951
                us.port = port
                conn1, ok := us.tryListen()

                if ok {
                        conn = conn1
                        logger.Infof("udp server start, port: " + strconv.Itoa(port))
                        return conn
                }

                if !ok && i == 2 {
                        logger.Errorf("failed to start udp server after trying 3 times.")
                }
        }
        return nil
}

可以看到 NewPushReceiver 启动了一个UDP服务, 然后等待处理发送过来的udp包,nacos服务就是根据 /v1/ns/instance/list 接口中的clientIP和udpPort 知道往这里推送服务变更的消息

func (us *PushReceiver) handleClient(conn *net.UDPConn) {
        if conn == nil {
                time.Sleep(time.Second * 5)
                conn = us.getConn()
                if conn == nil {
                        return
                }
        }

        data := make([]byte, 4024)
        n, remoteAddr, err := conn.ReadFromUDP(data)
        if err != nil {
                logger.Errorf("failed to read UDP msg because of %+v", err)
                return
        }

        s := TryDecompressData(data[:n])
        logger.Info("receive push: "+s+" from: ", remoteAddr)

        var pushData PushData
        err1 := json.Unmarshal([]byte(s), &pushData)
        if err1 != nil {
                logger.Infof("failed to process push data.err:%+v", err1)
                return
        }
        ack := make(map[string]string)

        if pushData.PushType == "dom" || pushData.PushType == "service" {
                us.hostReactor.ProcessServiceJson(pushData.Data)

                ack["type"] = "push-ack"
                ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10)
                ack["data"] = ""

        } else if pushData.PushType == "dump" {
                ack["type"] = "dump-ack"
                ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10)
                ack["data"] = util.ToJsonString(us.hostReactor.serviceInfoMap)
        } else {
                ack["type"] = "unknow-ack"
                ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10)
                ack["data"] = ""
        }

        bs, _ := json.Marshal(ack)
        c, err := conn.WriteToUDP(bs, remoteAddr)
        if err != nil {
                logger.Errorf("WriteToUDP failed,return:%d,err:%+v", c, err)
        }
}

具体业务逻辑在上面的 handleClient 中,主要逻辑在PushType是dom或者service时,也会交给ProcessServiceJson 判断是否需要更新缓存,同样如果更新了缓存则触发对应service注册的回调。

代码分析到这里,基本可以回答前面的问题了,nacos go sdk 通过创建naming_client时,启动一个udp server 来接收推送的,而向服务订阅serviceName的变更则是通过open-api的 /v1/ns/instance/list 接口中附加clientIP和udpPort实现的。

验证

针对上面的源码分析,参照sdk的代码, 写了一个udp server demo 来验证

先调用 /v1/ns/instance/list 接口,带上clientIP和udpPort, 再注册和注销服务,观察服务日志打印,要注意的是需要指定一些特殊的请求头,因为nacos服务只会对特定的sdk版本进行udp推送

#!/bin/bash
curl -H "Client-Version: Nacos-Go-Client:v1.0.1" \
  -H "User-Agent: Nacos-Go-Client:v1.0.1" \
  -H "Request-Module: Naming" \
  -H "Content-Type: application/x-www-form-urlencoded;charset=utf-8" \
  -X GET \
  'http://172.16.57.42:8848/nacos/v1/ns/instance/list?serviceName=nacos.test.3&udpPort=65013&clientIP=172.16.57.42'

可以看到确实可以收到推送过来的消息, demo暂时没有回ack, 可能服务会重试发送,但是已经可以说明订阅流程是通的

Start udp server at  172.16.57.42:65013
Recv data:  {"type":"dom","data":"{\"name\":\"DEFAULT_GROUP@@nacos.test.3\",\"clusters\":\"\",\"cacheMillis\":10000,\"hosts\":[],\"lastRefTime\":1634608400805,\"checksum\":\"\",\"allIPs\":false,\"reachProtectionThreshold\":false,\"valid\":true}","lastRefTime":1271435940556204}
Recv data:  {"type":"dom","data":"{\"name\":\"DEFAULT_GROUP@@nacos.test.3\",\"clusters\":\"\",\"cacheMillis\":10000,\"hosts\":[],\"lastRefTime\":1634608400805,\"checksum\":\"\",\"allIPs\":false,\"reachProtectionThreshold\":false,\"valid\":true}","lastRefTime":1271435940556204}
Recv data:  {"type":"dom","data":"{\"name\":\"DEFAULT_GROUP@@nacos.test.3\",\"clusters\":\"\",\"cacheMillis\":10000,\"hosts\":[{\"instanceId\":\"172.16.57.42#8849#DEFAULT#DEFAULT_GROUP@@nacos.test.3\",\"ip\":\"172.16.57.42\",\"port\":8849,\"weight\":1.0,\"healthy\":true,\"enabled\":true,\"ephemeral\":true,\"clusterName\":\"DEFAULT\",\"serviceName\":\"DEFAULT_GROUP@@nacos.test.3\",\"metadata\":{},\"instanceHeartBeatInterval\":5000,\"instanceHeartBeatTimeOut\":15000,\"ipDeleteTimeout\":30000,\"instanceIdGenerator\":\"simple\"}],\"lastRefTime\":1634608413716,\"checksum\":\"\",\"allIPs\":false,\"reachProtectionThreshold\":false,\"valid\":true}","lastRefTime":1271448808976688}
Recv data:  {"type":"dom","data":"{\"name\":\"DEFAULT_GROUP@@nacos.test.3\",\"clusters\":\"\",\"cacheMillis\":10000,\"hosts\":[{\"instanceId\":\"172.16.57.42#8849#DEFAULT#DEFAULT_GROUP@@nacos.test.3\",\"ip\":\"172.16.57.42\",\"port\":8849,\"weight\":1.0,\"healthy\":true,\"enabled\":true,\"ephemeral\":true,\"clusterName\":\"DEFAULT\",\"serviceName\":\"DEFAULT_GROUP@@nacos.test.3\",\"metadata\":{},\"instanceHeartBeatInterval\":5000,\"instanceHeartBeatTimeOut\":15000,\"ipDeleteTimeout\":30000,\"instanceIdGenerator\":\"simple\"}],\"lastRefTime\":1634608413716,\"checksum\":\"\",\"allIPs\":false,\"reachProtectionThreshold\":false,\"valid\":true}","lastRefTime":1271448808976688}

wechat
微信扫一扫,订阅我的博客动态^_^