不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • python基础

  • python模块

  • django

  • flask

  • SYL

  • Celery

    • 01.celery组件与原理
    • 02.celery简单使用
    • 03.项目中使用celery
      • 01.在普通项目中使用
        • 1.1 celery_task/main.py
        • 1.2 celery_task/tasks.py
        • 1.3 celery_task/tasks2.py
        • 1.4 启动项目
        • 1.5 celery并发方式
      • 02.在django项目中使用
        • 2.1 celery_task/main.py
        • 2.2 celery_task/tasks.py
        • 2.3 celery_task/tasks2.py
    • 04.django中使用
    • 05.celery分布式部署
    • 06.supervisor管理celery
    • 07.celery集群监控
    • 08.redis和rabbitmq区别
    • 10.celery踩过的坑
  • 微服务

  • python
  • Celery
xiaonaiqiang
2021-04-29
目录

03.项目中使用celery

# 01.在普通项目中使用

pip3 install Django==2.2
pip3 install celery==4.4.7
pip3 install redis==3.5.3
1
2
3

# 1.1 celery_task/main.py

# -*- coding: utf-8 -*-
from celery import Celery


# 1.celery基本配置
app = Celery('proj',
             broker='redis://localhost:6379/14',
             backend='redis://localhost:6379/15',
             include=['celery_task.tasks',
                      'celery_task.tasks2',
                      ])

# 2.实例化时可以添加下面这个属性
app.conf.update(
   result_expires=3600,        #执行结果放到redis里,一个小时没人取就丢弃
)

# 3.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {
    'add-every-5-seconds': {
        'task': 'celery_task.tasks.test_task_crontab',
        'schedule': 1,
        'args': (16, 16)
    },
}

# 4.添加时区配置
app.conf.timezone = 'UTC'

if __name__ == '__main__':
   app.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 1.2 celery_task/tasks.py

# -*- coding:utf8 -*-
from .main import app       #从当前目录导入app

# 1.test_task_crontab测试定时任务
@app.task
def test_task_crontab(x, y):
    print('执行定时任务')
    return x + y


# 2.测试异步发送邮件
@app.task(bind=True)
def send_sms_code(self, mobile, datas):
    return '异步发送邮件'
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 1.3 celery_task/tasks2.py

# -*- coding:utf8 -*-
from .main import app
import time,random

@app.task
def randnum(start,end):
    time.sleep(3)
    return random.randint(start,end)
1
2
3
4
5
6
7
8

# 1.4 启动项目

'''1.启动celery-worker'''
(syl) root@dev:opwf_project# celery -A celery_task.main worker -l INFO

'''2.启动celery-beat'''
(syl) root@dev:opwf_project# celery -A celery_task.main beat -l info
1
2
3
4
5

# 1.5 celery并发方式

  • Celery支持不同的并发和序列化的手段
    • 并发:Prefork, Eventlet, gevent, threads/single threaded
    • 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等
  • 方法1:使用进程池并发
    • 默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程
(syl) root@dev:opwf_project# celery worker -A celery_task.main --concurrency=4
1
  • 方法2:使用协程方式并发
# 安装eventlet模块
$ pip install eventlet

# 启用 Eventlet 池
$ celery -A celery_task.main worker -l info -P eventlet -c 1000
1
2
3
4
5

# 02.在django项目中使用

# 2.1 celery_task/main.py

# -*- coding: utf-8 -*-
from celery import Celery
import os,sys
import django

# # 1.添加django项目根路径
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))

# 2.添加django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings")
django.setup() # 读取配置


# 3.celery基本配置
app = Celery('proj',
             broker='redis://localhost:6379/14',
             backend='redis://localhost:6379/15',
             include=['celery_task.tasks',
                      'celery_task.tasks2',
                      ])

# 4.实例化时可以添加下面这个属性
app.conf.update(
   result_expires=3600,        #执行结果放到redis里,一个小时没人取就丢弃
)

# 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {
    'add-every-5-seconds': {
        'task': 'celery_task.tasks.test_task_crontab',
        'schedule': 5.0,
        'args': (16, 16)
    },
}

# 6.添加时区配置
app.conf.timezone = 'UTC'

if __name__ == '__main__':
   app.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 2.2 celery_task/tasks.py

# -*- coding:utf8 -*-
from .main import app       #从当前目录导入app
import os,sys
from .main import CELERY_BASE_DIR

# 1.test_task_crontab测试定时任务
@app.task
def test_task_crontab(x, y):
    # 添加django项目路径
    sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))

    from utils.rl_sms import test_crontab

    res = test_crontab(x, y)
    return x + y


# 2.测试异步发送邮件
@app.task(bind=True)
def send_sms_code(self, mobile, datas):
    sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
    # 在方法中导包
    from utils.rl_sms import send_message
    # time.sleep(5)
    try:
        # 用 res 接收发送结果, 成功是:0, 失败是:-1
        res = send_message(mobile, datas)
    except Exception as e:
        res = '-1'

    if res == '-1':
        # 如果发送结果是 -1  就重试.
        self.retry(countdown=5, max_retries=3, exc=Exception('短信发送失败'))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 2.3 celery_task/tasks2.py

# -*- coding:utf8 -*-
from .main import app
import time,random

@app.task
def randnum(start,end):
    time.sleep(3)
    return random.randint(start,end)
1
2
3
4
5
6
7
8
上次更新: 2024/3/13 15:35:10
02.celery简单使用
04.django中使用

← 02.celery简单使用 04.django中使用→

最近更新
01
04.数组双指针排序_子数组
03-25
02
08.动态规划
03-25
03
06.回溯算法
03-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式