不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • MySQL

  • Redis

  • Elasticsearch

  • Kafka

  • Etcd

    • 01.etcd安装使用
    • 02.etcd原理 ✅
    • 03.go操作etcd
      • 02.租约
        • 2.1 申请租约定时查看过期
        • 2.2 自动续租
      • 03.watch功能
      • 04.通过txn实现分布式锁
      • 05.json数据操作
    • 04.etcd服务发现
    • 05.grpc负载均衡
    • 06.grpc加权轮训
    • 07.分布式锁
    • 08.事务
  • MongoDB

  • TiDB

  • RabbitMQ

  • 数据库
  • Etcd
xiaonaiqiang
2022-01-10
目录

03.go操作etcd

# 01.etcd ClientV3基础

  • 参考博客 (opens new window)

# 1.1 安装

  • 需要在项目的go.mod文件中加入下面两个
replace (
	github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.5
	github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.3.2
	google.golang.org/grpc => google.golang.org/grpc v1.26.0
)

require github.com/coreos/etcd v3.3.20+incompatible
1
2
3
4
5
6
7

解决下面报错

        github.com/coreos/bbolt: github.com/coreos/bbolt@v1.3.6: parsing go.mod:
	              module declares its path as: go.etcd.io/bbolt
                but was required as: github.com/coreos/bbolt
1
2
3

# 1.2 增删查

package main

import (
	"context"
	"fmt"
	"github.com/coreos/etcd/clientv3"
	"time"
)


func main()  {
	config := clientv3.Config{
		Endpoints:     []string{"127.0.0.1:2379"},
		DialTimeout:   10*time.Second,
	}
	cli,_ := clientv3.New(config)
	putData(cli)
	//getData(cli)
	//delData(cli)
}

// 1、增
func putData(cli *clientv3.Client)  {
	// 实例化一个用于操作ETCD的KV
	kv := clientv3.NewKV(cli)
	putResp, _ := kv.Put(context.TODO(), "/school/class/students", "helios0", clientv3.WithPrevKV())
	fmt.Println(putResp.Header)  // 打印返回的头部信息
	fmt.Println(putResp.PrevKv)  // 打印历史版本信息(如果有)
	/*
	Header: cluster_id:14841639068965178418 member_id:10276657743932975437 revision:25 raft_term:4
	PrevKv: key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"helios0"
	*/
}

// 2、查
func getData(cli *clientv3.Client)  {
	// 实例化一个用于操作ETCD的KV
	kv := clientv3.NewKV(cli)
	getResp, _ := kv.Get(context.TODO(), "/school/class/students")
	fmt.Printf("Key:%s --> Value:%s \n", getResp.Kvs[0].Key, getResp.Kvs[0].Value)
	/*
	Key:/school/class/students --> Value:helios0
	*/
}

// 3、删
func delData(cli *clientv3.Client) {
	kv := clientv3.NewKV(cli)
	kv.Put(context.TODO(), "/school/class/students", "helios1")
	delResp, _ := kv.Delete(context.TODO(), "/school/class/students", clientv3.WithPrevKV())
	if len(delResp.PrevKvs) != 0 {
		for _, kvpair := range delResp.PrevKvs {
			fmt.Printf("delete key is: %s ---> Value: %s \n", string(kvpair.Key), string(kvpair.Value))
		}
	}
	/*
	delete key is: /school/class/students  --->  Value: helios1
	*/
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

# 1.3 op代替Get/Put/Delete

package main

import (
	"fmt"
	"github.com/coreos/etcd/clientv3"
	"time"
	"context"
)

func main()  {
	config := clientv3.Config{
		Endpoints:     []string{"127.0.0.1:2379"},
		DialTimeout:   10*time.Second,
	}
	cli,_ := clientv3.New(config)
	// 实例化一个用于操作ETCD的KV
	kv := clientv3.NewKV(cli)

	// 1、put写入
	putOp := clientv3.OpPut("/school/class/students", "helios")
	opResp, _ := kv.Do(context.TODO(), putOp);
	fmt.Println("写入Revision:", opResp.Put().Header.Revision)

	// 2、get获取
	getOp := clientv3.OpGet("/school/class/students")
	opResp, _ = kv.Do(context.TODO(), getOp)

	fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision)
	fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
}

/*
写入Revision: 76
数据Revision: 76
数据value: helios
 */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 02.租约

# 2.1 申请租约定时查看过期

  • 申请一个10s的租约,定时查看是否过期
package main

import (
	"context"
	"fmt"
	"github.com/coreos/etcd/clientv3"
	"time"
)

func main()  {
	config := clientv3.Config{
		Endpoints:     []string{"127.0.0.1:2379"},
		DialTimeout:   10*time.Second,
	}
	cli,_ := clientv3.New(config)
	applyLease(cli)
}

func applyLease(cli *clientv3.Client)  {
	// 1、申请一个10s的租约
	lease := clientv3.NewLease(cli)
	leaseGrantResp, _ := lease.Grant(context.TODO(), 10)
	leaseId := leaseGrantResp.ID

	// 2、获得kv API子集,设置key,val和过期时间
	kv := clientv3.NewKV(cli)
	kv.Put(context.TODO(), "/school/class/students", "h", clientv3.WithLease(leaseId))

	// 3、每隔两秒检查一次是否过期
	for {
		getResp, _ := kv.Get(context.TODO(), "/school/class/students");
		if getResp.Count == 0 {
			fmt.Println("kv过期了")
			break
		}
		fmt.Println("还没过期:", getResp.Kvs)
		time.Sleep(2 * time.Second)
	}
}

/*
还没过期: [key:"/school/class/students" create_revision:29 mod_revision:29 version:1 value:"h" lease:7587859680424737869 ]
还没过期: [key:"/school/class/students" create_revision:29 mod_revision:29 version:1 value:"h" lease:7587859680424737869 ]
kv过期了
 */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# 2.2 自动续租

package main

import (
	"context"
	"fmt"
	"github.com/coreos/etcd/clientv3"
	"time"
)

func main()  {
	config := clientv3.Config{
		Endpoints:     []string{"127.0.0.1:2379"},
		DialTimeout:   10*time.Second,
	}
	cli,_ := clientv3.New(config)
	applyLease(cli)
}

func applyLease(cli *clientv3.Client)  {
	// 1、申请一个10s的租约
	lease := clientv3.NewLease(cli)
	leaseGrantResp, _ := lease.Grant(context.TODO(), 10)
	leaseId := leaseGrantResp.ID

	// 2、获得kv API子集,设置key,val和过期时间
	kv := clientv3.NewKV(cli)
	kv.Put(context.TODO(), "/school/class/students", "h", clientv3.WithLease(leaseId))

	// 3、自动续租
	keepRespChan, _ := lease.KeepAlive(context.TODO(), leaseId)

	for {
		select {
		case keepResp := <- keepRespChan:
			if keepRespChan == nil {
				fmt.Println("租约已经失效了")
			} else {    // 每秒会续租一次, 所以就会受到一次应答
				fmt.Println("收到自动续租应答:", keepResp.ID)
			}
		}
	}
}

/*
收到自动续租应答: 7587859680424737888
收到自动续租应答: 7587859680424737888
 */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 03.watch功能

package main

import (
	"fmt"
	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/mvcc/mvccpb"
	"time"
	"context"
)

func main()  {
	config := clientv3.Config{
		Endpoints:     []string{"127.0.0.1:2379"},
		DialTimeout:   10*time.Second,
	}
	cli,_ := clientv3.New(config)
	testWatch(cli)
}

func testWatch(cli *clientv3.Client)  {
	kv := clientv3.NewKV(cli)

	// 1、模拟KV的变化(每隔1s添加和删除一次key)
	go func() {
		for {
			kv.Put(context.TODO(), "/school/class/students", "helios1")
			kv.Delete(context.TODO(), "/school/class/students")
			time.Sleep(1 * time.Second)
		}
	}()

	// 2、5s后退出函数执行
	ctx, cancelFunc := context.WithCancel(context.TODO())
	time.AfterFunc(5 * time.Second, func() {
		cancelFunc()
	})

	// 3、先GET到当前的值,并监听后续变化
	getResp, _ := kv.Get(context.TODO(), "/school/class/students")
	if len(getResp.Kvs) != 0 {	 // 现在key是存在的
		fmt.Println("当前值:", string(getResp.Kvs[0].Value))
	}
	watchStartRevision := getResp.Header.Revision + 1	// 获得当前revision

	// 4、创建一个watcher
	watcher := clientv3.NewWatcher(cli)
	fmt.Println("从该版本向后监听:", watchStartRevision)
	watchRespChan := watcher.Watch(ctx, "/school/class/students", clientv3.WithRev(watchStartRevision))

	// 处理kv变化事件(watch能感知到被监听key的变化)
	for watchResp := range watchRespChan {
		for _, event := range watchResp.Events {
			switch event.Type {
			case mvccpb.PUT:
				fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
			case mvccpb.DELETE:
				fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
			}
		}
	}
}

/*
当前值: helios1
从该版本向后监听: 37
删除了 Revision: 37
修改为: helios1 Revision: 38 38
删除了 Revision: 39
修改为: helios1 Revision: 40 40
删除了 Revision: 41
修改为: helios1 Revision: 42 42
删除了 Revision: 43
修改为: helios1 Revision: 44 44
删除了 Revision: 45
 */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

# 04.通过txn实现分布式锁

  • ETCD中的txn通过简单的"If-Then-Else"实现了原子操作。

  • 实现分布式锁主要分为三个步骤:

    • 第一步:上锁,包括创建租约、自动续约、在租约时间内去抢一个key
    • 第二步:抢到锁后执行业务逻辑,没有抢到退出
    • 第三步:释放租约
package main

import (
	"fmt"
	"github.com/coreos/etcd/clientv3"
	"time"
	"context"
)

func main()  {
	config := clientv3.Config{
		Endpoints:     []string{"127.0.0.1:2379"},
		DialTimeout:   10*time.Second,
	}
	cli,_ := clientv3.New(config)
	txnTest(cli)
}

func txnTest(cli *clientv3.Client)  {
	// 1. 上锁
	// 1.1 创建一个5s的租约
	lease := clientv3.NewLease(cli)
	leaseGrantResp, _ := lease.Grant(context.TODO(), 5)
	leaseId := leaseGrantResp.ID

	// 1.2 自动续约(创建一个可取消的租约,主要是为了退出的时候能够释放)
	ctx, cancelFunc := context.WithCancel(context.TODO())

	// 3. 释放租约
	defer cancelFunc()   // 取消的租约
	defer lease.Revoke(context.TODO(), leaseId)  // 回收对应的key

	keepRespChan, _ := lease.KeepAlive(ctx, leaseId)

	// 续约应答
	go func() {
		for {
			select {
			case keepResp := <- keepRespChan:
				if keepRespChan == nil {
					fmt.Println("租约已经失效了")
					goto END
				} else {    // 每秒会续租一次, 所以就会受到一次应答
					fmt.Println("收到自动续租应答:", keepResp.ID)
				}
			}
		}
	END:
	}()

	// 1.3 在租约时间内去抢锁(etcd里面的锁就是一个key)
	kv := clientv3.NewKV(cli)

	// 创建事物
	txn := kv.Txn(context.TODO())

	//if 不存在key, then 设置它, else 抢锁失败
	txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)).
		Then(clientv3.OpPut("lock", "g", clientv3.WithLease(leaseId))).
		Else(clientv3.OpGet("lock"))

	// 提交事务
	txnResp, _ := txn.Commit()
	if !txnResp.Succeeded {
		fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
		return
	}

	// 2. 抢到锁后执行业务逻辑,没有抢到退出
	fmt.Println("处理任务")
	time.Sleep(5 * time.Second)

	// 3. 释放锁,步骤在上面的defer,当defer租约关掉的时候,对应的key被回收了
}

/*
收到自动续租应答: 7587859680424737936
处理任务
收到自动续租应答: 7587859680424737936
收到自动续租应答: 7587859680424737936
 */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

# 05.json数据操作

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/coreos/etcd/clientv3"
	"time"
)

type MySQLConfig struct {
	Host         string `mapstructure:"host"`
	User         string `mapstructure:"user"`
	Password     string `mapstructure:"password"`
}

func main()  {
	config := clientv3.Config{
		Endpoints:     []string{"127.0.0.1:2379"},
		DialTimeout:   10*time.Second,
	}
	cli,_ := clientv3.New(config)
	putData(cli)
	//getData(cli)
	//delData(cli)

}

// 1、增
func putData(cli *clientv3.Client)  {
	MySQLConf := MySQLConfig{
		Host: "127.0.0.1",
		User: "root",
		Password: "1",
	}
	// 将新数据序列化
	data, _ := json.Marshal(MySQLConf)
	_, err := cli.Put(context.Background(), "/config/MySQL", string(data))
	if err != nil {
		fmt.Println("cli.Put error")
		return
	}
}

// 2、查
func getData(cli *clientv3.Client)  {
	// 实例化一个用于操作ETCD的KV
	kv := clientv3.NewKV(cli)
	//1、获取字符串形式的value
	getResp, _ := kv.Get(context.TODO(), "/config/MySQL")
	strlConf := getResp.Kvs[0].Value  // {"Host":"127.0.0.1","User":"root","Password":"1"}
	//2、将字符串转成结构体
	var objConf MySQLConfig
	json.Unmarshal(strlConf, &objConf)
	fmt.Println(objConf.Host, objConf.User, objConf.Password)  // 127.0.0.1 root 1
}

// 3、删
func delData(cli *clientv3.Client) {
	kv := clientv3.NewKV(cli)
	delResp, _ := kv.Delete(context.TODO(), "/config/MySQL", clientv3.WithPrevKV())
	if len(delResp.PrevKvs) != 0 {
		for _, kvpair := range delResp.PrevKvs {
			fmt.Printf("delete key is: %s ---> Value: %s \n", string(kvpair.Key), string(kvpair.Value))
		}
	}
	/*
		delete key is: /config/MySQL ---> Value: {"Host":"127.0.0.1","User":"root","Password":"1"}
	*/
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
上次更新: 2024/10/15 16:27:13
02.etcd原理 ✅
04.etcd服务发现

← 02.etcd原理 ✅ 04.etcd服务发现→

最近更新
01
04.数组双指针排序_子数组
03-25
02
08.动态规划
03-25
03
06.回溯算法
03-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式