20.machinery
# 01.异步框架machinery
# 1.1 machinery介绍
- go machinery框架类似python中常用celery框架,主要用于 异步任务和定时任务,有一下特性
- 任务重试机制
- 延迟任务支持
- 任务回调机制
- 任务结果记录
- 支持Workflow模式:Chain,Group,Chord
- 多Brokers支持:Redis, AMQP, AWS SQS (opens new window)
- 多Backends支持:Redis, Memcache, AMQP, MongoDB (opens new window)
# 1.2 架构
- 任务队列,简而言之就是一个放大的生产者消费者模型
- 用户请求会生成任务,队列的处理器程序充当消费者不断的消费任务。
- 基于这种框架设计思想,我们来看下machinery的简单设计结构图例
- Sender:业务推送模块,生成具体任务,可根据业务逻辑中,按交互进行拆分;
- Broker:存储具体序列化后的任务,machinery中目前支持到Redis, AMQP,和SQS;
- Worker:工作进程,负责消费者功能,处理具体的任务;
- Backend:后端存储,用于存储任务执行状态的数据;
# 02.machinery使用
# 2.1 异步和定时任务
package main
import (
"fmt"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/config"
"github.com/RichardKnop/machinery/v2/tasks"
"os"
"time"
)
func main() {
if len(os.Args) == 2 && os.Args[1] == "worker" { // 启动worker
if err := worker(); err != nil {
panic(err)
}
}
TestPeriodicTask() // 触发一个定时任务(定时任务由客户端控制,客户端退出定时就会结束)
TestAdd() // 触发一个异步任务
time.Sleep(time.Second * 1000)
}
/* 触发执行Add异步任务 */
func TestAdd() {
server, _ := startServer() // 调用异步任务 Add 函数,执行 1+4=5这个逻辑
signature := &tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 4,
},
{
Type: "int64",
Value: 1,
},
},
}
asyncResult, _ := server.SendTask(signature) // 任务可以通过将Signature的实例传递给Server实例来调用
results,_ := asyncResult.Get(time.Millisecond * 5) // 您还可以执行同步阻塞调用来等待任务结果
for _, result := range results {
fmt.Println(result.Interface())
}
}
/* 触发执行periodicTask异步任务 */
func TestPeriodicTask() {
server, _ := startServer()
signature := &tasks.Signature{
Name: "periodicTask",
Args: []tasks.Arg{
},
}
// 每分钟执行一次periodicTask函数,验证发现不支持秒级别定时任务
err := server.RegisterPeriodicTask("*/1 * * * ?", "periodic-task", signature)
if err != nil {
fmt.Println(err)
}
asyncResult, _ := server.SendTask(signature)
fmt.Println(asyncResult)
}
// 第一:配置Server并注册任务
func startServer() (*machinery.Server, error) {
cnf := &config.Config{
DefaultQueue: "machinery_tasks",
ResultsExpireIn: 3600,
Redis: &config.RedisConfig{
MaxIdle: 3,
IdleTimeout: 240,
ReadTimeout: 15,
WriteTimeout: 15,
ConnectTimeout: 15,
NormalTasksPollPeriod: 1000,
DelayedTasksPollPeriod: 500,
},
}
// 创建服务器实例
broker := redisbroker.NewGR(cnf, []string{"localhost:6379"}, 0)
backend := redisbackend.NewGR(cnf, []string{"localhost:6379"}, 0)
lock := eagerlock.New()
server := machinery.NewServer(cnf, broker, backend, lock)
// 注册异步任务
tasksMap := map[string]interface{}{
"add": Add,
"periodicTask": PeriodicTask,
}
return server, server.RegisterTasks(tasksMap)
}
// 第二步:启动Worker
func worker() error {
//消费者的标记
consumerTag := "machinery_worker"
server, err := startServer()
if err != nil {
return err
}
//第二个参数并发数, 0表示不限制
worker := server.NewWorker(consumerTag, 0)
//钩子函数
errorhandler := func(err error) {}
pretaskhandler := func(signature *tasks.Signature) {}
posttaskhandler := func(signature *tasks.Signature) {}
worker.SetPostTaskHandler(posttaskhandler)
worker.SetErrorHandler(errorhandler)
worker.SetPreTaskHandler(pretaskhandler)
return worker.Launch()
}
// 第三步:添加异步执行函数
func Add(args ...int64) (int64, error) {
println("############# 执行Add方法 #############")
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, nil
}
// 第四步:添加一个周期性任务
func PeriodicTask() error {
fmt.Println("################ 执行周期任务PeriodicTask #################")
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# 2.2 启动服务并发送任务
- go run main.go worker // 启动worker服务
- go run main.go // 发送任务到worker
# 03.gin+machinery
# 3.0 项目结构
go run main.go // 直接执行即可测试
xiaonaiqiang1@ZBMac-C02CW08SM work % tree ginWorker
ginWorker
├── main.go // 项目入库
└── pkg
└── task
├── server.go // machinery服务初始化
├── start.go // 启动异步任务入口
├── cronJobs.go // 触发周期性任务
├── sendJobs.go // 触发异任务
└── workers
└── tasks.go // 定义执行任务函数
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 3.1 main.go
package main
import (
"fmt"
"ginWorker/pkg/task"
"github.com/gin-gonic/gin"
"net/http"
)
func main() {
go task.Start() // 启动异步任务worker
go task.StartCron() // 启动定时任务
r := gin.Default()
r.GET("/add", func(c *gin.Context) {
task.TaskAdd(4,5) // 测试执行异步任务
c.String(http.StatusOK, "hello word")
})
fmt.Println("http://127.0.0.1:8000") //监听端口默认为8080
r.Run(":8000")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 3.2 pkg/task/server.go
package task
import (
"ginWorker/pkg/task/workers"
"github.com/RichardKnop/machinery/v2"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
"github.com/RichardKnop/machinery/v2/config"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2/tasks"
)
var AsyncTaskCenter *machinery.Server
// 第一:配置Server并注册任务
func startServer() (*machinery.Server, error) {
cnf := &config.Config{
DefaultQueue: "machinery_tasks",
ResultsExpireIn: 3600,
Redis: &config.RedisConfig{
MaxIdle: 3,
IdleTimeout: 240,
ReadTimeout: 15,
WriteTimeout: 15,
ConnectTimeout: 15,
NormalTasksPollPeriod: 1000,
DelayedTasksPollPeriod: 500,
},
}
// 创建服务器实例
broker := redisbroker.NewGR(cnf, []string{"localhost:6379"}, 0)
backend := redisbackend.NewGR(cnf, []string{"localhost:6379"}, 0)
lock := eagerlock.New()
server := machinery.NewServer(cnf, broker, backend, lock)
tasksMap := initAsyncTaskMap()
AsyncTaskCenter = server
return server, server.RegisterTasks(tasksMap)
}
// 第二步:启动Worker
func worker() error {
consumerTag := "machinery_worker" //消费者的标记
server, err := startServer()
if err != nil {
return err
}
worker := server.NewWorker(consumerTag, 0) //第二个参数并发数, 0表示不限制
//钩子函数
errorhandler := func(err error) {}
pretaskhandler := func(signature *tasks.Signature) {}
posttaskhandler := func(signature *tasks.Signature) {}
worker.SetPostTaskHandler(posttaskhandler)
worker.SetErrorHandler(errorhandler)
worker.SetPreTaskHandler(pretaskhandler)
return worker.Launch()
}
// 第三步:注册函数
func initAsyncTaskMap() map[string]interface{} {
tasksMap := map[string]interface{}{
"add": workers.Add,
"periodicTask": workers.PeriodicTask,
}
return tasksMap
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 3.3 pkg/task/start.go
package task
func Start() {
// 启动worker
if err := worker(); err != nil {
panic(err)
}
}
// 启动周期性任务
func StartCron() {
TestPeriodicTask()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 3.4 pkg/task/cronJobs.go
package task
import (
"fmt"
"github.com/RichardKnop/machinery/v2/tasks"
)
/* 触发执行periodicTask异步任务 */
func TestPeriodicTask() {
server, _ := startServer()
signature := &tasks.Signature{
Name: "periodicTask",
Args: []tasks.Arg{
},
}
// 每分钟执行一次periodicTask函数,验证发现不支持秒级别定时任务
err := server.RegisterPeriodicTask("*/1 * * * ?", "periodic-task", signature)
if err != nil {
fmt.Println(err)
}
asyncResult, _ := server.SendTask(signature)
fmt.Println(asyncResult)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 3.5 pkg/task/sendJobs.go
package task
import (
"fmt"
"github.com/RichardKnop/machinery/v2/tasks"
)
/* 触发执行Add异步任务 */
func TaskAdd(a,b int64) {
signature := &tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: a,
},
{
Type: "int64",
Value: b,
},
},
}
_, err := AsyncTaskCenter.SendTask(signature) // 任务可以通过将Signature的实例传递给Server实例来调用
if err != nil {
fmt.Println(err)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 3.6 pkg/task/workers/tasks.go
package workers
import (
"fmt"
"time"
)
// 添加异步执行函数
func Add(args ...int64) (int64, error) {
println("############# 执行Add方法 #############")
time.Sleep(10 * time.Second) // 模拟执行耗时任务
sum := int64(0)
for _, arg := range args {
sum += arg
}
println("############# Add方法Done #############")
return sum, nil
}
// 添加一个周期性任务
func PeriodicTask() error {
fmt.Println("################ 执行周期任务PeriodicTask #################")
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 3.7 运行结果
执行周期任务:每秒执行一次
通过接口触发异步任务
- http://127.0.0.1:8000/add
上次更新: 2024/3/13 15:35:10