不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • MySQL

  • Redis

  • Elasticsearch

  • Kafka

  • Etcd

    • 01.etcd安装使用
    • 02.etcd原理 ✅
    • 03.go操作etcd
    • 04.etcd服务发现
    • 05.grpc负载均衡
    • 06.grpc加权轮训
    • 07.分布式锁
      • 01.分布式锁设计原理
        • 1.1 排他性原理
        • 1.2 避免死锁原理
        • 1.3 分布式锁应用
    • 08.事务
  • MongoDB

  • TiDB

  • RabbitMQ

  • 数据库
  • Etcd
xiaonaiqiang
2022-01-10
目录

07.分布式锁

# 01.分布式锁设计原理

  • 参考博客 (opens new window)

# 1.1 排他性原理

  • 排他性:任意时刻,只能有一个机器的一个线程能获取到锁。

  • 通过在etcd中存入key值来实现上锁,删除key实现解锁,参考下面伪代码

func Lock(key string, cli *clientv3.Client) error {
    //获取key,判断是否存在锁
	resp, err := cli.Get(context.Background(), key)
	if err != nil {
		return err
	}
	//锁存在,返回上锁失败
	if len(resp.Kvs) > 0 {
		return errors.New("lock fail")
	}
	_, err = cli.Put(context.Background(), key, "lock")
	if err != nil {
		return err
	}
	return nil
}
//删除key,解锁
func UnLock(key string, cli *clientv3.Client) error {
	_, err := cli.Delete(context.Background(), key)
	return err
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  • 当发现已上锁时,直接返回lock fail,也可以处理成等待解锁,解锁后竞争锁。
//等待key删除后再竞争锁
func waitDelete(key string, cli *clientv3.Client) {
	rch := cli.Watch(context.Background(), key)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.DELETE: //删除
				return
			}
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
  • 容错性:只要分布式锁服务集群节点大部分存活,client就可以进行加锁解锁操作。
    • etcd基于Raft算法,确保集群中数据一致性。

# 1.2 避免死锁原理

  • 避免死锁:分布式锁一定能得到释放,即使client在释放之前崩溃。

  • 上面分布式锁设计有缺陷,假如client获取到锁后程序直接崩了,没有解锁,那其他线程也无法拿到锁,导致死锁出现。

  • 通过给key设定leases来避免死锁,但是leases过期时间设多长呢?

  • 假如设了30秒,而上锁后的操作比30秒大,会导致以下问题

    • 操作没完成,锁被别人占用了,不安全
    • 操作完成后,进行解锁,这时候把别人占用的锁解开了
  • 解决方案:给key添加过期时间后,以Keep leases alive方式延续leases当client正常持有锁时,锁不会过期;

  • 当client程序崩掉后,程序不能执行Keep leases alive,从而让锁过期,避免死锁。

//1、上锁
func Lock(key string, cli *clientv3.Client) error {
    //获取key,判断是否存在锁
	resp, err := cli.Get(context.Background(), key)
	if err != nil {
		return err
	}
	//锁存在,等待解锁后再竞争锁
	if len(resp.Kvs) > 0 {
		waitDelete(key, cli)
		return Lock(key)
	}
  //设置key过期时间
	resp, err := cli.Grant(context.TODO(), 30)
	if err != nil {
		return err
	}
	//设置key并绑定过期时间
	_, err = cli.Put(context.Background(), key, "lock", clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	//延续key的过期时间
	_, err = cli.KeepAlive(context.TODO(), resp.ID)
	if err != nil {
		return err
	}
	return nil
}

//2、通过让key值过期来解锁
func UnLock(resp *clientv3.LeaseGrantResponse, cli *clientv3.Client) error {
	_, err := cli.Revoke(context.TODO(), resp.ID)
	return err
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 1.3 分布式锁应用

  • 经过以上步骤,我们初步完成了分布式锁设计。
  • 其实官方已经实现了分布式锁,它大致原理和上述有出入,接下来我们看下如何使用官方的分布式锁。
package main
import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/clientv3/concurrency"
)

func main()  {
	// 1、链接etcd
	config := clientv3.Config{
		Endpoints:     []string{"127.0.0.1:2379"},
		DialTimeout:   10*time.Second,
	}
	cli,_ := clientv3.New(config)
	defer cli.Close()

	go runS1(cli)
	go runS2(cli)
	time.Sleep(time.Second * 20)
}


func runS1(cli *clientv3.Client)  {
	//1、为锁竞争创建单独的会话
	s1, _ := concurrency.NewSession(cli)
	defer s1.Close()
	m1 := concurrency.NewMutex(s1, "/my-lock/")  // 创建锁

	//2、获取锁
	fmt.Println("s1尝试获取锁")
	if err := m1.Lock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Println("s1获取锁成功")
	time.Sleep(time.Second * 5)  // 模拟执行业务逻辑

	//3、释放锁
	if err := m1.Unlock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Println("s1执行完成释放锁")
}

func runS2(cli *clientv3.Client)  {
	//1、为锁竞争创建单独的会话
	s2, _ := concurrency.NewSession(cli)
	defer s2.Close()
	m2 := concurrency.NewMutex(s2, "/my-lock/")
	fmt.Println("---> s2尝试获取锁")

	//2、获取锁
	if err := m2.Lock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Println("---> s2获取到锁,开始执行")

	//3、释放锁
	if err := m2.Unlock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Println("s2执行完成释放锁")
}

/*
	s1尝试获取锁
	---> s2尝试获取锁
	s1获取锁成功
	s1执行完成释放锁
	---> s2获取到锁,开始执行
	s2执行完成释放锁
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
上次更新: 2024/4/1 16:53:26
06.grpc加权轮训
08.事务

← 06.grpc加权轮训 08.事务→

最近更新
01
04.数组双指针排序_子数组
03-25
02
08.动态规划
03-25
03
06.回溯算法
03-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式