08.goroutine编程
# 01.goroutine池
# 1、消费模型版
本质上是生产者消费者模型
在工作中我们通常会使用可以指定启动的goroutine数量–
worker pool
模式控制
goroutine
的数量,防止goroutine
泄漏和暴涨实现2`
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
// 消费者消费任务
for j := range jobs {
fmt.Printf("worker:%d start job:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end job:%d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 1)开启3个goroutine,作为消费者消费 jobs中任务
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 2)5个任务(生产者生产任务)
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 3)输出结果
for a := 1; a <= 5; a++ {
v := <-results
fmt.Println(v)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 2、cancel退出版
package main
import (
"context"
"fmt"
"sync"
)
func MyFunction(ctx context.Context, cancel context.CancelFunc, workers, jobs int) {
var wg sync.WaitGroup
jobCh := make(chan int)
for i := 0; i < workers; i++ {
go func(workerID int) {
for {
// 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
select {
case <-ctx.Done(): // 如果 cancel 被调用,worker 退出
return
case job := <-jobCh:
fmt.Println("接收到任务打印 ", job)
wg.Done()
}
}
}(i)
}
// 分发任务
for j := 0; j < jobs; j++ {
wg.Add(1)
jobCh <- j
}
wg.Wait()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 在 main() 函数退出前,确保释放 context.WithCancel() 所分配的资源
MyFunction(ctx, cancel, 3, 10)
fmt.Println("All jobs done or cancelled.")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 3、timeout退出版
package main
import (
"context"
"fmt"
"sync"
"time"
)
// MyFunction 并发执行 jobs 个任务,使用 workers 个 worker,支持 context 取消
func MyFunction(ctx context.Context, workers, jobs int) {
var wg sync.WaitGroup // 用于等待所有任务完成
jobCh := make(chan int) // 用于分发任务
for i := 0; i < workers; i++ { // 启动指定数量的 worker goroutine
go func(workerID int) {
for {
select {
// 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
case <-ctx.Done(): // 上下文取消时退出
return
case job, ok := <-jobCh: // 从任务通道中取任务
if !ok {
return // 通道关闭,退出 worker
}
fmt.Println("接收到任务打印 ", job)
wg.Done() // 标记任务完成
}
}
}(i)
}
// 分发任务
for j := 0; j < jobs; j++ {
wg.Add(1)
jobCh <- j // 下发任务
}
// 另启 goroutine 等待所有任务完成后关闭通道,通知 worker 退出
go func() {
wg.Wait()
close(jobCh)
}()
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
MyFunction(ctx, 3, 10) // 启动任务处理
time.Sleep(3 * time.Second) // 主程序等待一段时间,确保任务完成
fmt.Println("All jobs done (or timeout)")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 02.chan使用
# 1、打印奇数偶数
- 首先我们这里通过make(chan int),开辟的通道是一种无缓冲通道
- 所以当对这个缓冲通道写的时候,
会一直阻塞等到某个协程对这个缓冲通道读
- 而这里我讲
ch <- true 理解为生产
,他却是需要等到某个协程读了再能继续运行
package main
import (
"fmt"
"sync"
)
func main() {
oddCh := make(chan struct{}) // 控制奇数协程的通道
evenCh := make(chan struct{}) // 控制偶数协程的通道
var wg sync.WaitGroup
wg.Add(2) // 等待两个协程完成
go func() { // 奇数协程
defer wg.Done()
for i := 1; i <= 9; i += 2 {
fmt.Println(i)
evenCh <- struct{}{} // 通知偶数协程
<-oddCh // 等待反馈信号
}
}()
go func() { // 偶数协程
defer wg.Done()
for i := 2; i <= 10; i += 2 {
<-evenCh // 等待触发信号
fmt.Println(i)
oddCh <- struct{}{} // 通知奇数协程
}
}()
wg.Wait() // 等待所有协程完成
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 2、n 协程顺序执行
package main
import (
"fmt"
"sync"
)
func main() {
n := 5
chs := make([]chan struct{}, n)
for i := range chs {
chs[i] = make(chan struct{}) // 无缓冲通道
}
var wg sync.WaitGroup
wg.Add(n)
// 按顺序创建协程(从0到n-1)
for i := 0; i < n; i++ {
go func(id int) {
<-chs[id] // 等待前一个协程的信号
fmt.Println("执行协程", id)
wg.Done() // 先标记完成
if id < n-1 {
chs[id+1] <- struct{}{} // 再触发下一个
}
}(i)
}
chs[0] <- struct{}{} // 在主协程触发第一个协程(确保所有协程都已启动)
wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 03.高速缓存
设计一个高并发安全的内存缓存系统(简化版)
- Set(key string, value interface{}):设置键值对
- Get(key string) (interface{}, bool):获取键值对,若不存在返回 false
- Delete(key string):删除键值对
- 支持自动过期功能:每个键值可以设置过期时间,过期后自动删除
- 要求支持高并发访问
package main
import (
"sync"
"time"
)
type entry struct {
val interface{}
exp int64 // 过期时间戳(秒)
}
type Cache struct {
mu sync.RWMutex
data map[string]entry
}
func (c *Cache) Set(k string, v interface{}, ttl time.Duration) {
c.mu.Lock()
c.data[k] = entry{
val: v,
exp: time.Now().Unix() + int64(ttl.Seconds()),
}
c.mu.Unlock()
}
func (c *Cache) Get(k string) (interface{}, bool) {
c.mu.RLock()
e, ok := c.data[k]
c.mu.RUnlock()
if !ok || time.Now().Unix() > e.exp {
return nil, false
}
return e.val, true
}
func (c *Cache) Delete(k string) {
c.mu.Lock()
delete(c.data, k)
c.mu.Unlock()
}
func (c *Cache) clear(interval time.Duration) {
ticker := time.NewTicker(interval)
go func() {
for range ticker.C {
now := time.Now().Unix()
c.mu.Lock()
for k, v := range c.data {
if now > v.exp {
delete(c.data, k)
}
}
c.mu.Unlock()
}
}()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 04.WorkerPool
写一个 worker 处理器,可以接收用户输入的定义处理函数,提供的方法
初始化函数: 传入 worker 的 handlerFunc,以及 worker 的数量
调用函数:用户输入需要处理的请求参数(用 string 表示),调用之后worker 会异步执行
查询请求状态:可以根据请求查询当前请求的执行状态(pending\running\completed)
查询请求结果:如果是 completed 状态,可以查询请求的执行结果
【扩展功能】增加 autoscaling 功能,根据请求流量,对 worker 数量进行自动扩容和缩容
package main
import (
"fmt"
"strings"
"sync"
"time"
)
type TaskStatus string
const (
Pending TaskStatus = "pending"
Running TaskStatus = "running"
Completed TaskStatus = "completed"
)
type Task struct {
ID string
Input string
Result string
Status TaskStatus
}
type WorkerPool struct {
handlerFunc func(string) string
taskQueue chan *Task
tasks map[string]*Task // Task是指针类型,修改即可改变全局
mu sync.RWMutex
workerNum int
baseWorkerNum int // 最小保留 worker 数量
stopChans []chan struct{} // 用于通知 worker 停止
}
// NewWorkerPool 初始化协程池
func NewWorkerPool(handler func(string) string, workerNum int) *WorkerPool {
wp := &WorkerPool{
handlerFunc: handler,
taskQueue: make(chan *Task, 100),
tasks: make(map[string]*Task),
workerNum: 0,
baseWorkerNum: workerNum,
stopChans: make([]chan struct{}, 0),
}
for i := 0; i < workerNum; i++ {
wp.addWorker() // 启动指定数量 消费 协程
}
go wp.autoScale() // 启动一个协程扩容 缩容
return wp
}
// addWorker 启动 消费 协程
func (wp *WorkerPool) addWorker() {
stop := make(chan struct{})
wp.stopChans = append(wp.stopChans, stop) // 用于通知 worker 停止
wp.workerNum++
go wp.worker(stop) // 调用 work 执行方法
}
// worker 真实 协程消费者
func (wp *WorkerPool) worker(stopChan chan struct{}) {
for {
select {
case task := <-wp.taskQueue: // 消费任务队列
wp.mu.Lock()
task.Status = Running
wp.mu.Unlock()
result := wp.handlerFunc(task.Input)
wp.mu.Lock()
task.Status = Completed
task.Result = result
wp.mu.Unlock()
case <-stopChan: // 用于通知 worker 停止
fmt.Println("Worker exited")
return
}
}
}
// Submit 提交任务
func (wp *WorkerPool) Submit(input string) string {
id := fmt.Sprintf("%d", time.Now().UnixNano())
task := &Task{
ID: id,
Input: input,
Status: Pending,
}
wp.mu.Lock()
wp.tasks[id] = task
wp.mu.Unlock()
wp.taskQueue <- task
return id
}
func (wp *WorkerPool) GetStatus(id string) TaskStatus {
wp.mu.RLock()
defer wp.mu.RUnlock()
if task, ok := wp.tasks[id]; ok {
return task.Status
}
return "not_found"
}
func (wp *WorkerPool) GetResult(id string) string {
wp.mu.RLock()
defer wp.mu.RUnlock()
if task, ok := wp.tasks[id]; ok && task.Status == Completed {
return task.Result
}
return ""
}
// removeWorker 关闭 stopChans 中最后一个 goroutine
func (wp *WorkerPool) removeWorker() {
if wp.workerNum <= wp.baseWorkerNum || len(wp.stopChans) == 0 {
return
}
stop := wp.stopChans[len(wp.stopChans)-1]
wp.stopChans = wp.stopChans[:len(wp.stopChans)-1]
// 广播通知:关闭通道会向所有正在等待从这个通道接收数据的 goroutine 发送一个"零值"
close(stop)
wp.workerNum--
}
// 自动扩缩容逻辑:每 2 秒检查队列长度
func (wp *WorkerPool) autoScale() {
ticker := time.NewTicker(2 * time.Second)
for range ticker.C {
queueLen := len(wp.taskQueue)
if queueLen > 10 && wp.workerNum < 50 {
wp.addWorker()
fmt.Println("Auto-scaled: +1 worker")
} else if queueLen == 0 && wp.workerNum > wp.baseWorkerNum {
wp.removeWorker()
fmt.Println("Auto-scaled: -1 worker")
}
}
}
func main() {
handler := func(s string) string {
time.Sleep(1 * time.Second) // 模拟处理耗时
return strings.ToUpper(s)
}
wp := NewWorkerPool(handler, 3)
id := wp.Submit("submit hello world")
fmt.Println("Submitted:", id)
time.Sleep(2 * time.Second)
fmt.Println("Status:", wp.GetStatus(id))
fmt.Println("Result:", wp.GetResult(id))
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# 10.其他
# 1、协程池2
package main
import (
"fmt"
"sync"
)
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
var wg sync.WaitGroup
// 1)开启3个goroutine,作为消费者消费 jobs中任务
for w := 1; w <= 3; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
results <- job * 2
}
}()
}
// 2)5个任务(生产者生产任务)
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 用一个额外的 goroutine 等待所有 worker 结束,然后关闭 results
go func() {
wg.Wait()
close(results)
}()
// 3)输出结果
for i := range results {
fmt.Println(i)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 2、cancel退出版
package main
import (
"context"
"fmt"
"sync"
"time"
)
type DoOneJobFunc func(job int)
func MyFunction(ctx context.Context, cancel context.CancelFunc, workers, jobs int, doOneJob DoOneJobFunc) {
var wg sync.WaitGroup
jobCh := make(chan int)
for i := 0; i < workers; i++ {
go func(workerID int) {
for {
// 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
select {
case <-ctx.Done(): // 如果 cancel 被调用,worker 退出
return
case job := <-jobCh:
// 假设某个 job 满足特殊条件,触发 cancel 提前退出
if job == 5 {
fmt.Println("模仿出现异常 err 提取退出, cancelling...")
cancel()
wg.Done()
return
}
doOneJob(job)
wg.Done()
}
}
}(i)
}
// 分发任务
for j := 0; j < jobs; j++ {
select {
case <-ctx.Done():
break
default:
wg.Add(1)
jobCh <- j
}
}
wg.Wait()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 在 main() 函数退出前,确保释放 context.WithCancel() 所分配的资源
doOneJob := func(job int) {
fmt.Printf("Doing job %d\n", job)
time.Sleep(100 * time.Millisecond)
}
MyFunction(ctx, cancel, 3, 10, doOneJob)
fmt.Println("All jobs done or cancelled.")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 3、超时控制
package main
import (
"fmt"
"time"
)
func main() {
workDoneCh := make(chan bool, 1)
go func() {
LongTimeWork() //这是我们要控制超时的函数
workDoneCh <- true // 函数正常执行结束给 chan信号正常退出
}()
select {
case <-workDoneCh: // 当协程执行完成后会向这个 channel 发送一个数据,收到即可结束
fmt.Println("Success!")
case <-time.After(3 * time.Second): //timeout到来
fmt.Println("timeout") // 3s无返回超时退出
}
}
func LongTimeWork() {
time.Sleep(time.Second * 2)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
上次更新: 2024/3/13 15:35:10