06.线程
# 01.线程
# 1.1 什么是线程
- 1)线程是操作系统调度的最小单位
- 2)线程是进程正真的执行者,是一些指令的集合(进程资源的拥有者)
- 3)同一个进程下的多个
线程共享内存空间
,数据直接访问(数据共享) - 4)为了保证数据安全,必须使用
线程锁
说明:
下面利用for循环同时启动50个线程并行执行,执行时间是3秒而不是所有线程执行时间的总和
import threading
import time
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3)
for i in range(50):
t = threading.Thread(target=sayhi,args=('t-%s'%i,))
t.start()
2
3
4
5
6
7
8
9
# 1.2 GIL锁和线程锁
- GIL全局解释器锁
- 在python全局解释器下,保证同一时间只有一个线程运行
- 防止多个线程都修改数据
- 实质:
- 每个线程在执行时候都需要先获取GIL,保证同一时刻只有一个线程可以执行代码
- 即同一时刻只有一个线程使用CPU,也就是说多线程并不是真正意义上的同时执行
- 线程锁(互斥锁)
- GIL锁只能保证同一时间只能有一个线程对某个资源操作,但当上一个线程还未执行完毕时可能就会释放GIL,其他线程就可以操作了
- 线程锁本质把线程中的数据加了一把互斥锁
- 加上线程锁之后所有其他线程,读都不能读这个数据
- 有了GIL全局解释器锁为什么还需要线程锁
- 因为cpu是分时使用的
在有GIL的情况下执行 count = count + 1 会出错原因解析,用线程锁解决方法
# 1)第一步:count = 0 count初始值为0
# 2)第二步:线程1要执行对count加1的操作首先申请GIL全局解释器锁
# 3)第三步:调用操作系统原生线程在操作系统中执行
# 4)第四步:count加1还未执行完毕,时间到了被要求释放GIL
# 5)第五步:线程1释放了GIL后线程2此时也要对count进行操作,此时线程1还未执行完,所以count还是0
# 6)第六步:线程2此时拿到count = 0后也要对count进行加1操作,假如线程2执行很快,一次就完成了
# count加1的操作,那么count此时就从0变成了1
# 7)第七步:线程2执行完加1后就赋值count=1并释放GIL
# 8)第八步:线程2执行完后cpu又交给了线程1,线程1根据上下文继续执行count加1操作,先拿到GIL
# 锁,完成加1操作,由于线程1先拿到的数据count=0,执行完加1后结果还是1
# 9)第九步:线程1将count=1在次赋值给count并释放GIL锁,此时连个线程都对数据加1,但是值最终是1
2
3
4
5
6
7
8
9
10
11
死锁定义
- 两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去
# 1.3 为什么python中必需要GIL
- CPython的实现中,GIL难以有效的去除的原因之一就是为了迁就引用计数。
- CPython采用的引用计数是最朴素的实现方式:局部变量、全局变量和对象字段都参与到引用计数中
- 当一个Python线程在运行时,它会获取GIL以保证它对对象引用计数的更新是全局同步的。
- 由于引用计数的实现细节被CPython的C API暴露到了外部的C扩展
- 要保持这些扩展完全兼容,就得维持或模拟CPython引用计数的实现,这就麻烦了…
- 举例说明
- 当两个线程同时提高同一个对象的引用计数时,(如果没有 GIL 锁)那么引用计数只会被提高了 1 次而不是 2 次。
- 大家注意我这两段应用中的
指针
和引用计数
。 - 其中指针是 C 语言的概念,Python 没有指针,引用计数是 Python 底层的概念。
- 你平时写的 Python 代码,引用计数是在你调用变量的时候自动增加的,不需要你去手动加 1.
# 1.4 为什么java中虚拟机没有GIL
# 1.4.1 java中没有GIL,如何做的
JVM(至少是HotSpot)确实有一个与“GIL”类似的概念,它的锁粒度要细得多,其中大部分来自更高级的GC热点。
例如,查看HotSpot代码,一旦到了
安全点(SafePoint)
,整个VM就停止了,就像python VM停止在GIL上一样。在Java世界中,这种VM暂停事件被称为“停止世界”
在这种情况下,只有绑定到特定条件的本地代码才是自由运行的,VM的其余部分已经停止。
# 1.4.2 Stop-the-world以及安全点
引用计数漏报则比较麻烦,因为垃圾回收器可能回收了仍被引用的对象,怎么解决这个问题呢?
Stop-the-world
- 在 Java 虚拟机里,传统的垃圾回收算法采用的是一种简单粗暴的方式,那便是 Stop-the-world
- 停止其他非垃圾回收线程的工作,直到完成垃圾回收。
- 这也就造成了垃圾回收所谓的暂停时间(GC pause)。
安全点(safepoint)
- Java 虚拟机中的 Stop-the-world 是通过安全点(safepoint)机制来实现的。
- 当 Java 虚拟机收到 Stop-the-world 请求,它便会等待所有的线程都到达安全点,才会停止所有线程
- 并允许请求Stop-the-world的那个线程进行独占的工作。
当然也并非蛮横的强制停止,毕竟多线程情况下,啥事都可能发生。
安全点的初始目的并不是让其他线程停下,而是找到一个稳定的执行状态。
在这个执行状态下,Java 虚拟机的堆栈不会发生变化。
# 1.4.3 java解决引用计数问题-可达性分析
- 引用计数法,思路很简单,但是如果出现循环引用,即:A 引用 B,B 又引用 A,这种情况下就不好办了
- 所以 JVM 中使用了另一种称为“可达性分析”的判断方法
- 关键不是在于 A、B 之间是否有引用,而是 A、B 是否可以一直向上追溯到 GC Roots。
- 如果与 GC Roots 没有关联,则会被回收,否则将继续存活。
# 02.join()和setDaemon()
# 2.1 join()
实现所有线程都执行结束后再执行主线程
import threading
import time
start_time = time.time()
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(30)
t_objs = [] #将进程实例对象存储在这个列表中
for i in range(50):
t = threading.Thread(target=sayhi,args=('t-%s'%i,))
t.start() #启动一个线程,程序不会阻塞
t_objs.append(t)
print(threading.active_count()) #打印当前活跃进程数量
for t in t_objs: #利用for循环等待上面50个进程全部结束
t.join() #阻塞某个程序
print(threading.current_thread()) #打印执行这个命令进程
print("----------------all threads has finished.....")
print(threading.active_count())
print('cost time:',time.time() - start_time)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 2.2 setDaemon()
守护线程,主线程退出时,需要子线程随主线程退出
import threading
import time
start_time = time.time()
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3)
for i in range(50):
t = threading.Thread(target=sayhi,args=('t-%s'%i,))
t.setDaemon(True) #把当前线程变成守护线程,必须在t.start()前设置
t.start() #启动一个线程,程序不会阻塞
print('cost time:',time.time() - start_time)
2
3
4
5
6
7
8
9
10
11
12
# 03.Python中使用过的线程模块?
# 3.1 threading
- Python提供了几个用于多线程编程的模块,包括thread、threading和Queue等。
- thread和threading模块允许程序员创建和管理线程。
import threading
import time
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3)
for i in range(50):
t = threading.Thread(target=sayhi,args=('t-%s'%i,))
t.start()
2
3
4
5
6
7
8
9
10
# 3.2 concurrent.futures
-
- 1、Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码
- 2、但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。
- 3、但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,
- 4、实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。
2、Executor和Future
1. Executor
concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。
但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用
我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
2. Future
- Future你可以把它理解为一个在未来完成的操作,这是异步编程的基础,
- 传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情
- 而Future的引入帮助我们在等待的这段时间可以完成其他的操作。
concurrent.futures.ThreadPoolExecutor 抓取网页
import requests
from concurrent.futures import ThreadPoolExecutor
def fetch_request(url):
result = requests.get(url)
print(result.text)
url_list = [
'https://www.baidu.com',
'https://www.google.com/', #google页面会卡住,知道页面超时后这个进程才结束
'http://dig.chouti.com/', #chouti页面内容会直接返回,不会等待Google页面的返回
]
pool = ThreadPoolExecutor(10) # 创建一个线程池,最多开10个线程
for url in url_list:
pool.submit(fetch_request,url) # 去线程池中获取一个线程,线程去执行fetch_request方法
pool.shutdown(True) # 主线程自己关闭,让子线程自己拿任务执行
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 04.线程锁
# 4.1 线程锁说明
- GIL锁只能保证同一时间只能有一个线程对某个资源操作
- 但当上一个线程还未执行完毕时可能就会释放GIL,其他线程就可以操作了
- 线程锁本质把线程中的数据加了一把互斥锁
- 加上线程锁之后所有其他线程,读都不能读这个数据
- 有了GIL全局解释器锁为什么还需要线程锁
- 因为cpu是分时使用的
# 4.2 线程锁的使用
第一步:lock = threading.Lock() #定义一把锁
第二步:lock.acquire() #对数据操作前加锁防止数据被另一线程操作
第三步:lock.release() #对数据操作完成后释放锁
2
3
import time
import threading
lock = threading.Lock() #1 生成全局锁
def addNum():
global num #2 在每个线程中都获取这个全局变量
print('--get num:',num )
time.sleep(1)
lock.acquire() #3 修改数据前加锁
num -= 1 #4 对此公共变量进行-1操作
lock.release() #5 修改后释放
2
3
4
5
6
7
8
9
10
# 05.死锁
# 5.1 死锁定义
- 两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象
- 若无外力作用,它们都将无法推进下去。
# 5.2 死锁举例
- 1.启动5个线程,执行run方法,假如thread1首先抢到了A锁,此时thread1没有释放A锁,紧接着执行代码mutexB.acquire(),抢到了B锁, 在抢B锁时候,没有其他线程与thread1争抢,因为A锁没有释放,其他线程只能等待
- 2.thread1执行完func1函数,然后执行func2函数,此时thread1拿到B锁,然后执行time.sleep(2),此时不会释放B锁
- 3.在thread1执行func2的同时thread2开始执行func1获取到了A锁,然后继续要获取B锁
- 4.不幸的是B锁还被thread1占用,thread1占用B锁时还需要同时获取A锁才能向下执行,但是此时发现A锁已经被thread2暂用,这样就死锁了
# 5.3 产生死锁代码
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('\033[41m%s 拿到A锁\033[0m' %self.name)
mutexB.acquire()
print('\033[42m%s 拿到B锁\033[0m' %self.name)
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print('\033[43m%s 拿到B锁\033[0m' %self.name)
time.sleep(2)
mutexA.acquire()
print('\033[44m%s 拿到A锁\033[0m' %self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(2):
t=MyThread()
t.start()
# 运行结果:输出下面结果后程序卡死,不再向下进行了
# Thread-1 拿到A锁
# Thread-1 拿到B锁
# Thread-1 拿到B锁
# Thread-2 拿到A锁
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 5.4 递归锁
- lock = threading.RLock() 解决死锁问题
- 1.递归锁的作用是同一线程中多次请求同一资源,但是不会参数死锁。
- 2.这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。
- 3.直到一个线程所有的acquire都被release,其他的线程才能获得资源。
from threading import Thread,Lock,RLock
import time
mutexA=mutexB=RLock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print('%s 拿到A锁' %self.name)
mutexB.acquire()
print('%s 拿到B锁' %self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print('%s 拿到B锁' % self.name)
time.sleep(0.1)
mutexA.acquire()
print('%s 拿到A锁' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(5):
t=MyThread()
t.start()
# 下面是运行结果:不会产生死锁
# Thread-1 拿到A锁
# Thread-1 拿到B锁
# Thread-1 拿到B锁
# Thread-1 拿到A锁
# Thread-2 拿到A锁
# Thread-2 拿到B锁
# Thread-2 拿到B锁
# Thread-2 拿到A锁
# Thread-4 拿到A锁
# Thread-4 拿到B锁
# Thread-4 拿到B锁
# Thread-4 拿到A锁
# Thread-3 拿到A锁
# Thread-3 拿到B锁
# Thread-3 拿到B锁
# Thread-3 拿到A锁
# Thread-5 拿到A锁
# Thread-5 拿到B锁
# Thread-5 拿到B锁
# Thread-5 拿到A锁
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 06.Semaphore(信号量)
- 1.互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据
- 2.比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去
- 3.作用就是同一时刻允许运行的线程数量
import threading,time
def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s\n" %n)
semaphore.release()
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
for i in range(22):
t = threading.Thread(target=run,args=(i,))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('----all threads done---')
# 代码结果说明:这里可以清晰看到运行时0-4是同时运行的没有顺序,而且是前五个,
# 表示再semaphore这个信号量的定义下程序同时仅能执行5个线程
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 07.events事件
# 7.1 event事件
1.event.set() : 设置标志位
2.event.clear() : 清除标志位
3.event.wait() : 等待标志被设定
4.event.is_set() : 判断标志位是否被设定
# 7.2 event(红绿灯例子)
import time,threading
event = threading.Event()
#第一:写一个红绿灯的死循环
def lighter():
count = 0
event.set() #1先设置为绿灯
while True:
if count > 5 and count <10: #2改成红灯
event.clear() #3把标志位清了
print("red light is on.....")
elif count > 10:
event.set() #4再设置标志位,变绿灯
count = 0
else:
print("green light is on.....")
time.sleep(1)
count += 1
#第二:写一个车的死循环
def car(name):
while True:
if event.is_set(): #设置了标志位代表绿灯
print("[%s] is running"%name)
time.sleep(1)
else:
print('[%s] sees red light, waiting......'%name)
event.wait()
print('[%s] green light is on,start going.....'%name)
light = threading.Thread(target=lighter,)
light.start()
car1 = threading.Thread(target=car,args=("Tesla",))
car1.start()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34