不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • GO基础

  • 面向对象

  • 并发编程

    • 01.goroutine介绍
    • 02.协程调度GRM模型
    • 03.协程基本使用
    • 04.Channel
    • 05.select
    • 06.并发安全和锁
    • 07.GO动态线程池
    • 08.goroutine编程
      • 01.goroutine池
        • 1、消费模型版
        • 2、cancel退出版
        • 3、timeout退出版
      • 02.chan使用
        • 1、打印奇数偶数
        • 2、n 协程顺序执行
      • 03.高速缓存
      • 04.WorkerPool
      • 10.其他
        • 1、协程池2
        • 2、cancel退出版
        • 3、超时控制
  • 常用库

  • 数据库操作

  • Beego框架

  • Beego商城

  • GIN框架

  • GIN论坛

  • 微服务

  • 设计模式

  • Go
  • 并发编程
xiaonaiqiang
2022-04-06
目录

08.goroutine编程

# 01.goroutine池

# 1、消费模型版

  • 本质上是生产者消费者模型

  • 在工作中我们通常会使用可以指定启动的goroutine数量–worker pool模式

  • 控制goroutine的数量,防止goroutine泄漏和暴涨

  • 实现2`

package main

import (
	"fmt"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
	// 消费者消费任务
	for j := range jobs {
		fmt.Printf("worker:%d start job:%d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("worker:%d end job:%d\n", id, j)
		results <- j * 2
	}
}

func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	// 1)开启3个goroutine,作为消费者消费 jobs中任务
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}
	// 2)5个任务(生产者生产任务)
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)
	// 3)输出结果
	for a := 1; a <= 5; a++ {
		v := <-results
		fmt.Println(v)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 2、cancel退出版

package main

import (
	"context"
	"fmt"
	"sync"
)

func MyFunction(ctx context.Context, cancel context.CancelFunc, workers, jobs int) {
	var wg sync.WaitGroup
	jobCh := make(chan int)

	for i := 0; i < workers; i++ {
		go func(workerID int) {
			for {
				// 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
				select {
				case <-ctx.Done(): // 如果 cancel 被调用,worker 退出
					return
				case job := <-jobCh:
					fmt.Println("接收到任务打印 ", job)
					wg.Done()
				}
			}
		}(i)
	}

	// 分发任务
	for j := 0; j < jobs; j++ {
		wg.Add(1)
		jobCh <- j
	}

	wg.Wait()
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // 在 main() 函数退出前,确保释放 context.WithCancel() 所分配的资源
	MyFunction(ctx, cancel, 3, 10)
	fmt.Println("All jobs done or cancelled.")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 3、timeout退出版

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// MyFunction 并发执行 jobs 个任务,使用 workers 个 worker,支持 context 取消
func MyFunction(ctx context.Context, workers, jobs int) {
	var wg sync.WaitGroup   // 用于等待所有任务完成
	jobCh := make(chan int) // 用于分发任务

	for i := 0; i < workers; i++ { // 启动指定数量的 worker goroutine
		go func(workerID int) {
			for {
				select {
				// 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
				case <-ctx.Done(): // 上下文取消时退出
					return
				case job, ok := <-jobCh: // 从任务通道中取任务
					if !ok {
						return // 通道关闭,退出 worker
					}
					fmt.Println("接收到任务打印 ", job)
					wg.Done() // 标记任务完成
				}
			}
		}(i)
	}

	// 分发任务
	for j := 0; j < jobs; j++ {
		wg.Add(1)
		jobCh <- j // 下发任务
	}

	// 另启 goroutine 等待所有任务完成后关闭通道,通知 worker 退出
	go func() {
		wg.Wait()
		close(jobCh)
	}()
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	MyFunction(ctx, 3, 10) // 启动任务处理

	time.Sleep(3 * time.Second) // 主程序等待一段时间,确保任务完成
	fmt.Println("All jobs done (or timeout)")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 02.chan使用

# 1、打印奇数偶数

  • 首先我们这里通过make(chan int),开辟的通道是一种无缓冲通道
  • 所以当对这个缓冲通道写的时候,会一直阻塞等到某个协程对这个缓冲通道读
  • 而这里我讲 ch <- true 理解为生产,他却是需要等到某个协程读了再能继续运行
package main

import (
	"fmt"
	"sync"
)

func main() {
	oddCh := make(chan struct{})  // 控制奇数协程的通道
	evenCh := make(chan struct{}) // 控制偶数协程的通道
	var wg sync.WaitGroup
	wg.Add(2) // 等待两个协程完成

	go func() { // 奇数协程
		defer wg.Done()
		for i := 1; i <= 9; i += 2 {
			fmt.Println(i)
			evenCh <- struct{}{} // 通知偶数协程
			<-oddCh              // 等待反馈信号
		}
	}()

	go func() { // 偶数协程
		defer wg.Done()
		for i := 2; i <= 10; i += 2 {
			<-evenCh // 等待触发信号
			fmt.Println(i)
			oddCh <- struct{}{} // 通知奇数协程
		}
	}()

	wg.Wait() // 等待所有协程完成
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 2、n 协程顺序执行

package main

import (
	"fmt"
	"sync"
)

func main() {
	n := 5
	chs := make([]chan struct{}, n)
	for i := range chs {
		chs[i] = make(chan struct{}) // 无缓冲通道
	}

	var wg sync.WaitGroup
	wg.Add(n)

	// 按顺序创建协程(从0到n-1)
	for i := 0; i < n; i++ {
		go func(id int) {
			<-chs[id] // 等待前一个协程的信号

			fmt.Println("执行协程", id)

			wg.Done() // 先标记完成
			if id < n-1 {
				chs[id+1] <- struct{}{} // 再触发下一个
			}

		}(i)
	}

	chs[0] <- struct{}{} // 在主协程触发第一个协程(确保所有协程都已启动)
	wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 03.高速缓存

设计一个高并发安全的内存缓存系统(简化版)

  • Set(key string, value interface{}):设置键值对
  • Get(key string) (interface{}, bool):获取键值对,若不存在返回 false
  • Delete(key string):删除键值对
  • 支持自动过期功能:每个键值可以设置过期时间,过期后自动删除
  • 要求支持高并发访问
package main

import (
	"sync"
	"time"
)

type entry struct {
	val interface{}
	exp int64 // 过期时间戳(秒)
}

type Cache struct {
	mu   sync.RWMutex
	data map[string]entry
}

func (c *Cache) Set(k string, v interface{}, ttl time.Duration) {
	c.mu.Lock()
	c.data[k] = entry{
		val: v,
		exp: time.Now().Unix() + int64(ttl.Seconds()),
	}
	c.mu.Unlock()
}

func (c *Cache) Get(k string) (interface{}, bool) {
	c.mu.RLock()
	e, ok := c.data[k]
	c.mu.RUnlock()
	if !ok || time.Now().Unix() > e.exp {
		return nil, false
	}
	return e.val, true
}

func (c *Cache) Delete(k string) {
	c.mu.Lock()
	delete(c.data, k)
	c.mu.Unlock()
}

func (c *Cache) clear(interval time.Duration) {
	ticker := time.NewTicker(interval)
	go func() {
		for range ticker.C {
			now := time.Now().Unix()
			c.mu.Lock()
			for k, v := range c.data {
				if now > v.exp {
					delete(c.data, k)
				}
			}
			c.mu.Unlock()
		}
	}()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

# 04.WorkerPool

写一个 worker 处理器,可以接收用户输入的定义处理函数,提供的方法

  • 初始化函数: 传入 worker 的 handlerFunc,以及 worker 的数量

  • 调用函数:用户输入需要处理的请求参数(用 string 表示),调用之后worker 会异步执行

  • 查询请求状态:可以根据请求查询当前请求的执行状态(pending\running\completed)

  • 查询请求结果:如果是 completed 状态,可以查询请求的执行结果

  • 【扩展功能】增加 autoscaling 功能,根据请求流量,对 worker 数量进行自动扩容和缩容

package main

import (
	"fmt"
	"strings"
	"sync"
	"time"
)

type TaskStatus string

const (
	Pending   TaskStatus = "pending"
	Running   TaskStatus = "running"
	Completed TaskStatus = "completed"
)

type Task struct {
	ID     string
	Input  string
	Result string
	Status TaskStatus
}

type WorkerPool struct {
	handlerFunc   func(string) string
	taskQueue     chan *Task
	tasks         map[string]*Task // Task是指针类型,修改即可改变全局
	mu            sync.RWMutex
	workerNum     int
	baseWorkerNum int             // 最小保留 worker 数量
	stopChans     []chan struct{} // 用于通知 worker 停止
}

// NewWorkerPool 初始化协程池
func NewWorkerPool(handler func(string) string, workerNum int) *WorkerPool {
	wp := &WorkerPool{
		handlerFunc:   handler,
		taskQueue:     make(chan *Task, 100),
		tasks:         make(map[string]*Task),
		workerNum:     0,
		baseWorkerNum: workerNum,
		stopChans:     make([]chan struct{}, 0),
	}

	for i := 0; i < workerNum; i++ {
		wp.addWorker() // 启动指定数量 消费 协程
	}

	go wp.autoScale() // 启动一个协程扩容 缩容

	return wp
}

// addWorker 启动  消费 协程
func (wp *WorkerPool) addWorker() {
	stop := make(chan struct{})
	wp.stopChans = append(wp.stopChans, stop) // 用于通知 worker 停止
	wp.workerNum++
	go wp.worker(stop) // 调用 work 执行方法
}

// worker 真实 协程消费者
func (wp *WorkerPool) worker(stopChan chan struct{}) {
	for {
		select {
		case task := <-wp.taskQueue: // 消费任务队列
			wp.mu.Lock()
			task.Status = Running
			wp.mu.Unlock()

			result := wp.handlerFunc(task.Input)

			wp.mu.Lock()
			task.Status = Completed
			task.Result = result
			wp.mu.Unlock()
		case <-stopChan: // 用于通知 worker 停止
			fmt.Println("Worker exited")
			return
		}
	}
}

// Submit 提交任务
func (wp *WorkerPool) Submit(input string) string {
	id := fmt.Sprintf("%d", time.Now().UnixNano())
	task := &Task{
		ID:     id,
		Input:  input,
		Status: Pending,
	}
	wp.mu.Lock()
	wp.tasks[id] = task
	wp.mu.Unlock()

	wp.taskQueue <- task
	return id
}

func (wp *WorkerPool) GetStatus(id string) TaskStatus {
	wp.mu.RLock()
	defer wp.mu.RUnlock()
	if task, ok := wp.tasks[id]; ok {
		return task.Status
	}
	return "not_found"
}

func (wp *WorkerPool) GetResult(id string) string {
	wp.mu.RLock()
	defer wp.mu.RUnlock()
	if task, ok := wp.tasks[id]; ok && task.Status == Completed {
		return task.Result
	}
	return ""
}

// removeWorker 关闭 stopChans 中最后一个 goroutine
func (wp *WorkerPool) removeWorker() {
	if wp.workerNum <= wp.baseWorkerNum || len(wp.stopChans) == 0 {
		return
	}
	stop := wp.stopChans[len(wp.stopChans)-1]
	wp.stopChans = wp.stopChans[:len(wp.stopChans)-1]
	// 广播通知:关闭通道会向所有正在等待从这个通道接收数据的 goroutine 发送一个"零值"
	close(stop)
	wp.workerNum--
}

// 自动扩缩容逻辑:每 2 秒检查队列长度
func (wp *WorkerPool) autoScale() {
	ticker := time.NewTicker(2 * time.Second)
	for range ticker.C {
		queueLen := len(wp.taskQueue)

		if queueLen > 10 && wp.workerNum < 50 {
			wp.addWorker()
			fmt.Println("Auto-scaled: +1 worker")
		} else if queueLen == 0 && wp.workerNum > wp.baseWorkerNum {
			wp.removeWorker()
			fmt.Println("Auto-scaled: -1 worker")
		}
	}
}

func main() {
	handler := func(s string) string {
		time.Sleep(1 * time.Second) // 模拟处理耗时
		return strings.ToUpper(s)
	}

	wp := NewWorkerPool(handler, 3)

	id := wp.Submit("submit hello world")
	fmt.Println("Submitted:", id)

	time.Sleep(2 * time.Second)
	fmt.Println("Status:", wp.GetStatus(id))
	fmt.Println("Result:", wp.GetResult(id))
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161

# 10.其他

# 1、协程池2

package main

import (
	"fmt"
	"sync"
)

func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)

	var wg sync.WaitGroup

	// 1)开启3个goroutine,作为消费者消费 jobs中任务
	for w := 1; w <= 3; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobs {
				results <- job * 2
			}
		}()
	}
	// 2)5个任务(生产者生产任务)
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)

	// 用一个额外的 goroutine 等待所有 worker 结束,然后关闭 results
	go func() {
		wg.Wait()
		close(results)
	}()

	// 3)输出结果
	for i := range results {
		fmt.Println(i)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 2、cancel退出版

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type DoOneJobFunc func(job int)

func MyFunction(ctx context.Context, cancel context.CancelFunc, workers, jobs int, doOneJob DoOneJobFunc) {
	var wg sync.WaitGroup
	jobCh := make(chan int)

	for i := 0; i < workers; i++ {
		go func(workerID int) {
			for {
        // 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
				select {
				case <-ctx.Done(): // 如果 cancel 被调用,worker 退出
					return
				case job := <-jobCh:
					// 假设某个 job 满足特殊条件,触发 cancel 提前退出
					if job == 5 {
						fmt.Println("模仿出现异常 err 提取退出, cancelling...")
						cancel()
						wg.Done()
						return
					}
					doOneJob(job)
					wg.Done()
				}
			}
		}(i)
	}

	// 分发任务
	for j := 0; j < jobs; j++ {
		select {
		case <-ctx.Done():
			break
		default:
			wg.Add(1)
			jobCh <- j
		}
	}

	wg.Wait()
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // 在 main() 函数退出前,确保释放 context.WithCancel() 所分配的资源

	doOneJob := func(job int) {
		fmt.Printf("Doing job %d\n", job)
		time.Sleep(100 * time.Millisecond)
	}

	MyFunction(ctx, cancel, 3, 10, doOneJob)
	fmt.Println("All jobs done or cancelled.")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# 3、超时控制

package main

import (
	"fmt"
	"time"
)

func main() {
	workDoneCh := make(chan bool, 1)
  
	go func() {
		LongTimeWork()     //这是我们要控制超时的函数
		workDoneCh <- true // 函数正常执行结束给 chan信号正常退出
	}()
  
	select {
	case <-workDoneCh: // 当协程执行完成后会向这个 channel 发送一个数据,收到即可结束
		fmt.Println("Success!")
	case <-time.After(3 * time.Second): //timeout到来
		fmt.Println("timeout") // 3s无返回超时退出
	}
}


func LongTimeWork() {
	time.Sleep(time.Second * 2)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
上次更新: 2024/3/13 15:35:10
07.GO动态线程池
01.fmt

← 07.GO动态线程池 01.fmt→

最近更新
01
05.快递Agent智能体
06-04
02
200.AI Agent核心概念
06-04
03
105.Agent智能体梳理
06-04
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式