05.grpc负载均衡
# 01.轮训负载均衡
# 1.1 项目结构
.
├── client // grpc客户端
│ └── client.go
└── server // grpc服务端
└── server.go
├── proto // grpc通讯协议定义
│ └── simple.proto
├── etcdv3 // etcd注册和发现服务
│ ├── discovery.go
│ └── register.go
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 1.2 实现负载均衡流程
- gRPC已提供了简单的负载均衡策略(如:Round Robin),我们只需实现它提供的
Builder
和Resolver
接口,就能完成gRPC客户端负载均衡 Builder
接口:创建一个resolver
(本文称之服务发现),用于监视名称解析更新。Build
方法:为给定目标创建一个新的resolver
,当调用grpc.Dial()
时执行。- 在Build方法中我们可以订阅需要监听的服务列表,并修改变更最新的server
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
Scheme() string
}
1
2
3
4
2
3
4
Scheme
方法:返回此resolver
支持的方案- Resolver
接口:监视指定目标的更新,包括地址更新和服务配置更新。
- ResolveNow
方法:被 gRPC 调用,以尝试再次解析目标名称。只用于提示,可忽略该方法。
- Close
方法:关闭
resolver
- Resolver
type Resolver interface {
ResolveNow(ResolveNowOption)
Close()
}
1
2
3
4
2
3
4
- 根据以上两个接口,我们把服务发现的功能写在
Build
方法中 - 把获取到的负载均衡服务地址返回到客户端,并监视服务更新情况,以修改客户端连接。
# 1.3 测试负载均衡
- 在 8000 端口注册grpc服务(轮训的方式获取请求)
$ go run server/server.go
2022-01-09 15:12:55.793803 I | localhost:8000 net.Listing...
2022-01-09 15:12:55.818423 I | Put key:/grpclb/simple_grpc/localhost:8000 val:localhost:8000 success!
2022-01-09 15:13:07.884302 I | receive: grpc 1
2022-01-09 15:13:09.891356 I | receive: grpc 3
2022-01-09 15:13:11.902094 I | receive: grpc 5
2022-01-09 15:13:13.912325 I | receive: grpc 7
1
2
3
4
5
6
7
2
3
4
5
6
7
- 在 8001 端口注册grpc服务(轮训的方式获取请求)
$ go run server/server.go
2022-01-09 15:12:39.884699 I | localhost:8001 net.Listing...
2022-01-09 15:12:39.909653 I | Put key:/grpclb/simple_grpc/localhost:8001 val:localhost:8001 success!
2022-01-09 15:13:06.878625 I | receive: grpc 0
2022-01-09 15:13:08.887312 I | receive: grpc 2
2022-01-09 15:13:10.896725 I | receive: grpc 4
2022-01-09 15:13:12.907039 I | receive: grpc 6
1
2
3
4
5
6
7
2
3
4
5
6
7
- 模拟客户端发型8个请求给服务端
$ go run client/client.go
2022-01-09 15:13:06.873669 I | Build
2022-01-09 15:13:06.876690 I | put key : /grpclb/simple_grpc/localhost:8000 val: localhost:8000
2022-01-09 15:13:06.876733 I | put key : /grpclb/simple_grpc/localhost:8001 val: localhost:8001
2022-01-09 15:13:06.877101 I | watching prefix:/grpclb/simple_grpc/ now...
2022-01-09 15:13:06.878828 I | code:200 value:"hello grpc 0"
2022-01-09 15:13:07.884560 I | code:200 value:"hello grpc 1"
2022-01-09 15:13:08.887767 I | code:200 value:"hello grpc 2"
2022-01-09 15:13:09.891792 I | code:200 value:"hello grpc 3"
2022-01-09 15:13:10.897160 I | code:200 value:"hello grpc 4"
2022-01-09 15:13:11.902347 I | code:200 value:"hello grpc 5"
2022-01-09 15:13:12.907339 I | code:200 value:"hello grpc 6"
2022-01-09 15:13:13.912571 I | code:200 value:"hello grpc 7"
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 02.代码实现
# 2.1 server/server.go
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
"days/etcdv3"
pb "days/proto"
)
// SimpleService 定义我们的服务
type SimpleService struct{}
const (
// Address 监听地址
Address string = "localhost:8000"
// Network 网络通信协议
Network string = "tcp"
// SerName 服务名称
SerName string = "simple_grpc"
)
// EtcdEndpoints etcd地址
var EtcdEndpoints = []string{"localhost:2379"}
func main() {
// 1、注册 gRPC 服务
listener, _ := net.Listen(Network, Address) // gRPC 监听本地端口
log.Println(Address + " net.Listing...")
grpcServer := grpc.NewServer() // 新建gRPC服务器实例
pb.RegisterSimpleServer(grpcServer, &SimpleService{}) // 在gRPC服务器注册我们的服务
// 2、将 gRPC服务注册到etcd
//把服务注册到etcd
ser, _ := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 5)
defer ser.Close()
// 3、启动gRPC服务
//用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待,直到进程被杀死或者 Stop() 被调用
grpcServer.Serve(listener)
}
// gRPC: Route 实现Route方法(proto协议定义的方法实现)
func (s *SimpleService) Route(ctx context.Context, req *pb.SimpleRequest) (*pb.SimpleResponse, error) {
log.Println("receive: " + req.Data)
res := pb.SimpleResponse{
Code: 200,
Value: "hello " + req.Data,
}
return &res, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 2.2 client/client.go
package main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
"days/etcdv3"
pb "days/proto"
)
var (
// EtcdEndpoints etcd地址
EtcdEndpoints = []string{"localhost:2379"}
// SerName 服务名称
SerName = "simple_grpc"
grpcClient pb.SimpleClient
)
func main() {
//1、新建etcd发现服务(r=resolver.Builder)
r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
resolver.Register(r) // 注册自定义的Resolver,用于监视名称解析更新
// 2、从etcd订阅服务ip:port 连接服务器
// 当调用grpc.Dial()时执行Build方法:监视前缀,修改变更的server
conn, _ := grpc.Dial(
fmt.Sprintf("%s:///%s", r.Scheme(), SerName),
grpc.WithBalancerName("round_robin"),
grpc.WithInsecure(),
)
defer conn.Close()
// 3、建立gRPC连接
grpcClient = pb.NewSimpleClient(conn)
// 4、模拟grpc客户端发送请求给grpc服务
for i := 0; i < 100; i++ {
route(i)
time.Sleep(1 * time.Second)
}
}
// route 调用服务端Route方法
func route(i int) {
// 创建发送结构体
req := pb.SimpleRequest{
Data: "grpc " + strconv.Itoa(i),
}
// 调用我们的服务(Route方法)
// 同时传入了一个 context.Context ,在有需要时可以让我们改变RPC的行为,比如超时/取消一个正在运行的RPC
res, err := grpcClient.Route(context.Background(), &req)
if err != nil {
log.Fatalf("Call Route err: %v", err)
}
// 打印返回值
log.Println(res)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 2.3 etcdv3/register.go
package etcdv3
import (
"context"
"log"
"time"
"github.com/coreos/etcd/clientv3"
)
//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
cli *clientv3.Client //etcd client
leaseID clientv3.LeaseID //租约ID
//租约keepalieve相应chan
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string //key
val string //value
}
//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) {
cli, _ := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
ser := &ServiceRegister{
cli: cli,
key: "/" + schema + "/" + serName + "/" + addr,
val: addr,
}
//申请租约设置时间keepalive
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//设置租约时间
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
//注册服务并绑定租约
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
//设置续租 定期发送需求请求
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("续约成功", leaseKeepResp)
}
log.Println("关闭续租")
}
// Close 注销服务
func (s *ServiceRegister) Close() error {
//撤销租约
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# 2.4 etcdv3/discovery.go
package etcdv3
import (
"context"
"log"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"google.golang.org/grpc/resolver"
)
const schema = "grpclb"
//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
cli *clientv3.Client //etcd client
cc resolver.ClientConn
serverList sync.Map //服务列表
}
//NewServiceDiscovery 新建发现服务
func NewServiceDiscovery(endpoints []string) resolver.Builder {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
}
}
//Build 为给定目标创建一个新的`resolver`,当调用`grpc.Dial()`时执行
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
log.Println("Build")
s.cc = cc
prefix := "/" + target.Scheme + "/" + target.Endpoint + "/"
//根据前缀获取现有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
//监视前缀,修改变更的server
go s.watcher(prefix)
return s, nil
}
// ResolveNow 监视目标更新
func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) {
log.Println("ResolveNow")
}
//Scheme return schema
func (s *ServiceDiscovery) Scheme() string {
return schema
}
//Close 关闭
func (s *ServiceDiscovery) Close() {
log.Println("Close")
s.cli.Close()
}
// watcher 监听前缀(订阅服务)
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //新增或修改
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: //删除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.serverList.Store(key, resolver.Address{Addr: val})
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
log.Println("put key :", key, "val:", val)
}
//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
s.serverList.Delete(key)
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
log.Println("del key:", key)
}
//GetServices 获取服务地址
func (s *ServiceDiscovery) getServices() []resolver.Address {
addrs := make([]resolver.Address, 0, 10)
s.serverList.Range(func(k, v interface{}) bool {
addrs = append(addrs, v.(resolver.Address))
return true
})
return addrs
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# 2.5 proto/simple.proto
生成gRPC文件:
protoc --go_out=plugins=grpc:./ *.proto
syntax = "proto3"; // 协议为proto3
package proto;
// 定义发送请求信息
message SimpleRequest{
// 定义发送的参数,采用驼峰命名方式,小写加下划线,如:student_name
// 参数类型 参数名 标识号(不可重复)
string data = 1;
}
// 定义响应信息
message SimpleResponse{
// 定义接收的参数
// 参数类型 参数名 标识号(不可重复)
int32 code = 1;
string value = 2;
}
// 定义我们的服务(可定义多个服务,每个服务可定义多个接口)
service Simple{
rpc Route (SimpleRequest) returns (SimpleResponse){};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
上次更新: 2024/4/1 16:53:26