不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • GO基础

  • 面向对象

  • 并发编程

    • 01.goroutine介绍
    • 02.协程调度GRM模型
    • 03.协程基本使用
    • 04.Channel
    • 05.select
    • 06.并发安全和锁
    • 07.GO动态线程池
    • 08.goroutine编程
      • 01.goroutine池
        • 1、消费模型版
        • 2、cancel退出版
        • 3、timeout退出版
      • 02.chan使用
        • 1、打印奇数偶数
        • 2、n 协程顺序执行
      • 03.高速缓存
      • 10.其他
        • 1、协程池2
        • 2、cancel退出版
        • 3、超时控制
  • 常用库

  • 数据库操作

  • Beego框架

  • Beego商城

  • GIN框架

  • GIN论坛

  • 微服务

  • 设计模式

  • Go
  • 并发编程
xiaonaiqiang
2022-04-06
目录

08.goroutine编程

# 01.goroutine池

# 1、消费模型版

  • 本质上是生产者消费者模型

  • 在工作中我们通常会使用可以指定启动的goroutine数量–worker pool模式

  • 控制goroutine的数量,防止goroutine泄漏和暴涨

  • 实现2`

package main

import (
	"fmt"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
	// 消费者消费任务
	for j := range jobs {
		fmt.Printf("worker:%d start job:%d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("worker:%d end job:%d\n", id, j)
		results <- j * 2
	}
}

func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	// 1)开启3个goroutine,作为消费者消费 jobs中任务
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}
	// 2)5个任务(生产者生产任务)
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)
	// 3)输出结果
	for a := 1; a <= 5; a++ {
		v := <-results
		fmt.Println(v)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 2、cancel退出版

package main

import (
	"context"
	"fmt"
	"sync"
)

func MyFunction(ctx context.Context, cancel context.CancelFunc, workers, jobs int) {
	var wg sync.WaitGroup
	jobCh := make(chan int)

	for i := 0; i < workers; i++ {
		go func(workerID int) {
			for {
				// 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
				select {
				case <-ctx.Done(): // 如果 cancel 被调用,worker 退出
					return
				case job := <-jobCh:
					fmt.Println("接收到任务打印 ", job)
					wg.Done()
				}
			}
		}(i)
	}

	// 分发任务
	for j := 0; j < jobs; j++ {
		wg.Add(1)
		jobCh <- j
	}

	wg.Wait()
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // 在 main() 函数退出前,确保释放 context.WithCancel() 所分配的资源
	MyFunction(ctx, cancel, 3, 10)
	fmt.Println("All jobs done or cancelled.")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 3、timeout退出版

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// MyFunction 并发执行 jobs 个任务,使用 workers 个 worker,支持 context 取消
func MyFunction(ctx context.Context, workers, jobs int) {
	var wg sync.WaitGroup   // 用于等待所有任务完成
	jobCh := make(chan int) // 用于分发任务

	for i := 0; i < workers; i++ { // 启动指定数量的 worker goroutine
		go func(workerID int) {
			for {
				select {
				// 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
				case <-ctx.Done(): // 上下文取消时退出
					return
				case job, ok := <-jobCh: // 从任务通道中取任务
					if !ok {
						return // 通道关闭,退出 worker
					}
					fmt.Println("接收到任务打印 ", job)
					wg.Done() // 标记任务完成
				}
			}
		}(i)
	}

	// 分发任务
	for j := 0; j < jobs; j++ {
		wg.Add(1)
		jobCh <- j // 下发任务
	}

	// 另启 goroutine 等待所有任务完成后关闭通道,通知 worker 退出
	go func() {
		wg.Wait()
		close(jobCh)
	}()
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	MyFunction(ctx, 3, 10) // 启动任务处理

	time.Sleep(3 * time.Second) // 主程序等待一段时间,确保任务完成
	fmt.Println("All jobs done (or timeout)")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 02.chan使用

# 1、打印奇数偶数

  • 首先我们这里通过make(chan int),开辟的通道是一种无缓冲通道
  • 所以当对这个缓冲通道写的时候,会一直阻塞等到某个协程对这个缓冲通道读
  • 而这里我讲 ch <- true 理解为生产,他却是需要等到某个协程读了再能继续运行
package main

import (
	"fmt"
	"sync"
)

func main() {
	oddCh := make(chan struct{})  // 控制奇数协程的通道
	evenCh := make(chan struct{}) // 控制偶数协程的通道
	var wg sync.WaitGroup
	wg.Add(2) // 等待两个协程完成

	go func() { // 奇数协程
		defer wg.Done()
		for i := 1; i <= 9; i += 2 {
			fmt.Println(i)
			evenCh <- struct{}{} // 通知偶数协程
			<-oddCh              // 等待反馈信号
		}
	}()

	go func() { // 偶数协程
		defer wg.Done()
		for i := 2; i <= 10; i += 2 {
			<-evenCh // 等待触发信号
			fmt.Println(i)
			oddCh <- struct{}{} // 通知奇数协程
		}
	}()

	wg.Wait() // 等待所有协程完成
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 2、n 协程顺序执行

package main

import (
	"fmt"
	"sync"
)

func main() {
	n := 5
	chs := make([]chan struct{}, n)
	for i := range chs {
		chs[i] = make(chan struct{}) // 无缓冲通道
	}

	var wg sync.WaitGroup
	wg.Add(n)

	// 按顺序创建协程(从0到n-1)
	for i := 0; i < n; i++ {
		go func(id int) {
			<-chs[id] // 等待前一个协程的信号

			fmt.Println("执行协程", id)

			wg.Done() // 先标记完成
			if id < n-1 {
				chs[id+1] <- struct{}{} // 再触发下一个
			}

		}(i)
	}

	chs[0] <- struct{}{} // 在主协程触发第一个协程(确保所有协程都已启动)
	wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 03.高速缓存

设计一个高并发安全的内存缓存系统(简化版)

  • Set(key string, value interface{}):设置键值对
  • Get(key string) (interface{}, bool):获取键值对,若不存在返回 false
  • Delete(key string):删除键值对
  • 支持自动过期功能:每个键值可以设置过期时间,过期后自动删除
  • 要求支持高并发访问
package main

import (
	"sync"
	"time"
)

type entry struct {
	val interface{}
	exp int64 // 过期时间戳(秒)
}

type Cache struct {
	mu   sync.RWMutex
	data map[string]entry
}

func (c *Cache) Set(k string, v interface{}, ttl time.Duration) {
	c.mu.Lock()
	c.data[k] = entry{
		val: v,
		exp: time.Now().Unix() + int64(ttl.Seconds()),
	}
	c.mu.Unlock()
}

func (c *Cache) Get(k string) (interface{}, bool) {
	c.mu.RLock()
	e, ok := c.data[k]
	c.mu.RUnlock()
	if !ok || time.Now().Unix() > e.exp {
		return nil, false
	}
	return e.val, true
}

func (c *Cache) Delete(k string) {
	c.mu.Lock()
	delete(c.data, k)
	c.mu.Unlock()
}

func (c *Cache) clear(interval time.Duration) {
	ticker := time.NewTicker(interval)
	go func() {
		for range ticker.C {
			now := time.Now().Unix()
			c.mu.Lock()
			for k, v := range c.data {
				if now > v.exp {
					delete(c.data, k)
				}
			}
			c.mu.Unlock()
		}
	}()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

# 10.其他

# 1、协程池2

package main

import (
	"fmt"
	"sync"
)

func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)

	var wg sync.WaitGroup

	// 1)开启3个goroutine,作为消费者消费 jobs中任务
	for w := 1; w <= 3; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobs {
				results <- job * 2
			}
		}()
	}
	// 2)5个任务(生产者生产任务)
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)

	// 用一个额外的 goroutine 等待所有 worker 结束,然后关闭 results
	go func() {
		wg.Wait()
		close(results)
	}()

	// 3)输出结果
	for i := range results {
		fmt.Println(i)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 2、cancel退出版

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type DoOneJobFunc func(job int)

func MyFunction(ctx context.Context, cancel context.CancelFunc, workers, jobs int, doOneJob DoOneJobFunc) {
	var wg sync.WaitGroup
	jobCh := make(chan int)

	for i := 0; i < workers; i++ {
		go func(workerID int) {
			for {
        // 如果调用了 cancel(),ctx.Done() 会变为可读,当前 goroutine 立即返回
				select {
				case <-ctx.Done(): // 如果 cancel 被调用,worker 退出
					return
				case job := <-jobCh:
					// 假设某个 job 满足特殊条件,触发 cancel 提前退出
					if job == 5 {
						fmt.Println("模仿出现异常 err 提取退出, cancelling...")
						cancel()
						wg.Done()
						return
					}
					doOneJob(job)
					wg.Done()
				}
			}
		}(i)
	}

	// 分发任务
	for j := 0; j < jobs; j++ {
		select {
		case <-ctx.Done():
			break
		default:
			wg.Add(1)
			jobCh <- j
		}
	}

	wg.Wait()
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // 在 main() 函数退出前,确保释放 context.WithCancel() 所分配的资源

	doOneJob := func(job int) {
		fmt.Printf("Doing job %d\n", job)
		time.Sleep(100 * time.Millisecond)
	}

	MyFunction(ctx, cancel, 3, 10, doOneJob)
	fmt.Println("All jobs done or cancelled.")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# 3、超时控制

package main

import (
	"fmt"
	"time"
)

func main() {
	workDoneCh := make(chan bool, 1)
  
	go func() {
		LongTimeWork()     //这是我们要控制超时的函数
		workDoneCh <- true // 函数正常执行结束给 chan信号正常退出
	}()
  
	select {
	case <-workDoneCh: // 当协程执行完成后会向这个 channel 发送一个数据,收到即可结束
		fmt.Println("Success!")
	case <-time.After(3 * time.Second): //timeout到来
		fmt.Println("timeout") // 3s无返回超时退出
	}
}


func LongTimeWork() {
	time.Sleep(time.Second * 2)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
上次更新: 2024/3/13 15:35:10
07.GO动态线程池
01.fmt

← 07.GO动态线程池 01.fmt→

最近更新
01
04.数组双指针排序_子数组
03-25
02
08.动态规划
03-25
03
06.回溯算法
03-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式