不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • 项目

    • 01.分布式调度系统
      • 01.服务设计要点
        • 0、项目介绍
        • 1)自我介绍
        • 2)项目分层介绍
        • 3)系统设计
        • 4)带着伤疤反思
        • 5)架构图
        • 6)Q & A
        • 1、分布式一致性(去中心化)
        • 1)设计概述
        • 2)去中心化调度
        • 2、容错机制
        • 3、高效调度
        • 4、计算加速
        • 1)缓存与镜像加速
        • 2)计算加速与调度优化
        • 5、海量日志处理
        • 1)日志收集流程
        • 2)日志数据流设计
        • 6、跨网络调度
      • 02.项目模块说明
        • 1、配置管理层
        • 1)配置管理
        • 2)流水线管理
        • 3)原子配置
        • 4)DAG编排和触发
        • 5)效能大盘
        • 2、调度引擎层
        • 1)派发引擎服务
        • 2)调度服务
        • 3、任务执行层(GRPC服务层)
        • 1)GRPC Server(任务引擎服务)
        • 2)GRPC Client(执行器Agent)
        • 4、服务保障层
        • 1)原子状态同步服务
        • 2)原子执行器
        • 3)监控服务
        • 4)效能看板
      • 03.其他
        • 1、服务架构
        • 2、项目
        • 3、优先级队列代码
    • 02.GPU调度
    • 03.车端构建系统
    • 04.资源管理平台
    • 05.CICD
    • 06.Mage平台
    • 10.发布系统使用
    • 11.自动驾驶业务
目录

01.分布式调度系统

# 01.服务设计要点

20万核  每个node 256核   大概有 780 node节点
单个node节点 10个pod每秒
780 * 10 = 7800 个pod的创建速度
1
2
3

# 0、项目介绍

# 1)自我介绍

您好,我是 xxx,毕业于 xx 大学,专业是计算机通信方向

这几年主要在做平台侧开发,比如自动驾驶的分布式调度系统
京东的 DevOps 平台,以及机器学习平台 和 运维自动化 工作

现在在xx汽车,主要负责自动驾驶团队的调度系统,
这个系统每天的任务规模 2000 万,峰值并发 20 万
除了要调度 K8s,还要支持测试车、台架这些终端设备,
很多设备跑在外网或者弱网环境,所以调度的容错、稳定性要求会更高

系统整体是去中心化设计,支持横向扩展,也做了很多幂等控制和故障转移机制

我用得比较多的语言是 Go 和 Python,也做过一些 Java 项目的维护,
数据库、消息队列 这些常用组件的使用和原理 都还OK

主要情况是这些,如果可以的话,我先简单介绍一下目前这个调度系统的设计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 2)项目分层介绍

  • 资源说明

总核数:20 万核

每 node 核数:256 核

总 node数:约 780(30个k8s集群规模)

每 node 每秒创建 pod 数:10

总 pod 创建速度:780 * 10 = 7800 pod/s

  • 背景
# 背景
这个项目是我在 xx 汽车主要开发的一套通用分布式调度系统,
支撑自动驾驶团队从研发到测试的全流程任务调度
系统每天处理任务量超过 2000 万,最大并发可达 20 万,不仅需要调度 K8s 环境任务,
还要覆盖弱网环境下的测试车、台架等物理终端
这样的场景对系统的稳定性、扩展性和容错能力提出了非常高的要求
1
2
3
4
5
6
  • 项目介绍
# 我们采用了分层架构设计,整个系统核心分为三层:
# 第一层,配置管理层(流水线配置&实例触发)
提供友好的界面,用户通过拖拽可快速完成DAG任务的编排,轻松实现串并行任务定制
支持多种方式触发调度实例,包括GitLab、Webhook、定时触发和人工触发
提供资源配置(K8S、测试车、台架等)、仓库配置、通知管理等功能,满足任务执行的多种需求

# 第二层,调度引擎层(优先级队列+分组+抢占式消费)
实例创建后,我们通过 TiDB 事务将实例 ID 推送到 RabbitMQ 的队列中(确保任务投递过程具备强一致性和事务保障)
调度引擎会根据实例的优先级和 DAG 拓扑结构,解析出待调度的原子
我们引入了一个预处理服务,它会解析任务参数、生成可执行结构体(如 K8s Job 的 YAML 文件),
再把这些封装好的任务放入下游的原子封装队列
  
# 第三层,任务执行层
这一层通过 gRPC 与各类终端设备通信
执行节点带有 Tag 标识,比如“测试车”、“K8s”、“台架”等,任务下发时会自动根据标签匹配合适的设备
系统具备去中心化调度能力,调度节点之间可横向扩展,且支持自动容错
比如在设备掉线、弱网延迟等情况下,调度器可以检测失败状态并自动重新分配任务,确保关键任务不会丢失或卡死
我们还设计了任务状态回传机制,确保每个任务状态都能完整上报、可追踪

# 服务保障层(正常情况可以不说)
1.原子状态同步
2.原子执行器
3.监控服务
4.弹性扩容服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 3)系统设计

我主要负责的项目是xx汽车 自动驾驶团队的分布式调度系统

这个系统主要解决的是——分布式、多终端、跨网络环境中,高效可靠地调度(大规模计算任务)

主要挑战包括:

  • 1)任务量大、设备多,如何横向扩展,避免单点瓶颈
  • 2)网络复杂,如何做到稳定、高可用调度
  • 3)如何保证多节点任务调度的分布式一致性问题
  • 4)终端资源有限,如何提升任务 执行效率与资源利用率
  • 5)如何确保海量日志处理,不影响系统处理性能

为了应对上面问题,我们在设计上融入了六个关键优化

  • ① 去中心化

    • 在架构上我们设计了一套去中心化调度模型
    • 通过队列抢占式消费,让每个调度节点具备独立调度能力,避免单点瓶颈
  • ② 高效调度+分布式一致性

    • 任务下发通过RabbitMQ + Redis 实现任务分布式一致性
    • 利用 RabbitMQ 分组机制和优先级队列,实现任务优先级高效调度能力
    • 在 K8s 层面采用优先级和抢占式调度策略,高优先级可以抢占低优任务资源
  • ③ 容错机制

    • 引入心跳上报机制,实时监控各终端设备状态(Agent心跳)
    • 借助失败检测与自动重试机制,设备掉线或执行失败时能自动重新分配任务,保证关键任务不会丢失
  • ④ 计算加速

    • 为了提升调度效率,我们引入了 Dragonfly ([ˈdræɡənflaɪ]) 做 P2P 文件分发,加快任务依赖资源的下发
    • 对计算任务涉及的数据集进行预处理,加快数据加载和处理速度
  • ⑤ 海量日志

    • 日志链路方面,我们做了从 gRPC 实时上传 → Kafka 聚合 → ES 检索 → S3 存档的全链路日志系统
  • ⑥ 高可用

    • 各节点均支持水平扩展,消除单点故障隐患,保证系统在高并发情况下依然具备优异的可用性
  • 这个项目的主要设计思路是这样的,如果有兴趣,我可以详细介绍一下项目的分层架构

# 4)带着伤疤反思

  • ① 然后事故是怎么发生的呢(北京怀来机房 到 合肥机房 专线打满)
    • 有一天,高峰期突然涌入十万级的 K8s 任务,因为模型镜像大、数据要跨机房拉
    • 专线(20G)带宽一下子就被拉满了,北京到合肥的链路延迟狂飙,丢包、抖动特别严重
    • RabbitMQ 消费确认 ACK丢失,立马又往队列里重发了一遍
    • 原子执行状态未能及时回传上报,导致误判超时失败重新调度
    • 一下子多了几倍的突增流量,结果一下子就把 MQ 打爆了(任务大量积压)
    • 告警爆炸:原子超时执行告警,数据库告警,告警电话
  • ② 后来我们怎么解决的?
    • 专线隔离

      • 在北京怀来机房,我们Agent划分到独立VLAN,使用独立专线通信
      • 这样我们调度任务,心跳,状态同步不会受到其他业务流量影响
    • 动态限流 & 调度熔断

      • 入队速率超过 3000 条/秒或队列长度超过 20000 条时,触发流控机制(低优先级任务记录到TiDB 待调度状态)
      • 动态调整消费者的 prefetch_count(预处理数量),在 100–400 范围内自适应,控制消费端负载
      • 滑动窗口统计 Pod 启动延迟,若1 分钟内启动耗时 > 3 秒的 Pod 占比超过 30%
      • 暂停低优先级任务的派发和重试 1–2 分钟
      • 超时重试数量在 5% 时,将低优先级任务快速失败 或 写入TiDB标记为 待重试 状态
    • 告警收敛

      • 同类告警合并:同一任务类型/服务模块/异常原因的告警,在单位时间内合并为1条告警

      • 指数退避策略:重复告警频率按照 1min ➜ 5min ➜ 15min 等递增推迟告警推送频率,避免刷屏

      • 缩略通知:当短时间内告警量过大,仅发送统计摘要(如「30秒内有 342 条原子任务失败」)

      • 新告警数据实时新到redis进行缓存,在页面中进行告实时警统计和展示

    • 全链路可观测

      • 调度系统中通过 SkyWalking 注入链路追踪,在关键节点打上追踪点
      • 每条任务从下发到状态同步都带同一 TraceID
      • 并将 traceID/spanID 写入日志,从而实现任务全流程的可观测、可追踪、可回溯
    • Redis 写入失败的容错与降级处理策略

      • 当 Agent 启动 Pod 执行任务时,如 Redis 写入失败,系统会触发容错机制

      • 任务消息将 不被 RabbitMQ 确认消费(ACK),确保任务状态未误标

      • 同时,Pod 名称和相关信息会 记录到 ES(Elasticsearch),便于后续分析

      • 后台服务定时 扫描异常 Pod 记录,并进行自动清理与回收,确保系统健康

  • ③ 最后总结一句
    • 高并发调度靠的不是算法有多快,而是能否控得住每个任务的状态。
    • 状态一致、执行幂等、异常可熔断、链路可观测,才能让系统稳得住、跑得远
  • 到底出了哪些幺蛾子?
    • RabbitMQ 队列大量积压

      • RabbitMQ 瞬间从 5 千条/秒飙到 5 万条/秒,CPU、磁盘都撑不住
    • 重复执行灾难:Redis 去重失灵

      • 网络延时导致部分任务没能及时标记为“已执行”,导致重复调度
    • 故障排查痛点:看不到全链路

      • 没有统一的 TraceID,怀来和合肥两地日志分散
      • 排查一个 ACK 丢失到底是哪条消息得靠人工翻多套日志

# 5)架构图

  • processon 原图 (opens new window)

# 6)Q & A

面试官可能追问 应对角度及说明
异构资源如何统一抽象调度的? 讲解如何利用统一的 Tag 机制和分层执行器模型,将 K8s、测试车、台架等不同终端抽象成统一调度对象
多终端设备如何自动发现和注册? 基于 gRPC 的心跳机制和动态注册表,每个终端启动时自动注册并定期上报状态,确保设备状态实时可见
弱网环境下如何保障任务不丢? 依靠心跳监控、超时检测和自动重调度机制,以及任务状态的幂等处理,保证设备离线或网络波动时任务可以重新分配
为什么选择 TiDB + RabbitMQ? TiDB 保证了分布式事务一致性;RabbitMQ 具备强大的多消费者并发处理、优先级队列及可靠消息机制,满足高并发任务投递需求
如何实现调度的监控与可观测? 采用链路追踪、任务状态上报,再结合 Prometheus 和 Grafana 实现实时监控、报警以及全链路性能分析
去中心化调度具体如何实现? 说明通过多实例调度、队列抢占式消费和无状态设计,实现了调度节点间的横向扩展和分布式调度,无单点故障
海量日志如何保证处理效率与实时性? 通过独立的日志处理服务,实现异步日志收集和分布式存储,日志采集与主业务流程解耦,并利用 ELK/ClickHouse 建立快速查询索引
计算加速部分如何优化容器镜像及数据处理? 引入镜像加速方案(例如使用 dfdaemon),提前对计算任务数据集做预处理,降低任务启动时的数据加载开销
跨数据中心调度如何设计? 基于分布式架构设计,利用全局一致性协议和分布式事务确保数据一致性,同时采用延时容忍机制应对跨区域通信延迟
如何确保系统数据和任务状态的一致性? 通过分布式事务、心跳同步和任务状态上报(幂等处理),利用 TiDB 保证强一致性,再辅以状态对比修正机制
DAG任务编排如何避免循环依赖问题? 在任务提交环节进行依赖检查和校验,利用 DAG 算法保证任务之间的关系是有向无环的,必要时采用人工审批或回滚机制
当 RabbitMQ 队列堵塞时如何应对? 采用限流、消息回退及队列监控策略,必要时结合自动扩容机制,及时发现并调整队列负载,确保任务持续流畅
如何调试和排查分布式调度系统中的问题? 通过全链路日志和分布式追踪系统,对各节点的调用链、状态上报及异常日志进行集中收集与分析,从而快速定位瓶颈或故障点
对于任务优先级分配,你们是如何设计任务打分的? 讨论如何根据业务权重、任务重要性动态调整优先级,并结合 RabbitMQ 分组机制和 Kubernetes 的抢占式调度,保障高优先级任务先行
系统高可用性的设计中有哪些关键考虑? 强调所有节点支持水平扩展、无状态设计、自动故障检测与重调度机制,以及跨区域冗余和网络负载均衡策略

# 1、分布式一致性(去中心化)

# 1)设计概述

  • 基于 RabbitMQ + Redis + ETCD 构建的去中心化、高可靠、支持横向扩展的分布式任务调度系统
  • 核心通过Redis分布式锁实现任务幂等消费,确保在多实例并发下也能保持一致性
  • 并通过RabbitMQ的ack机制确保任务消息不会丢失
  • 设计目标

    • 去中心化 调度服务水平扩展,避免单点故障

    • 水平扩展 支持通过增加调度服务实例来提高并发处理能力

    • 任务幂等 任务在多个实例消费时,不会被重复处理(redis去重机制)

    • 高可靠性 确保即使调度服务实例故障,消息仍然不会丢失(MQ Ack,Redis缓存检测)

    • 负载均衡 各调度实例合理分担任务,避免某些实例过载

  • 系统流程

    • 所有调度服务实例并行监听 RabbitMQ 队列,尝试抢占消费任务

    • 调度服务获取 Redis 锁(任务ID为 key)防止重复消费

    • Redis去重机制 在消费者端维护一个全局Redis Set,分别记录“正在执行”和“已执行成功”的任务状态

    • 如果多次执行失败,任务会写入到Redis中,由监控告警人工介入处理

# 2)去中心化调度

  • 多实例消费 RabbitMQ,所有调度服务实例可以同时从同一个 RabbitMQ 队列中消费任务
  • 抢占式消费机制,
    • RabbitMQ 的特性允许多个消费者从同一个队列并行抢占式消费消息,但每条消息只能被一个消费者成功消费
  • 消息确认机制:
    • 每个调度实例通过GrpcServer分发给GrpcClient后,会使用 RabbitMQ 的 ack 确认机制
    • 如果未能成功执行,消息会重新放回队列,其他实例可继续消费
  • Redis去重机制
    • 在消费者端维护一个全局Redis Set,分别记录“正在执行”和“已执行成功”的任务状态
      • 消费端在获取任务后,将任务加入“正在执行”Set
      • 如果任务执行失败,则从“正在执行”Set移除
      • 若执行成功,则将其移入“已执行成功”Set
    • 消费消息前会先在这两个Set中进行检查,若任务已经存在,则自动放弃调度,以避免重复执行

# 2、容错机制

  • 为了在 gRPC 客户端网络抖动时,既能保证任务被可靠调度,又能避免过度敏感导致重复调度
  • 需要设计合理的失联检测和任务重调度机制
  • 客户端心跳机制

    • 客户端定期发送心跳包(如每10秒)以保持连接
    • 服务端缓存心跳时间戳,更新最后接收时间
    • 若60秒内未收到心跳,服务端将触发失联检测
    • 失联 Agent任务会有 超时机制接管
  • 任务超时机制

    • 客户端在执行任务期间,每隔 1 分钟 发送心跳请求到服务端,告知任务仍在执行

    • 服务端收到心跳后,更新该任务的最后执行时间戳

    • 如果超过一定时间(3分钟)未收到心跳,才会认为任务可能失败(网络抖动处理)

  • 任务重调度策略

    • 在消费者端维护一个全局Redis Set,分别记录“正在执行”和“已执行成功”的任务状态

    • 若任务已经存在,则自动放弃调度,以避免重复执行

    • 若判断当前任务需要重新调度,则从“正在执行”Set移除;并将这些任务重新分配给其他可用客户端

  • 故障转移

    • 在系统发生故障时,任务引擎服务会接管调度功能

    • 通过Etcd获取其他可用的gRPC服务端进行任务派发,确保任务不中断

    • 系统在每次任务重新调度时,会增加其TTL值当TTL值超过预设限度时

    • 系统会将TTL超时状态上报到RabbitMQ,以便及时处理异常情况

  • 任务失败自动重试

  • 系统支持为任务设置失败后的重试次数,任务失败时系统会根据预设的重试策略自动进行重试

  • 同时设置重试最大次数,以避免系统资源浪费

# 3、高效调度

  • 任务发布:用户提交任务,通过RabbitMQ发布,包含优先级和组信息,任务被路由到对应队列
  • 调度:RabbitMQ根据优先级进行任务调度,Redis实时监控调度状态并动态调整资源分配
  • 执行:Worker节点接收任务并反馈结果至RabbitMQ,结果异步写入TiDB实现持久化存储
  • RabbitMQ 优先级队列

    • 优先级范围:可以设置一个优先级范围,通常为 [0-9] 或 [0-255],0 为最低优先级,最大值为最高优先级

    • 任务的分配:当多个任务同时到达时,优先级高的任务会优先被调度和执行

  • 分组队列机制

  • 每个用户组对应一个队列(例如,GroupA有GroupA-PriorityQueue,GroupB有GroupB-PriorityQueue)

    • 不同组的队列消息根据业务规则分配不同的优先级区间,例如0-9优先级
    • 使用basic_qos方法为每个队列设置不同的预取值,控制每次消费者从队列中取出消息的频率
    • 预取值计算公式:权重 * 总处理能力(假设系统的总任务处理能力为100次/秒)
      • GroupA的预取值应设为0.1 * 100 = 10
      • GroupB的预取值应设为0.9 * 100 = 90
  • 动态分片

    • 系统支持将一次任务中的多个并行原子任务打包并放入队列中

    • 调度服务会优先将这些任务分配给不同的Agent执行器随着用户执行器数量的增加,任务分片的数量也会相应增加

    • 从而加快任务处理速度这种机制提高了任务的并行度和执行效率

  • 路由策略

    • 每个原子任务可以设置Tag标记,用于指引任务分配到特定的执行器节点这有助于优化资源分配,提升系统调度效率

    • 支持用户部署私有Agent,用户只需具备足够资源,即可保证任务的高效调度此设计增强了系统的灵活性和定制化能力

# 4、计算加速

# 1)缓存与镜像加速

  • NIOFS、Ceph、Alluxio,高成本(需要三份数据备份)、复杂的运维和部署

  • Dragonfly:

    • 通过利用 P2P 技术,减少跨机房和跨境带宽成本
    • 无缝支持容器镜像分发,容易集成
    • 支持机器整体带宽管理,提升稳定性
    • 提高下载稳定性,支持高度一致性校验
  • Dragonfly 具体实现

    • 在 Kubernetes 集群中的每个 Node 节点上安装配置 dfdaemon 代理

    • sudo vim /etc/docker/daemon.json
      # 添加以下内容,将 Docker 的镜像拉取代理指向 dfdaemon:
      {
        "registry-mirrors": ["http://127.0.0.1:65001"]
      }
      
      sudo systemctl restart docker
      
      # dfdaemon 启动:确保 dfdaemon 在每个节点上监听 65001 端口,拦截 Docker 的镜像拉取请求
      
      1
      2
      3
      4
      5
      6
      7
      8
      9

# 2)计算加速与调度优化

  • 实现过程:

    • 计算加速组件首先识别处于待处理状态(pending)的计算任务(即 step instance)
    • 每个任务通常会有其所需的数据集(例如,模型文件、输入数据等)
    • 这些数据集合需要被识别并聚合在一起,以便后续处理
  • 示例: 假设有一个机器学习任务,它有多个步骤(step),每个步骤需要不同的数据

    • Step A 需要数据集 dataset_A

    • Step B 需要模型文件 model_B

    • Step C 需要特征文件 features_C

    • 计算加速组件会将这些数据收集起来,形成一个待处理的数据集合

    • {
          "pending_steps": ["Step A", "Step B", "Step C"],
          "data_requirements": [
              "dataset_A",
              "model_B",
              "features_C"
          ]
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
  • 利用缓存加速模块有序并发下载 Data 集合

    • 计算加速组件会利用缓存加速模块,根据用户配置的缓存空间和跨域下载需求,进行有序的并发下载
    • 如果某个数据已经被缓存,则可以直接从缓存中获取,而不需要再次下载
    • 缓存加速模块会将数据写入到pod高性能 pvc挂载盘中,缓存结束后会把缓存映射路径写入到TiDB
    • 当原子被调度运行起来时检查是否有映射,有就直接使用无需再次下载

# 5、海量日志处理

# 1)日志收集流程

  • ① K8S内网Job日志收集

    • k8s集群名/Job名称 采集写入 ClickHouse
  • ② 外网测试车日志收集

    • grpc client通过长连接将日志传送给grpc Server,grpc server将日志写入Kafka
    • 再由分布客户端消费Kafka中日志数据,将日志写入到ClickHouse中
  • ③ 任务重新调度日志处理

    • 每个任务派发时会生成一个唯一TaskID,将日志写入Kafka时使用TaskID作为标识

    • 每次任务重试,不管多少次,都会将执行事件写入到TiDB中,每次对应一个唯一TaskID

    • 任务被重新调度,TaskID不会改变,即使被重新调度,也可以将两次调度日志合并到一起

# 2)日志数据流设计

  • Kafka 分区与 TaskID 映射

    • Kafka 可以基于 TaskID 进行哈希分区,保证同一个任务的所有日志会写入到同一个分区,方便后续的消费和处理

    • Kafka 的生产者可以通过设置批量发送日志,减少频繁的网络交互,提升吞吐量

  • 日志批量消费与存储

    • 可以设置多个消费者实例来并行消费不同分区的日志,保证高并发任务的日志能够实时处理

    • 同时,消费日志时可以选择批量拉取,减轻对下游存储系统的压力

  • 日志存储与查询

    • 实时日志存储 (ES),能够支持基于 TaskID 查询日志,并且可以根据时间、节点等条件进行过滤查询

    • 归档日志存储 (S3),任务执行结束后,ES 中的日志会按批次推送到 S3 或 OSS 对象存储中

# 6、跨网络调度

  • 跨网段通信
    • 通过gRPC Stream的双向流式传输,保持客户端与服务端的持续连接,减少频繁创建连接的开销
    • 外网Agent利用OpenSSL自建CA和特定算法,采用外网私钥加密、内网公钥解密,确保建联安全
      • 外网测试车首次启动gRPC Agent,Agent生成公钥 并 上报到内网 gRPC服务端(人工审批)
  • gRPC通信和负载均衡
    • 当客户端断开连接时,系统自动重连,迅速恢复连接
    • 采用轮询,结合Ingress的负载均衡,动态分配流量,并将连接信息存储在内存中和ETCD
    • 通过心跳监控连接状态,若多次失败则移除失效连接
    • 并将AgentID和未完成任务记录在Redis中,超时未重连则重新派发任务,同时保证任务幂等性

# 02.项目模块说明

# 1、配置管理层

# 1)配置管理

  • 团队管理,项目管理(流水线最终归属一个项目里)
  • 通知管理(飞书通知、邮件通知、通知模版定义)
  • Webhook管理(回调URL,Header、Body)
  • 仓库配置(仓库地址,认证token)
  • 制品库配置(制品地址、认证秘钥)
  • 资源配置(K8S、测试车、台架、实机、虚机)
  • 应用配置(打通与Apollo配置中心数据拉取覆盖,区分dev、prod环境,用户部署时将参数设置到原子环境变量中,或者k8s secret中)
  • 扩展属性配置(支持用户自定义实例页面属性显示)

# 2)流水线管理

  • 流水线配置
    • 配置 实例变量
    • 触发规则(GitLab触发、Webhook触发、定时触发、人工触发)
    • 重试规则(重试条件 变量=某一个值或者正则时,重试间隔,最多多少次)
    • 并发规则(流水线最多能够启动多少实例),终止最早实例 或 禁止新实例触发
    • 通知规则(实例成功,实例失败,变量符合指定值或正则)
    • 回调规则(header、body数据根据规则替换成环境变量数据)
    • 管理权限(可编辑人、可触发人、运营权限)

# 3)原子配置

  • 原子配置
    • 源原子(指定要拉取的Gitlab仓库地址,分支名,Tag)
    • 自定义原子(支持Python、Shell、Java等多种环境)
    • 远程原子
      • 启动原子(URL、Method、Header、Body)
      • 终止原子(URL、Method、Header、Body)
      • 原子终止回调(支持变量替换)
    • 子流程原子
      • 原子触发子流程实例创建(将父流水线参数携带到子流程中)
      • 重试策略(创建新实例,重试旧实例)
      • 支持任意节点重试(调度服务逐级启动父实例,原子逐级启动子实例)
    • Docker构建原子
      • 关联源原子、镜像仓库、镜像名称、镜像Tag
      • Dockerfile、Dockerfile路径(Kaniko构建完成自动推送Harbor)
    • 部署原子
      • 关联“应用参数”+环境“dev/prod”等(原子部署时会引用 替换标记)
      • 指定 “集群资源”,命名空间、部署名称
      • 或者指定gitops路径参数执行部署
    • 流控原子(指定哪些原子不执行直接跳过)
    • 人工原子(重要步骤用来人工审批节点,对接内部工单系统)

# 4)DAG编排和触发

  • DAG任务编排
    • 实例包含多个阶段,阶段包含多个原子,原子根据依赖关系编排并行串行
    • 编排好的任务最终生成 DAG(有向无环图)
  • 触发流水线
    • 提供用户接口,用于触发调度实例(GitLab触发、Webhook触发、定时触发、人工触发)
    • 在实例创建后,通过 TiDB 事务将实例ID推送至 RabbitMQ 的 实例ID队列 中

# 5)效能大盘

  • 流水线效能大盘

    • 开始时间、结束时间、调度等待耗时、业务执行耗时、业务等待耗时、业务执行时间
  • 效能统计(统计大盘、实时大盘、组大盘)

    • 实例数量、原子数量、耗时
    • CPU、GPU、内存 使用量 使用率
    • 车辆、台架 资源使情况
  • 资源Quota

    • 1.流水线实例, 2.原子实例, 3.CPU requests, 4.CPU limits, 5.MEM requests, 6.MEM limits, 7.GPU', 台架数量,测试车数量

# 2、调度引擎层

# 1)派发引擎服务

  • 在实例创建后,通过 TiDB 事务将实例ID推送至 RabbitMQ 的 实例ID队列 中

  • RabbitMQ 优先级队列

    • 优先级范围:可以设置一个优先级范围,通常为 [0-9] 或 [0-255],0 为最低优先级,最大值为最高优先级

    • 任务的分配:当多个任务同时到达时,优先级高的任务会优先被调度和执行

  • 分组队列机制

    • 每个用户组对应一个队列(例如,GroupA有GroupA-PriorityQueue,GroupB有GroupB-PriorityQueue)

    • 不同组的队列消息根据业务规则分配不同的优先级区间,例如0-9优先级

    • 使用basic_qos方法为每个队列设置不同的预取值,控制每次消费者从队列中取出消息的频率

    • 预取值计算公式:权重 * 总处理能力(假设系统的总任务处理能力为100次/秒)

      • GroupA的预取值应设为0.1 * 100 = 10
      • GroupB的预取值应设为0.9 * 100 = 90
  • 预处理服务 根据实例ID、原子ID,获取原子详细配置和要执行的命令参数

    • 调度集群信息 or 车辆类型 or 台架类型(根据Agent Tag标记识别)
  • 根据上面参数生成包括创建K8S Job的YAML文件等最终执行结构体,并推送到 原子封装队列

# 2)调度服务

  • 去中心化设计(大型项目)
1、每个调度服务实例启动后注册到 ETCD,维护服务列表和健康状态
2、所有调度服务实例并行监听 RabbitMQ 队列,尝试抢占消费任务
3、成功消费任务后,调度服务获取 Redis 锁(任务ID为 key)防止重复消费
4、如果锁获取成功,调度服务执行任务;如果锁获取失败,丢弃该任务
5、任务执行完成后,调度服务发送 RabbitMQ `ack`,并更新任务状态到 Redis
6、如果任务执行失败,RabbitMQ 重新将任务放回队列,等待其他实例消费
7、如果多次执行失败,任务会写入到redis中,由监控告警人工介入处理
1
2
3
4
5
6
7

# 3、任务执行层(GRPC服务层)

# 1)GRPC Server(任务引擎服务)

  • 任务下发
    • gRPC服务端接收来自调度服务的任务,根据内存中 gRPC客户端 上报的Tag标记,将任务发送给匹配的客户端
  • 去中心化调度&故障转移
    • 如果检查发现自己没有可用的指定Tag客户端,或者客户端异常无法正常执行
    • 任务引擎服务会担当调度服务的功能,从etct中获取其他可用的gRPC服务端派发
    • 每次被重新调度TTL值加1,当超过预设TTL值时会上报TTL超时状态到RabbitMQ
  • 客户端连接管理
    • 客户端启动后会根据公网域名和路由策略分配到指定gRPC服务端,并建立gRPC长连接
    • 服务端根据客户端ID把连接写入到内存中,并同步到Redis中(服务端、客户端、Tag标记对应关系)
    • 客户端会与服务端保持心跳机制,服务端更新客户端信息到ETCD中,ETCD中有过期策略,心跳3次失败就会自动过期
  • 日志收集
    • 客户端使用gRPC长连接把执行日志上报给服务端,服务端根据 实例ID/原子ID 存储日志到挂载盘
    • 原子结束后把原子日志文件上报到OSS存储服务中,并将存储URL替换TiDB中的挂载盘路径
    • 运行时日志通过挂载盘直接读取给客户端,超大日志只读取指定长度日志,同时提供存储地址,用户可自行下载
  • 原子状态同步
    • 客户端将原子成功 & 失败 等状态同步到服务端,服务端将状态写入RabbitMQ中,并同步到Kafka中,方便用户订阅

# 2)GRPC Client(执行器Agent)

  • 接收任务
    • 客户端接收并解析来自 Server 的原子任务,并根据任务类型执行任务
      • K8S任务:K8S Job任务,使用上面封装好的K8S Job结构体直接启动执行
      • 脚本任务:直接在本机执行(台架和测试车等都是脚本任务)
    • 将执行任务POD Name或者台架信息上报到服务端,服务端更新绑定信息到TiDB和Redis缓存中
  • 上报日志
    • K8S任务日志:直接根据POD NAME从K8S接口中拉取数据流发送给服务端
    • 脚本执行日志:通过gRPC长连接实时同步到服务端,并同步写入到本地(失败重传)
  • 执行状态上报
    • 客户端根据执行成功失败情况将原子执行状态上报到服务端
  • 建立连接
    • gRPC客户端启动后会尝试与服务端建立连接,如果失败会一直重试直到建联成功
    • 客户端根据直接执行能力上报Tag标记(测试车、P1台架、P2台架、K8S等)
    • 建联成功后会有一个goroutine一直与服务端保持心跳服务
  • 公网认证服务
    • 如果是车端Agent或者外网台架,客户端通过Openssl生成公钥私钥对
    • 通过认证Token认证,调用http外网域名接口上报到云端,云端审批后即可完成认证
    • 完成认证后客户端使用私钥对URL参数加密,云端解密,成功后即可完成gRPC连接

# 4、服务保障层

# 1)原子状态同步服务

  • 原子状态同步,负责同步任务执行的状态
  • 消费 RabbitMQ 的状态队列,将任务的执行状态更新至 TiDB 数据库中,确保任务状态的实时监控
    • GRPC Server将原子执行状态上报到 RabbitMQ 的状态队列
    • priorityAtomSync JOB 消费状态队列,并将状态更新到TiDB
  • 原子状态同步充分考虑了幂等性问题
    • 原子状态上报时携带当时上报时间的毫秒级时间撮,保证久状态不会覆盖更新新状态

# 2)原子执行器

  • 原子执行器由执行器和代码片段两部分组成,代码片段可以是官方的,也支持用户自定义
  • 通过挂载盘或 wget 动态下载的方式,执行指定路径下的 GO 二进制文件或脚本
  • 执行脚本时,会根据环境变量传递的实例和原子ID,获取并注入任务参数,实现原子任务之间的数据共享
  • 执行完成后,将产物变量信息上报至平台,原子Agent封装了执行器中的逻辑,可以直接执行
  • 官方代码片段主要包括
    • 源原子(指定要拉取的Gitlab仓库地址,分支名,Tag)
    • 远程原子(定义启动,终止的调用URL,类似与Postman,串联业务)
    • 子流程原子(用来触发其他子流程,实时同步执行百分比)
    • Docker构建原子(利用Kaniko构建镜像并推送到Harbor)
    • 部署原子(使用K8S配置将服务部署到K8S中)
    • 流控原子(指定哪些原子不执行直接跳过)
    • 人工原子(重要步骤用来人工审批节点,对接内部工单系统)
  • 命令行执行,自定义原子(支持Python、Shell、Java等多种环境)
    • 比如用户可以git clone下载一个python脚本,执行python脚本
    • 也可以直接执行Shell脚本等

# 3)监控服务

  • 告警服务

    • 实例超时启动告警

    • 原子超时启动告警

    • 资源使用率超过预定阀值告警

    • 原子执行时长超过遇到值报警

    • 报警数据展示、报警收敛、报警升级

  • 重新调度服务(主要用来做兜底)

    • 日志再收集:
      • 定时任务检查原子执行结束但没有日志的原子
      • K8S Job类型:重新从K8S中拉取数据流,上传到OSS,更新日志链接
      • 脚本类型:告警,并等待测试车、台架等在线后重新上传(会在本地留存日志数据)
    • 原子重新调度
      • 检查最近2小时内失败原子配置,如果设置了重试策略,根据重试策略重新调度

# 4)效能看板

  • 流水线效能大盘

    • 开始时间、结束时间、调度等待耗时、业务执行耗时、业务等待耗时、业务执行时间
  • 效能统计(统计大盘、实时大盘、组大盘)

    • 实例数量、原子数量、耗时
    • CPU、GPU、内存 使用量 使用率
    • 车辆、台架 资源使情况
  • 资源Quota

    • 1.流水线实例, 2.原子实例, 3.CPU requests, 4.CPU limits, 5.MEM requests, 6.MEM limits, 7.GPU', 台架数量,测试车数量
  • 任务执行监控

    • 任务状态追踪:实时跟踪各个实例的执行状态,包括正在执行、已完成、失败、等待中等状态每个任务的执行进度可以在看板上清晰展示

    • 任务详情展示:对于每个实例和原子任务,提供详细的执行日志和步骤信息,帮助用户快速定位和解决问题

  • 性能指标监控

    • 系统负载:展示调度系统的当前负载情况,包括消息队列的长度、服务器的CPU和内存使用率等关键资源的使用情况
    • 执行延迟:分析和展示任务从触发到完成的延迟时间,帮助用户识别和解决潜在的性能瓶颈
    • 吞吐量:展示单位时间内系统处理的任务数量,帮助用户评估系统的处理能力
  • 资源利用情况

    • Kubernetes 资源监控:对于采用 Kubernetes 作为执行环境的任务,效能看板提供对 Kubernetes 资源使用情况的监控,包括 Pod 数量、节点资源使用率等
    • gRPC 连接状态:展示 gRPC Server 与 Client 之间的连接情况,追踪长连接的健康状态和响应时间

# 03.其他

# 1、服务架构

  • 1、RabbitMQ

    • 副本数量:考虑到高并发需求,建议至少使用 3 个副本(RabbitMQ 3.9.x)
      • CPU: 32 核心
      • Memory: 128GB
      • 存储: 512GB SSD
  • 2、Kafka

    • Broker 节点数量:建议至少 7 个 Broker 节点,以提高高可用性和分担负载 (Kafka 3.5.x)

      • CPU: 16 核心
      • 内存: 64GB
      • 存储: 2TB SSD
    • Partition 数量:对于 2000 万任务的高吞吐量,建议 Partition 数量设置为 200,以确保消息流量均匀分配

    • 优化 ISR 和控制器:使用 Kafka 的控制器自动管理分区和副本,确保集群在节点故障时能够快速恢复

  • 3、ES 8.x

    • 主节点Mater:至少三台单独机器(node.master: true )

      • CPU: 4 核心

      • 内存: 16GB(分配 50% 的内存给 JVM)

      • 硬盘:SSD,大小根据需求,通常配置 256GB 或以上

      • 协调节点Coordinating:至少三台独立服务器(node.data: true )

        • CPU: 8 核心

        • 内存: 32GB(分配 50% 的内存给 JVM)

        • 硬盘:SSD,大小建议 256GB 或以上

    • 数据节点Data:至少4台,后续不够用可以随时扩容,数据节点最好不要超过100台

      • CPU: 16 核心
        • 内存: 64GB(分配 50% 的内存给 JVM)

        • 存储:SSD,建议 1TB 或以上

  • 4、ETCD

    • 节点数量:建议至少使用 5 个节点,以增强容错能力(ETCD 3.5.x)
      • CPU: 8 核心
      • 内存: 32GB
      • 存储: 256GB SSD
  • 5、Redis

    • 节点数量:使用 Redis Cluster,建议至少 6 个节点配置(Redis 7.x)
      • CPU: 16 核心
      • 内存: 128GB
      • 存储: 512GB SSD
  • 6、TiDB

    • TiDB Server:使用 5 个实例配置(TiDB 7.x)

      • CPU: 32 核心

      • Memory: 128GB

      • PD Server:3 个实例,每个实例配置

        • CPU: 8 核心
        • Memory: 32GB
    • TiKV:至少 7 个实例,每个实例配置

      • CPU: 32 核心
        • Memory: 128GB
        • 存储: 2TB SSD

# 2、项目

技术架构:

  • 技术栈: Python、Golang、Shell、gRPC、Gin、Gorm、Client-Go、OpenSSL、TiDB、Redis、RabbitMQ、Kafka、Etcd、ES、K8S、Apollo、Artifactory、S3
  • 架构设计:系统采用分布式微服务架构,通过消息队列、DAG任务编排、gRPC通信等核心技术,构建高可用、高性能的任务调度与执行平台

项目背景:

  • 传统调度主要针对内网K8S环境,对车载设备、工业台架等多终端跨网络环境无法支持
  • 项目旨在构建一种支持跨网段、多终端模式的分布式调度系统

项目介绍:

  • 通用调度服务是一个高效、可靠的分布式调度系统,每日处理超过2000万任务
  • 系统设计充分考虑到 去中心化、容错机制、高效调度、计算加速 等核心能力

核心职责与贡献:

  • 分布式一致性与去中心化
    • 系统采用去中心化架构,确保每个调度实例具备独立调度能力,避免单点故障
    • 充分利用RabbitMQ可靠性,结合Redis去重机制和分布式锁,保证调度一致性
  • 容错机制
    • 设计了健壮的 心跳服务、故障转移与重调度机制
    • 确保在网络波动或设备故障时任务能够可靠地重新分配
  • 高效调度与资源优化
    • 系统利用RabbitMQ优先级队列和Redis动态调度策略
    • 根据任务优先级和资源使用状况进行实时调度
    • 支持高达20万的并发任务处理,确保计算资源的最大化利用
  • 计算加速
    • 利用Dragonfly实现P2P文件分发,降低带宽成本并提高下载稳定性
    • 有效识别并聚合计算任务所需数据,支持有序并发下载,优化执行效率
  • 海量日志处理
    • 通过gRPC长连接将日志传输至Kafka,再由分布客户端写入ES,最终归档至S3
    • 利用雪花算法唯一性,确保日志关联性与重调度后的数据合并

业务影响与成果:

  • 调度效率提升:智能化调度,系统调度效率提升超过50%,实现了千万任务的高效管理
  • 维护成本降低:模块化、可扩展的设计减少系统的开发与维护成本,维护成本降低超过40%
  • 资源利用率提高:优化资源管理,设备利用率提升50%以上,为企业节省数千万的设备投资

# 3、优先级队列代码

  • 配置 RabbitMQ 优先级队列
# 要创建优先级队列,需要在定义队列时设置 `x-max-priority` 属性例如,创建一个最大优先级为 10 的队列
channel.queue_declare(queue='task_queue', durable=True, arguments={'x-max-priority': 10})

# 消息发送时,可以为每个消息指定优先级
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message_body,
    properties=pika.BasicProperties(
        priority=message_priority  # message_priority 是消息优先级 (0-9)
    )
)
1
2
3
4
5
6
7
8
9
10
11
12
  • 设置队列的消息获取频率:
    • 权重设定:GroupA权重为0.1,GroupB权重为0.9,表示GroupB的调度频率远高于GroupA

    • 预取值计算公式:权重 * 总处理能力(假设系统的总任务处理能力为100次/秒)

      • GroupA的预取值应设为0.1 * 100 = 10
      • GroupB的预取值应设为0.9 * 100 = 90
channel.queue_declare(queue='GroupA_task_queue')
channel.queue_declare(queue='GroupB_task_queue')

channel.basic_publish(
    exchange='task_exchange',
    routing_key='GroupA',
    body=message_body
)
1
2
3
4
5
6
7
8
  • 分组队列与调度权重

    • 在分组调度中,每个用户组的调度权重决定了该组的任务消费频率

    • 例如:GroupA 权重为 0.1,GroupB 权重为 0.9,这意味着在资源紧张时,GroupB 的任务调度频率更高

    • 通过 RabbitMQ 的 basic_qos 方法限制每个组的消费者从队列中获取的消息数,从而实现根据权重进行资源调度

channel.basic_qos(prefetch_count=int(10 * weight))  # weight 为该组的调度
1
import pika
import time

# RabbitMQ 连接参数
RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USER = 'guest'
RABBITMQ_PASSWORD = 'guest'

# 权重设定
weights = {
    'GroupA': 0.1,
    'GroupB': 0.9
}

# 总处理能力
TOTAL_CAPACITY = 100

# 创建连接和频道
def create_connection():
    credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASSWORD)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=RABBITMQ_HOST,
        port=RABBITMQ_PORT,
        credentials=credentials
    ))
    return connection

# 定义消费者类
class Consumer:

    def __init__(self, group_name, queue_name, prefetch_count):
        self.group_name = group_name
        self.queue_name = queue_name
        self.prefetch_count = prefetch_count
        self.connection = create_connection()
        self.channel = self.connection.channel()

        # 声明队列
        self.channel.queue_declare(queue=self.queue_name, durable=True)

        # 设置 basic_qos, 控制每次获取的消息数量
        self.channel.basic_qos(prefetch_count=self.prefetch_count)

        # 开始消费
        self.channel.basic_consume(queue=self.queue_name,
                                   on_message_callback=self.callback,
                                   auto_ack=False)

    # 处理消息
    def callback(self, ch, method, properties, body):
        print(f"{self.group_name} received message: {body.decode()}")
        time.sleep(1)  # 模拟任务处理时间
        ch.basic_ack(delivery_tag=method.delivery_tag)

    # 启动消费者
    def start_consuming(self):
        print(f"Starting consumer for {self.group_name} with prefetch count {self.prefetch_count}")
        self.channel.start_consuming()

# 启动消费者
if __name__ == "__main__":
    consumers = []

    # 为每个组创建消费者
    for group, weight in weights.items():
        prefetch_count = int(weight * TOTAL_CAPACITY)
        queue_name = f'{group}-PriorityQueue'
        consumer = Consumer(group, queue_name, prefetch_count)
        consumers.append(consumer)

    # 启动每个消费者
    for consumer in consumers:
        consumer.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
上次更新: 2025/4/29 17:38:19
02.GPU调度

02.GPU调度→

最近更新
01
06.Mage平台
05-30
02
16.区块链交易所
05-28
03
01.常识梳理
05-28
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式