不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • python基础

  • python模块

  • django

  • flask

  • SYL

  • Celery

    • 01.celery组件与原理
    • 02.celery简单使用
    • 03.项目中使用celery
    • 04.django中使用
    • 05.celery分布式部署
    • 06.supervisor管理celery
    • 07.celery集群监控
    • 08.redis和rabbitmq区别
    • 10.celery踩过的坑
      • 01.队列集群内存爆了
        • 1.1 场景描述
        • 1.2 Redis机器的内网带宽跑满
        • 1.3 RabbitMQ集群内存爆了
        • 1.4 RabbitMQ流控机制
        • 1.5 其余一些问题
        • 不启用RabbitMQ的confirm机制
        • 慎用CELERYACKSLATE
        • Celery 任务分队列
        • 让Celery忽略处理结果
        • Celery内存泄露
        • ip_conntrack: table full, dropping packet
        • Inodes满了没法写文件
      • 02.celery丢失任务
      • 03.celery重复执行
        • 3.1 情况描述
        • 3.2 celery once
        • 3.3 celery once使用步骤
        • 3.3.1 第一步,安装
        • 3.3.3 第三步,修改 delay 方法
        • 3.3.4 第四步,修改 task 参数
  • 微服务

  • python
  • Celery
xiaonaiqiang
2021-05-02
目录

10.celery踩过的坑

# 01.队列集群内存爆了

# 1.1 场景描述

  • 有一个项目须要处理大量的异步任务,并须要能够快速水平扩展,增长系统吞吐量。
  • 最终基于Celery来开发这个系统
  • Celery是用Python编写的一个分布式任务队列,经过消息队列来在Client与Worker之间传递任务
  • 但Celery自己并不提供消息队列服务,须要使用第三方的消息服务做为Broker
  • 官方推荐RabbitMQ和Redis,我一开始使用了Redis。
  • 使用Celery开发很容易,只须要编写任务逻辑,调度的事情Celery就帮你完成了。
  • 而后部署到3台机器,一切都运行得很好,Worker会把执行的events发送到Broker
  • 经过events能够知道任务执行的状况,成功仍是失败等等。
  • Celery Flower提供一个web界面来查看这些监控数据。
  • 后来项目须要增长系统吞吐量,OK,把程序发布到新机器,启动Celery Worker,就完成扩展了。
  • 随着开的Worker愈来愈多,问题也出来了。

# 1.2 Redis机器的内网带宽跑满

  • 系统使用单机Redis来作Broker,当链接到Redis的Worker增长的时候,内网流量也迅速增长
  • 最后达到1G,把千兆网卡跑满了,生产者和消费者的性能立刻降了下来
  • 经过查看Redis的操做记录,发现大量的发布订阅操做,消息是json格式
  • 除了对传入任务参数的封装,还有Celery自己附带的一些信息
  • Redis不停地把这些消息发布给各个Worker,而Redis性能真的好,因而产生了每秒1G的流量
  • 查看了Celery的文档,发现json
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_MESSAGE_COMPRESSION = 'gzip'
1
2
  • 把json改为Python内置的pickle,并压缩,能够减小一点点消息的大小,但仍是跑满了。
  • 想到单机的消息队列可能会成为系统的瓶颈,因而把单机Broker改为集群
  • 可是Redis集群用在这里不太好用,看了看RabbitMQ,自己支持集群,镜像模式还能够作HA,性能也能够
  • 果断把Broker改为RabbitMQ集群。
  • 拿了两台机器(16G内存)来组成集群,使用HAProxy来作负载均衡,Celery配置加上服务器
CELERY_QUEUE_HA_POLICY = 'all'
1
  • 而后单机内网流量降了下来,单机峰值在500M/s,可是运行几个小时以后,问题又来了。

# 1.3 RabbitMQ集群内存爆了

  • RabbitMQ集群出现OOM了,一番搜索以后异步

  • RabbitMQ自带的监控插件有可能会占用大量的内存,不看web界面的时候,把插件关闭。

[root@k8s-node2 ~]#  rabbitmq-plugins disable rabbitmq_management
1
  • celery默认会发送服务器的心跳信息,这些我是不须要的,能够经过zabbix等监控,关闭发送,能够在celery启动命令中加上 --without-heartbeat
  • celery默认会发送大量的任务处理状态事件,这些事件默认是不设置过时时间的
  • 由RabbitMQ的过时时间来处理,因此会有大量的事件数据在RabbitMQ中堆积但又不会被消费。
  • 能够在celery配置中加上过时时间,如设置过时时间5s
CELERY_EVENT_QUEUE_TTL = 5
1
  • 作完这几步以后,RabbitMQ单机内存占用稳定在1~2G,并且内网流量也大幅降了下来,峰值100M/s

# 1.4 RabbitMQ流控机制

  • 这个系统生产消息有明显的高峰和低谷,观察高峰时生产者的日志
  • 发现当生产者刚启动时,队列尚未消息的时候,消息入队很快,大概2k/s
  • 而后几十秒以后,发现消息入队愈来愈慢,入队2k逐渐须要4s、8s、10s
  • 一番搜索以后,发现RabbitMQ有流量控制机制,当生产者过快,消费者来不及消费消息,消息在队列中堆积
  • RabbitMQ就会阻塞发布消息过快的连击,也就表现为入队逐渐变慢
  • 这时须要注意调整生产和消费的速率,注意RabbitMQ内存占用和内存阀值配置,以及磁盘空间。

# 1.5 其余一些问题

  • # 不启用RabbitMQ的confirm机制

    • RabbitMQ处理confirm消息占用了大量cpu资源。
    • confrim的做用在于当消息真正落地写到磁盘时,给生产者发送ack确认
    • 若生产者在收到该ack后才丢弃该消息,就能够保证消息必定不丢,这是一种很是高强度的可靠性保证。
    • 但若没有这么高的要求则能够不启用confirm机制,增长RabbitMQ的吞吐量。
  • # 慎用CELERY_ACKS_LATE

    • Celery的CELERY_ACKS_LATE=True,表示Worker在任务执行完后才向Broker发送acks
    • 告诉队列这个任务已经处理了,而不是在接收任务后执行前发送
    • 这样能够在Worker处理任务过程当中异常退出,这个任务还会被发送给别的Worker处理。
    • 可是可能的状况是,一个任务会被屡次执行,因此必定要慎用。
  • # Celery 任务分队列

    • 耗时和不耗时的任务分开,避免耗时任务阻塞队列;
    • 重要和不重要的任务分开,避免重要的任务得不到及时处理。
  • # 让Celery忽略处理结果

    • 多数状况下并不须要关注Celery处理的结果,况且在Worker里面咱们会记录其结果
    • 设置CELERY_IGNORE_RESULT = True可让Celery不要把结果发送到Broker,也能够下降内网流量和Broker内存占用。
  • # Celery内存泄露

    • 长时间运行Celery有可能发生内存泄露,能够配置CELERYD_MAX_TASKS_PER_CHILD
    • 让Worker在执行n个任务杀掉子进程再启动新的子进程,能够防止内存泄露。
    • 另外Worker执行大量任务后有僵死的状况,启动了一个crontab定时重启Worker。
  • # ip_conntrack: table full, dropping packet

    • 系统执行时会创建大量的链接,形成iptables跟踪表满了,socket拒绝链接,性能提不上去。
    • 解决方法:加大 ip_conntrack_max 值。
  • # Inodes满了没法写文件

    • 因为创建了太多的临时文件,发现磁盘没有满,但仍是没法写入文件
    • 由于Inodes被用完了,启动一个crontab定时清理临时文件

# 02.celery丢失任务

  • 修改配置如下:
task_reject_on_worker_lost = True    # 作用是当worker进程意外退出时,task会被放回到队列中
task_acks_late = True            # 作用是只有当worker完成了这个task时,任务才被标记为ack状态
1
2
  • 该配置可以保证task不丢失,中断的task在下次启动时将会重新执行。

  • 需要说明的是,backend最好使用rabbitmq等支持ACK状态的消息中间件。

# 03.celery重复执行

# 3.1 情况描述

  • celery 在执行task时有个机制,就是任务时长超过了 visibility_timeout 时还没执行完
  • 就会指定其他worker重新开始task,默认的时长是一小时.
app.conf.broker_transport_options = {‘visibility_timeout’: 3600}
1

# 3.2 celery once

  • Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类
  • 该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base
@task(base=QueueOnce, once={'graceful': True})
1
  • 后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False

  • 那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

  • 另外如果要手动设置任务的 key,可以指定 keys 参数

@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b
1
2
3
4

# 3.3 celery once使用步骤

# 3.3.1 第一步,安装

pip install -U celery_once
1

3.3.2 第二步,增加配置

from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}
1
2
3
4
5
6
7
8
9
10
11
12

# 3.3.3 第三步,修改 delay 方法

example.delay(10)
# 修改为
result = example.apply_async(args=(10))
1
2
3

# 3.3.4 第四步,修改 task 参数

@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b
1
2
3
4
上次更新: 2024/3/13 15:35:10
08.redis和rabbitmq区别
01.grpc基本使用

← 08.redis和rabbitmq区别 01.grpc基本使用→

最近更新
01
04.数组双指针排序_子数组
03-25
02
08.动态规划
03-25
03
06.回溯算法
03-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式