04.django中使用
# 1. 安装相关包 与 管理命令
# 1.1 安装包
pip3 install Django==2.2
pip3 install celery==4.4.7
pip3 install redis==3.5.3
pip3 install django-celery==3.1.17
1
2
3
4
2
3
4
# 1.2 celery管理
celery -A celery_task worker -l INFO # 单线程
celery multi start w1 w2 -A celery_pro -l info #一次性启动w1,w2两个worker
celery -A celery_pro status #查看当前有哪些worker在运行
celery multi stop w1 w2 -A celery_pro #停止w1,w2两个worker
# 1.项目中启动celery worker
celery multi start celery_task -A celery_task -l debug --autoscale=50,10 # celery并发数:最多50个,最少5个
# 2.在项目中关闭celery worker
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9 # 关闭所有celery进程
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 1.3 django_celery_beat管理
# 1.普通测试启动celery beat
celery -A celery_task beat -l info
# 2.在项目中后台启动celery beat
celery -A celery_task beat -l debug >> /aaa/Scheduler.log 2>&1 &
# 3.停止celery beat
ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}' | xargs kill -TERM &> /dev/null # 杀死心跳所有进程
1
2
3
4
5
6
2
3
4
5
6
# 2.安装相关包 与 管理命令
# 2.1 在与项目同名的目录下创建celery.py
import os
from celery import Celery
# 只要是想在自己的脚本中访问Django的数据库等文件就必须配置Django的环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'opwf.settings')
# app名字
app = Celery('celery_test')
# 配置celery
class Config:
BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
app.config_from_object(Config)
# 到各个APP里自动发现tasks.py文件
app.autodiscover_tasks()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 2.2 在与项目同名的目录下的 init.py 文件中添加下面内容
# -*- coding:utf8 -*-
# 告诉Django在启动时别忘了检测我的celery文件
from .celery import app as celery_app
__all__ = ['celery_app']
1
2
3
4
5
2
3
4
5
# 2.3 创建workorder/tasks.py文件
# -*- coding:utf8 -*-
from celery import shared_task
import time
# 这里不再使用@app.task,而是用@shared_task,是指定可以在其他APP中也可以调用这个任务
@shared_task
def add(x,y):
print('########## running add #####################')
return x + y
@shared_task
def minus(x,y):
time.sleep(30)
print('########## running minus #####################')
return x - y
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 2.4 启动
- 保证启动了redis-server
- 启动一个celery的worker
celery -A opwf worker -l INFO
celery multi start w1 w2 -A celery_pro -l info #一次性启动w1,w2两个worker
celery -A celery_pro status #查看当前有哪些worker在运行
celery multi stop w1 w2 -A celery_pro #停止w1,w2两个worker
celery multi start celery_test -A celery_test -l debug --autoscale=50,5 # celery并发数:最多50个,最少5个
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9 # 关闭所有celery进程
1
2
3
4
5
6
7
2
3
4
5
6
7
# 2.5 测试celery
./manage.py shell
import tasks
t1 = tasks.minus.delay(5,3)
t2 = tasks.add.delay(3,4)
t1.get()
t2.get()
1
2
3
4
5
6
2
3
4
5
6
上次更新: 2024/3/13 15:35:10