10.redis锁代码
# 01.redis分布式锁
https://www.cnblogs.com/xiaonq/p/12328934.html
# 1.1 redis事物介绍
# 1、事物介绍
1.redis事物是可以一次执行多个命令,本质是一组命令的集合。
2.一个事务中的所有命令都会序列化,按顺序串行化的执行而不会被其他命令插入
**作用:**一个队列中,一次性、顺序性、排他性的执行一系列命令
# Multi 命令用于标记一个事务块的开始事务块内的多条命令会按照先后顺序被放进一个队列当中,最后由 EXEC 命令原子性( atomic )地执行
> multi # 开始一个redis事物
incr books
incr books
> exec # 执行事物
> discard # 丢弃事物
2
3
4
5
6
- Redis事务提供以下几个命令
MULTI # 标记一个事务块的开始
EXEC # 执行所有事务块内的命令。
DISCARD # 取消事务,放弃执行事务块内的所有命令。
WATCH # 监视一个或多个key,如果在事务执行前这些key的值发生了改变,事务将被打断
2
3
4
# 2、如何解决 Redis 事务的缺陷
Redis 从 2.6 版本开始支持执行 Lua 脚本,它的功能和事务非常类似
我们可以利用 Lua 脚本来批量执行多条 Redis 命令
一段 Lua 脚本可以视作一条命令执行
一段 Lua 脚本执行过程中不会有其他脚本或 Redis 命令同时执行,保证了操作不会被其他指令插入或打扰
不过,如果 Lua 脚本运行时出错并中途结束,出错之后的命令是不会被执行的
并且,出错之前执行的命令是无法被撤销的,无法实现类似关系型数据库执行失败可以回滚的那种原子性效果
因此, 严格来说的话,通过 Lua 脚本来批量执行 Redis 命令实际也是不完全满足原子性的
如果想要让 Lua 脚本中的命令全部执行,必须保证语句语法和命令都是对的
# 1.2 watch乐观锁
实质:
WATCH 只会在数据被其他客户端抢先修改了的情况下通知执行命令的这个客户端
(通过 WatchError 异常)但不会阻止其他客户端对数据的修改
1)原理
1.当用户购买时,通过 WATCH 监听用户库存,如果库存在
watch监听后发生改变
,就会捕获异常而放弃对库存减一操作
2.如果库存没有监听到变化并且数量大于1,则库存数量减一,并执行任务
2)弊端
- 1.Redis 在尝试完成一个事务的时候,可能会因为事务的失败而重复尝试重新执行
- 2.保证商品的库存量正确是一件很重要的事情,但是单纯的使用 WATCH 这样的机制对服务器压力过大
使用reids的 watch + multi 指令实现超卖
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
def sale(rs):
while True:
with rs.pipeline() as p:
try:
p.watch('apple') # 监听key值为apple的数据数量改变
count = int(rs.get('apple'))
print('拿取到了苹果的数量: %d' % count)
p.multi() # 事务开始
if count> 0 : # 如果此时还有库存
p.set('apple', count - 1)
p.execute() # 执行事务
p.unwatch()
break # 当库存成功减一或没有库存时跳出执行循环
except Exception as e: # 当出现watch监听值出现修改时,WatchError异常抛出
print('[Error]: %s' % e)
continue # 继续尝试执行
rs = redis.Redis(host='127.0.0.1', port=6379) # 连接redis
rs.set('apple',1000) # 首先在redis中设置某商品apple 对应数量value值为1000
sale(rs)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 1.3 setnx分布式锁
1.分布式锁本质是占一个坑,当别的进程也要来占坑时发现已经被占,就会放弃或者稍后重试
2.占坑一般使用 setnx(set if not exists)指令,只允许一个客户端占坑
3.先来先占,用完了在调用del指令释放坑
> setnx lock:codehole true
.... do something critical ....
> del lock:codehole
2
3
# 1、分布式锁解决超买
- setnx+watch+multi解决超卖问题
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import uuid
import time
# 1.初始化连接函数
def get_conn(host,port=6379):
rs = redis.Redis(host=host, port=port)
return rs
# 2. 构建redis锁
def acquire_lock(rs, lock_name, expire_time=10):
'''
rs: 连接对象
lock_name: 锁标识
acquire_time: 过期超时时间
return -> False 获锁失败 or True 获锁成功
'''
identifier = str(uuid.uuid4())
end = time.time() + expire_time
while time.time() < end:
# 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
if rs.setnx(lock_name, identifier): # 尝试取得锁
return identifier
time.sleep(0.001)
return False
# 3. 释放锁
def release_lock(rs, lockname, identifier):
'''
rs: 连接对象
lockname: 锁标识
identifier: 锁的value值,用来校验
'''
pipe = rs.pipeline(True)
try:
pipe.watch(lockname)
if rs.get(lockname).decode() == identifier: # 防止其他进程同名锁被误删
pipe.multi() # 开启事务
pipe.delete(lockname)
pipe.execute()
return True # 删除锁
pipe.unwatch() # 取消事务
except Exception as e:
pass
return False # 删除失败
'''在业务函数中使用上面的锁'''
def sale(rs):
start = time.time() # 程序启动时间
with rs.pipeline() as p:
'''
通过管道方式进行连接
多条命令执行结束,一次性获取结果
'''
while True:
lock = acquire_lock(rs, 'lock')
if not lock: # 持锁失败
continue
try:
count = int(rs.get('apple')) # 取量
p.set('apple', count-1) # 减量
p.execute()
print('当前库存量: %s' % count)
break
finally:
release_lock(rs, 'lock', lock)
print('[time]: %.2f' % (time.time() - start))
rs = redis.Redis(host='127.0.0.1', port=6379) # 连接redis
rs.set('apple',1000) # 首先在redis中设置某商品apple 对应数量value值为1000
sale(rs)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# 2、优化
- 给分布式锁加超时时间防止死锁
def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10):
'''
rs: 连接对象
lock_name: 锁标识
acquire_time: 过期超时时间
locked_time: 锁的有效时间
return -> False 获锁失败 or True 获锁成功
'''
identifier = str(uuid.uuid4())
end = time.time() + expire_time
while time.time() < end:
# 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
if rs.setnx(lock_name, identifier): # 尝试取得锁
# print('锁已设置: %s' % identifier)
rs.expire(lock_name, locked_time)
return identifier
time.sleep(.001)
return False
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 02.redis分布式锁不可靠
# 1、redis分布式锁不可靠产生的原因
锁超时: 线程 A 获取了锁,并设置了超时时间,由于执行时间长,导致锁的超时时间到期
锁释放:
线程 A 的锁超时,系统自动将锁释放
,但是此时线程 A 的业务逻辑可能还没有执行完成其他线程获取锁: 在锁被释放的瞬间,其他
线程 B 获取了锁
线程 A继续执行: 线程 A 完成后释放锁,由于锁已经被线程 B 获取,
线程 A 误释放了线程 B 持有的锁
# 2、解决误释放其他线程锁的方法
- 加锁时指定锁标识
- 是否锁时判断当前持有的锁是否是自己的锁标识
import redis
import uuid
import time
class DistributedLock:
def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0, lock_key='distributed_lock'):
self.redis_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db)
self.lock_key = lock_key
self.lock_value = None
def acquire_lock(self, timeout=10):
identifier = str(uuid.uuid4()) # 锁标识
end_time = time.time() + timeout # 过期超时时间
while time.time() < end_time:
# 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
if self.redis_conn.set(self.lock_key, identifier, nx=True, ex=timeout):
self.lock_value = identifier
return True
time.sleep(0.1)
return False
def release_lock(self):
# redis.call("get", KEYS[1]) 获取锁当前的标识符
# ARGV[1]是当前标识符,如果相同则表示这是持有锁的正确客户端
# 如果标识符不同,返回 0 表示释放锁失败,因为这个客户端不是持有锁的正确客户端
# 使用 Lua 脚本可以确保上述操作是原子性的,从而避免了竞态条件
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
try:
self.redis_conn.eval(lua_script, 1, self.lock_key, self.lock_value)
except redis.exceptions.RedisError:
pass
if __name__ == "__main__":
# 使用示例
lock = DistributedLock()
try:
if lock.acquire_lock():
# 执行需要加锁的操作
print("Lock acquired successfully")
time.sleep(5)
else:
print("Failed to acquire lock within timeout")
except Exception as e:
print(f"An error occurred: {e}")
finally:
lock.release_lock()
print("Lock released")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 3、抢购业务中如何解决此问题
进入业务后业务先将库存减1存入缓存(就是先假设商品能卖成功)
分布式锁不仅判断锁也判断库存,如果库存不够也不准进入
.举例:
- 比如库存10个,每个请求进来后都将缓存库存-1,这时12个请求过来,第11、12个是进不来的
- 但这个时候卖失败两个,缓存库存+2
- 后面的第13、14个请求就能进来了。要注意的是库存+-要是安全的
主要利用 redis 在哈希表中的字段上执行增加或减少操作的原子性来实现
# 创建一个名为 key:info 的哈希表,并设置其中的 stock 字段的初始值为 10
HSET key:info count 10
# 对名为 key:info 的哈希表中的 count 字段递减 1操作
HINCRBY key:info count -1
2
3
4