发布网友 发布时间:2024-09-28 09:45
共1个回答
热心网友 时间:2024-11-15 21:57
背景在工作学习中使用gRPC的地方比较多,通常我们都使用的是自带的负载均衡算法,但是在某些场景下我们需要对服务的版本进行控制比如[appV2只能去链接userV3],在这样的情况下就只能选自定义负载均衡策略
目标实现基于版本(version)的grpc负载均衡器,了解过程后可自己实现更多的负载均衡功能
注册中心
EtcdLease是一种检测客户端存活状况的机制。群集授予具有生存时间的租约。如果etcd群集在给定的TTL时间内未收到keepAlive,则租约到期。为了将租约绑定到键值存储中,每个key最多可以附加一个租约
服务注册(注册服务)
定时把本地服务(APP)地址,版本等信息注册到服务器
服务发现(客户端发起服务解析请求(APP))
查询注册中心(APP)下有那些服务
并向所有的服务建立HTTP2长链接
通过Etcdwatch监听服务(APP),通过变化更新链接
负载均衡(客户端发起请求(APP))
负载均衡选择合适的服务(APPHTTP2长链接)
发起调用
服务注册(注册服务)源码register.go
funcNewRegister(opt...RegisterOptions)(*Register,error){s:=&Register{opts:newOptions(opt...),}varctx,cancel=context.WithTimeout(context.Background(),time.Duration(s.opts.RegisterTtl)*time.Second)defercancel()data,err:=json.Marshal(s.opts)iferr!=nil{returnnil,err}etcdCli,err:=clientv3.New(s.opts.EtcdConf)iferr!=nil{returnnil,err}s.etcdCli=etcdCli//申请租约resp,err:=etcdCli.Grant(ctx,s.opts.RegisterTtl)iferr!=nil{returns,err}s.name=fmt.Sprintf("%s/%s",s.opts.Node.Path,s.opts.Node.Id)//注册节点_,err=etcdCli.Put(ctx,s.name,string(data),clientv3.WithLease(resp.ID))iferr!=nil{returns,err}//续约租约s.keepAliveChan,err=etcdCli.KeepAlive(context.Background(),resp.ID)iferr!=nil{returns,err}returns,nil}在etcd里面我们可以看到如下信息APPv1版本服务在节点的key/hwholiday/srv/app/app-beb3cb56-eb61-11eb-858d-2cf05dc7c711
{"node":{"name":"app","path":"/hwholiday/srv/app","id":"app-beb3cb56-eb61-11eb-858d-2cf05dc7c711","version":"v1","address":"172.12.12.188:8089"}}APPv2版本服务在节点的key/hwholiday/srv/app/app-beb3cb56-eb61-11eb-858d-2cf05dc7c711
{"node":{"name":"app","path":"/hwholiday/srv/app","id":"app-19980562-eb63-11eb-99c0-2cf05dc7c711","version":"v2","address":"172.12.12.188:8088"},}服务发现(客户端发起服务解析请求(APP))源码discovery.go实现grpc内的resolver.Builder接口(Builder创建一个解析器,用于监视名称解析更新)
funcNewDiscovery(opt...ClientOptions)resolver.Builder{s:=&Discovery{opts:newOptions(opt...),}etcdCli,err:=clientv3.New(s.opts.EtcdConf)iferr!=nil{panic(err)}s.etcdCli=etcdClireturns}//Build当调用`grpc.Dial()`时执行func(d*Discovery)Build(targetresolver.Target,ccresolver.ClientConn,optsresolver.BuildOptions)(resolver.Resolver,error){d.cc=ccres,err:=d.etcdCli.Get(context.Background(),d.opts.SrvName,clientv3.WithPrefix())iferr!=nil{returnnil,err}for_,v:=rangeres.Kvs{iferr=d.AddNode(v.Key,v.Value);err!=nil{log.Println(err)continue}}gofunc(dd*Discovery){dd.watcher()}(d)returnd,err}//根据官方的建议我们把从注册中心拿到的服务信息储存到Attributes中//Attributescontainsarbitrarydataabouttheresolverintendedfor//consumptionbytheloadbalancingpolicy.//属性包含有关供负载平衡策略使用的解析器的任意数据。//Attributes*attributes.Attributesfunc(d*Discovery)AddNode(key,val[]byte)error{vardata=new(register.Options)err:=json.Unmarshal(val,data)iferr!=nil{returnerr}addr:=resolver.Address{Addr:data.Node.Address}addr=SetNodeInfo(addr,data)d.Node.Store(string(key),addr)returnd.cc.UpdateState(resolver.State{Addresses:d.GetAddress()})}负载均衡(客户端发起请求(APP))源码version_balancer.go
gRPC提供了PickerBuilder和Picker接口让我们实现自己的负载均衡策略
//PickerBuilder创建balancer.Picker。typePickerBuilderinterface{//Build返回一个选择器,gRPC将使用它来选择一个SubConn。Build(infoPickerBuildInfo)balancer.Picker}//gRPC使用Picker来选择一个SubConn来发送RPC。//每次平衡器的内部状态发生变化时,它都会从它的快照中生成一个新的选择器。//gRPC使用的选择器可以通过ClientConn.UpdateState()更新。typePickerinterface{//选择合适的子链接发送请求Pick(infoPickInfo)(PickResult,error)}从上面得知我们可以干事的地方在Build方法或者Pick方法(调用gRPC方法时先执行Build再执行Pick)
Build(infoPickerBuildInfo)balancer.Pickerinfo里面有服务的链接,和链接对应的刚刚通过AddNode方法存入的服务信息这里我们可以基于grpc-client层面来做负载,比如(加权随机负载)
Pick(infoPickInfo)(PickResult,error)info里面有调用的方法名和context.Context通过context.Context我们可以获得这个来获取发起请求的时候填入的参数,这样我们可以很灵活的针对每个方法进行不同的负载这里我们可以基于grpc-client-api层面来做负载
func(*rrPickerBuilder)Build(infobase.PickerBuildInfo)balancer.Picker{iflen(info.ReadySCs)==0{returnbase.NewErrPicker(balancer.ErrNoSubConnAvailable)}varscs=make(map[balancer.SubConn]*register.Options,len(info.ReadySCs))forconn,addr:=rangeinfo.ReadySCs{nodeInfo:=GetNodeInfo(addr.Address)ifnodeInfo!=nil{scs[conn]=nodeInfo}}iflen(scs)==0{returnbase.NewErrPicker(balancer.ErrNoSubConnAvailable)}return&rrPicker{node:scs,}}func(p*rrPicker)Pick(infobalancer.PickInfo)(balancer.PickResult,error){p.mu.Lock()deferp.mu.Unlock()version:=info.Ctx.Value("version")varsubConns[]balancer.SubConnforconn,node:=rangep.node{ifversion!=""{ifnode.Node.Version==version.(string){subConns=append(subConns,conn)}}}iflen(subConns)==0{returnbalancer.PickResult{},errors.New("nomatchfoundconn")}index:=rand.Intn(len(subConns))sc:=subConns[index]returnbalancer.PickResult{SubConn:sc},nil}客户的使用我们定义的version负载均衡策略r:=discovery.NewDiscovery(discovery.SetName("hwholiday.srv.app"),discovery.SetEtcdConf(clientv3.Config{Endpoints:[]string{"172.12.12.165:2379"},DialTimeout:time.Second*5,}))resolver.Register(r)//连接服务器conn,err:=grpc.Dial("hwholiday.srv.app",//没有使用这个参数grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy":"%s"}`,"version")),grpc.WithInsecure(),)iferr!=nil{log.Fatalf("net.Connecterr:%v",err)}deferconn.Close()//调用服务apiClient:=api.NewApiClient(conn)ctx:=context.WithValue(context.Background(),"version","v1")_,err=apiClient.ApiTest(ctx,&api.Request{Input:"v1v1v1v1v1"})iferr!=nil{fmt.Println(err)}运行效果测试源码
运行APP服务v1,调用grpc-client使用v1
APP打印
启动成功===>0.0.0.0:8089
input:"v1v1v1v1v1"
grpc-client打印
===RUNTestClient
v1v1v1v1v1v1v1v1v1v1
运行APP服务v1,调用grpc-client使用v2
APP打印
启动成功===>0.0.0.0:8089
grpc-client打印
===RUNTestClient
rpcerror:code=Unavailabledesc=nomatchfoundconn
总结详情介绍地址
源码地址:https://github.com/hwholiday/learning_tools/tree/master/etcd
通过学习我们可以实现基于version的负载策略,这里只是提供一种思路怎么去实现可能我的这个例子不太适合这个,但是提供了一种思路,欢迎一起讨论。