10.celery踩过的坑
# 01.队列集群内存爆了
# 1.1 场景描述
- 有一个项目须要处理大量的异步任务,并须要能够快速水平扩展,增长系统吞吐量。
- 最终基于Celery来开发这个系统
- Celery是用Python编写的一个分布式任务队列,经过消息队列来在Client与Worker之间传递任务
- 但Celery自己并不提供消息队列服务,须要使用第三方的消息服务做为Broker
- 官方推荐RabbitMQ和Redis,我一开始使用了Redis。
- 使用Celery开发很容易,只须要编写任务逻辑,调度的事情Celery就帮你完成了。
- 而后部署到3台机器,一切都运行得很好,Worker会把执行的events发送到Broker
- 经过events能够知道任务执行的状况,成功仍是失败等等。
- Celery Flower提供一个web界面来查看这些监控数据。
- 后来项目须要增长系统吞吐量,OK,把程序发布到新机器,启动Celery Worker,就完成扩展了。
- 随着开的Worker愈来愈多,问题也出来了。
# 1.2 Redis机器的内网带宽跑满
- 系统使用单机Redis来作Broker,当链接到Redis的Worker增长的时候,内网流量也迅速增长
- 最后达到1G,把千兆网卡跑满了,生产者和消费者的性能立刻降了下来
- 经过查看Redis的操做记录,发现大量的发布订阅操做,消息是json格式
- 除了对传入任务参数的封装,还有Celery自己附带的一些信息
- Redis不停地把这些消息发布给各个Worker,而Redis性能真的好,因而产生了每秒1G的流量
- 查看了Celery的文档,发现json
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_MESSAGE_COMPRESSION = 'gzip'
1
2
2
- 把json改为Python内置的pickle,并压缩,能够减小一点点消息的大小,但仍是跑满了。
- 想到单机的消息队列可能会成为系统的瓶颈,因而把单机Broker改为集群
- 可是Redis集群用在这里不太好用,看了看RabbitMQ,自己支持集群,镜像模式还能够作HA,性能也能够
- 果断把Broker改为RabbitMQ集群。
- 拿了两台机器(16G内存)来组成集群,使用HAProxy来作负载均衡,Celery配置加上服务器
CELERY_QUEUE_HA_POLICY = 'all'
1
- 而后单机内网流量降了下来,单机峰值在500M/s,可是运行几个小时以后,问题又来了。
# 1.3 RabbitMQ集群内存爆了
RabbitMQ集群出现OOM了,一番搜索以后异步
RabbitMQ自带的监控插件有可能会占用大量的内存,不看web界面的时候,把插件关闭。
[root@k8s-node2 ~]# rabbitmq-plugins disable rabbitmq_management
1
- celery默认会发送服务器的心跳信息,这些我是不须要的,能够经过zabbix等监控,关闭发送,能够在celery启动命令中加上 --without-heartbeat
- celery默认会发送大量的任务处理状态事件,这些事件默认是不设置过时时间的
- 由RabbitMQ的过时时间来处理,因此会有大量的事件数据在RabbitMQ中堆积但又不会被消费。
- 能够在celery配置中加上过时时间,如设置过时时间5s
CELERY_EVENT_QUEUE_TTL = 5
1
- 作完这几步以后,RabbitMQ单机内存占用稳定在1~2G,并且内网流量也大幅降了下来,峰值100M/s
# 1.4 RabbitMQ流控机制
- 这个系统生产消息有明显的高峰和低谷,观察高峰时生产者的日志
- 发现当生产者刚启动时,队列尚未消息的时候,消息入队很快,大概2k/s
- 而后几十秒以后,发现消息入队愈来愈慢,入队2k逐渐须要4s、8s、10s
- 一番搜索以后,发现RabbitMQ有流量控制机制,当生产者过快,消费者来不及消费消息,消息在队列中堆积
- RabbitMQ就会阻塞发布消息过快的连击,也就表现为入队逐渐变慢
- 这时须要注意调整生产和消费的速率,注意RabbitMQ内存占用和内存阀值配置,以及磁盘空间。
# 1.5 其余一些问题
# 不启用RabbitMQ的confirm机制
- RabbitMQ处理confirm消息占用了大量cpu资源。
- confrim的做用在于当消息真正落地写到磁盘时,给生产者发送ack确认
- 若生产者在收到该ack后才丢弃该消息,就能够保证消息必定不丢,这是一种很是高强度的可靠性保证。
- 但若没有这么高的要求则能够不启用confirm机制,增长RabbitMQ的吞吐量。
# 慎用CELERY_ACKS_LATE
- Celery的CELERY_ACKS_LATE=True,表示Worker在任务执行完后才向Broker发送acks
- 告诉队列这个任务已经处理了,而不是在接收任务后执行前发送
- 这样能够在Worker处理任务过程当中异常退出,这个任务还会被发送给别的Worker处理。
- 可是可能的状况是,一个任务会被屡次执行,因此必定要慎用。
# Celery 任务分队列
- 耗时和不耗时的任务分开,避免耗时任务阻塞队列;
- 重要和不重要的任务分开,避免重要的任务得不到及时处理。
# 让Celery忽略处理结果
- 多数状况下并不须要关注Celery处理的结果,况且在Worker里面咱们会记录其结果
- 设置CELERY_IGNORE_RESULT = True可让Celery不要把结果发送到Broker,也能够下降内网流量和Broker内存占用。
# Celery内存泄露
- 长时间运行Celery有可能发生内存泄露,能够配置CELERYD_MAX_TASKS_PER_CHILD
- 让Worker在执行n个任务杀掉子进程再启动新的子进程,能够防止内存泄露。
- 另外Worker执行大量任务后有僵死的状况,启动了一个crontab定时重启Worker。
# ip_conntrack: table full, dropping packet
- 系统执行时会创建大量的链接,形成iptables跟踪表满了,socket拒绝链接,性能提不上去。
- 解决方法:加大 ip_conntrack_max 值。
# Inodes满了没法写文件
- 因为创建了太多的临时文件,发现磁盘没有满,但仍是没法写入文件
- 由于Inodes被用完了,启动一个crontab定时清理临时文件
# 02.celery丢失任务
- 修改配置如下:
task_reject_on_worker_lost = True # 作用是当worker进程意外退出时,task会被放回到队列中
task_acks_late = True # 作用是只有当worker完成了这个task时,任务才被标记为ack状态
1
2
2
该配置可以保证task不丢失,中断的task在下次启动时将会重新执行。
需要说明的是,backend最好使用rabbitmq等支持ACK状态的消息中间件。
# 03.celery重复执行
# 3.1 情况描述
- celery 在执行task时有个机制,就是任务时长超过了 visibility_timeout 时还没执行完
- 就会指定其他worker重新开始task,默认的时长是一小时.
app.conf.broker_transport_options = {‘visibility_timeout’: 3600}
1
# 3.2 celery once
- Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类
- 该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base
@task(base=QueueOnce, once={'graceful': True})
1
后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False
那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。
另外如果要手动设置任务的 key,可以指定 keys 参数
@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
sleep(30)
return a + b
1
2
3
4
2
3
4
# 3.3 celery once使用步骤
# 3.3.1 第一步,安装
pip install -U celery_once
1
3.3.2 第二步,增加配置
from celery import Celery
from celery_once import QueueOnce
from time import sleep
celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 3.3.3 第三步,修改 delay 方法
example.delay(10)
# 修改为
result = example.apply_async(args=(10))
1
2
3
2
3
# 3.3.4 第四步,修改 task 参数
@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
sleep(30)
return a + b
1
2
3
4
2
3
4
上次更新: 2024/3/13 15:35:10