分布式调度系统
# 01.服务设计要点
# 0、项目介绍
# 1)自我介绍
您好,我是 xxx,毕业于 xx 大学,专业是计算机通信方向
这几年我主要在做一些平台侧的系统开发,比如自动驾驶的分布式调度系统
京东的 DevOps 平台,以及机器学习平台 和 运维自动化 工作
现在我在xx汽车,主要负责自动驾驶团队的调度系统,
这个系统每天要处理差不多 2000 万个任务,峰值并发在 20 万左右
除了要调度 K8s,还要支持测试车、台架这些终端设备,
很多设备还跑在外网或者弱网环境,所以调度的容错、稳定性要求会更高
系统整体是去中心化设计,支持横向扩展,也做了很多幂等控制和故障转移的机制
我用得比较多的语言是 Go 和 Python,也做过一些 Java 项目的维护,
数据库、消息队列 这些常用组件的使用和原理 都还OK
主要情况是这些,如果可以的话,我先简单介绍一下目前这个调度系统的设计
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2)系统设计
我主要负责的项目是xx汽车 自动驾驶团队的分布式调度系统
这个系统主要解决的是——如何在一个
分布式、多终端、跨网络环境中
,高效可靠地
调度(大规模计算任务
)主要
挑战
包括:
- 1)任务量大、设备多,如何
横向扩展调度能力
,避免单点瓶颈- 2)网络复杂,如何做到
稳定、高可用调度
- 3)如何保证
多节点任务调度
中的分布式一致性问题
- 4)终端资源有限,如何
提升任务执行效率与资源利用率
- 5)日志处理复杂,如何
确保海量日志不影响系统性能
为了应对上面问题,我们在设计上融入了六个关键优化
① 去中心化
- 在架构上我设计了一套
去中心化调度模型
- 通过
队列抢占式消费
,让每个调度节点具备独立调度能力,避免单点瓶颈
- 在架构上我设计了一套
② 高效调度+分布式一致性
- 任务下发通过**
RabbitMQ + Redis
** 实现任务分布式一致性
- 利用 RabbitMQ
分组机制和优先级队列
,实现任务优先级高效调度能力
- 在 K8s 层面采用
优先级和抢占式调度策略
,确保高优任务在资源紧张时能够抢占低优任务资源
- 任务下发通过**
③ 容错机制
- 引入
心跳上报机制
,实时监控各终端设备状态 - 借助
失败检测与自动重试机制
,设备掉线或执行失败时能自动重新分配任务,保证关键任务不会丢失
- 引入
④ 计算加速
- 为了提升调度效率,我们引入了
Dragonfly
([ˈdræɡənflaɪ]) 做 P2P 文件分发,加快任务依赖资源的下发 - 对计算任务涉及的数据集进行预处理,加快数据加载和处理速度
- 为了提升调度效率,我们引入了
⑤ 海量日志
- 日志链路方面,我们做了从 gRPC 实时上传 → Kafka 聚合 → ES 检索 → S3 存档的全链路日志系统
⑥ 高可用
- 各节点均支持水平扩展,消除单点故障隐患,保证系统在高并发情况下依然具备优异的可用性
- 这个项目的主要设计思路是这样的,如果有兴趣,我可以详细介绍一下项目的分层架构
# 3)项目分层介绍
资源说明
计算资源大概12万核
目前每天处理两千万左右任务量
当前业务最大并发任务20万左右
系统所有节点都是可以支持动态扩容的,理论上只要机器资源满足,调度性能完全没有问题
2
3
4
背景
# 背景
这个项目是我在 xx 汽车主要开发的一套通用分布式调度系统,
支撑自动驾驶团队从研发到测试的全流程任务调度
系统每天处理任务量超过 2000 万,最大并发可达 20 万,不仅需要调度 K8s 环境任务,
还要覆盖弱网环境下的测试车、台架等物理终端
这样的场景对系统的稳定性、扩展性和容错能力提出了非常高的要求
2
3
4
5
6
项目介绍
# 我们采用了分层架构设计,整个系统核心分为三层:
# 第一层,配置管理层
提供友好的界面,用户通过拖拽可快速完成DAG任务的编排,轻松实现串并行任务定制
支持多种方式触发调度实例,包括GitLab、Webhook、定时触发和人工触发
提供资源配置(K8S、测试车、台架等)、仓库配置、通知管理等功能,满足任务执行的多种需求
# 第二层,调度引擎层(核心逻辑)
实例创建后,我们通过 TiDB 事务将实例 ID 推送到 RabbitMQ 的队列中,确保任务投递过程具备强一致性和事务保障
调度引擎会根据实例的优先级和 DAG 拓扑结构,把待调度的原子任务打散并按照优先级分类放入不同的 RabbitMQ 队列
我们引入了一个预处理服务,它会解析任务参数、生成可执行结构体(如 K8s Job 的 YAML 文件),
再把这些封装好的任务放入下游的原子封装队列
这一层的亮点包括:
优先级调度队列隔离:保障高优任务不被低优任务拖慢;
批量任务拆分与合并处理:提升整体吞吐;
调度链路解耦 + 高可观测性设计:便于跟踪调度状态和链路瓶颈
# 第三层,任务执行层
这一层通过 gRPC 与各类终端设备通信
执行节点带有 Tag 标识,比如“测试车”、“K8s”、“台架”等,任务下发时会自动根据标签匹配合适的设备
系统具备去中心化调度能力,调度节点之间可横向扩展,且支持自动容错
比如在设备掉线、弱网延迟等情况下,调度器可以检测失败状态并自动重新分配任务,确保关键任务不会丢失或卡死
我们还设计了任务状态回传机制,确保每个任务状态都能完整上报、可追踪,方便故障定位和 SLA 保障
# 服务保障层(正常情况可以不说)
1.原子状态同步
2.原子执行器
3.监控服务
4.弹性扩容服务
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 4)带着伤疤反思
① 然后事故是怎么发生的呢
- 有一天,高峰期突然涌入十万级的 K8s 任务,因为模型镜像大、数据要跨机房拉
- 专线(20G)带宽一下子就被拉满了,北京到合肥的链路延迟狂飙,丢包、抖动特别严重
- RabbitMQ 消费确认 ACK丢失,立马又往队列里重发了一遍
- 原子执行状态未能及时回传上报,导致误判超时失败重新调度
- 一下子多了几倍的突增流量,结果一下子就把 MQ 打爆了(任务大量积压)
- 告警爆炸:原子超时执行告警,数据库告警,告警电话都打爆了
② 到底出了哪些幺蛾子?
RabbitMQ 队列大量积压
- RabbitMQ 瞬间从 5 千条/秒飙到 5 万条/秒,CPU、磁盘都撑不住
重复执行灾难:Redis 去重失灵
- 网络延时导致部分任务没能及时标记为“已执行”,导致重复调度
故障排查痛点:看不到全链路
- 没有统一的 TraceID,怀来和合肥两地日志分散
- 排查一个 ACK 丢失到底是哪条消息得靠人工翻多套日志
③ 后来我们怎么解决的?
专线隔离
- 在北京怀来机房,我们Agent划分到独立VLAN,使用独立专线通信
- 这样我们调度任务,心跳,状态同步不会受到其他业务流量影响
动态限流 & 调度熔断
入队速率超过 1000 条/秒或队列长度超过 10000 条时,触发流量控制机制(禁止低优先级提交)
动态调整消费者的
prefetch_count
,以控制每个消费者同时处理的消息数量,防止消费端过载当失败率超过阈值时,暂停低优先级任务的
派发和重试
1–2 分钟,以减轻系统负担实施
滑动窗口统计失败率
,当 1 分钟内失败率 >20% 时,暂停低优先级任务的派发和重试
Redis 写入失败的
容错与降级处理
策略当 Agent 启动 Pod 执行任务时,如 Redis 写入失败,系统会触发容错机制
任务消息将 不被 RabbitMQ 确认消费(ACK),确保任务状态未误标
同时,Pod 名称和相关信息会 记录到 ES(Elasticsearch),便于后续分析
后台服务定时 扫描异常 Pod 记录,并进行自动清理与回收,确保系统健康
告警收敛
同类告警合并:同一任务类型/服务模块/异常原因的告警,
在单位时间内合并为1条告警
指数退避策略:重复告警频率按照
1min ➜ 5min ➜ 15min
等递增推迟告警推送频率,避免刷屏缩略通知:当短时间内告警量过大,仅发送统计摘要(如「30秒内有 342 条原子任务失败」)
新告警数据实时新到redis进行缓存,在页面中进行告实时警统计和展示
全链路可观测
- 调度系统中通过 SkyWalking 注入链路追踪,在关键节点打上追踪点
- 每条任务从下发到状态同步都带同一 TraceID
- 并将 traceID/spanID 写入日志,从而实现任务全流程的可观测、可追踪、可回溯
④ 最后总结一句
- “这场雪崩让我明白,高并发调度不是只看算法多快,而是要管住每一个状态流
- 状态同步要万无一失,执行要绝对
幂等,链路要可观测,异常要能马上熔断
救场 - 只有这样,才能扛住百万级的压力,把系统守住、跑稳!
# 5)架构图
# 6)Q & A
面试官可能追问 | 应对角度及说明 |
---|---|
异构资源如何统一抽象调度的? | 讲解如何利用统一的 Tag 机制和分层执行器模型,将 K8s、测试车、台架等不同终端抽象成统一调度对象 |
多终端设备如何自动发现和注册? | 基于 gRPC 的心跳机制和动态注册表,每个终端启动时自动注册并定期上报状态,确保设备状态实时可见 |
弱网环境下如何保障任务不丢? | 依靠心跳监控、超时检测和自动重调度机制,以及任务状态的幂等处理,保证设备离线或网络波动时任务可以重新分配 |
为什么选择 TiDB + RabbitMQ? | TiDB 保证了分布式事务一致性;RabbitMQ 具备强大的多消费者并发处理、优先级队列及可靠消息机制,满足高并发任务投递需求 |
如何实现调度的监控与可观测? | 采用链路追踪、任务状态上报,再结合 Prometheus 和 Grafana 实现实时监控、报警以及全链路性能分析 |
去中心化调度具体如何实现? | 说明通过多实例调度、队列抢占式消费和无状态设计,实现了调度节点间的横向扩展和分布式调度,无单点故障 |
海量日志如何保证处理效率与实时性? | 通过独立的日志处理服务,实现异步日志收集和分布式存储,日志采集与主业务流程解耦,并利用 ELK/ClickHouse 建立快速查询索引 |
计算加速部分如何优化容器镜像及数据处理? | 引入镜像加速方案(例如使用 dfdaemon),提前对计算任务数据集做预处理,降低任务启动时的数据加载开销 |
跨数据中心调度如何设计? | 基于分布式架构设计,利用全局一致性协议和分布式事务确保数据一致性,同时采用延时容忍机制应对跨区域通信延迟 |
如何确保系统数据和任务状态的一致性? | 通过分布式事务、心跳同步和任务状态上报(幂等处理),利用 TiDB 保证强一致性,再辅以状态对比修正机制 |
DAG任务编排如何避免循环依赖问题? | 在任务提交环节进行依赖检查和校验,利用 DAG 算法保证任务之间的关系是有向无环的,必要时采用人工审批或回滚机制 |
当 RabbitMQ 队列堵塞时如何应对? | 采用限流、消息回退及队列监控策略,必要时结合自动扩容机制,及时发现并调整队列负载,确保任务持续流畅 |
如何调试和排查分布式调度系统中的问题? | 通过全链路日志和分布式追踪系统,对各节点的调用链、状态上报及异常日志进行集中收集与分析,从而快速定位瓶颈或故障点 |
对于任务优先级分配,你们是如何设计任务打分的? | 讨论如何根据业务权重、任务重要性动态调整优先级,并结合 RabbitMQ 分组机制和 Kubernetes 的抢占式调度,保障高优先级任务先行 |
系统高可用性的设计中有哪些关键考虑? | 强调所有节点支持水平扩展、无状态设计、自动故障检测与重调度机制,以及跨区域冗余和网络负载均衡策略 |
# 1、分布式一致性(去中心化)
# 1)设计概述
- 基于
RabbitMQ + Redis + ETCD
构建的去中心化、高可靠、支持横向扩展的分布式任务调度系统- 核心通过Redis分布式锁实现任务幂等消费,确保在多实例并发下也能保持一致性
- 并通过
RabbitMQ的ack机制
确保任务消息不会丢失
设计目标
去中心化
调度服务水平扩展,避免单点故障水平扩展
支持通过增加调度服务实例来提高并发处理能力任务幂等
任务在多个实例消费时,不会被重复处理(redis去重机制)高可靠性
确保即使调度服务实例故障,消息仍然不会丢失(MQ Ack,Redis缓存检测)负载均衡
各调度实例合理分担任务,避免某些实例过载
系统流程
所有调度服务实例
并行监听 RabbitMQ 队列
,尝试抢占消费任务
调度服务获取 Redis 锁
(任务ID为 key)防止重复消费
Redis去重机制
在消费者端维护一个全局Redis Set,分别记录“正在执行”和“已执行成功”的任务状态如果多次执行失败,任务会写入到Redis中,由监控告警人工介入处理
# 2)去中心化调度
多实例消费 RabbitMQ
,所有调度服务实例可以同时从同一个 RabbitMQ 队列中消费任务抢占式消费机制
,- RabbitMQ 的特性允许多个消费者从同一个队列并行抢占式消费消息,但每条消息只能被一个消费者成功消费
消息确认机制
:- 每个调度实例通过GrpcServer分发给GrpcClient后,会使用 RabbitMQ 的
ack
确认机制 - 如果未能成功执行,消息会重新放回队列,其他实例可继续消费
- 每个调度实例通过GrpcServer分发给GrpcClient后,会使用 RabbitMQ 的
Redis去重机制
- 在消费者端维护一个全局Redis Set,分别记录“正在执行”和“已执行成功”的任务状态
- 消费端在获取任务后,将任务加入“正在执行”Set
- 如果任务执行失败,则从“正在执行”Set移除
- 若执行成功,则将其移入“已执行成功”Set
- 消费消息前会先在这两个Set中进行检查,若任务已经存在,则自动放弃调度,以避免重复执行
- 在消费者端维护一个全局Redis Set,分别记录“正在执行”和“已执行成功”的任务状态
# 2、容错机制
- 为了在 gRPC 客户端网络抖动时,既能保证任务被可靠调度,又能避免过度敏感导致重复调度
- 需要设计合理的
失联检测
和任务重调度
机制
客户端心跳机制
- 客户端定期
发送心跳包
(如每10秒)以保持连接 - 服务端缓存心跳时间戳,更新最后接收时间
- 若60秒内未收到心跳,服务端将
触发失联检测
- 失联 Agent任务会有
超时机制接管
- 客户端定期
任务超时机制
客户端在执行任务期间,每隔 1 分钟 发送心跳请求到服务端,
告知任务仍在执行
服务端收到心跳后,
更新该任务的最后执行时间戳
如果超过一定时间(3分钟)未收到心跳,才会认为任务可能失败(网络抖动处理)
任务重调度策略
在
消费者端
维护一个全局Redis Set
,分别记录“正在执行”和“已执行成功”的任务状态若任务已经存在,则自动放弃调度,以避免重复执行
若判断当前任务需要重新调度,则从“正在执行”Set移除;并将这些任务重新分配给其他可用客户端
故障转移
在系统发生故障时,
任务引擎服务
会接管调度功能通过Etcd获取其他可用的gRPC服务端进行任务派发,确保任务不中断
系统在每次任务重新调度时,会增加其TTL值当TTL值超过预设限度时
系统会将TTL超时状态上报到RabbitMQ,以便及时处理异常情况
任务失败自动重试
系统支持为任务设置失败后的重试次数,任务失败时系统会根据预设的重试策略自动进行重试
同时设置重试最大次数,以避免系统资源浪费
# 3、高效调度
- 任务发布:用户提交任务,通过RabbitMQ发布,包含优先级和组信息,任务被路由到对应队列
- 调度:RabbitMQ根据优先级进行任务调度,Redis实时监控调度状态并动态调整资源分配
- 执行:Worker节点接收任务并反馈结果至RabbitMQ,结果异步写入TiDB实现持久化存储
RabbitMQ 优先级队列
优先级范围:可以设置一个优先级范围,通常为 [0-9] 或 [0-255],0 为最低优先级,最大值为最高优先级
任务的分配:当多个任务同时到达时,优先级高的任务会优先被调度和执行
分组队列机制
每个用户组对应一个队列(例如,
GroupA
有GroupA-PriorityQueue
,GroupB
有GroupB-PriorityQueue
)- 不同组的队列消息根据业务规则分配不同的优先级区间,例如
0-9
优先级 - 使用
basic_qos
方法为每个队列设置不同的预取值,控制每次消费者从队列中取出消息的频率 - 预取值计算公式:
权重 * 总处理能力
(假设系统的总任务处理能力为100次/秒
)GroupA
的预取值应设为0.1 * 100 = 10
GroupB
的预取值应设为0.9 * 100 = 90
- 不同组的队列消息根据业务规则分配不同的优先级区间,例如
动态分片
系统支持将一次任务中的多个并行原子任务打包并放入队列中
调度服务会优先将这些任务分配给不同的Agent执行器随着用户执行器数量的增加,任务分片的数量也会相应增加
从而加快任务处理速度这种机制提高了任务的并行度和执行效率
路由策略
每个原子任务可以设置Tag标记,用于指引任务分配到特定的执行器节点这有助于优化资源分配,提升系统调度效率
支持用户部署私有Agent,用户只需具备足够资源,即可保证任务的高效调度此设计增强了系统的灵活性和定制化能力
# 4、计算加速
# 1)缓存与镜像加速
NIOFS、Ceph、Alluxio,高成本(需要三份数据备份)、复杂的运维和部署
Dragonfly:
- 通过利用 P2P 技术,
减少跨机房和跨境带宽成本
- 无缝支持容器镜像分发,容易集成
- 支持机器整体带宽管理,提升稳定性
- 提高下载稳定性,支持高度一致性校验
- 通过利用 P2P 技术,
Dragonfly 具体实现
在 Kubernetes 集群中的每个 Node 节点上安装配置
dfdaemon
代理sudo vim /etc/docker/daemon.json # 添加以下内容,将 Docker 的镜像拉取代理指向 dfdaemon: { "registry-mirrors": ["http://127.0.0.1:65001"] } sudo systemctl restart docker # dfdaemon 启动:确保 dfdaemon 在每个节点上监听 65001 端口,拦截 Docker 的镜像拉取请求
1
2
3
4
5
6
7
8
9
# 2)计算加速与调度优化
实现过程:
- 计算加速组件首先
识别处于待处理状态(pending)的计算任务
(即 step instance) - 每个任务通常会有其
所需的数据集
(例如,模型文件、输入数据等) - 这些数据集合需要被识别并聚合在一起,以便后续处理
- 计算加速组件首先
示例: 假设有一个机器学习任务,它有多个步骤(step),每个步骤需要不同的数据
Step A 需要数据集
dataset_A
Step B 需要模型文件
model_B
Step C 需要特征文件
features_C
计算加速组件会将这些数据收集起来,形成一个
待处理的数据集合
{ "pending_steps": ["Step A", "Step B", "Step C"], "data_requirements": [ "dataset_A", "model_B", "features_C" ] }
1
2
3
4
5
6
7
8
利用
缓存加速模块有序并发下载 Data 集合
- 计算加速组件会利用缓存加速模块,根据用户配置的缓存空间和跨域下载需求,进行有序的并发下载
- 如果
某个数据已经被缓存,则可以直接从缓存中获取,而不需要再次下载
- 缓存加速模块会将数据写入到
pod高性能 pvc挂载盘中
,缓存结束后会把缓存映射路径写入到TiDB
- 当原子被调度运行起来时
检查是否有映射
,有就直接使用无需再次下载
# 5、海量日志处理
# 1)日志收集流程
① K8S内网Job日志收集
k8s集群名/Job名称
采集写入 ClickHouse
② 外网测试车日志收集
- grpc client通过长连接将日志传送给grpc Server,grpc server将日志写入Kafka
- 再由分布客户端消费Kafka中日志数据,将日志写入到ClickHouse中
③ 任务重新调度日志处理
每个任务派发时会生成一个唯一TaskID,将日志写入Kafka时使用TaskID作为标识
每次任务重试,不管多少次,都会将执行事件写入到TiDB中,每次对应一个唯一TaskID
任务被重新调度,TaskID不会改变,即使被重新调度,也可以将两次调度日志合并到一起
# 2)日志数据流设计
Kafka 分区与 TaskID 映射
Kafka 可以基于
TaskID
进行哈希分区,保证同一个任务的所有日志会写入到同一个分区,方便后续的消费和处理Kafka 的生产者可以通过设置批量发送日志,减少频繁的网络交互,提升吞吐量
日志批量消费与存储
可以设置多个消费者实例来并行消费不同分区的日志,保证高并发任务的日志能够实时处理
同时,消费日志时可以选择批量拉取,减轻对下游存储系统的压力
日志存储与查询
实时日志存储 (ES)
,能够支持基于TaskID
查询日志,并且可以根据时间、节点等条件进行过滤查询归档日志存储 (S3)
,任务执行结束后,ES 中的日志会按批次推送到 S3 或 OSS 对象存储中
# 6、跨网络调度
跨网段通信
- 通过gRPC
Stream的双向流式传输
,保持客户端与服务端的持续连接,减少频繁创建连接的开销 - 外网Agent利用OpenSSL
自建CA
和特定算法,采用外网私钥加密、内网公钥解密
,确保建联安全- 外网测试车首次启动gRPC Agent,Agent生成公钥 并 上报到内网 gRPC服务端(人工审批)
- 通过gRPC
gRPC通信和负载均衡
- 当客户端断开连接时,
系统自动重连,迅速恢复连接
- 采用轮询,结合
Ingress的负载均衡
,动态分配流量,并将连接信息存储在内存中和ETCD - 通过
心跳监控连接状态
,若多次失败则移除失效连接
- 并将
AgentID和未完成任务记录在Redis中
,超时未重连则重新派发任务
,同时保证任务幂等性
- 当客户端断开连接时,
# 02.项目模块说明
# 1、配置管理层
# 1)配置管理
- 团队管理,项目管理(流水线最终归属一个项目里)
- 通知管理(飞书通知、邮件通知、通知模版定义)
- Webhook管理(回调URL,Header、Body)
- 仓库配置(仓库地址,认证token)
- 制品库配置(制品地址、认证秘钥)
- 资源配置(K8S、测试车、台架、实机、虚机)
- 应用配置(打通与Apollo配置中心数据拉取覆盖,区分dev、prod环境,用户部署时将参数设置到原子环境变量中,或者k8s secret中)
- 扩展属性配置(支持用户自定义实例页面属性显示)
# 2)流水线管理
- 流水线配置
- 配置 实例变量
- 触发规则(GitLab触发、Webhook触发、定时触发、人工触发)
- 重试规则(重试条件
变量=某一个值或者正则时
,重试间隔,最多多少次) - 并发规则(流水线最多能够启动多少实例),终止最早实例 或 禁止新实例触发
- 通知规则(实例成功,实例失败,变量符合指定值或正则)
- 回调规则(header、body数据根据规则替换成环境变量数据)
- 管理权限(可编辑人、可触发人、运营权限)
# 3)原子配置
- 原子配置
- 源原子(指定要拉取的Gitlab仓库地址,分支名,Tag)
- 自定义原子(支持Python、Shell、Java等多种环境)
- 远程原子
- 启动原子(URL、Method、Header、Body)
- 终止原子(URL、Method、Header、Body)
- 原子终止回调(支持变量替换)
- 子流程原子
- 原子触发子流程实例创建(将父流水线参数携带到子流程中)
- 重试策略(创建新实例,重试旧实例)
- 支持任意节点重试(调度服务逐级启动父实例,原子逐级启动子实例)
- Docker构建原子
- 关联源原子、镜像仓库、镜像名称、镜像Tag
- Dockerfile、Dockerfile路径(Kaniko构建完成自动推送Harbor)
- 部署原子
- 关联“应用参数”+环境“dev/prod”等(原子部署时会引用 替换标记)
- 指定 “集群资源”,命名空间、部署名称
- 或者指定gitops路径参数执行部署
- 流控原子(指定哪些原子不执行直接跳过)
- 人工原子(重要步骤用来人工审批节点,对接内部工单系统)
# 4)DAG编排和触发
- DAG任务编排
- 实例包含多个阶段,阶段包含多个原子,原子根据依赖关系编排并行串行
- 编排好的任务最终生成 DAG(有向无环图)
- 触发流水线
- 提供用户接口,用于触发调度实例(GitLab触发、Webhook触发、定时触发、人工触发)
- 在实例创建后,通过 TiDB 事务将实例ID推送至 RabbitMQ 的
实例ID队列
中
# 5)效能大盘
流水线效能大盘
- 开始时间、结束时间、调度等待耗时、业务执行耗时、业务等待耗时、业务执行时间
效能统计(统计大盘、实时大盘、组大盘)
- 实例数量、原子数量、耗时
- CPU、GPU、内存 使用量 使用率
- 车辆、台架 资源使情况
资源Quota
- 1.流水线实例, 2.原子实例, 3.CPU requests, 4.CPU limits, 5.MEM requests, 6.MEM limits, 7.GPU', 台架数量,测试车数量
# 2、调度引擎层
# 1)派发引擎服务
在实例创建后,通过 TiDB 事务将实例ID推送至 RabbitMQ 的
实例ID队列
中RabbitMQ 优先级队列
优先级范围:可以设置一个优先级范围,通常为 [0-9] 或 [0-255],0 为最低优先级,最大值为最高优先级
任务的分配:当多个任务同时到达时,优先级高的任务会优先被调度和执行
分组队列机制
每个用户组对应一个队列(例如,
GroupA
有GroupA-PriorityQueue
,GroupB
有GroupB-PriorityQueue
)不同组的队列消息根据业务规则分配不同的优先级区间,例如
0-9
优先级使用
basic_qos
方法为每个队列设置不同的预取值,控制每次消费者从队列中取出消息的频率预取值计算公式:
权重 * 总处理能力
(假设系统的总任务处理能力为100次/秒
)GroupA
的预取值应设为0.1 * 100 = 10
GroupB
的预取值应设为0.9 * 100 = 90
预处理服务
根据实例ID、原子ID,获取原子详细配置和要执行的命令参数
- 调度集群信息 or 车辆类型 or 台架类型(根据Agent Tag标记识别)
根据上面参数生成包括创建K8S Job的YAML文件等最终执行结构体,并推送到
原子封装队列
# 2)调度服务
- 去中心化设计(大型项目)
1、每个调度服务实例启动后注册到 ETCD,维护服务列表和健康状态
2、所有调度服务实例并行监听 RabbitMQ 队列,尝试抢占消费任务
3、成功消费任务后,调度服务获取 Redis 锁(任务ID为 key)防止重复消费
4、如果锁获取成功,调度服务执行任务;如果锁获取失败,丢弃该任务
5、任务执行完成后,调度服务发送 RabbitMQ `ack`,并更新任务状态到 Redis
6、如果任务执行失败,RabbitMQ 重新将任务放回队列,等待其他实例消费
7、如果多次执行失败,任务会写入到redis中,由监控告警人工介入处理
2
3
4
5
6
7
# 3、任务执行层(GRPC服务层)
# 1)GRPC Server(任务引擎服务)
- 任务下发
gRPC服务端
接收来自调度服务的任务,根据内存中 gRPC客户端 上报的Tag标记,将任务发送给匹配的客户端
- 去中心化调度&故障转移
- 如果检查发现自己没有可用的指定Tag客户端,或者客户端异常无法正常执行
任务引擎服务
会担当调度服务
的功能,从etct中获取其他可用的gRPC服务端派发- 每次被重新调度TTL值加1,当超过预设TTL值时会上报TTL超时状态到RabbitMQ
- 客户端连接管理
- 客户端启动后会根据公网域名和路由策略分配到指定gRPC服务端,并建立gRPC长连接
- 服务端根据客户端ID把连接写入到内存中,并同步到Redis中(服务端、客户端、Tag标记对应关系)
- 客户端会与服务端保持心跳机制,服务端更新客户端信息到ETCD中,ETCD中有过期策略,心跳3次失败就会自动过期
- 日志收集
- 客户端使用gRPC长连接把执行日志上报给服务端,服务端根据
实例ID/原子ID
存储日志到挂载盘 - 原子结束后把原子日志文件上报到OSS存储服务中,并将存储URL替换TiDB中的挂载盘路径
- 运行时日志通过挂载盘直接读取给客户端,超大日志只读取指定长度日志,同时提供存储地址,用户可自行下载
- 客户端使用gRPC长连接把执行日志上报给服务端,服务端根据
- 原子状态同步
- 客户端将原子成功 & 失败 等状态同步到服务端,服务端将状态写入RabbitMQ中,并同步到Kafka中,方便用户订阅
# 2)GRPC Client(执行器Agent)
- 接收任务
- 客户端接收并解析来自 Server 的原子任务,并根据任务类型执行任务
- K8S任务:K8S Job任务,使用上面封装好的K8S Job结构体直接启动执行
- 脚本任务:直接在本机执行(台架和测试车等都是脚本任务)
- 将执行任务POD Name或者台架信息上报到服务端,服务端更新绑定信息到TiDB和Redis缓存中
- 客户端接收并解析来自 Server 的原子任务,并根据任务类型执行任务
- 上报日志
- K8S任务日志:直接根据POD NAME从K8S接口中拉取数据流发送给服务端
- 脚本执行日志:通过gRPC长连接实时同步到服务端,并同步写入到本地(失败重传)
- 执行状态上报
- 客户端根据执行成功失败情况将原子执行状态上报到服务端
- 建立连接
- gRPC客户端启动后会尝试与服务端建立连接,如果失败会一直重试直到建联成功
- 客户端根据直接执行能力上报Tag标记(测试车、P1台架、P2台架、K8S等)
- 建联成功后会有一个goroutine一直与服务端保持心跳服务
- 公网认证服务
- 如果是车端Agent或者外网台架,客户端通过Openssl生成公钥私钥对
- 通过认证Token认证,调用http外网域名接口上报到云端,云端审批后即可完成认证
- 完成认证后客户端使用私钥对URL参数加密,云端解密,成功后即可完成gRPC连接
# 4、服务保障层
# 1)原子状态同步服务
- 原子状态同步,负责同步任务执行的状态
- 消费 RabbitMQ 的状态队列,将任务的执行状态更新至 TiDB 数据库中,确保任务状态的实时监控
- GRPC Server将原子执行状态上报到 RabbitMQ 的状态队列
- priorityAtomSync JOB 消费状态队列,并将状态更新到TiDB
- 原子状态同步充分考虑了幂等性问题
- 原子状态上报时携带当时上报时间的毫秒级时间撮,保证久状态不会覆盖更新新状态
# 2)原子执行器
- 原子执行器由执行器和代码片段两部分组成,代码片段可以是官方的,也支持用户自定义
- 通过挂载盘或 wget 动态下载的方式,执行指定路径下的 GO 二进制文件或脚本
- 执行脚本时,会根据环境变量传递的实例和原子ID,获取并注入任务参数,实现原子任务之间的数据共享
- 执行完成后,将产物变量信息上报至平台,原子Agent封装了执行器中的逻辑,可以直接执行
- 官方代码片段主要包括
- 源原子(指定要拉取的Gitlab仓库地址,分支名,Tag)
- 远程原子(定义启动,终止的调用URL,类似与Postman,串联业务)
- 子流程原子(用来触发其他子流程,实时同步执行百分比)
- Docker构建原子(利用Kaniko构建镜像并推送到Harbor)
- 部署原子(使用K8S配置将服务部署到K8S中)
- 流控原子(指定哪些原子不执行直接跳过)
- 人工原子(重要步骤用来人工审批节点,对接内部工单系统)
- 命令行执行,自定义原子(支持Python、Shell、Java等多种环境)
- 比如用户可以git clone下载一个python脚本,执行python脚本
- 也可以直接执行Shell脚本等
# 3)监控服务
告警服务
实例超时启动告警
原子超时启动告警
资源使用率超过预定阀值告警
原子执行时长超过遇到值报警
报警数据展示、报警收敛、报警升级
重新调度服务(主要用来做兜底)
- 日志再收集:
- 定时任务检查原子执行结束但没有日志的原子
- K8S Job类型:重新从K8S中拉取数据流,上传到OSS,更新日志链接
- 脚本类型:告警,并等待测试车、台架等在线后重新上传(会在本地留存日志数据)
- 原子重新调度
- 检查最近2小时内失败原子配置,如果设置了重试策略,根据重试策略重新调度
- 日志再收集:
# 4)效能看板
流水线效能大盘
- 开始时间、结束时间、调度等待耗时、业务执行耗时、业务等待耗时、业务执行时间
效能统计(统计大盘、实时大盘、组大盘)
- 实例数量、原子数量、耗时
- CPU、GPU、内存 使用量 使用率
- 车辆、台架 资源使情况
资源Quota
- 1.流水线实例, 2.原子实例, 3.CPU requests, 4.CPU limits, 5.MEM requests, 6.MEM limits, 7.GPU', 台架数量,测试车数量
任务执行监控
任务状态追踪:实时跟踪各个实例的执行状态,包括正在执行、已完成、失败、等待中等状态每个任务的执行进度可以在看板上清晰展示
任务详情展示:对于每个实例和原子任务,提供详细的执行日志和步骤信息,帮助用户快速定位和解决问题
性能指标监控
- 系统负载:展示调度系统的当前负载情况,包括消息队列的长度、服务器的CPU和内存使用率等关键资源的使用情况
- 执行延迟:分析和展示任务从触发到完成的延迟时间,帮助用户识别和解决潜在的性能瓶颈
- 吞吐量:展示单位时间内系统处理的任务数量,帮助用户评估系统的处理能力
资源利用情况
- Kubernetes 资源监控:对于采用 Kubernetes 作为执行环境的任务,效能看板提供对 Kubernetes 资源使用情况的监控,包括 Pod 数量、节点资源使用率等
- gRPC 连接状态:展示 gRPC Server 与 Client 之间的连接情况,追踪长连接的健康状态和响应时间
# 03.其他
# 1、服务架构
1、RabbitMQ
- 副本数量:考虑到高并发需求,建议至少使用 3 个副本(
RabbitMQ 3.9.x
)- CPU: 32 核心
- Memory: 128GB
- 存储: 512GB SSD
- 副本数量:考虑到高并发需求,建议至少使用 3 个副本(
2、Kafka
Broker 节点数量:建议至少 7 个 Broker 节点,以提高高可用性和分担负载 (
Kafka 3.5.x
)- CPU: 16 核心
- 内存: 64GB
- 存储: 2TB SSD
Partition 数量:对于 2000 万任务的高吞吐量,建议 Partition 数量设置为 200,以确保消息流量均匀分配
优化 ISR 和控制器:使用 Kafka 的控制器自动管理分区和副本,确保集群在节点故障时能够快速恢复
3、ES 8.x
主节点Mater:
至少三台单独机器(node.master: true )CPU: 4 核心
内存: 16GB(分配 50% 的内存给 JVM)
硬盘:SSD,大小根据需求,通常配置 256GB 或以上
协调节点Coordinating:
至少三台独立服务器(node.data: true )CPU: 8 核心
内存: 32GB(分配 50% 的内存给 JVM)
硬盘:SSD,大小建议 256GB 或以上
数据节点Data:
至少4台,后续不够用可以随时扩容,数据节点最好不要超过100台- CPU: 16 核心
内存: 64GB(分配 50% 的内存给 JVM)
存储:SSD,建议 1TB 或以上
- CPU: 16 核心
4、ETCD
- 节点数量:建议至少使用 5 个节点,以增强容错能力(
ETCD 3.5.x
)- CPU: 8 核心
- 内存: 32GB
- 存储: 256GB SSD
- 节点数量:建议至少使用 5 个节点,以增强容错能力(
5、Redis
- 节点数量:使用 Redis Cluster,建议至少 6 个节点配置(
Redis 7.x
)- CPU: 16 核心
- 内存: 128GB
- 存储: 512GB SSD
- 节点数量:使用 Redis Cluster,建议至少 6 个节点配置(
6、TiDB
TiDB Server:使用 5 个实例配置(
TiDB 7.x
)CPU: 32 核心
Memory: 128GB
PD Server:3 个实例,每个实例配置
- CPU: 8 核心
- Memory: 32GB
TiKV:至少 7 个实例,每个实例配置
- CPU: 32 核心
- Memory: 128GB
- 存储: 2TB SSD
- CPU: 32 核心
# 2、项目
技术架构:
- 技术栈: Python、Golang、Shell、gRPC、Gin、Gorm、Client-Go、OpenSSL、TiDB、Redis、RabbitMQ、Kafka、Etcd、ES、K8S、Apollo、Artifactory、S3
- 架构设计:系统采用
分布式微服务架构
,通过消息队列、DAG任务编排
、gRPC通信
等核心技术,构建高可用
、高性能
的任务调度与执行平台
项目背景:
- 传统调度主要针对内网K8S环境,对车载设备、工业台架等多终端跨网络环境无法支持
- 项目旨在构建一种支持
跨网段、多终端
模式的分布式
调度系统
项目介绍:
- 通用调度服务是一个高效、可靠的分布式调度系统,每日处理超过2000万任务
- 系统设计充分考虑到
去中心化
、容错机制
、高效调度
、计算加速
等核心能力
核心职责与贡献:
- 分布式一致性与去中心化
- 系统采用去中心化架构,确保每个调度实例具备独立调度能力,避免单点故障
- 充分利用RabbitMQ可靠性,结合Redis去重机制和分布式锁,保证调度一致性
- 容错机制
- 设计了健壮的 心跳服务、故障转移与重调度机制
- 确保在网络波动或设备故障时任务能够可靠地重新分配
- 高效调度与资源优化
- 系统利用RabbitMQ优先级队列和Redis动态调度策略
- 根据任务优先级和资源使用状况进行实时调度
- 支持高达20万的并发任务处理,确保计算资源的最大化利用
- 计算加速
- 利用Dragonfly实现P2P文件分发,降低带宽成本并提高下载稳定性
- 有效识别并聚合计算任务所需数据,支持有序并发下载,优化执行效率
- 海量日志处理
- 通过gRPC长连接将日志传输至Kafka,再由分布客户端写入ES,最终归档至S3
- 利用雪花算法唯一性,确保日志关联性与重调度后的数据合并
业务影响与成果:
- 调度效率提升:智能化调度,系统
调度效率提升超过50%
,实现了千万任务的高效管理 - 维护成本降低:模块化、可扩展的设计减少系统的开发与维护成本,维护成本降低超过40%
- 资源利用率提高:优化资源管理,设备
利用率提升50%以上
,为企业节省数千万的设备投资
# 3、优先级队列代码
- 配置 RabbitMQ 优先级队列
# 要创建优先级队列,需要在定义队列时设置 `x-max-priority` 属性例如,创建一个最大优先级为 10 的队列
channel.queue_declare(queue='task_queue', durable=True, arguments={'x-max-priority': 10})
# 消息发送时,可以为每个消息指定优先级
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message_body,
properties=pika.BasicProperties(
priority=message_priority # message_priority 是消息优先级 (0-9)
)
)
2
3
4
5
6
7
8
9
10
11
12
- 设置队列的消息获取频率:
权重设定:
GroupA
权重为0.1,GroupB
权重为0.9,表示GroupB
的调度频率远高于GroupA
预取值计算公式:
权重 * 总处理能力
(假设系统的总任务处理能力为100次/秒
)GroupA
的预取值应设为0.1 * 100 = 10
GroupB
的预取值应设为0.9 * 100 = 90
channel.queue_declare(queue='GroupA_task_queue')
channel.queue_declare(queue='GroupB_task_queue')
channel.basic_publish(
exchange='task_exchange',
routing_key='GroupA',
body=message_body
)
2
3
4
5
6
7
8
分组队列与调度权重
在分组调度中,每个用户组的调度权重决定了该组的任务消费频率
例如:GroupA 权重为 0.1,GroupB 权重为 0.9,这意味着在资源紧张时,GroupB 的任务调度频率更高
通过 RabbitMQ 的
basic_qos
方法限制每个组的消费者从队列中获取的消息数
,从而实现根据权重进行资源调度
channel.basic_qos(prefetch_count=int(10 * weight)) # weight 为该组的调度
import pika
import time
# RabbitMQ 连接参数
RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USER = 'guest'
RABBITMQ_PASSWORD = 'guest'
# 权重设定
weights = {
'GroupA': 0.1,
'GroupB': 0.9
}
# 总处理能力
TOTAL_CAPACITY = 100
# 创建连接和频道
def create_connection():
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
credentials=credentials
))
return connection
# 定义消费者类
class Consumer:
def __init__(self, group_name, queue_name, prefetch_count):
self.group_name = group_name
self.queue_name = queue_name
self.prefetch_count = prefetch_count
self.connection = create_connection()
self.channel = self.connection.channel()
# 声明队列
self.channel.queue_declare(queue=self.queue_name, durable=True)
# 设置 basic_qos, 控制每次获取的消息数量
self.channel.basic_qos(prefetch_count=self.prefetch_count)
# 开始消费
self.channel.basic_consume(queue=self.queue_name,
on_message_callback=self.callback,
auto_ack=False)
# 处理消息
def callback(self, ch, method, properties, body):
print(f"{self.group_name} received message: {body.decode()}")
time.sleep(1) # 模拟任务处理时间
ch.basic_ack(delivery_tag=method.delivery_tag)
# 启动消费者
def start_consuming(self):
print(f"Starting consumer for {self.group_name} with prefetch count {self.prefetch_count}")
self.channel.start_consuming()
# 启动消费者
if __name__ == "__main__":
consumers = []
# 为每个组创建消费者
for group, weight in weights.items():
prefetch_count = int(weight * TOTAL_CAPACITY)
queue_name = f'{group}-PriorityQueue'
consumer = Consumer(group, queue_name, prefetch_count)
consumers.append(consumer)
# 启动每个消费者
for consumer in consumers:
consumer.start_consuming()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74