不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • MySQL

  • Redis

  • Elasticsearch

  • Kafka

  • Etcd

    • 01.etcd安装使用
    • 02.etcd原理 ✅
    • 03.go操作etcd
    • 04.etcd服务发现
      • 01.前言
      • 02.服务注册及健康检查
        • 2.1 server/main.go
        • 2.2 client/main.go
        • 2.3 运行测试
    • 05.grpc负载均衡
    • 06.grpc加权轮训
    • 07.分布式锁
    • 08.事务
  • MongoDB

  • TiDB

  • RabbitMQ

  • 数据库
  • Etcd
xiaonaiqiang
2022-01-10
目录

04.etcd服务发现

# 01.前言

  • 参考博客 (opens new window)

  • 服务发现要解决的也是分布式系统中最常见的问题之一,即在同一个分布式集群中的进程或服务,要如何才能找到对方并建立连接。

  • 本质上来说,服务发现就是想要了解集群中是否有进程在监听 udp 或 tcp 端口,并且通过名字就可以查找和连接。

服务发现需要实现一下基本功能

  • 服务注册:同一service的所有节点注册到相同目录下,节点启动后将自己的信息注册到所属服务的目录中。
  • 健康检查:
    • 服务节点定时进行健康检查。
    • 注册到服务目录中的信息设置一个较短的TTL,运行正常的服务节点每隔一段时间会去更新信息的TTL ,从而达到健康检查效果。
  • 服务发现:
    • 通过服务节点能查询到服务提供外部访问的 IP 和端口号。
    • 比如网关代理服务时能够及时的发现服务中新增节点、丢弃不可用的服务节点。

# 02.服务注册及健康检查

  • 根据etcd的v3 API,当启动一个服务时候,我们把服务的地址写进etcd,注册服务。
  • 同时绑定租约(lease),并以续租约(keep leases alive)的方式检测服务是否正常运行,从而实现健康检查。
  • 项目结构如下
├── client
│   └── main.go
└── server
    └── main.go
1
2
3
4

# 2.1 server/main.go

package main

import (
	"context"
	"log"
	"time"

	"github.com/coreos/etcd/clientv3"
)

//1、ServiceRegister 创建租约注册服务
type ServiceRegister struct {
	cli     *clientv3.Client //etcd client
	leaseID clientv3.LeaseID //租约ID
	//租约keepalieve相应chan
	keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
	key           string //key
	val           string //value
}

//2、NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	ser := &ServiceRegister{
		cli: cli,
		key: key,
		val: val,
	}
	//申请租约设置时间keepalive
	if err := ser.putKeyWithLease(lease); err != nil {
		return nil, err
	}
	return ser, nil
}

//3、设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
	//设置租约时间
	resp, err := s.cli.Grant(context.Background(), lease)
	if err != nil {
		return err
	}
	//注册服务并绑定租约
	_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	//设置续租 定期发送需求请求
	leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

	if err != nil {
		return err
	}
	s.leaseID = resp.ID
	log.Println(s.leaseID)
	s.keepAliveChan = leaseRespChan
	log.Printf("Put key:%s  val:%s  success!", s.key, s.val)
	return nil
}

//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
	for leaseKeepResp := range s.keepAliveChan {
		log.Println("续约成功", leaseKeepResp)
	}
	log.Println("关闭续租")
}

//4、Close 注销服务
func (s *ServiceRegister) Close() error {
	//撤销租约
	if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
		return err
	}
	log.Println("撤销租约")
	return s.cli.Close()
}

func RegisterServerToEtcd() {
	var endpoints = []string{"localhost:2379"}
	//ser, err := NewServiceRegister(endpoints, "/web/node1", "localhost:8000", 5)
	ser, err := NewServiceRegister(endpoints, "/gRPC/node1", "localhost:9000", 5)
	if err != nil {
		log.Fatalln(err)
	}
	//监听续租相应chan
	go ser.ListenLeaseRespChan()
	select {
	// case <-time.After(20 * time.Second):
	// 	ser.Close()
	}
}

func main() {
	RegisterServerToEtcd()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

# 2.2 client/main.go

package main

import (
	"context"
	"log"
	"sync"
	"time"

	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/mvcc/mvccpb"
)

//1、ServiceDiscovery 服务发现
type ServiceDiscovery struct {
	cli        *clientv3.Client  //etcd client
	serverList map[string]string //服务列表
	lock       sync.Mutex
}

//2、NewServiceDiscovery  新建发现服务
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	return &ServiceDiscovery{
		cli:        cli,
		serverList: make(map[string]string),
	}
}

//3、WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
	//根据前缀获取现有的key
	resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
	if err != nil {
		return err
	}

	for _, ev := range resp.Kvs {
		s.SetServiceList(string(ev.Key), string(ev.Value))
	}

	//监视前缀,修改变更的server
	go s.watcher(prefix)
	return nil
}

//4、watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
	rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
	log.Printf("watching prefix:%s now...", prefix)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.PUT: //修改或者新增
				s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
			case mvccpb.DELETE: //删除
				s.DelServiceList(string(ev.Kv.Key))
			}
		}
	}
}

//5、SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.serverList[key] = string(val)
	log.Println("put key :", key, "val:", val)
}

//6、DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	delete(s.serverList, key)
	log.Println("del key:", key)
}

//7、Close 关闭服务
func (s *ServiceDiscovery) Close() error {
	return s.cli.Close()
}

func SubscribeService() {
	var endpoints = []string{"localhost:2379"}
	ser := NewServiceDiscovery(endpoints)
	defer ser.Close()
	_ = ser.WatchService("/web/") // 订阅前缀为 /web/ 的服务
	ser.WatchService("/gRPC/")    // 订阅前缀为 /gRPC/ 的服务
	for {
		select {
		case <-time.Tick(10 * time.Second):
			log.Println(ser.serverList)
		}
	}
}

func main() {
	SubscribeService()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

# 2.3 运行测试

  • server端将自己的服务加入到etcd中,client端订阅这个key的前缀就可以获取当前服务列表
#运行服务发现
$ go run client/main.go 
2022-01-07 13:53:04.951669 I | watching prefix:/web/ now...
2022-01-07 13:53:04.951784 I | watching prefix:/gRPC/ now...
2022-01-07 13:53:13.686900 I | put key : /web/node1 val: localhost:8000
2022-01-07 13:53:14.956426 I | map[/web/node1:localhost:8000]

#另一个终端运行服务注册
$ go run server/main.go
2022-01-07 13:56:56.038518 I | Put key:/web/node1  val:localhost:8000  success!
2022-01-07 13:56:56.041196 I | 续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:98 raft_term:4 
...
1
2
3
4
5
6
7
8
9
10
11
12
上次更新: 2024/4/1 16:53:26
03.go操作etcd
05.grpc负载均衡

← 03.go操作etcd 05.grpc负载均衡→

最近更新
01
04.数组双指针排序_子数组
03-25
02
08.动态规划
03-25
03
06.回溯算法
03-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式