03.项目中使用celery
# 01.在普通项目中使用
pip3 install Django==2.2
pip3 install celery==4.4.7
pip3 install redis==3.5.3
1
2
3
2
3
# 1.1 celery_task/main.py
# -*- coding: utf-8 -*-
from celery import Celery
# 1.celery基本配置
app = Celery('proj',
broker='redis://localhost:6379/14',
backend='redis://localhost:6379/15',
include=['celery_task.tasks',
'celery_task.tasks2',
])
# 2.实例化时可以添加下面这个属性
app.conf.update(
result_expires=3600, #执行结果放到redis里,一个小时没人取就丢弃
)
# 3.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {
'add-every-5-seconds': {
'task': 'celery_task.tasks.test_task_crontab',
'schedule': 1,
'args': (16, 16)
},
}
# 4.添加时区配置
app.conf.timezone = 'UTC'
if __name__ == '__main__':
app.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 1.2 celery_task/tasks.py
# -*- coding:utf8 -*-
from .main import app #从当前目录导入app
# 1.test_task_crontab测试定时任务
@app.task
def test_task_crontab(x, y):
print('执行定时任务')
return x + y
# 2.测试异步发送邮件
@app.task(bind=True)
def send_sms_code(self, mobile, datas):
return '异步发送邮件'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 1.3 celery_task/tasks2.py
# -*- coding:utf8 -*-
from .main import app
import time,random
@app.task
def randnum(start,end):
time.sleep(3)
return random.randint(start,end)
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 1.4 启动项目
'''1.启动celery-worker'''
(syl) root@dev:opwf_project# celery -A celery_task.main worker -l INFO
'''2.启动celery-beat'''
(syl) root@dev:opwf_project# celery -A celery_task.main beat -l info
1
2
3
4
5
2
3
4
5
# 1.5 celery并发方式
- Celery支持不同的并发和序列化的手段
- 并发:Prefork, Eventlet, gevent, threads/single threaded
- 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等
方法1:使用进程池并发
- 默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程
(syl) root@dev:opwf_project# celery worker -A celery_task.main --concurrency=4
1
方法2:使用协程方式并发
# 安装eventlet模块
$ pip install eventlet
# 启用 Eventlet 池
$ celery -A celery_task.main worker -l info -P eventlet -c 1000
1
2
3
4
5
2
3
4
5
# 02.在django项目中使用
# 2.1 celery_task/main.py
# -*- coding: utf-8 -*-
from celery import Celery
import os,sys
import django
# # 1.添加django项目根路径
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
# 2.添加django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings")
django.setup() # 读取配置
# 3.celery基本配置
app = Celery('proj',
broker='redis://localhost:6379/14',
backend='redis://localhost:6379/15',
include=['celery_task.tasks',
'celery_task.tasks2',
])
# 4.实例化时可以添加下面这个属性
app.conf.update(
result_expires=3600, #执行结果放到redis里,一个小时没人取就丢弃
)
# 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {
'add-every-5-seconds': {
'task': 'celery_task.tasks.test_task_crontab',
'schedule': 5.0,
'args': (16, 16)
},
}
# 6.添加时区配置
app.conf.timezone = 'UTC'
if __name__ == '__main__':
app.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 2.2 celery_task/tasks.py
# -*- coding:utf8 -*-
from .main import app #从当前目录导入app
import os,sys
from .main import CELERY_BASE_DIR
# 1.test_task_crontab测试定时任务
@app.task
def test_task_crontab(x, y):
# 添加django项目路径
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
from utils.rl_sms import test_crontab
res = test_crontab(x, y)
return x + y
# 2.测试异步发送邮件
@app.task(bind=True)
def send_sms_code(self, mobile, datas):
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
# 在方法中导包
from utils.rl_sms import send_message
# time.sleep(5)
try:
# 用 res 接收发送结果, 成功是:0, 失败是:-1
res = send_message(mobile, datas)
except Exception as e:
res = '-1'
if res == '-1':
# 如果发送结果是 -1 就重试.
self.retry(countdown=5, max_retries=3, exc=Exception('短信发送失败'))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 2.3 celery_task/tasks2.py
# -*- coding:utf8 -*-
from .main import app
import time,random
@app.task
def randnum(start,end):
time.sleep(3)
return random.randint(start,end)
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
上次更新: 2024/3/13 15:35:10