1. 当默认调度器不够用的时候

假设你的团队管理着同时运行AI训练任务和Web服务的混合集群。某个凌晨2点,运维群里突然炸锅:10台带有A100显卡的服务器明明还有剩余算力,但新提交的GPU任务全被分配到太平洋对岸的机房了——这就是默认调度器用一刀切的策略带来的困扰。

Kubernetes默认的调度器就像个严格执行考勤的HR,它只会机械地检查"CPU够不够用""内存是否达标"这类硬性指标。但真实的业务场景中我们往往需要:

  • 优先使用本区域的GPU节点
  • 把同服务的Pod分散在不同故障域
  • 根据节点的实时温度调整负载 这时就需要自定义调度器登场了

2. 调度器是怎样"思考"的

Kubernetes调度器本质是个持续运行的决策程序,整个决策过程可以拆解为三个阶段:

// 技术栈:Go语言 + client-go库
type Scheduler struct {
    client     kubernetes.Interface  // 与API Server通讯的客户端
    podQueue   chan *v1.Pod          // 待调度Pod队列
    nodeLister corelisters.NodeLister // 节点信息监听器
}

// 核心决策循环(简化版)
func (s *Scheduler) Run(stopCh <-chan struct{}) {
    for {
        pod := <-s.podQueue                  // 接收新Pod
        nodes, _ := s.nodeLister.List()      // 获取所有节点
        feasibleNodes := filterNodes(pod, nodes)  // 筛选合格节点
        scores := prioritize(pod, feasibleNodes) // 给节点打分
        bestNode := selectHost(scores)           // 选择最高分节点
        bindPodToNode(pod, bestNode)             // 执行绑定
    }
}

这段伪代码展现的正是调度器的骨架逻辑,真实的实现需要处理事件监听、错误重试等复杂情况。其中的关键点在于filter和prioritize这两个阶段,这正是我们可以大做文章的地方。

3. 从零搭建调度器框架

让我们用Go语言配合client-go库搭建最小可行版的调度器:

// main.go - 调度器基础框架
package main

import (
    "context"
    "flag"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/klog/v2"
)

func main() {
    // 加载kubeconfig配置
    kubeconfig := flag.String("kubeconfig", "/root/.kube/config", "kubeconfig文件路径")
    flag.Parse()
    
    config, _ := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    clientset, _ := kubernetes.NewForConfig(config)
    
    // 创建共享Informer工厂
    factory := informers.NewSharedInformerFactory(clientset, 0)
    
    // 初始化调度器
    scheduler := NewScheduler(clientset, factory)
    
    // 启动Informer监听
    ctx := context.Background()
    factory.Start(ctx.Done())
    factory.WaitForCacheSync(ctx.Done())
    
    // 运行调度器主循环
    scheduler.Run(ctx)
}

// NewScheduler创建调度器实例
func NewScheduler(client kubernetes.Interface, factory informers.SharedInformerFactory) *Scheduler {
    podInformer := factory.Core().V1().Pods()
    nodeInformer := factory.Core().V1().Nodes()
    
    return &Scheduler{
        client:     client,
        podQueue:   make(chan *v1.Pod, 100),
        podLister:  podInformer.Lister(),
        nodeLister: nodeInformer.Lister(),
    }
}

这个基础框架已经具备监听集群状态的能力。接下来我们要实现具体的调度逻辑,以处理以下业务需求:

  • 优先选择标签带有"gpu-model=a100"的节点
  • 跨可用区(availability zone)部署同一服务的Pod
  • 避开最近5分钟内有硬件告警的节点

4. 实现高级调度算法

让我们实现一个兼顾资源利用率和业务稳定性的调度算法:

// scheduler_algorithm.go - 核心算法实现
const (
    ZoneWeight      = 0.3   // 可用区分布权重
    GPUScore        = 0.4   // GPU型号得分
    WarningPenalty  = -100  // 告警节点惩罚分
)

func (s *Scheduler) prioritize(pod *v1.Pod, nodes []*v1.Node) map[string]float64 {
    scores := make(map[string]float64)
    existingZones := getExistingZones(pod) // 获取同服务Pod所在可用区
    
    for _, node := range nodes {
        score := 0.0
        
        // 计算可用区得分
        currentZone := node.Labels["topology.kubernetes.io/zone"]
        if !existingZones[currentZone] {
            score += ZoneWeight * 100  // 新可用区加满分
        } else {
            score += ZoneWeight * (100 - 20*len(existingZones)) // 已有区域按密度减分
        }
        
        // GPU型号匹配度
        if node.Labels["gpu-model"] == "a100" {
            score += GPUScore * 100
        } else if node.Labels["gpu-model"] != "" {
            score += GPUScore * 50  // 其他GPU型号得50%分
        }
        
        // 节点健康状态检查
        if hasRecentWarnings(node) {
            score += WarningPenalty
        }
        
        // 资源剩余率计算
        allocatable := node.Status.Allocatable
        requested := calculateRequest(pod)
        cpuScore := (allocatable.Cpu().MilliValue() - requested.Cpu) / allocatable.Cpu().MilliValue()
        memScore := (allocatable.Memory().Value() - requested.Memory) / allocatable.Memory().Value()
        score += (cpuScore + memScore) * 0.15 * 100  // 资源利用率占比15%
        
        scores[node.Name] = score
    }
    return scores
}

// 获取同服务Pod所在可用区的工具方法
func getExistingZones(pod *v1.Pod) map[string]bool {
    zones := make(map[string]bool)
    if pod.Labels["app"] == "" {
        return zones
    }
    // 查询集群中同服务的其他Pod
    pods, _ := s.podLister.Pods(pod.Namespace).List(labels.Set{"app": pod.Labels["app"]}.AsSelector())
    for _, p := range pods {
        if p.Spec.NodeName == "" || p.Name == pod.Name {
            continue
        }
        node, _ := s.nodeLister.Get(p.Spec.NodeName)
        zones[node.Labels["topology.kubernetes.io/zone"]] = true
    }
    return zones
}

这个算法实现了:

  1. 防止单个可用区的集中部署
  2. 优先选择高性能GPU节点
  3. 避免使用不健康节点
  4. 考虑资源利用率平衡

5. 部署与实战演练

将调度器打包为容器后,通过Deployment部署到集群:

# custom-scheduler.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: gpu-scheduler
  labels:
    app: gpu-scheduler
spec:
  replicas: 2
  selector:
    matchLabels:
      app: gpu-scheduler
  template:
    metadata:
      labels:
        app: gpu-scheduler
    spec:
      serviceAccountName: scheduler-sa
      containers:
      - name: scheduler
        image: registry.example.com/gpu-scheduler:v1.2
        args:
        - --kubeconfig=/var/lib/scheduler/kubeconfig

在Pod配置中指定自定义调度器:

apiVersion: v1
kind: Pod
metadata:
  name: ai-training-job
spec:
  schedulerName: gpu-scheduler  # 关键配置项
  containers:
  - name: trainer
    image: nvidia-training:v3
    resources:
      limits:
        nvidia.com/gpu: 4

6. 调度器扩展高级玩法

Kubernetes 1.19+版本引入了Scheduler Framework,开发者可以像插件一样扩展调度器:

// 调度插件示例:节点预处理插件
type NodeSanitizerPlugin struct{}

func (sp *NodeSanitizerPlugin) Name() string {
    return "NodeSanitizer"
}

// 在过滤阶段前执行预处理
func (sp *NodeSanitizerPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
    // 检查节点是否处于维护模式
    for _, node := range nodeList {
        if node.Labels["maintenance-mode"] == "true" {
            framework.NewStatus(framework.Unschedulable, "node in maintenance")
        }
    }
    return nil
}

// 在打分阶段后调整分数
func (sp *NodeSanitizerPlugin) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
    // 给SSD存储节点额外加分
    if node.Labels["disk-type"] == "ssd" {
        return 10, nil
    }
    return 0, nil
}

这种方式可以更精细地控制调度流程,同时复用主调度器的核心逻辑。

7. 应用场景全景图

自定义调度器的典型用武之地: AI训练场景
在混合GPU机型集群中,根据任务类型动态选择机型(如A100适合大模型训练,T4适合推理任务),同时考虑GPU显存的碎片整理。

混合云调度
私有云节点优先部署敏感数据处理任务,当本地资源不足时自动将非敏感任务引流到公有云节点,且保证同一服务的所有Pod不跨云部署。

成本优化调度
根据节点的竞价实例到期时间,动态调整调度策略,在到期前30分钟停止调度新Pod到该节点,并逐步迁移现有工作负载。

8. 技术方案优劣分析

优势侧写

  • 灵活性:可以针对任何节点属性(甚至自定义指标)做调度决策
  • 深度整合:可以直接调用企业内部系统(如CMDB、监控平台)获取决策数据
  • 渐进式改进:可通过多个调度器分阶段替换,降低风险

潜在挑战

  • 开发复杂度:需要深入理解Kubernetes调度机制
  • 性能瓶颈:复杂的算法可能影响调度吞吐量
  • 版本适配:Kubernetes API变更可能影响调度器兼容性

9. 血泪经验总结

在多个生产环境中实施自定义调度器后,我们总结了这些黄金法则:

  1. 多调度器共存策略
    同时运行自定义调度器与默认调度器时,通过nodeAffinity划分管辖范围,防止调度冲突。例如:
apiVersion: v1
kind: Node
metadata:
  labels:
    scheduler-group: gpu  # 节点分组标签
---
apiVersion: v1
kind: Pod
metadata:
  name: normal-pod
spec:
  schedulerName: default-scheduler
  affinity:
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
        - matchExpressions:
          - key: scheduler-group
            operator: NotIn
            values: ["gpu"]
  1. 资源预留缓冲
    无论算法多么智能,永远要为系统组件保留至少10%的资源空间。通过Pod优先级机制确保关键系统Pod优先获得资源:
// 在打分函数中添加优先级加权
if pod.Spec.Priority != nil && *pod.Spec.Priority > 1000000 {
    finalScore *= 1.5  // 系统关键Pod获得50%加分
}
  1. 灰度发布机制
    新调度器上线时,先通过标签选择部分节点和Pod进行试运行:
kubectl label nodes node-001 scheduler-canary=enabled
kubectl label pods test-job-1 scheduler-group=experimental

10. 未来展望

随着Kueue等批量调度项目的成熟,未来我们可以实现:

  • 多级调度:首层调度决定资源池分配,二层调度选择具体节点
  • 动态策略:根据实时负载自动切换调度算法
  • 意图预测:通过历史数据分析提前预调度资源