不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • 项目

    • 01.分布式调度系统
    • 02.GPU调度
      • 01.背景
        • 1、概述
        • 2、现状
        • 3、目标和非目标
      • 02.任务类型
        • 1、MPI job
        • 2、Spark Job
        • 1)Spark 概述
        • 2)普通K8s Job
        • 3)Spark Job
        • 4)平台调度职责
        • 5)go执行器运行Spark
        • 6)Spark Job 原理
        • 3、Service
        • 4、GPU任务调度差异
        • 1)资源隔离
        • 2)亲和性与反亲和性
        • 3)数据本地性
        • 4)任务隔离
      • 03.存储方案
        • 1、TB数据 长时间任务
        • 2、短时任务 临时存储
        • 3、 Redis-FUSE 挂载
        • 1)K8s 任务 YAML 配置
        • 2)业务脚本 inference.py
        • 4、脚本主动访问 Redis
        • 1)K8s 任务 YAML 配置
        • 2)业务脚本 inference.py
        • 3)调度系优化点
      • 04.技术架构
        • 1、资源管理
        • 1)资源划分
        • 2)资源申请
        • 3)任务优先级
        • 4)Quota 计算
        • 2、任务抢占
        • 1)任务抢占实现
        • 2)被抢占任务处理
        • 3、日志收集
      • 05.部门职能
        • 1、生产链路
        • 2、路测链路
        • 3、地图业务
        • 4、PnC (Planning and Control,规划与控制)
        • 5、大规模仿真
    • 03.车端构建系统
    • 04.资源管理平台
    • 05.CICD
    • 06.Mage平台
    • 07.MLOps
    • 10.发布系统使用
    • 11.自动驾驶业务
目录

02.GPU调度

# 统一资源调度平台

# 01.背景

# 1、概述

  • 平台希望能将整个 AA 的机器资源统一管理,实现统一调度
  • 将内部硬件资源动态分配,通过Quota等方式进行资源限制
  • 让闲置的资源流动起来,同时要满足不同部门,不同团队的具体使用场景

# 2、现状

  • 目前评测、数据处理等任务的现有服务只是在k8s上直接部署,没有一个系统进行统一调度资源管理
  • 而且有些机器占用后只是特定时间使用,造成资源浪费
  • 同时新加一个服务需要运维审批,然后再自行k8s部署,调度过程并不可追踪且繁琐

# 3、目标和非目标

目标

  • 实现对评测、数据处理相关任务所需机器资源在集群上的统一调度,简化当前调度机器资源的流程,减少人力成本,优化资源使用率
  • P1: 对平台进行优化,拓宽各种任务所需的功能,如支持服务的实例数量的在线扩/缩容等
  • P2: 将 NTS 训练平台的功能以及用户迁移到该平台
  • P3: 对整个AA的机器资源进行统一调度管理,支持更多业务的任务机器资源调度

非目标

  • 不做调度完机器资源之后具体业务上的调度
    • 平台单次资源调度在 10s 级别,即资源充足情况下,从获取到服务所需资源到服务启动消耗的时间
  • 不做具体业务的上线、维护
  • 不提供有状态服务或任务的调度,如 mysql 数据库等

# 02.任务类型

平台可调度的任务主要分为 MPI Job / Spark Job 和 Service

# 1、MPI job

  • 定义:即须运行一段时间只执行一次的 MPI 任务,任务有完整的生命周期(如训练任务(MPI)、Datafilter)
  • 平台职责:
    • 调度平台负责将 mpi job 调度到合适的节点上执行,并对 job 的生命周期进行监控,job 执行结束后立即释放资源
    • 同时平台支持调度多个节点用于多机运算
  • 存储方案:
    • 为用户提供一种可持久化的网络存储,支持用户申请后,在 job 启动时挂载到 job 上
    • 同时也为每个 job 提供一个生命周期与 job 相同的临时存储
  • 使用形式:
    • 用户通过将环境打成 image,在指定 image 下运行指定 command
  • 调度策略:
    • 根据任务的资源需求以及集群资源空余情况,调度 job 到合适的机器
    • 对于多节点任务,会在确保多个节点资源都满足的情况下一起调度

# 2、Spark Job

# 1)Spark 概述

  • Spark 任务定义

    • Spark 是一个 分布式计算框架,用于处理大规模数据
    • Spark Job 自动切分数据 并 智能调度计算
    • 相比 K8s Job,无需手动拆分数据,还能 动态调整资源
  • 例

    • 自动划分 100GB LiDAR 数据,每个 Executor 只处理一部分数据
    • Driver 统一调度,根据集群资源动态调整 Executor 数量(如 5 个 Executor)
    • 计算完成后自动聚合结果,无需手动管理数据拆分和合并
  • Spark Job 的作用(主要用于 处理大规模数据集)

    • 批处理计算:用于定期处理历史数据,如日志分析、数据统计等

    • 流式计算:处理实时数据流,如异常检测、事件分析等

    • 数据转换:从多个数据源提取数据,进行转换和清洗,再存入数据仓库或数据库

  • 应用场景: 清洗 LiDAR 点云数据

    • Spark 读取 100GB 点云数据,自动分配多个计算节点并行处理

    • 去除无效点(如反射过弱的噪声点)

    • 转换标准格式,方便下游算法使用

# 2)普通K8s Job

如果用 K8s Job 直接处理 100GB LiDAR 数据,可能需要手动拆分数据

  • 手动切分数据,让每个 Pod 只处理一部分
  • K8s 会启动 5 个 Pod,每个 Pod 负责不同的 lidar_part_{index}.bin 数据
  • 缺点:
    • 数据划分需手动管理,资源固定,无法动态扩展
apiVersion: batch/v1
kind: Job
metadata:
  name: lidar-processing
spec:
  completions: 5  # 5个并行任务
  parallelism: 5
  template:
    spec:
      containers:
      - name: lidar-processor
        image: myrepo/lidar:v1
        args: ["--input", "s3://dataset/lidar_part_{index}.bin"]
1
2
3
4
5
6
7
8
9
10
11
12
13

# 3)Spark Job

如果用 Spark Job 处理 100GB LiDAR 数据,流程完全不同

  • K8s 启动 Spark Driver,Driver 发现数据大,向 K8s 申请 5 个 Executor

  • Executor 由 Spark 负责调度,K8s 只是分配 Pod 资源,不关心具体计算任务

  • Spark 会自动分割数据,并均匀分配给 5 个 Executor(无需手动指定每个 Executor 处理哪部分数据)

  • 任务完成后,Driver 触发结果聚合,并存储到 HDFS/S3

  • ① 构建 Image

FROM apache/spark:latest
RUN pip install opencv-python numpy pyspark
COPY camera_processing.py /app/
ENTRYPOINT ["spark-submit", "--master", "k8s://kubernetes.api.server", "/app/camera_processing.py"]
1
2
3
4
  • ② 任务提交(K8s YAML 示例)
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: camera-processing
spec:
  type: Python
  image: myrepo/spark-camera:v1
  mainApplicationFile: /app/camera_processing.py
  sparkConf:
    spark.executor.instances: "5"
    spark.executor.memory: "8g"
    spark.executor.cores: "4"
  driver:
    cores: 2
    memory: "4g"
  executor:
    cores: 4
    memory: "8g"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  • ③ 任务执行流程

    • K8s 调度 Spark Driver Pod,运行 camera_processing.py

    • Driver 启动 5 个 Executor,每个 Executor 处理不同的视频段

    • 任务结束后,自动清理所有 Pod,释放资源

# 4)平台调度职责

  • 基于资源需求选择最佳节点

    • 任务提交时,调度器检查 CPU/内存/GPU 需求

    • 选择最合适的 Worker 节点运行 Spark Driver

    • Executor 的调度由 Spark 自身 动态分配(Spark on K8s 会自动请求 K8s 分配 Executor Pod)

  • 动态调整 Executor 数量

    • 任务启动时,默认申请 5 个 Executor

    • 发现数据量大,Spark 自动向 K8s 申请更多 Executor(最多扩展到 20 个)

    • 任务快结束时,Spark 释放多余 Executor,只保留核心计算资源

    • sparkConf:
        spark.dynamicAllocation.enabled: "true"
        spark.dynamicAllocation.initialExecutors: "5"
        spark.dynamicAllocation.minExecutors: "2"
        spark.dynamicAllocation.maxExecutors: "20"
      
      1
      2
      3
      4
      5
  • 调度服务指定调度资源

    • K8s 集群、Namespace
    • ① nodeSelector
      • 在 YAML 里添加 nodeSelector,让 K8s 选择合适的计算节点
      • 适用场景:需要 GPU 加速时,可以限定 Job 只跑在带 GPU 的机器上
    • ② 打污点
      • 让 Spark Job 运行在 带 Taint 限制的特殊节点(如高性能计算节点)
    • ③ 使用 Affinity 控制调度
      • 让 Spark Job 只运行在 特定云厂商的节点(如 AWS/GCP)

# 5)go执行器运行Spark

  • 通过 Go 执行器启动 Spark 脚本

    • # 原本镜像是这样启动的
      ENTRYPOINT ["spark-submit", "--master", "k8s://kubernetes.api.server", "/app/camera_processing.py"]
      # 现在要改为
      ENTRYPOINT ["/app/atom"]
      
      1
      2
      3
      4
    • cmd := exec.Command(
          "spark-submit",    // 启动 Spark 应用的命令行工具
          // 指定 Spark 的 Master 为 Kubernetes 模式,并提供 K8s API Server 的地址(在集群内部通常是 kubernetes.default.svc)
          "--master", "k8s://https://kubernetes.default.svc",
          "--deploy-mode", "cluster",
          "--name", "gpu-video-job",  // 指定 Spark 应用的名称,便于追踪和日志查看
      
          "--conf", "spark.executor.instances=3",  // 设置 Spark 执行器数量(即启动多少个 executor pod)
          "--conf", "spark.executor.memory=16g",   // 设置每个 executor 的内存大小
          "--conf", "spark.executor.cores=8",      // 设置每个 executor 使用的 CPU 核心数
      
          // GPU 配置
          "--conf", "spark.executor.resource.gpu.amount=1",
          "--conf", "spark.executor.resource.gpu.vendor=nvidia",
          "--conf", "spark.executor.resource.gpu.discoveryScript=/opt/spark/scripts/getGpus.sh",
          "--conf", "spark.kubernetes.executor.limit.nvidia.com/gpu=1",
      
          // nodeSelector(匹配有 GPU 的节点)
          "--conf", "spark.kubernetes.executor.nodeSelector.gpu=true",
      
          "/app/camera_processing.py",
      )
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
  • 问题 1:Go 启动的 Spark 任务,有权限根据配置自动创建多个 Executor 吗?

    • 只要 Driver Pod(go执行器) 本身有合适权限
    • 使用 spark 的 SparkApplication 或容器内 spark-submit,绑定了 edit 或 admin 角色,具备创建 Pod 权限
    • 你也可以显示在 YAML 里配置 serviceAccount: spark-runner
  • 问题 2:如果需要 GPU,我们只需调度到有 GPU 的节点就可以了吗?是否还需要额外操作?

    • 除了调度,还需配置 Spark 的 GPU 资源使用方式

    • 调度只是第一步,让 Pod 上到有 GPU 的节点,但 Spark 不会自动使用 GPU

    • 需要做以下配置必要操作(完整的 GPU 支持)

    • # 1. Executor Pod 配置使用 GPU
      "--conf", "spark.executor.resource.gpu.amount=1", # 每个 executor 使用一个 GPU
      "--conf", "spark.executor.resource.gpu.vendor=nvidia",  # 标识资源类型(仅 NVIDIA 支持)
      # Spark 必须指定一个脚本来让 Pod 内发现 GPU 资源(NVIDIA 容器通常挂好此脚本)
      "--conf", "spark.executor.resource.gpu.discoveryScript=/opt/spark/scripts/getGpus.sh",
      
      # 2. K8s 容器运行时支持 GPU
      # Driver / Executor Pod 的容器必须加上
      resources:
        limits:
          nvidia.com/gpu: 1
      
      # 3. nodeSelector 选 GPU 节点(可选但推荐)
      "--conf", "spark.kubernetes.executor.nodeSelector.gpu=true",
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14

# 6)Spark Job 原理

  • ① 数据分区

    • 任务开始时,Driver 读取数据源(如 S3/HDFS)
    • 数据被切分为多个 Partition,每个 Partition 由一个 Executor 负责处理
    • 如果 100GB LiDAR 数据存储在 S3(默认 block 128MB)
      • 则 Spark 可能会创建 100GB / 128MB ≈ 800 个 Partition
      • 这些 Partition 会均匀分配给多个 Executor
  • ② 数据如何分布式处理

    • spark.read.parquet 自动将数据切分为 Partition 并分发给 Executor

    • df.filter() 和 df.select() 在 Executor 内部 并行执行

    • df.write.parquet 触发 Shuffle 阶段(数据聚合)

    • from pyspark.sql import SparkSession
      
      spark = SparkSession.builder.appName("LiDARProcessing").getOrCreate()
      
      # Step 1: 读取数据(Spark 自动切分 Partition)
      df = spark.read.parquet("s3://dataset/lidar_data.parquet")
      
      # Step 2: 数据处理(每个 Partition 并行处理)
      cleaned_df = df.filter(df["intensity"] > 0.1).select("x", "y", "z")
      
      # Step 3: 保存处理后的数据(Spark 自动聚合)
      cleaned_df.write.parquet("s3://output/processed_lidar.parquet")
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
  • ③ 计算完成后,如何聚合数据

    • 窄依赖

      • 如果 Task 之间没有数据交换,数据可以在本地 Executor 内部完成计算(无需网络传输)
      • 例如 filter() 仅处理当前 Partition 的数据,不需要跨 Executor 交互
    • 宽依赖

      • 例如 groupBy()、reduceByKey() 需要跨 Partition 进行数据合并

      • Spark 需要 重新分区(Shuffle),将相同 key 的数据汇总到同一个 Executor 进行最终计算

# 3、Service

  • 定义:即需要长期保持运行,执行日常任务的无状态的服务
  • 平台职责:
    • 平台负责维持用户定义数量的服务实例,实例挂掉后自动重启,保证服务的稳定运行
    • 用户能够动态扩缩服务实例的数量
  • 例子:目前训练系统所使用的 WebTerminal、Web 后端等
  • 存储方案:持久化存储形式同 mpi job
  • 调度策略:只要能满足运行服务的最小需求,即minAvailable即可调度
  • 相关组件:用户可以创建和管理一些相关组件,然后在启动 service 时选择对应的组件
    • config:service 所需的配置文件
    • secret:包含少量敏感信息例如密码、令牌或密钥的对象

# 4、GPU任务调度差异

  • 从调度系统的角度来看,CPU 任务和 GPU 任务的调度流程基本相同

  • 都是通过 YAML 指定 集群(Cluster)、命名空间(Namespace)、资源需求(CPU/GPU/内存)、污点/亲和性等约束条件

  • 但是,GPU 任务调度相比 CPU 任务,需要额外考虑一些因素,以确保任务能够正确、高效地运行

# 1)资源隔离

  • CPU 任务 只需要 CPU/内存,可以随意调度到任何有空闲 CPU 的节点上

  • GPU 任务 需要 GPU 资源(如 NVIDIA A100, V100)

  • 并且 必须确保 GPU 资源不会被多个 Pod 共享(即 GPU 不能像 CPU 那样在多个任务间动态分配)

  • ① 使用 nvidia.com/gpu 资源请求,让 K8s 确保每个 GPU Pod 绑定到特定 GPU

    • resources:
        limits:
          nvidia.com/gpu: 1  # 申请 1 张 GPU
      
      1
      2
      3
  • ② 启用 device-plugin,让 K8s 正确分配 GPU

    • kubectl apply -f https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/master/nvidia-device-plugin.yml
      
      1
  • ③ 使用 nodeSelector 或 taints 确保 GPU 任务不会被调度到 没有 GPU 的普通节点

# 2)亲和性与反亲和性

  • 某些任务一定要跑在特定类型的节点上(亲和性)

  • 某些任务不能和其他任务跑在一起(反亲和性)

  • ① 任务必须运行在某些节点上(如 GPU 任务只能跑在 GPU 服务器上)

  • ② 任务避免调度到相同的物理机上(提高可靠性)

# 3)数据本地性

  • 对于大规模数据处理(如 Spark 任务),数据存储在 HDFS、S3、CephFS 这类分布式存储,任务需要从远程存储读取数据

  • 如果调度不当,可能会导致:

    • 任务调度到数据存储位置很远的节点,导致 I/O 开销大

    • 同一个任务的 Executor 可能被分布到不同的数据中心,影响性能

  • ① 使用 nodeAffinity 让任务优先调度到存储节点附近

  • ② 使用 Spark spark.locality.wait 参数,减少远程数据访问开销

    • spark.locality.wait=10ms  # 让 Spark 尽量等一小段时间,看是否能调度到数据本地节点
      
      1
  • ③ 如果数据在 S3 等远程存储,可以提前缓存数据到本地存储(如 NVMe 盘)

# 4)任务隔离

在 K8s 里,不同团队可能会共享 GPU 资源,但我们需要

  • 确保 GPU 任务不会影响 CPU 任务

  • 同一个团队的 GPU 任务可以共享资源,但不同团队的任务不能抢占资源

  • ① 使用 K8s ResourceQuota 限制每个团队的 GPU 资源

    • apiVersion: v1
      kind: ResourceQuota
      metadata:
        name: gpu-quota
        namespace: team-a
      spec:
        hard:
          nvidia.com/gpu: "4"  # 该团队最多使用 4 块 GPU
      
      1
      2
      3
      4
      5
      6
      7
      8
  • ② 使用 PriorityClass 让重要任务优先调度

    • apiVersion: scheduling.k8s.io/v1
      kind: PriorityClass
      metadata:
        name: high-priority
      value: 1000  # 优先级越高,调度时越靠前
      
      1
      2
      3
      4
      5

# 03.存储方案

# 1、TB数据 长时间任务

  • 对 Ceph RBD CSI 进行集成,用户可以申请一些“硬盘”,在申请资源时可以选择挂载,存储内容持久化

  • 大规模深度学习训练(如自动驾驶模型训练)

    • 训练任务往往 持续数小时到数天,需要 大数据集(TB级别) 作为输入

    • 训练过程中会不断生成 模型 checkpoint(断点续训)、日志、训练中间结果

    • 需要 高吞吐、低延迟存储,并且存储内容 训练结束后仍然需要保留(如模型参数)

  • 解决方案

    • 使用 Ceph RBD CSI 进行持久化存储,挂载到训练任务的 Pod

    • 在任务调度时,自动检查是否有 Ceph RBD 挂载请求,并绑定到合适的节点

  • 具体 YAML 配置

    • 任务提交时,调度系统检查是否需要挂载 Ceph RBD 持久化存储

    • 如果任务需要 GPU 计算,调度系统会选择 GPU 计算节点,并确保 Ceph RBD 可以挂载到该节点

    • 如果 Ceph RBD 已被其他任务占用,调度系统会等待,或者调度到其他空闲节点

    • apiVersion: v1
      kind: PersistentVolumeClaim
      metadata:
        name: model-training-storage
      spec:
        accessModes:
          - ReadWriteOnce
        resources:
          requests:
            storage: 500Gi  # 申请 500GB 存储
        storageClassName: ceph-rbd  # 指定 Ceph RBD 存储
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11

# 2、短时任务 临时存储

  • 需求

    • 数据清洗、数据增强、格式转换 任务只运行几分钟到几小时

    • 需要 高性能磁盘,但 数据可以临时存储,任务完成后可释放

    • 计算过程中可能会产生 临时缓存(如解压文件、转换后的格式),不需要持久化存储

  • 解决方案

    • 使用 Local Path CSI 作为临时存储,每个物理机上的 NVMe 或 SSD 盘提供存储
    • 如果本地存储资源不足,调度系统需要考虑负载均衡,避免某些节点磁盘使用率过高
  • 具体 YAML 配置

    • apiVersion: v1
      kind: PersistentVolumeClaim
      metadata:
        name: data-processing-storage
      spec:
        accessModes:
          - ReadWriteOnce
        resources:
          requests:
            storage: 100Gi  # 申请 100GB 临时存储
        storageClassName: local-path  # 使用 Local Path CSI
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11

# 3、 Redis-FUSE 挂载

方案 特点 适用场景
业务代码主动访问 Redis 需要使用 redis-py SDK 适用于 存储结构化数据(string、hash、list)
Redis-FUSE 挂载 直接 open("/mnt/cache/file") 访问 适用于 存储大文件(LiDAR 点云、视频帧)
  • 业务代码希望直接 open("/mnt/cache/data.bin") 访问 Redis,而不用 SDK
  • Redis 主要存储 二进制文件或大数据块,如 LiDAR 点云、图片、视频帧
  • 需要 高吞吐、低延迟访问

# 1)K8s 任务 YAML 配置

  • Pod 启动时,自动挂载 Redis-FUSE,/mnt/cache 变成 Redis 的“本地目录”

    业务代码直接 open() 访问 Redis 数据,无需修改

apiVersion: batch/v1
kind: Job
metadata:
  name: redis-fuse-job
spec:
  template:
    spec:
      containers:
        - name: inference
          image: inference-image
          command: ["/bin/bash", "-c"]
          args:
            - "redisfs mount redis://redis-server:6379 /mnt/cache && python inference.py"
          volumeMounts:
            - mountPath: /mnt/cache
              name: inference-cache
      volumes:
        - name: inference-cache
          emptyDir: {}
      restartPolicy: Never
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 2)业务脚本 inference.py

with open("/mnt/cache/lidar_frame_001.bin", "rb") as f:
    data = f.read()

print(f"Loaded {len(data)} bytes from Redis-backed cache")
1
2
3
4

# 4、脚本主动访问 Redis

  • 需求(如自动驾驶感知计算)

    • 低延迟,高吞吐

    • 需要缓存最近一批推理数据,避免重复读取远程存储(如 S3、HDFS)

    • 需要 存储系统提供 Python SDK 访问能力(如 TensorFlow 训练数据加载)

# 1)K8s 任务 YAML 配置

  • 环境变量 REDIS_HOST & REDIS_PORT → 脚本可以动态连接 Redis
  • Pod 运行 inference.py 业务脚本,从 Redis 拉取数据并处理
apiVersion: batch/v1
kind: Job
metadata:
  name: redis-inference-job
spec:
  template:
    spec:
      containers:
        - name: inference
          image: inference-image
          command: ["/bin/bash", "-c"]
          args:
            - "python inference.py"
          env:
            - name: REDIS_HOST
              value: "redis-server"
            - name: REDIS_PORT
              value: "6379"
      restartPolicy: Never
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 2)业务脚本 inference.py

  • 自动判断 Redis 数据类型(r.type(key))
  • 针对不同类型(string、hash、list、set、zset)调用不同的 API
  • 避免硬编码 key 的类型,提高灵活性
import os
import redis

# 连接 Redis
redis_host = os.getenv("REDIS_HOST", "localhost")
redis_port = int(os.getenv("REDIS_PORT", 6379))
r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
1
2
3
4
5
6
7

# 3)调度系优化点

  • 假设我们有 3 个 K8s 集群,每个集群都可能运行 Redis 缓存,同时任务需要访问激光雷达数据 (frame_001.bin)

  • 🔹 任务提交前,调度系统的决策流程

  • ① 任务提交:请求处理 frame_001

    • 任务申请 2 个 GPU,10GB 内存
  • ② 调度系统检查缓存情况

    • 发现 frame_001 已经缓存在 A 集群的 Redis 上

    • B 和 C 没有缓存该数据,但 C 空闲 GPU 资源最多

  • ③ 智能调度决策

    • 如果 A 集群有空闲 GPU,任务优先调度到 A 集群,直接从 Redis 读取数据(最快)

    • 如果 A 集群 GPU 资源不足

      • 检查 A 集群是否有本地 emptyDir 缓存(如 /mnt/cache)
      • 如果 A 也没有缓存,才调度到 C,并触发缓存预热机制(将 frame_001 拉取到 C 的 Redis)

# 04.技术架构

# 1、资源管理

  • 资源划分:将集群资源划分为 Public 资源池和 Project 资源池
    • Public 资源为整个部门共同使用的资源,大部分资源都在 public 资源中,比如提供给算法训练的机器资源
    • Project 资源为某个项目或业务的保留资源,也可以是组内特殊用途的机器
  • Public 资源 Quota 限制:
    • 用户可以选择以个人名义或 project 名义申请 public 资源,个人和 project 都有一定限额的 quota 限制
    • 分发到个人和 project 的总 quota 会大于 public 资源数量,来提高集群利用率
    • 任务在 running 或者 pending 都会占用相应 quota
    • Quota 将以CPU,内存,不同加速硬件来划分
  • Project 资源:
    • 用户在 project 内调度任务不受 quota 限制,只会受 project 资源数量限制
    • 用户在 project 内的任务全 project 共享,可以共同对 project 内任务进行操作
  • 任务优先级:用户调度任务时可以选择 normal 和 low 两种优先级
    • low 任务可以在整个集群上调度,不受 quota 的限制
    • normal 任务可以在 project 或 public 资源中调度
    • Normal 任务始终会抢占 low 任务

# 1)资源划分

平台所调度的机器资源主要分为含加速硬件节点和 cpu 节点两类

  • 加速硬件节点:指带 GPU 节点或端侧开发板等带加速硬件的节点
    • 申请任务时需选择需要申请的加速硬件类型
    • 这类节点在申请时,节点的 CPU,内存数会和加速硬件个数绑定
    • 例如:A100节点可调度的资源有 8 块 gpu,56 核 cpu,944 GB 内存于是每 1 块卡就和 7 核 cpu、118 GB 内存绑定
  • CPU节点: 指只带 CPU 和内存的节点
    • 这类节点申请时只受单个节点最大 CPU 和内存数限制
  • 在申请节点时可以通过加上 tag 来指定调度相应设备的节点(例如:通过 tag:A100 去专门调度 A100机器)
资源池 适用任务 受 Quota 限制 资源类型 适用场景
Public 资源池 共享任务、训练任务 ✅ 受限 CPU / GPU / NPU / 端侧设备 共享计算资源、实验任务
Project 资源池 项目独占任务 ❌ 无限制 CPU / GPU / NPU 业务核心任务,避免资源抢占
加速硬件资源池 仅加速任务 ✅ 受限 A100 / V100 / 端侧设备 机器学习、深度学习推理
CPU 资源池 仅 CPU 任务 ✅ 受限 CPU + 内存 传统计算任务、调度系统任务

# 2)资源申请

  • 任务提交:
    • 检查 Quota 余额,确定可用资源池
    • 确认任务需求(CPU / GPU / 端侧设备)
    • 若申请 GPU 资源,检查 绑定的 CPU & 内存是否足够
  • 资源选择:
    • 优先调度 Project 资源(不受 Quota 限制)
    • 若 Project 资源不足,则尝试 Public 资源
    • 若 Public 资源也不足,则任务 进入 Pending 队列,等待抢占或资源释放
  • 加速硬件调度策略
    • 若任务请求 GPU,调度系统将按照GPU 绑定的 CPU & 内存分配
    • Eg: 任务申请 A100 资源,则调度 7 核 CPU + 118 GB 内存 + 1 GPU

# 3)任务优先级

  • 任务调度优先级

    • 任务类型 资源限制 是否抢占 low 任务 适用场景
      High(高优先级) 仅 Project 资源 ❌ 不抢占 关键业务任务
      Normal(默认) 可用 Project & Public 资源 ✅ 可抢占 low 任务 训练、推理任务
      Low(低优先级) 仅 Public 空闲资源 ❌ 可能被 normal 抢占 可中断任务,如批量任务
  • 任务抢占逻辑

    • Normal 任务可抢占 Low 任务,确保资源合理利用

    • Low 任务仅能使用闲置资源,不会影响正常业务

# 4)Quota 计算

  • 在 K8s 资源调度中,每个用户或项目的 Quota 限制 按不同资源类型 计算

    • ① CPU-only 任务 → 仅计算 CPU & 内存
    • ② GPU 任务 → 计算 GPU 数量,同时绑定 CPU & 内存
    • ③ 端侧设备任务 → 直接计算设备数量
  • Quota 计算规则

    • 资源类型 计算方式 是否受配额限制 示例
      GPU 按 GPU 数量绑定 CPU & 内存 ✅ 是 1 块 A100 = 7 核 CPU + 118 GB 内存
      CPU 直接计算 CPU 核数 & 内存大小 ✅ 是 32 核 CPU + 64 GB 内存
      端侧设备 直接计算设备数量 ✅ 是 Jetson Xavier 2 台
  • eg:GPU 任务 (申请 A100-80G 2 块)

    • 每块 A100 绑定 7 核 CPU + 118 GB 内存

# 2、任务抢占

# 1)任务抢占实现

  • 任务抢占的核心在于 K8s 自带的优先级调度机制

  • 当高优先级任务需要资源时,K8s 会自动驱逐低优先级任务,释放资源供高优先级任务运行

  • ① K8s 使用 PriorityClass 机制,定义不同任务的优先级

    • value 值越大,优先级越高
    • low-priority 任务容易被 normal-priority 任务抢占
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: normal-priority
value: 1000  # Normal 任务优先级较高
globalDefault: false
description: "Normal priority jobs"
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: low-priority
value: 500  # Low 任务优先级较低,容易被抢占
globalDefault: false
description: "Low priority jobs"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  • ② 高优先级任务(Normal)
apiVersion: batch/v1
kind: Job
metadata:
  name: normal-task
spec:
  template:
    spec:
      priorityClassName: normal-priority
      containers:
      - name: job
        image: job-image
        resources:
          limits:
            cpu: "16"
            memory: "128Gi"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  • ③ 低优先级任务(Low,可能被抢占)
apiVersion: batch/v1
kind: Job
metadata:
  name: low-task
spec:
  template:
    spec:
      priorityClassName: low-priority
      containers:
      - name: job
        image: job-image
        resources:
          limits:
            cpu: "16"
            memory: "128Gi"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 2)被抢占任务处理

  • 方案 1:Pending 超时自动失败
    • 如果任务长时间 Pending,可以通过 activeDeadlineSeconds 设置超时失败
    • 任务 Pending 超过 30 分钟会自动失败,避免积压任务
apiVersion: batch/v1
kind: Job
metadata:
  name: low-task
spec:
  activeDeadlineSeconds: 1800  # 30 分钟超时
  template:
    spec:
      priorityClassName: low-priority
      containers:
      - name: job
        image: task-image
1
2
3
4
5
6
7
8
9
10
11
12
  • 其他方案

    问题 解决方案 实现方式
    高优先级任务抢占低优先级任务 使用 PriorityClass 机制 priorityClassName
    Pending 任务长时间无资源 超时失败,重新提交 activeDeadlineSeconds
    资源紧张导致任务一直等待 降级 GPU/CPU 需求,重新调度 调度系统修改 Job 资源
    集群资源不足 迁移任务到其他集群 调度系统跨集群调度
    过期任务积压 智能回收无用任务 调度系统定期清理

# 3、日志收集

  • 确定日志存储路径

    • 对于 containerd 运行时:/var/log/pods/{namespace}_{pod-name}_{pod-uid}/{container-name}/0.log
    • 对于 Docker 运行时:/var/lib/docker/containers/{container-id}/*.log
    • 可以使用 kubectl get pod -o wide 获取 Pod 运行的 Node,并登录对应 Node
  • 安装 Vector 在 Kubernetes Node 上安装 Vector

    • curl -L https://packages.timber.io/vector/latest/vector-amd64.deb -o vector.deb
      sudo dpkg -i vector.deb
      
      1
      2
  • 优缺点

优势 缺点
高效、资源占用低 需要手动安装 Vector
可本地处理日志,减少 ClickHouse 压力 配置较复杂
支持 ClickHouse、Kafka、Loki 等多个存储 需要调优以避免数据丢失

# 05.部门职能

# 1、生产链路

  • 主要职责

    • 自动驾驶数据的 采集、处理、标注、存储等工作
    • 为模型训练和算法优化提供高质量的基础数据
  • 数据采集与清洗:

    • 利用车载传感器(如摄像头、激光雷达等)采集大量的道路行驶数据
    • 并进行数据清洗和筛选,去除无效或噪声数据。
  • 数据标注:

    • 将采集的数据进行人工或半自动的标注,如对图片或视频中的行人、车辆、交通信号等进行标记,以支持机器学习模型的训练
    • 例如,手动标注行人过马路、红绿灯状态等,确保数据的准确性
  • 数据存储与管理:

    • 维护大规模的数据存储系统,确保数据的有效存储、备份和高效读取,如在分布式存储系统中管理PB级的数据。

具体应用场景:

  • 车队通过传感器每天采集上百TB的行驶数据,数据采集链路团队负责从车辆传感器到云端的整个数据流的处理,包括数据上传、清洗和存储。

# 2、路测链路

主要负责: 自动驾驶车辆的道路测试,验证自动驾驶系统在实际道路环境中的表现,确保算法在不同场景下的可靠性与安全性。

具体职责和业务举例:

  • 路测计划制定: 根据不同的测试需求,设计测试路线和场景,覆盖城市道路、高速公路、停车场等多种复杂场景,确保自动驾驶系统的全面测试。
  • 路测数据采集: 在路测过程中,实时采集车辆传感器数据、行驶轨迹、障碍物信息等,用于分析算法在实际环境中的表现。例如,测试自动驾驶车辆在拥挤城市路况中的避障能力。
  • 结果分析与反馈: 对路测数据进行分析,如车辆的加速、刹车、转向、变道等行为是否符合预期。如果发现问题,将数据反馈至研发部门进行算法优化。

具体应用场景:

  • 在自动驾驶车辆行驶过程中,路测链路团队安排车辆在不同天气条件下的复杂路况进行测试,并通过数据分析检查车辆的表现,保证自动驾驶系统的稳定性。

# 3、地图业务

主要负责: 高精度地图的构建和维护,为自动驾驶车辆提供精确的道路信息,辅助车辆进行路径规划和导航。

具体职责和业务举例:

  • 高精地图制作: 利用激光雷达和其他传感器采集的道路信息,生成高精度的3D地图。这些地图包含详细的车道线、路标、交通信号等信息,精度通常达到厘米级。
  • 实时地图更新: 确保地图数据的实时更新,反映道路上的最新变化(如施工、路障、新增交通设施等),以便自动驾驶系统能够及时调整路径规划。
  • 地图融合: 将实时传感器数据与预先制作的高精地图进行融合,为车辆提供更准确的定位和环境感知支持。例如,车辆可以通过高精地图提前知道即将到来的弯道或交通灯位置。

具体应用场景:

  • 地图业务部门构建的高精地图帮助车辆准确识别道路标线、交通信号等细节,使车辆能够在城市中的复杂交叉路口正确导航和通行。

# 4、PnC (Planning and Control,规划与控制)

主要负责: 自动驾驶系统中的决策、路径规划和车辆控制,确保车辆安全高效地从起点行驶到目的地。

具体职责和业务举例:

  • 路径规划: 基于实时感知信息和高精地图,规划最优行驶路线,避免障碍物和其他交通参与者。例如,在拥堵的城市路段中,系统可以规划绕开拥堵路段的路线。
  • 行为决策: 在自动驾驶过程中,做出如变道、减速、停车等行为决策。例如,当车辆遇到前方有行人过马路时,系统会通过感知和决策模块发出减速并停车的指令。
  • 车辆控制: 根据规划的路径和行为决策,实时控制车辆的加速、转向和刹车,确保车辆平稳行驶。例如,在高速公路行驶时,车辆控制模块确保车辆在车道内保持稳定的车速和方向。

具体应用场景:

  • PnC团队负责设计算法来处理复杂的交通场景,如当车辆遇到前方车辆突然减速时,系统能够实时规划变道动作,并控制车辆平稳变道,避免碰撞风险。

# 5、大规模仿真

主要负责: 构建大规模的虚拟环境,通过仿真测试自动驾驶系统的性能,评估系统在各种极端条件下的行为,减少实际路测的成本和风险。

具体职责和业务举例:

  • 仿真场景构建: 创建不同的虚拟驾驶场景,包括城市道路、乡村道路、高速公路、恶劣天气等,模拟真实的驾驶环境。例如,模拟大雾天气下的高速公路行驶,测试自动驾驶系统的感知和决策能力。
  • 仿真测试与迭代: 通过仿真平台批量测试自动驾驶系统的行为,评估车辆在不同路况下的决策准确性和行驶稳定性。仿真测试可以进行数百万次,并根据测试结果对算法进行优化。
  • 异常情况处理: 在仿真过程中,模拟突发状况(如突然出现的障碍物、交通事故等),测试系统的应急反应能力。例如,测试车辆在紧急情况下的刹车反应是否足够快,能否有效避免碰撞。

具体应用场景:

  • 仿真团队可以在虚拟环境中模拟真实世界中的千种驾驶场景,如车辆在冰雪路面行驶时的刹车测试,确保算法的鲁棒性,避免将未成熟的技术直接投入路测。
上次更新: 2025/4/29 17:38:19
01.分布式调度系统
03.车端构建系统

← 01.分布式调度系统 03.车端构建系统→

最近更新
01
300.整体设计
06-10
02
06.LangGraph
06-09
03
202.AI销售智能体
06-07
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式