03.go操作etcd
# 01.etcd ClientV3基础
# 1.1 安装
- 需要在项目的go.mod文件中加入下面两个
replace (
github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.5
github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.3.2
google.golang.org/grpc => google.golang.org/grpc v1.26.0
)
require github.com/coreos/etcd v3.3.20+incompatible
1
2
3
4
5
6
7
2
3
4
5
6
7
解决下面报错
github.com/coreos/bbolt: github.com/coreos/bbolt@v1.3.6: parsing go.mod:
module declares its path as: go.etcd.io/bbolt
but was required as: github.com/coreos/bbolt
1
2
3
2
3
# 1.2 增删查
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10*time.Second,
}
cli,_ := clientv3.New(config)
putData(cli)
//getData(cli)
//delData(cli)
}
// 1、增
func putData(cli *clientv3.Client) {
// 实例化一个用于操作ETCD的KV
kv := clientv3.NewKV(cli)
putResp, _ := kv.Put(context.TODO(), "/school/class/students", "helios0", clientv3.WithPrevKV())
fmt.Println(putResp.Header) // 打印返回的头部信息
fmt.Println(putResp.PrevKv) // 打印历史版本信息(如果有)
/*
Header: cluster_id:14841639068965178418 member_id:10276657743932975437 revision:25 raft_term:4
PrevKv: key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"helios0"
*/
}
// 2、查
func getData(cli *clientv3.Client) {
// 实例化一个用于操作ETCD的KV
kv := clientv3.NewKV(cli)
getResp, _ := kv.Get(context.TODO(), "/school/class/students")
fmt.Printf("Key:%s --> Value:%s \n", getResp.Kvs[0].Key, getResp.Kvs[0].Value)
/*
Key:/school/class/students --> Value:helios0
*/
}
// 3、删
func delData(cli *clientv3.Client) {
kv := clientv3.NewKV(cli)
kv.Put(context.TODO(), "/school/class/students", "helios1")
delResp, _ := kv.Delete(context.TODO(), "/school/class/students", clientv3.WithPrevKV())
if len(delResp.PrevKvs) != 0 {
for _, kvpair := range delResp.PrevKvs {
fmt.Printf("delete key is: %s ---> Value: %s \n", string(kvpair.Key), string(kvpair.Value))
}
}
/*
delete key is: /school/class/students ---> Value: helios1
*/
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 1.3 op代替Get/Put/Delete
package main
import (
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
"context"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10*time.Second,
}
cli,_ := clientv3.New(config)
// 实例化一个用于操作ETCD的KV
kv := clientv3.NewKV(cli)
// 1、put写入
putOp := clientv3.OpPut("/school/class/students", "helios")
opResp, _ := kv.Do(context.TODO(), putOp);
fmt.Println("写入Revision:", opResp.Put().Header.Revision)
// 2、get获取
getOp := clientv3.OpGet("/school/class/students")
opResp, _ = kv.Do(context.TODO(), getOp)
fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision)
fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
}
/*
写入Revision: 76
数据Revision: 76
数据value: helios
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 02.租约
# 2.1 申请租约定时查看过期
申请一个10s的租约,定时查看是否过期
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10*time.Second,
}
cli,_ := clientv3.New(config)
applyLease(cli)
}
func applyLease(cli *clientv3.Client) {
// 1、申请一个10s的租约
lease := clientv3.NewLease(cli)
leaseGrantResp, _ := lease.Grant(context.TODO(), 10)
leaseId := leaseGrantResp.ID
// 2、获得kv API子集,设置key,val和过期时间
kv := clientv3.NewKV(cli)
kv.Put(context.TODO(), "/school/class/students", "h", clientv3.WithLease(leaseId))
// 3、每隔两秒检查一次是否过期
for {
getResp, _ := kv.Get(context.TODO(), "/school/class/students");
if getResp.Count == 0 {
fmt.Println("kv过期了")
break
}
fmt.Println("还没过期:", getResp.Kvs)
time.Sleep(2 * time.Second)
}
}
/*
还没过期: [key:"/school/class/students" create_revision:29 mod_revision:29 version:1 value:"h" lease:7587859680424737869 ]
还没过期: [key:"/school/class/students" create_revision:29 mod_revision:29 version:1 value:"h" lease:7587859680424737869 ]
kv过期了
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 2.2 自动续租
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10*time.Second,
}
cli,_ := clientv3.New(config)
applyLease(cli)
}
func applyLease(cli *clientv3.Client) {
// 1、申请一个10s的租约
lease := clientv3.NewLease(cli)
leaseGrantResp, _ := lease.Grant(context.TODO(), 10)
leaseId := leaseGrantResp.ID
// 2、获得kv API子集,设置key,val和过期时间
kv := clientv3.NewKV(cli)
kv.Put(context.TODO(), "/school/class/students", "h", clientv3.WithLease(leaseId))
// 3、自动续租
keepRespChan, _ := lease.KeepAlive(context.TODO(), leaseId)
for {
select {
case keepResp := <- keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
}
/*
收到自动续租应答: 7587859680424737888
收到自动续租应答: 7587859680424737888
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 03.watch功能
package main
import (
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"time"
"context"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10*time.Second,
}
cli,_ := clientv3.New(config)
testWatch(cli)
}
func testWatch(cli *clientv3.Client) {
kv := clientv3.NewKV(cli)
// 1、模拟KV的变化(每隔1s添加和删除一次key)
go func() {
for {
kv.Put(context.TODO(), "/school/class/students", "helios1")
kv.Delete(context.TODO(), "/school/class/students")
time.Sleep(1 * time.Second)
}
}()
// 2、5s后退出函数执行
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5 * time.Second, func() {
cancelFunc()
})
// 3、先GET到当前的值,并监听后续变化
getResp, _ := kv.Get(context.TODO(), "/school/class/students")
if len(getResp.Kvs) != 0 { // 现在key是存在的
fmt.Println("当前值:", string(getResp.Kvs[0].Value))
}
watchStartRevision := getResp.Header.Revision + 1 // 获得当前revision
// 4、创建一个watcher
watcher := clientv3.NewWatcher(cli)
fmt.Println("从该版本向后监听:", watchStartRevision)
watchRespChan := watcher.Watch(ctx, "/school/class/students", clientv3.WithRev(watchStartRevision))
// 处理kv变化事件(watch能感知到被监听key的变化)
for watchResp := range watchRespChan {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
}
}
}
}
/*
当前值: helios1
从该版本向后监听: 37
删除了 Revision: 37
修改为: helios1 Revision: 38 38
删除了 Revision: 39
修改为: helios1 Revision: 40 40
删除了 Revision: 41
修改为: helios1 Revision: 42 42
删除了 Revision: 43
修改为: helios1 Revision: 44 44
删除了 Revision: 45
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 04.通过txn实现分布式锁
ETCD中的txn通过简单的"If-Then-Else"实现了原子操作。
实现分布式锁主要分为三个步骤:
- 第一步:上锁,包括创建租约、自动续约、在租约时间内去抢一个key
- 第二步:抢到锁后执行业务逻辑,没有抢到退出
- 第三步:释放租约
package main
import (
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
"context"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10*time.Second,
}
cli,_ := clientv3.New(config)
txnTest(cli)
}
func txnTest(cli *clientv3.Client) {
// 1. 上锁
// 1.1 创建一个5s的租约
lease := clientv3.NewLease(cli)
leaseGrantResp, _ := lease.Grant(context.TODO(), 5)
leaseId := leaseGrantResp.ID
// 1.2 自动续约(创建一个可取消的租约,主要是为了退出的时候能够释放)
ctx, cancelFunc := context.WithCancel(context.TODO())
// 3. 释放租约
defer cancelFunc() // 取消的租约
defer lease.Revoke(context.TODO(), leaseId) // 回收对应的key
keepRespChan, _ := lease.KeepAlive(ctx, leaseId)
// 续约应答
go func() {
for {
select {
case keepResp := <- keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()
// 1.3 在租约时间内去抢锁(etcd里面的锁就是一个key)
kv := clientv3.NewKV(cli)
// 创建事物
txn := kv.Txn(context.TODO())
//if 不存在key, then 设置它, else 抢锁失败
txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)).
Then(clientv3.OpPut("lock", "g", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("lock"))
// 提交事务
txnResp, _ := txn.Commit()
if !txnResp.Succeeded {
fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2. 抢到锁后执行业务逻辑,没有抢到退出
fmt.Println("处理任务")
time.Sleep(5 * time.Second)
// 3. 释放锁,步骤在上面的defer,当defer租约关掉的时候,对应的key被回收了
}
/*
收到自动续租应答: 7587859680424737936
处理任务
收到自动续租应答: 7587859680424737936
收到自动续租应答: 7587859680424737936
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# 05.json数据操作
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)
type MySQLConfig struct {
Host string `mapstructure:"host"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
}
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10*time.Second,
}
cli,_ := clientv3.New(config)
putData(cli)
//getData(cli)
//delData(cli)
}
// 1、增
func putData(cli *clientv3.Client) {
MySQLConf := MySQLConfig{
Host: "127.0.0.1",
User: "root",
Password: "1",
}
// 将新数据序列化
data, _ := json.Marshal(MySQLConf)
_, err := cli.Put(context.Background(), "/config/MySQL", string(data))
if err != nil {
fmt.Println("cli.Put error")
return
}
}
// 2、查
func getData(cli *clientv3.Client) {
// 实例化一个用于操作ETCD的KV
kv := clientv3.NewKV(cli)
//1、获取字符串形式的value
getResp, _ := kv.Get(context.TODO(), "/config/MySQL")
strlConf := getResp.Kvs[0].Value // {"Host":"127.0.0.1","User":"root","Password":"1"}
//2、将字符串转成结构体
var objConf MySQLConfig
json.Unmarshal(strlConf, &objConf)
fmt.Println(objConf.Host, objConf.User, objConf.Password) // 127.0.0.1 root 1
}
// 3、删
func delData(cli *clientv3.Client) {
kv := clientv3.NewKV(cli)
delResp, _ := kv.Delete(context.TODO(), "/config/MySQL", clientv3.WithPrevKV())
if len(delResp.PrevKvs) != 0 {
for _, kvpair := range delResp.PrevKvs {
fmt.Printf("delete key is: %s ---> Value: %s \n", string(kvpair.Key), string(kvpair.Value))
}
}
/*
delete key is: /config/MySQL ---> Value: {"Host":"127.0.0.1","User":"root","Password":"1"}
*/
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
上次更新: 2024/10/15 16:27:13