70 lines
1.6 KiB
Go
70 lines
1.6 KiB
Go
package rpc
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
consulClient "github.com/rpcxio/rpcx-consul/client"
|
|
"github.com/smallnest/rpcx/client"
|
|
)
|
|
|
|
var (
|
|
xclientMap = make(map[string]client.XClient)
|
|
xclientMu sync.RWMutex
|
|
)
|
|
|
|
// CallWithConsul 基于Consul服务发现调用RPC
|
|
func CallWithConsul(ctx context.Context, serviceName string, args, reply interface{}) error {
|
|
xclient, err := getOrCreatXClient(serviceName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = xclient.Call(ctx, "Mul", args, reply)
|
|
if err != nil {
|
|
// 调用失败,清理失效客户端
|
|
removeXClient(serviceName)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func removeXClient(serviceName string) {
|
|
xclientMu.Lock()
|
|
defer xclientMu.Unlock()
|
|
if c, ok := xclientMap[serviceName]; ok {
|
|
c.Close()
|
|
delete(xclientMap, serviceName)
|
|
}
|
|
}
|
|
|
|
func getOrCreatXClient(serviceName string) (client.XClient, error) {
|
|
// 第一次:读锁,快速判断是否存在
|
|
xclientMu.RLock()
|
|
if c, ok := xclientMap[serviceName]; ok {
|
|
xclientMu.RUnlock()
|
|
return c, nil
|
|
}
|
|
xclientMu.RUnlock()
|
|
|
|
// 没找到,加写锁准备创建
|
|
xclientMu.Lock()
|
|
defer xclientMu.Unlock()
|
|
|
|
// 第二次:双重检查,防止刚被别人创建完
|
|
if c, ok := xclientMap[serviceName]; ok {
|
|
return c, nil
|
|
}
|
|
|
|
consulAddr := g.Cfg().MustGet(nil, "consul.address").String()
|
|
|
|
d, err := consulClient.NewConsulDiscovery("rpcx", serviceName, []string{consulAddr}, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c := client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
|
|
xclientMap[serviceName] = c
|
|
return c, nil
|
|
}
|