08.goroutine编程
# 01.goroutine池
# 1、消费模型版
本质上是生产者消费者模型
在工作中我们通常会使用可以指定启动的goroutine数量–
worker pool
模式控制
goroutine
的数量,防止goroutine
泄漏和暴涨实现2`
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
// 消费者消费任务
for j := range jobs {
fmt.Printf("worker:%d start job:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end job:%d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 1)开启3个goroutine,作为消费者消费 jobs中任务
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 2)5个任务(生产者生产任务)
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 3)输出结果
for a := 1; a <= 5; a++ {
v := <-results
fmt.Println(v)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 2、cancel退出版
package main
import (
"context"
"fmt"
"sync"
)
func MyFunction(ctx context.Context, cancel context.CancelFunc, workers, jobs int) {
var wg sync.WaitGroup
jobCh := make(chan int)
for i := 0; i < workers; i++ {
go func(workerID int) {
for {
// 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
select {
case <-ctx.Done(): // 如果 cancel 被调用,worker 退出
return
case job := <-jobCh:
fmt.Println("接收到任务打印 ", job)
wg.Done()
}
}
}(i)
}
// 分发任务
for j := 0; j < jobs; j++ {
wg.Add(1)
jobCh <- j
}
wg.Wait()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 在 main() 函数退出前,确保释放 context.WithCancel() 所分配的资源
MyFunction(ctx, cancel, 3, 10)
fmt.Println("All jobs done or cancelled.")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 3、timeout退出版
package main
import (
"context"
"fmt"
"sync"
"time"
)
// MyFunction 并发执行 jobs 个任务,使用 workers 个 worker,支持 context 取消
func MyFunction(ctx context.Context, workers, jobs int) {
var wg sync.WaitGroup // 用于等待所有任务完成
jobCh := make(chan int) // 用于分发任务
for i := 0; i < workers; i++ { // 启动指定数量的 worker goroutine
go func(workerID int) {
for {
select {
// 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
case <-ctx.Done(): // 上下文取消时退出
return
case job, ok := <-jobCh: // 从任务通道中取任务
if !ok {
return // 通道关闭,退出 worker
}
fmt.Println("接收到任务打印 ", job)
wg.Done() // 标记任务完成
}
}
}(i)
}
// 分发任务
for j := 0; j < jobs; j++ {
wg.Add(1)
jobCh <- j // 下发任务
}
// 另启 goroutine 等待所有任务完成后关闭通道,通知 worker 退出
go func() {
wg.Wait()
close(jobCh)
}()
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
MyFunction(ctx, 3, 10) // 启动任务处理
time.Sleep(3 * time.Second) // 主程序等待一段时间,确保任务完成
fmt.Println("All jobs done (or timeout)")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 02.chan使用
# 1、打印奇数偶数
- 首先我们这里通过make(chan int),开辟的通道是一种无缓冲通道
- 所以当对这个缓冲通道写的时候,
会一直阻塞等到某个协程对这个缓冲通道读
- 而这里我讲
ch <- true 理解为生产
,他却是需要等到某个协程读了再能继续运行
package main
import (
"fmt"
"sync"
)
func main() {
oddCh := make(chan struct{}) // 控制奇数协程的通道
evenCh := make(chan struct{}) // 控制偶数协程的通道
var wg sync.WaitGroup
wg.Add(2) // 等待两个协程完成
go func() { // 奇数协程
defer wg.Done()
for i := 1; i <= 9; i += 2 {
fmt.Println(i)
evenCh <- struct{}{} // 通知偶数协程
<-oddCh // 等待反馈信号
}
}()
go func() { // 偶数协程
defer wg.Done()
for i := 2; i <= 10; i += 2 {
<-evenCh // 等待触发信号
fmt.Println(i)
oddCh <- struct{}{} // 通知奇数协程
}
}()
wg.Wait() // 等待所有协程完成
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 2、n 协程顺序执行
package main
import (
"fmt"
"sync"
)
func main() {
n := 5
chs := make([]chan struct{}, n)
for i := range chs {
chs[i] = make(chan struct{}) // 无缓冲通道
}
var wg sync.WaitGroup
wg.Add(n)
// 按顺序创建协程(从0到n-1)
for i := 0; i < n; i++ {
go func(id int) {
<-chs[id] // 等待前一个协程的信号
fmt.Println("执行协程", id)
wg.Done() // 先标记完成
if id < n-1 {
chs[id+1] <- struct{}{} // 再触发下一个
}
}(i)
}
chs[0] <- struct{}{} // 在主协程触发第一个协程(确保所有协程都已启动)
wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 03.高速缓存
设计一个高并发安全的内存缓存系统(简化版)
- Set(key string, value interface{}):设置键值对
- Get(key string) (interface{}, bool):获取键值对,若不存在返回 false
- Delete(key string):删除键值对
- 支持自动过期功能:每个键值可以设置过期时间,过期后自动删除
- 要求支持高并发访问
package main
import (
"sync"
"time"
)
type entry struct {
val interface{}
exp int64 // 过期时间戳(秒)
}
type Cache struct {
mu sync.RWMutex
data map[string]entry
}
func (c *Cache) Set(k string, v interface{}, ttl time.Duration) {
c.mu.Lock()
c.data[k] = entry{
val: v,
exp: time.Now().Unix() + int64(ttl.Seconds()),
}
c.mu.Unlock()
}
func (c *Cache) Get(k string) (interface{}, bool) {
c.mu.RLock()
e, ok := c.data[k]
c.mu.RUnlock()
if !ok || time.Now().Unix() > e.exp {
return nil, false
}
return e.val, true
}
func (c *Cache) Delete(k string) {
c.mu.Lock()
delete(c.data, k)
c.mu.Unlock()
}
func (c *Cache) clear(interval time.Duration) {
ticker := time.NewTicker(interval)
go func() {
for range ticker.C {
now := time.Now().Unix()
c.mu.Lock()
for k, v := range c.data {
if now > v.exp {
delete(c.data, k)
}
}
c.mu.Unlock()
}
}()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 10.其他
# 1、协程池2
package main
import (
"fmt"
"sync"
)
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
var wg sync.WaitGroup
// 1)开启3个goroutine,作为消费者消费 jobs中任务
for w := 1; w <= 3; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
results <- job * 2
}
}()
}
// 2)5个任务(生产者生产任务)
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 用一个额外的 goroutine 等待所有 worker 结束,然后关闭 results
go func() {
wg.Wait()
close(results)
}()
// 3)输出结果
for i := range results {
fmt.Println(i)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 2、cancel退出版
package main
import (
"context"
"fmt"
"sync"
"time"
)
type DoOneJobFunc func(job int)
func MyFunction(ctx context.Context, cancel context.CancelFunc, workers, jobs int, doOneJob DoOneJobFunc) {
var wg sync.WaitGroup
jobCh := make(chan int)
for i := 0; i < workers; i++ {
go func(workerID int) {
for {
// 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
select {
case <-ctx.Done(): // 如果 cancel 被调用,worker 退出
return
case job := <-jobCh:
// 假设某个 job 满足特殊条件,触发 cancel 提前退出
if job == 5 {
fmt.Println("模仿出现异常 err 提取退出, cancelling...")
cancel()
wg.Done()
return
}
doOneJob(job)
wg.Done()
}
}
}(i)
}
// 分发任务
for j := 0; j < jobs; j++ {
select {
case <-ctx.Done():
break
default:
wg.Add(1)
jobCh <- j
}
}
wg.Wait()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 在 main() 函数退出前,确保释放 context.WithCancel() 所分配的资源
doOneJob := func(job int) {
fmt.Printf("Doing job %d\n", job)
time.Sleep(100 * time.Millisecond)
}
MyFunction(ctx, cancel, 3, 10, doOneJob)
fmt.Println("All jobs done or cancelled.")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 3、超时控制
package main
import (
"fmt"
"time"
)
func main() {
workDoneCh := make(chan bool, 1)
go func() {
LongTimeWork() //这是我们要控制超时的函数
workDoneCh <- true // 函数正常执行结束给 chan信号正常退出
}()
select {
case <-workDoneCh: // 当协程执行完成后会向这个 channel 发送一个数据,收到即可结束
fmt.Println("Success!")
case <-time.After(3 * time.Second): //timeout到来
fmt.Println("timeout") // 3s无返回超时退出
}
}
func LongTimeWork() {
time.Sleep(time.Second * 2)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
上次更新: 2024/3/13 15:35:10