不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • MySQL

  • Redis

  • Elasticsearch

  • Kafka

  • Etcd

    • 01.etcd安装使用
    • 02.etcd原理 ✅
    • 03.go操作etcd
    • 04.etcd服务发现
    • 05.grpc负载均衡
    • 06.grpc加权轮训
      • 01.加权轮训负载均衡
        • 1.1 项目结构
        • 1.2 实现负载均衡方法
        • 1.3 权重信息存储
        • 1.4 加权轮训算法原理
        • 1.5 测试加权负载均衡
      • 02.代码实现
        • 2.1 server/server.go
        • 2.2 client/client.go
        • 2.3 etcdv3/register.go
        • 2.4 etcdv3/discovery.go
        • 2.5 balancer/weight/weight.go
        • 2.6 proto/simple.proto
    • 07.分布式锁
    • 08.事务
  • MongoDB

  • TiDB

  • RabbitMQ

  • 数据库
  • Etcd
xiaonaiqiang
2022-01-10
目录

06.grpc加权轮训

# 01.加权轮训负载均衡

  • 参考博客 (opens new window)

# 1.1 项目结构

.
├── balancer  // 设置权重
│   └── weight
│       └── weight.go
├── client    // grpc客户端
│   └── client.go
└── server   // grpc服务端
    └── server.go
├── proto    // grpc通讯协议定义
│   └── simple.proto
├── etcdv3   // etcd注册和发现服务
│   ├── discovery.go
│   └── register.go
1
2
3
4
5
6
7
8
9
10
11
12
13

# 1.2 实现负载均衡方法

  • gRPC提供了V2PickerBuilder和V2Picker接口让我们实现自己的负载均衡策略。
    • V2PickerBuilder接口:创建V2版本的子连接选择器
type V2PickerBuilder interface {
	Build(info PickerBuildInfo) balancer.V2Picker
}
1
2
3
  • Build方法:返回一个V2选择器,将用于gRPC选择子连接。
Copytype V2Picker interface {
	Pick(info PickInfo) (PickResult, error)
}
1
2
3
  • V2Picker接口:用于gRPC选择子连接去发送请求。
  • Pick方法:子连接选择

# 1.3 权重信息存储

  • 问题来了,我们需要把服务器地址的权重添加进去,但是地址resolver.Address并没有提供权重的属性。

  • 官方给的答复是:把权重存储到地址的元数据metadata中。

  • 定义AddrInfo结构体并添加权重Weight属性,Set方法把Weight存储到resolver.Address中

  • Get方法从resolver.Address获取Weight。

# 1.4 加权轮训算法原理

  • 首先实现V2PickerBuilder接口,返回子连接选择器
  • 加权随机法中,我使用空间换时间的方式,把权重转成地址个数(例如addr1的权重是3,那么添加3个子连接到切片中
  • addr2权重为1,则添加1个子连接;
  • 选择子连接时候,按子连接切片长度生成随机数,以随机数作为下标就是选中的子连接),避免重复计算权重。
  • 考虑到内存占用,权重定义从1到5权重。
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.V2Picker {
	grpclog.Infof("weightPicker: newPicker called with info: %v", info)
	if len(info.ReadySCs) == 0 {
		return base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)
	}
	var scs []balancer.SubConn
	for subConn, addr := range info.ReadySCs {
		node := GetAddrInfo(addr.Address)
		if node.Weight <= 0 {
			node.Weight = minWeight
		} else if node.Weight > 5 {
			node.Weight = maxWeight
		}
		for i := 0; i < node.Weight; i++ {
			scs = append(scs, subConn)
		}
	}
	return &rrPicker{
		subConns: scs,
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 1.5 测试加权负载均衡

  • 在 8000 端口注册grpc服务,权重为1
$  go  run server/server.go
2022-01-09 15:40:56.879149 I | localhost:8000 net.Listing...
2022-01-09 15:40:56.943401 I | Put key:/grpclb/simple_grpc/localhost:8000  weight:1  success!
2022-01-09 15:41:21.543788 I | receive: grpc 0
2022-01-09 15:41:27.563448 I | receive: grpc 6
1
2
3
4
5
  • 在 8001 端口注册grpc服务权重为4
$  go run server/server.go
2022-01-09 15:41:16.963703 I | localhost:8001 net.Listing...
2022-01-09 15:41:17.047668 I | Put key:/grpclb/simple_grpc/localhost:8001  weight:4  success!
2022-01-09 15:41:22.544977 I | receive: grpc 1
2022-01-09 15:41:23.550037 I | receive: grpc 2
2022-01-09 15:41:24.550847 I | receive: grpc 3
2022-01-09 15:41:25.556705 I | receive: grpc 4
2022-01-09 15:41:26.558660 I | receive: grpc 5
2022-01-09 15:41:28.566574 I | receive: grpc 7
2022-01-09 15:41:29.571889 I | receive: grpc 8
2022-01-09 15:41:31.583354 I | receive: grpc 10
1
2
3
4
5
6
7
8
9
10
11
  • 模拟客户端发型10个请求给服务端
$  go run client/client.go
2022-01-09 15:41:21.537362 I | Build
2022-01-09 15:41:21.541488 I | put key : /grpclb/simple_grpc/localhost:8000 wieght: 1
2022-01-09 15:41:21.541587 I | put key : /grpclb/simple_grpc/localhost:8001 wieght: 4
2022-01-09 15:41:21.541981 I | watching prefix:/grpclb/simple_grpc/ now...
2022-01-09 15:41:21.543981 I | code:200 value:"hello grpc 0" 
2022-01-09 15:41:22.545373 I | code:200 value:"hello grpc 1" 
2022-01-09 15:41:23.550337 I | code:200 value:"hello grpc 2" 
2022-01-09 15:41:24.551192 I | code:200 value:"hello grpc 3" 
2022-01-09 15:41:25.556976 I | code:200 value:"hello grpc 4" 
...
1
2
3
4
5
6
7
8
9
10
11

# 02.代码实现

# 2.1 server/server.go

package main

import (
	"context"
	"log"
	"net"

	"google.golang.org/grpc"

	"days/etcdv3"
	pb "days/proto"
)

// SimpleService 定义我们的服务
type SimpleService struct{}

const (
	// Address 监听地址
	Address string = "localhost:8000"
	// Network 网络通信协议
	Network string = "tcp"
	// SerName 服务名称
	SerName string = "simple_grpc"
)

// EtcdEndpoints etcd地址
var EtcdEndpoints = []string{"localhost:2379"}

func main() {
	// 监听本地端口
	listener, err := net.Listen(Network, Address)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}
	log.Println(Address + " net.Listing...")
	// 新建gRPC服务器实例
	grpcServer := grpc.NewServer()
	// 在gRPC服务器注册我们的服务
	pb.RegisterSimpleServer(grpcServer, &SimpleService{})
	//把服务注册到etcd
	ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName+"/"+Address, "1", 5)
	if err != nil {
		log.Fatalf("register service err: %v", err)
	}
	defer ser.Close()
	//用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待,直到进程被杀死或者 Stop() 被调用
	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("grpcServer.Serve err: %v", err)
	}
}

// Route 实现Route方法
func (s *SimpleService) Route(ctx context.Context, req *pb.SimpleRequest) (*pb.SimpleResponse, error) {
	log.Println("receive: " + req.Data)
	res := pb.SimpleResponse{
		Code:  200,
		Value: "hello " + req.Data,
	}
	return &res, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

# 2.2 client/client.go

package main

import (
	"context"
	"fmt"
	"log"
	"strconv"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/resolver"

	"days/etcdv3"
	pb "days/proto"
)

var (
	// EtcdEndpoints etcd地址
	EtcdEndpoints = []string{"localhost:2379"}
	// SerName 服务名称
	SerName    = "simple_grpc"
	grpcClient pb.SimpleClient
)

func main() {
	r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
	resolver.Register(r)
	// 连接服务器
	conn, err := grpc.Dial(
		fmt.Sprintf("%s:///%s", r.Scheme(), SerName),
		grpc.WithBalancerName("weight"),
		grpc.WithInsecure(),
	)
	if err != nil {
		log.Fatalf("net.Connect err: %v", err)
	}
	defer conn.Close()

	// 建立gRPC连接
	grpcClient = pb.NewSimpleClient(conn)
	for i := 0; i < 100; i++ {
		route(i)
		time.Sleep(1 * time.Second)
	}
}

// route 调用服务端Route方法
func route(i int) {
	// 创建发送结构体
	req := pb.SimpleRequest{
		Data: "grpc " + strconv.Itoa(i),
	}
	// 调用我们的服务(Route方法)
	// 同时传入了一个 context.Context ,在有需要时可以让我们改变RPC的行为,比如超时/取消一个正在运行的RPC
	res, err := grpcClient.Route(context.Background(), &req)
	if err != nil {
		log.Fatalf("Call Route err: %v", err)
	}
	// 打印返回值
	log.Println(res)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

# 2.3 etcdv3/register.go

package etcdv3

import (
	"context"
	"log"
	"time"

	"github.com/coreos/etcd/clientv3"
)

//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
	cli     *clientv3.Client //etcd client
	leaseID clientv3.LeaseID //租约ID
	//租约keepalieve相应chan
	keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
	key           string //key
	weight        string //value
}

//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, addr, weigit string, lease int64) (*ServiceRegister, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	ser := &ServiceRegister{
		cli:    cli,
		key:    "/" + schema + "/" + addr,
		weight: weigit,
	}

	//申请租约设置时间keepalive
	if err := ser.putKeyWithLease(lease); err != nil {
		return nil, err
	}

	return ser, nil
}

//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
	//设置租约时间
	resp, err := s.cli.Grant(context.Background(), lease)
	if err != nil {
		return err
	}
	//注册服务并绑定租约
	_, err = s.cli.Put(context.Background(), s.key, s.weight, clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	//设置续租 定期发送需求请求
	leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

	if err != nil {
		return err
	}
	s.leaseID = resp.ID
	s.keepAliveChan = leaseRespChan
	log.Printf("Put key:%s  weight:%s  success!", s.key, s.weight)
	return nil
}

//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
	for leaseKeepResp := range s.keepAliveChan {
		log.Println("续约成功", leaseKeepResp)
	}
	log.Println("关闭续租")
}

// Close 注销服务
func (s *ServiceRegister) Close() error {
	//撤销租约
	if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
		return err
	}
	log.Println("撤销租约")
	return s.cli.Close()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

# 2.4 etcdv3/discovery.go

package etcdv3

import (
	"context"
	"log"
	"strconv"
	"strings"
	"sync"
	"time"

	"days/balancer/weight"

	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/mvcc/mvccpb"
	"google.golang.org/grpc/resolver"
)

const schema = "grpclb"

//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
	cli        *clientv3.Client //etcd client
	cc         resolver.ClientConn
	serverList sync.Map //服务列表
	prefix     string   //监视的前缀
}

//NewServiceDiscovery  新建发现服务
func NewServiceDiscovery(endpoints []string) resolver.Builder {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	return &ServiceDiscovery{
		cli: cli,
	}
}

//Build 为给定目标创建一个新的`resolver`,当调用`grpc.Dial()`时执行
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
	log.Println("Build")
	s.cc = cc
	s.prefix = "/" + target.Scheme + "/" + target.Endpoint + "/"
	//根据前缀获取现有的key
	resp, err := s.cli.Get(context.Background(), s.prefix, clientv3.WithPrefix())
	if err != nil {
		return nil, err
	}

	for _, ev := range resp.Kvs {
		s.SetServiceList(string(ev.Key), string(ev.Value))
	}
	s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
	//监视前缀,修改变更的server
	go s.watcher()
	return s, nil
}

// ResolveNow 监视目标更新
func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) {
	log.Println("ResolveNow")
}

//Scheme return schema
func (s *ServiceDiscovery) Scheme() string {
	return schema
}

//Close 关闭
func (s *ServiceDiscovery) Close() {
	log.Println("Close")
	s.cli.Close()
}

//watcher 监听前缀
func (s *ServiceDiscovery) watcher() {
	rch := s.cli.Watch(context.Background(), s.prefix, clientv3.WithPrefix())
	log.Printf("watching prefix:%s now...", s.prefix)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.PUT: //新增或修改
				s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
			case mvccpb.DELETE: //删除
				s.DelServiceList(string(ev.Kv.Key))
			}
		}
	}
}

//SetServiceList 设置服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
	//获取服务地址
	addr := resolver.Address{Addr: strings.TrimPrefix(key, s.prefix)}
	//获取服务地址权重
	nodeWeight, err := strconv.Atoi(val)
	if err != nil {
		//非数字字符默认权重为1
		nodeWeight = 1
	}
	//把服务地址权重存储到resolver.Address的元数据中
	addr = weight.SetAddrInfo(addr, weight.AddrInfo{Weight: nodeWeight})
	s.serverList.Store(key, addr)
	s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
	log.Println("put key :", key, "wieght:", val)
}

//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
	s.serverList.Delete(key)
	s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
	log.Println("del key:", key)
}

//GetServices 获取服务地址
func (s *ServiceDiscovery) getServices() []resolver.Address {
	addrs := make([]resolver.Address, 0, 10)
	s.serverList.Range(func(k, v interface{}) bool {
		addrs = append(addrs, v.(resolver.Address))
		return true
	})
	return addrs
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127

# 2.5 balancer/weight/weight.go

package weight

import (
	"math/rand"
	"sync"

	"google.golang.org/grpc/attributes"
	"google.golang.org/grpc/balancer"
	"google.golang.org/grpc/balancer/base"
	"google.golang.org/grpc/grpclog"
	"google.golang.org/grpc/resolver"
)

// Name is the name of weight balancer.
const Name = "weight"

var (
	minWeight = 1
	maxWeight = 5
)

// attributeKey是用于存储AddrInfo在resolver.Address的Attributes字段中的键的类型
type attributeKey struct{}

// 为了使用加权均衡器,AddrInfo将被存储在Address元数据中
type AddrInfo struct {
	Weight int
}

// SetAddrInfo返回addr的副本,其中Attributes字段被用addrInfo更新。
func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
	addr.Attributes = attributes.New()
	addr.Attributes = addr.Attributes.WithValues(attributeKey{}, addrInfo)
	return addr
}

// GetAddrInfo返回存储在addr的Attributes字段中的AddrInfo.
func GetAddrInfo(addr resolver.Address) AddrInfo {
	v := addr.Attributes.Value(attributeKey{})
	ai, _ := v.(AddrInfo)
	return ai
}

//  NewBuilder创建了一个新的权重平衡器生成器.
func newBuilder() balancer.Builder {
	return base.NewBalancerBuilderV2(Name, &rrPickerBuilder{}, base.Config{HealthCheck: false})
}

func init() {
	balancer.Register(newBuilder())
}

type rrPickerBuilder struct{}

func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.V2Picker {
	grpclog.Infof("weightPicker: newPicker called with info: %v", info)
	if len(info.ReadySCs) == 0 {
		return base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)
	}
	var scs []balancer.SubConn
	for subConn, addr := range info.ReadySCs {
		node := GetAddrInfo(addr.Address)
		if node.Weight <= 0 {
			node.Weight = minWeight
		} else if node.Weight > 5 {
			node.Weight = maxWeight
		}
		for i := 0; i < node.Weight; i++ {
			scs = append(scs, subConn)
		}
	}
	return &rrPicker{
		subConns: scs,
	}
}

type rrPicker struct {
	//subConns是roundrobin平衡器创建时的快照。切片是不可变的
	//每个Get()将对它进行轮询选择,并返回选中的SubConn
	subConns []balancer.SubConn
	mu       sync.Mutex
}

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	p.mu.Lock()
	index := rand.Intn(len(p.subConns))
	sc := p.subConns[index]
	p.mu.Unlock()
	return balancer.PickResult{SubConn: sc}, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90

# 2.6 proto/simple.proto

生成gRPC文件: protoc --go_out=plugins=grpc:./ *.proto

syntax = "proto3";// 协议为proto3

package proto;

// 定义发送请求信息
message SimpleRequest{
    // 定义发送的参数,采用驼峰命名方式,小写加下划线,如:student_name
    // 参数类型 参数名 标识号(不可重复)
    string data = 1;
}

// 定义响应信息
message SimpleResponse{
    // 定义接收的参数
    // 参数类型 参数名 标识号(不可重复)
    int32 code = 1;
    string value = 2;
}

// 定义我们的服务(可定义多个服务,每个服务可定义多个接口)
service Simple{
    rpc Route (SimpleRequest) returns (SimpleResponse){};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
上次更新: 2024/4/1 16:53:26
05.grpc负载均衡
07.分布式锁

← 05.grpc负载均衡 07.分布式锁→

最近更新
01
300.整体设计
06-10
02
06.LangGraph
06-09
03
202.AI销售智能体
06-07
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式