07.GO动态线程池
# 01.GO动态线程池
# 1、动态线程池概述
任务队列
(taskQueue
):用于存储待处理任务工作线程
(worker
):负责从任务队列中取出任务并执行扩容与缩容逻辑
:根据任务队列的长度和当前线程数量调整线程池的大小
- 动态线程池是一个可以根据任务负载动态调整工作线程数量的系统
- 线程池在启动时,会创建一个固定数量的最小工作线程(
minWorkers
) - 这些
线程在独立的协程中运行,监听任务队列,持续执行任务
- 提交任务时,线程池会触发调整逻辑,检查是否需要扩容
- 当任务队列长度小于当前线程数,且线程数多于最小线程限制(
minWorkers
)时,移除空闲线程 - 工作线程代码
// execWorker 运行工作者,持续从任务队列中获取任务并执行
func (w *worker) execWorker() {
for {
select {
case task := <-w.pool.taskQueue: // 从任务队列中获取任务
task() // 执行任务
case <-w.stop: // 接收到停止信号,停止当前worker线程
return
}
}
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 2、代码实现
package main
import (
"fmt"
"sync"
"time"
)
type Task func() // 用于线程池中执行的任务
// ThreadPool 表示动态线程池结构
type ThreadPool struct {
taskQueue chan Task // 用于存储任务的队列
workers []*worker // 当前线程池中的工作者列表
maxWorkers int // 最大工作者数量
minWorkers int // 最小工作者数量
mutex sync.Mutex // 互斥锁,用于保护线程池的并发操作
resizeSignal chan struct{} // 调整信号,用于触发线程池调整
}
// worker 结构体表示线程池中的一个工作者,每个工作者在独立的 goroutine 中运行
type worker struct {
pool *ThreadPool // 所属的线程池
stop chan struct{} // 用于停止工作者的信号通道
}
// NewThreadPool 创建一个新的动态线程池
func NewThreadPool(minWorkers, maxWorkers int) *ThreadPool {
pool := &ThreadPool{
taskQueue: make(chan Task, 100), // 初始化任务队列,缓冲大小为 100
maxWorkers: maxWorkers, // 最大工作者数量
minWorkers: minWorkers, // 最小工作者数量
resizeSignal: make(chan struct{}, 1), // 调整信号通道,容量为 1 避免阻塞
}
for i := 0; i < minWorkers; i++ { // 初始化最小数量的工作者
pool.addWorker()
}
go pool.monitorResize() // 启动调整线程池大小的监控协程
return pool
}
// Submit 向线程池提交任务
func (p *ThreadPool) Submit(task Task) {
p.taskQueue <- task // 将任务添加到队列
p.Resize() // 自动触发线程池调整逻辑
}
// monitorResize 监控线程池的调整信号
func (p *ThreadPool) monitorResize() {
for {
select {
case <-p.resizeSignal:
p.adjustWorkerCount()
}
}
}
// addWorker 创建并启动一个新的工作者
func (p *ThreadPool) addWorker() {
worker := &worker{
pool: p, // 关联线程池
stop: make(chan struct{}), // 初始化停止信号通道
}
p.workers = append(p.workers, worker) // 将工作者添加到工作者列表
go worker.execWorker() // 启动工作者协程
}
// removeWorker 停止并移除一个工作者
func (p *ThreadPool) removeWorker() {
if len(p.workers) == 0 {
return // 如果没有工作者,直接返回
}
worker := p.workers[len(p.workers)-1] // 获取最后一个工作者
p.workers = p.workers[:len(p.workers)-1] // 从工作者列表中移除
worker.stop <- struct{}{} // 发送停止信号
}
// adjustWorkerCount 根据任务队列的长度和当前工作者数量调整线程池大小
func (p *ThreadPool) adjustWorkerCount() {
p.mutex.Lock() // 加锁保护并发操作
defer p.mutex.Unlock()
queueLen := len(p.taskQueue) // 获取任务队列的当前长度
workerCount := len(p.workers) // 获取当前工作者数量
// 如果任务队列长度大于当前工作者数量,且未达到最大工作者限制,扩容
if queueLen > workerCount && workerCount < p.maxWorkers {
p.addWorker()
fmt.Println("扩容 工作线程数量:", len(p.workers))
// 如果任务队列长度小于当前工作者数量,且当前工作者数量大于最小工作者限制,缩容
} else if queueLen < workerCount && workerCount > p.minWorkers {
p.removeWorker()
fmt.Println("缩容 工作线程数量:", len(p.workers))
}
}
// Resize 触发线程池调整逻辑,通知线程池检查并调整工作者数量
func (p *ThreadPool) Resize() {
select {
case p.resizeSignal <- struct{}{}: // 非阻塞发送信号
default: // 如果信号通道已满,则忽略
}
}
// execWorker 运行工作者,持续从任务队列中获取任务并执行
func (w *worker) execWorker() {
for {
select {
case task := <-w.pool.taskQueue: // 从任务队列中获取任务
task() // 执行任务
case <-w.stop: // 接收到停止信号
return
}
}
}
func main() {
// 创建一个线程池,最小工作者数量为2,最大工作者数量为5
pool := NewThreadPool(2, 5)
// 定义一个任务生成函数,用于模拟任务提交
submitTasks := func(taskCount int, delay time.Duration) {
for i := 0; i < taskCount; i++ {
taskID := i // 捕获当前任务的ID
pool.Submit(func() {
fmt.Printf("func exec workID = %d\n", taskID)
time.Sleep(1 * time.Second) // 每个任务执行1秒
})
// 模拟任务提交的延迟
time.Sleep(delay)
}
}
fmt.Println("第一步:提交大量任务以触发扩容")
go submitTasks(10, 100*time.Millisecond)
time.Sleep(5 * time.Second)
fmt.Println("第二步:任务提交暂停,观察是否触发缩容")
time.Sleep(10 * time.Second) // 停止提交任务,观察线程池的缩容行为
fmt.Println("第三步:再次提交任务,观察扩容行为")
go submitTasks(5, 200*time.Millisecond)
time.Sleep(15 * time.Second) // 等待所有任务完成
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
上次更新: 2024/12/19 17:28:11