分布式调度系统
# 01.服务设计要点
# 0、项目介绍
# 1)自我介绍
您好,我是 xxx,毕业于郑州大学,计算机通信方向
这几年我主要做了四方面的工作:蔚来自动驾驶团队的分布式调度系统、
京东的 DevOps 平台、机器学习平台,以及运维自动化开发
主要使用的开发语言是 Golang 和 Python,Java 这块主要是负责一些老项目的维护
前端的话,以前用 Vue 做过一些运维开发项目,
数据库和中间件这类技术也在工作中都接触过,基本使用和原理都没问题
目前,我在蔚来汽车 负责自动驾驶团队的分布式调度系统开发
这套系统每天要处理任务规模2000万,最大并发任务量20万左右,
支撑着自动驾驶团队的整个研发和测试流程,
相比传统的互联网任务调度,自动驾驶的任务调度复杂得多
我们不仅要处理 K8S 环境,还要支持测试车、台架等多终端设备
同时,由于很多设备是在外网或弱网环境下,还要应对设备经常离线的问题
基本情况就是这些,如果您对某些细节感兴趣,我们可以进一步聊聊
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2)项目介绍
计算资源大概12万核
目前每天处理两千万左右任务量
当前业务最大并发任务20万左右
系统所有节点都是可以支持动态扩容的,理论上只要机器资源满足,调度性能完全没有问题
2
3
4
# 背景
蔚来汽车通用调度服务,每天要处理任务规模2000万以上,最大并发任务量20万左右,
支撑着自动驾驶团队的整个研发和测试流程
相比传统的互联网任务调度,自动驾驶的任务调度复杂得多
我们不仅要处理 K8S 环境,还要支持测试车、台架等多终端设备
同时,由于很多设备是在外网或弱网环境下,还要应对设备经常离线的问题
系统采用分层架构,核心部分分为三层,
# 第一层,配置管理层(Web-Server)
提供友好的界面,用户通过拖拽可快速完成DAG任务的编排,轻松实现串并行任务定制
支持多种方式触发调度实例,包括GitLab、Webhook、定时触发和人工触发
提供资源配置(K8S、测试车、台架等)、仓库配置、通知管理等功能,满足任务执行的多种需求
# 第二层,调度引擎层
在实例创建后,通过TiDB事务将实例ID推送至RabbitMQ的实例ID队列
调度引擎根据实例的优先级和DAG配置,将需要执行的任务(原子任务)
批量推送至RabbitMQ的原子ID队列中,不同优先级的任务分配到各自的队列
预处理服务会根据实例ID、原子ID获取任务配置和执行命令,
并生成最终执行的结构体(如K8S Job的YAML文件)推送至原子封装队列
# 第三层,任务执行层(GRPC服务层)
任务的实际执行通过gRPC服务与各终端设备(如测试车、台架等)通信
gRPC服务会根据客户端的Tag标记将任务分发给合适的执行器
系统支持去中心化调度,自动故障转移机制确保任务在弱网或设备故障情况下能够重新调度执行
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 1、分布式一致性(去中心化)
# 1)设计概述
设计目标
每个调度服务实例都具备任务调度能力,避免单点故障
任务在多个实例消费时,不会被重复处理
确保即使调度服务实例故障,消息仍然不会丢失
能够水平扩展,支持通过增加调度服务实例来提高并发处理能力
各调度实例合理分担任务,避免某些实例过载
系统流程
1. 每个调度服务实例启动后注册到 ETCD,维护服务列表和健康状态
2. 所有调度服务实例并行监听 RabbitMQ 队列,尝试抢占消费任务
3. 成功消费任务后,调度服务获取 Redis 锁(任务ID为 key)防止重复消费
4. 如果锁获取成功,调度服务执行任务;如果锁获取失败,丢弃该任务
5. 任务执行完成后,调度服务发送 RabbitMQ `ack`,并更新任务状态到 Redis
6. 如果任务执行失败,RabbitMQ 重新将任务放回队列,等待其他实例消费
7. 如果多次执行失败,任务会写入到redis中,由监控告警人工介入处理
2
3
4
5
6
7
# 2)去中心化调度
多实例消费 RabbitMQ
,所有调度服务实例可以同时从同一个 RabbitMQ 队列中消费任务抢占式消费机制
,- RabbitMQ 的特性允许多个消费者从同一个队列并行抢占式消费消息,但每条消息只能被一个消费者成功消费
消息确认机制
:- 每个调度实例通过GrpcServer分发给GrpcClient后,会使用 RabbitMQ 的
ack
确认机制 - 如果未能成功执行,消息会重新放回队列,其他实例可继续消费
- 每个调度实例通过GrpcServer分发给GrpcClient后,会使用 RabbitMQ 的
redis去重机制
- 在消费者端维护一个全局Redis Set,分别记录“正在执行”和“已执行成功”的任务状态
- 消费端在获取任务后,将任务加入“正在执行”Set
- 如果任务执行失败,则从“正在执行”Set移除
- 若执行成功,则将其移入“已执行成功”Set
- 消费消息前会先在这两个Set中进行检查,若任务已经存在,则自动放弃调度,以避免重复执行
- 在消费者端维护一个全局Redis Set,分别记录“正在执行”和“已执行成功”的任务状态
# 2、容错机制
- 为了在 gRPC 客户端网络抖动时,既能保证任务被可靠调度,又能避免过度敏感导致重复调度
- 需要设计合理的**
失联检测
和任务重调度
**机制
客户端心跳机制
- 客户端定期发送心跳包(如每10秒)以保持连接
- 服务端缓存心跳时间戳,更新最后接收时间
- 若60秒内未收到心跳,服务端将触发失联检测
- 启动任务重调度逻辑,确保系统稳定性和可靠性
任务超时机制
客户端在执行任务期间,每隔 3 分钟 发送心跳请求到服务端,告知任务仍在执行
服务端收到心跳后,更新该任务的最后执行时间戳
如果超过一定时间(9分钟)未收到心跳,才会认为任务可能失败(网络抖动处理)
任务重调度策略
在消费者端维护一个全局Redis Set,分别记录“正在执行”和“已执行成功”的任务状态
若任务已经存在,则自动放弃调度,以避免重复执行
若判断当前任务需要重新调度,则从“正在执行”Set移除;并将这些任务重新分配给其他可用客户端
故障转移
在系统发生故障时,
任务引擎服务
会接管调度功能通过Etcd获取其他可用的gRPC服务端进行任务派发,确保任务不中断
系统在每次任务重新调度时,会增加其TTL值当TTL值超过预设限度时
系统会将TTL超时状态上报到RabbitMQ,以便及时处理异常情况
任务失败自动重试
系统支持为任务设置失败后的重试次数,任务失败时系统会根据预设的重试策略自动进行重试
同时设置重试最大次数,以避免系统资源浪费
# 3、高效调度
- 任务发布:用户提交任务,通过RabbitMQ发布,包含优先级和组信息,任务被路由到对应队列
- 调度:RabbitMQ根据优先级进行任务调度,Redis实时监控调度状态并动态调整资源分配
- 执行:Worker节点接收任务并反馈结果至RabbitMQ,结果异步写入TiDB实现持久化存储
RabbitMQ 优先级队列
优先级范围:可以设置一个优先级范围,通常为 [0-9] 或 [0-255],0 为最低优先级,最大值为最高优先级
任务的分配:当多个任务同时到达时,优先级高的任务会优先被调度和执行
分组队列机制
每个用户组对应一个队列(例如,
GroupA
有GroupA-PriorityQueue
,GroupB
有GroupB-PriorityQueue
)不同组的队列消息根据业务规则分配不同的优先级区间,例如
0-9
优先级使用
basic_qos
方法为每个队列设置不同的预取值,控制每次消费者从队列中取出消息的频率预取值计算公式:
权重 * 总处理能力
(假设系统的总任务处理能力为100次/秒
)GroupA
的预取值应设为0.1 * 100 = 10
GroupB
的预取值应设为0.9 * 100 = 90
动态分片
系统支持将一次任务中的多个并行原子任务打包并放入队列中
调度服务会优先将这些任务分配给不同的Agent执行器随着用户执行器数量的增加,任务分片的数量也会相应增加
从而加快任务处理速度这种机制提高了任务的并行度和执行效率
路由策略
每个原子任务可以设置Tag标记,用于指引任务分配到特定的执行器节点这有助于优化资源分配,提升系统调度效率
支持用户部署私有Agent,用户只需具备足够资源,即可保证任务的高效调度此设计增强了系统的灵活性和定制化能力
# 4、计算加速
# 1)缓存与镜像加速
NIOFS、Ceph、Alluxio,高成本(需要三份数据备份)、复杂的运维和部署
Dragonfly:
- 通过利用 P2P 技术,减少跨机房和跨境带宽成本
- 无缝支持容器镜像分发,容易集成
- 支持机器整体带宽管理,提升稳定性
- 提高下载稳定性,支持高度一致性校验
Dragonfly 具体实现
在 Kubernetes 集群中的每个 Node 节点上安装配置
dfdaemon
代理sudo vim /etc/docker/daemon.json # 添加以下内容,将 Docker 的镜像拉取代理指向 dfdaemon: { "registry-mirrors": ["http://127.0.0.1:65001"] } sudo systemctl restart docker # dfdaemon 启动:确保 dfdaemon 在每个节点上监听 65001 端口,拦截 Docker 的镜像拉取请求
1
2
3
4
5
6
7
8
9
# 2)计算加速与调度优化
实现过程:
- 计算加速组件首先识别处于待处理状态(pending)的计算任务(即 step instance)
- 每个任务通常会有其所需的数据集(例如,模型文件、输入数据等)
- 这些数据集合需要被识别并聚合在一起,以便后续处理
示例: 假设有一个机器学习任务,它有多个步骤(step),每个步骤需要不同的数据
Step A 需要数据集
dataset_A
Step B 需要模型文件
model_B
Step C 需要特征文件
features_C
计算加速组件会将这些数据收集起来,形成一个待处理的数据集合
{ "pending_steps": ["Step A", "Step B", "Step C"], "data_requirements": [ "dataset_A", "model_B", "features_C" ] }
1
2
3
4
5
6
7
8
利用缓存加速模块有序并发下载 Data 集合
- 计算加速组件会利用缓存加速模块,根据用户配置的缓存空间和跨域下载需求,进行有序的并发下载
- 如果某个数据已经被缓存,则可以直接从缓存中获取,而不需要再次下载
- 缓存加速模块会将数据写入到pod高性能 pvc挂载盘中,缓存结束后会把缓存映射路径写入到TiDB
- 当原子被调度运行起来时检查是否有映射,有就直接使用无需再次下载
# 5、海量日志处理
# 1)日志收集流程
日志收集流程
(2000万任务量的日志量)grpc client通过长连接将日志传送给grpc Server,grpc server将日志写入Kafka
再由分布客户端消费Kafka中日志数据,将日志写入到ES中
当实例执行结束后,将ES中日志写入到对象存储S3中,并删除ES中日志数据
任务重新调度处理
每个任务派发时会生成一个唯一TaskID,将日志写入Kafka时使用TaskID作为标识
每次任务重试,不管多少次,都会将执行事件写入到TiDB中,每次对应一个唯一TaskID
任务被重新调度,TaskID不会改变,即使被重新调度,也可以将两次调度日志合并到一起
# 2)日志数据流设计
Kafka 分区与 TaskID 映射
Kafka 可以基于
TaskID
进行哈希分区,保证同一个任务的所有日志会写入到同一个分区,方便后续的消费和处理Kafka 的生产者可以通过设置批量发送日志,减少频繁的网络交互,提升吞吐量
日志批量消费与存储
可以设置多个消费者实例来并行消费不同分区的日志,保证高并发任务的日志能够实时处理
同时,消费日志时可以选择批量拉取,减轻对下游存储系统的压力
日志存储与查询
实时日志存储 (ES)
,能够支持基于TaskID
查询日志,并且可以根据时间、节点等条件进行过滤查询归档日志存储 (S3)
,任务执行结束后,ES 中的日志会按批次推送到 S3 或 OSS 对象存储中
# 6、跨网络调度
跨网段通信
- 通过gRPC Stream的双向流式传输,保持客户端与服务端的持续连接,减少频繁创建连接的开销
- 外网Agent利用OpenSSL自建CA和特定算法,采用外网私钥加密、内网公钥解密,确保建联安全
gRPC通信和负载均衡
- 当客户端断开连接时,系统自动重连,迅速恢复连接。
- 采用轮询、最小连接数等算法,结合Ingress的负载均衡,动态分配流量,并将连接信息存储在内存中和ETCD
- 通过心跳监控连接状态,若多次失败则移除失效连接
- 并将AgentID和未完成任务记录在Redis中,超时未重连则重新派发任务,同时保证任务幂等性
# 02.服务架构
# 1、RabbitMQ
- 副本数量:考虑到高并发需求,建议至少使用 3 个副本(
RabbitMQ 3.9.x
)- CPU: 32 核心
- Memory: 128GB
- 存储: 512GB SSD
# 2、Kafka
Broker 节点数量:建议至少 7 个 Broker 节点,以提高高可用性和分担负载 (
Kafka 3.5.x
)- CPU: 16 核心
- 内存: 64GB
- 存储: 2TB SSD
Partition 数量:对于 2000 万任务的高吞吐量,建议 Partition 数量设置为 200,以确保消息流量均匀分配
优化 ISR 和控制器:使用 Kafka 的控制器自动管理分区和副本,确保集群在节点故障时能够快速恢复
# 3、ES 8.x
主节点Mater:
至少三台单独机器(node.master: true )CPU: 4 核心
内存: 16GB(分配 50% 的内存给 JVM)
硬盘:SSD,大小根据需求,通常配置 256GB 或以上
协调节点Coordinating:
至少三台独立服务器(node.data: true )CPU: 8 核心
内存: 32GB(分配 50% 的内存给 JVM)
硬盘:SSD,大小建议 256GB 或以上
数据节点Data:
至少4台,后续不够用可以随时扩容,数据节点最好不要超过100台CPU: 16 核心
内存: 64GB(分配 50% 的内存给 JVM)
存储:SSD,建议 1TB 或以上
# 3、ETCD
- 节点数量:建议至少使用 5 个节点,以增强容错能力(
ETCD 3.5.x
)- CPU: 8 核心
- 内存: 32GB
- 存储: 256GB SSD
# 4、Redis
- 节点数量:使用 Redis Cluster,建议至少 6 个节点配置(
Redis 7.x
)- CPU: 16 核心
- 内存: 128GB
- 存储: 512GB SSD
# 5、TiDB
- TiDB Server:使用 5 个实例配置(
TiDB 7.x
)- CPU: 32 核心
- Memory: 128GB
- PD Server:3 个实例,每个实例配置
- CPU: 8 核心
- Memory: 32GB
- TiKV:至少 7 个实例,每个实例配置
- CPU: 32 核心
- Memory: 128GB
- 存储: 2TB SSD
# 03.项目概述
# 1、项目
技术架构:
- 技术栈: Python、Golang、Shell、gRPC、Gin、Gorm、Client-Go、OpenSSL、TiDB、Redis、RabbitMQ、Kafka、Etcd、ES、K8S、Apollo、Artifactory、S3
- 架构设计:系统采用
分布式微服务架构
,通过消息队列、DAG任务编排
、gRPC通信
等核心技术,构建高可用
、高性能
的任务调度与执行平台
项目背景:
- 传统调度主要针对内网K8S环境,对车载设备、工业台架等多终端跨网络环境无法支持
- 项目旨在构建一种支持
跨网段、多终端
模式的分布式
调度系统
项目介绍:
- 通用调度服务是一个高效、可靠的分布式调度系统,每日处理超过2000万任务
- 系统设计充分考虑到
去中心化
、容错机制
、高效调度
、计算加速
等核心能力
核心职责与贡献:
- 分布式一致性与去中心化
- 系统采用去中心化架构,确保每个调度实例具备独立调度能力,避免单点故障
- 充分利用RabbitMQ可靠性,结合Redis去重机制和分布式锁,保证调度一致性
- 容错机制
- 设计了健壮的 心跳服务、故障转移与重调度机制
- 确保在网络波动或设备故障时任务能够可靠地重新分配
- 高效调度与资源优化
- 系统利用RabbitMQ优先级队列和Redis动态调度策略
- 根据任务优先级和资源使用状况进行实时调度
- 支持高达20万的并发任务处理,确保计算资源的最大化利用
- 计算加速
- 利用Dragonfly实现P2P文件分发,降低带宽成本并提高下载稳定性
- 有效识别并聚合计算任务所需数据,支持有序并发下载,优化执行效率
- 海量日志处理
- 通过gRPC长连接将日志传输至Kafka,再由分布客户端写入ES,最终归档至S3
- 利用雪花算法唯一性,确保日志关联性与重调度后的数据合并
业务影响与成果:
- 调度效率提升:智能化调度,系统
调度效率提升超过50%
,实现了千万任务的高效管理 - 维护成本降低:模块化、可扩展的设计减少系统的开发与维护成本,维护成本降低超过40%
- 资源利用率提高:优化资源管理,设备
利用率提升50%以上
,为企业节省数千万的设备投资
# 2、优先级队列代码
- 配置 RabbitMQ 优先级队列
# 要创建优先级队列,需要在定义队列时设置 `x-max-priority` 属性例如,创建一个最大优先级为 10 的队列
channel.queue_declare(queue='task_queue', durable=True, arguments={'x-max-priority': 10})
# 消息发送时,可以为每个消息指定优先级
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message_body,
properties=pika.BasicProperties(
priority=message_priority # message_priority 是消息优先级 (0-9)
)
)
2
3
4
5
6
7
8
9
10
11
12
- 设置队列的消息获取频率:
权重设定:
GroupA
权重为0.1,GroupB
权重为0.9,表示GroupB
的调度频率远高于GroupA
预取值计算公式:
权重 * 总处理能力
(假设系统的总任务处理能力为100次/秒
)GroupA
的预取值应设为0.1 * 100 = 10
GroupB
的预取值应设为0.9 * 100 = 90
channel.queue_declare(queue='GroupA_task_queue')
channel.queue_declare(queue='GroupB_task_queue')
channel.basic_publish(
exchange='task_exchange',
routing_key='GroupA',
body=message_body
)
2
3
4
5
6
7
8
分组队列与调度权重
在分组调度中,每个用户组的调度权重决定了该组的任务消费频率
例如:GroupA 权重为 0.1,GroupB 权重为 0.9,这意味着在资源紧张时,GroupB 的任务调度频率更高
通过 RabbitMQ 的
basic_qos
方法限制每个组的消费者从队列中获取的消息数
,从而实现根据权重进行资源调度
channel.basic_qos(prefetch_count=int(10 * weight)) # weight 为该组的调度
import pika
import time
# RabbitMQ 连接参数
RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USER = 'guest'
RABBITMQ_PASSWORD = 'guest'
# 权重设定
weights = {
'GroupA': 0.1,
'GroupB': 0.9
}
# 总处理能力
TOTAL_CAPACITY = 100
# 创建连接和频道
def create_connection():
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
credentials=credentials
))
return connection
# 定义消费者类
class Consumer:
def __init__(self, group_name, queue_name, prefetch_count):
self.group_name = group_name
self.queue_name = queue_name
self.prefetch_count = prefetch_count
self.connection = create_connection()
self.channel = self.connection.channel()
# 声明队列
self.channel.queue_declare(queue=self.queue_name, durable=True)
# 设置 basic_qos, 控制每次获取的消息数量
self.channel.basic_qos(prefetch_count=self.prefetch_count)
# 开始消费
self.channel.basic_consume(queue=self.queue_name,
on_message_callback=self.callback,
auto_ack=False)
# 处理消息
def callback(self, ch, method, properties, body):
print(f"{self.group_name} received message: {body.decode()}")
time.sleep(1) # 模拟任务处理时间
ch.basic_ack(delivery_tag=method.delivery_tag)
# 启动消费者
def start_consuming(self):
print(f"Starting consumer for {self.group_name} with prefetch count {self.prefetch_count}")
self.channel.start_consuming()
# 启动消费者
if __name__ == "__main__":
consumers = []
# 为每个组创建消费者
for group, weight in weights.items():
prefetch_count = int(weight * TOTAL_CAPACITY)
queue_name = f'{group}-PriorityQueue'
consumer = Consumer(group, queue_name, prefetch_count)
consumers.append(consumer)
# 启动每个消费者
for consumer in consumers:
consumer.start_consuming()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74