一、为什么需要自定义调度器

Kubernetes自带的默认调度器已经能满足大部分场景的需求,它会根据资源请求、节点亲和性等规则把Pod分配到合适的节点上。但在实际生产环境中,我们经常会遇到一些特殊需求:

  1. 需要根据业务特性调度Pod(比如GPU密集型任务必须分配到有显卡的节点)
  2. 需要实现复杂的调度策略(比如让同一服务的Pod分散在不同可用区)
  3. 需要与内部系统集成(比如根据内部资源管理系统数据做调度决策)

这时候,默认调度器就显得力不从心了。好在Kubernetes提供了完善的扩展机制,让我们可以开发自己的调度器。

二、自定义调度器的工作原理

自定义调度器的核心工作流程其实很简单:

  1. 监听API Server中未被调度的Pod(即spec.nodeName为空的Pod)
  2. 根据自定义逻辑选择合适的节点
  3. 将调度决策写回API Server

整个过程就像是一个"决策者",不断查看有哪些Pod需要调度,然后根据自己的规则决定它们应该去哪。

技术栈:Golang(因为Kubernetes本身就是用Go写的,生态最好)

下面是一个最简单的调度器框架示例:

package main

import (
    "context"
    "fmt"
    "time"
    
    v1 "k8s.io/api/core/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    // 1. 创建k8s客户端
    config, _ := clientcmd.BuildConfigFromFlags("", "/path/to/kubeconfig")
    clientset, _ := kubernetes.NewForConfig(config)
    
    // 2. 监控未调度的Pod
    watch, _ := clientset.CoreV1().Pods("").Watch(context.TODO(), v1.ListOptions{
        FieldSelector: "spec.nodeName=",
    })
    
    // 3. 处理调度事件
    for event := range watch.ResultChan() {
        pod := event.Object.(*v1.Pod)
        
        // 4. 简单调度逻辑:总是选择第一个节点
        nodes, _ := clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
        if len(nodes.Items) > 0 {
            selectedNode := nodes.Items[0].Name
            
            // 5. 绑定Pod到节点
            binding := &v1.Binding{
                ObjectMeta: v1.ObjectMeta{Name: pod.Name},
                Target:     v1.ObjectReference{Kind: "Node", Name: selectedNode},
            }
            _ = clientset.CoreV1().Pods(pod.Namespace).Bind(context.TODO(), binding, v1.CreateOptions{})
            
            fmt.Printf("Scheduled pod %s to node %s\n", pod.Name, selectedNode)
        }
    }
}

这个示例虽然简单,但包含了自定义调度器的所有关键要素。接下来我们会逐步完善它。

三、实现一个实用的调度策略

让我们实现一个更实用的调度策略:优先选择资源充足的节点,同时考虑亲和性要求。

// 扩展上面的示例,添加调度逻辑
func schedulePod(pod *v1.Pod, clientset *kubernetes.Clientset) (string, error) {
    // 1. 获取所有节点
    nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
    if err != nil {
        return "", err
    }
    
    // 2. 过滤符合条件的节点
    var candidateNodes []v1.Node
    for _, node := range nodes.Items {
        if meetsRequirements(pod, node) {
            candidateNodes = append(candidateNodes, node)
        }
    }
    
    // 3. 如果没有可用节点,返回错误
    if len(candidateNodes) == 0 {
        return "", fmt.Errorf("no available nodes meet the requirements")
    }
    
    // 4. 评分:选择资源最充足的节点
    bestNode := ""
    maxScore := 0
    for _, node := range candidateNodes {
        score := calculateNodeScore(pod, node)
        if score > maxScore {
            maxScore = score
            bestNode = node.Name
        }
    }
    
    return bestNode, nil
}

// 检查节点是否满足Pod的基本要求
func meetsRequirements(pod *v1.Pod, node v1.Node) bool {
    // 检查节点是否就绪
    for _, condition := range node.Status.Conditions {
        if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue {
            return false
        }
    }
    
    // 检查资源是否足够
    requestedCPU := getPodCPURequest(pod)
    requestedMemory := getPodMemoryRequest(pod)
    allocatableCPU := node.Status.Allocatable.Cpu().MilliValue()
    allocatableMemory := node.Status.Allocatable.Memory().Value()
    
    return requestedCPU <= allocatableCPU && requestedMemory <= allocatableMemory
}

// 计算节点得分(资源越充足得分越高)
func calculateNodeScore(pod *v1.Pod, node v1.Node) int {
    // 简单示例:只考虑CPU和内存
    cpuScore := node.Status.Allocatable.Cpu().MilliValue() - getPodCPURequest(pod)
    memoryScore := node.Status.Allocatable.Memory().Value() - getPodMemoryRequest(pod)
    
    return int(cpuScore + memoryScore/1000000) // 内存转换为MB
}

四、与默认调度器共存

在实际环境中,我们可能希望自定义调度器只处理特定的Pod,其他的仍然由默认调度器处理。这可以通过几种方式实现:

  1. 使用nodeSelector:给Pod添加特定的标签
  2. 使用schedulerName:在PodSpec中指定调度器名称

推荐使用第二种方式,因为它更明确。修改后的Pod YAML如下:

apiVersion: v1
kind: Pod
metadata:
  name: my-pod
spec:
  schedulerName: my-custom-scheduler # 指定使用我们的调度器
  containers:
  - name: nginx
    image: nginx

然后在我们的调度器代码中,只需要处理指定了schedulerName的Pod:

for event := range watch.ResultChan() {
    pod := event.Object.(*v1.Pod)
    
    // 只处理指定了我们调度器的Pod
    if pod.Spec.SchedulerName != "my-custom-scheduler" {
        continue
    }
    
    // 原有的调度逻辑...
}

五、高级调度策略示例

让我们看一个更复杂的场景:实现一个支持多租户的调度器,确保不同租户的Pod均匀分布在各个节点上。

// 多租户调度策略
type MultiTenantScheduler struct {
    clientset       *kubernetes.Clientset
    tenantNodeCount map[string]map[string]int // tenant -> node -> count
}

func (s *MultiTenantScheduler) Schedule(pod *v1.Pod) (string, error) {
    // 1. 获取租户信息
    tenant := pod.Labels["tenant"]
    if tenant == "" {
        return "", fmt.Errorf("pod missing tenant label")
    }
    
    // 2. 获取所有节点
    nodes, err := s.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
    if err != nil {
        return "", err
    }
    
    // 3. 找出最适合的节点(当前租户Pod最少的节点)
    bestNode := ""
    minCount := int(^uint(0) >> 1) // 初始化为最大值
    
    for _, node := range nodes.Items {
        if !meetsRequirements(pod, node) {
            continue
        }
        
        // 获取该节点上当前租户的Pod数量
        count := s.getTenantNodeCount(tenant, node.Name)
        if count < minCount {
            minCount = count
            bestNode = node.Name
        }
    }
    
    if bestNode == "" {
        return "", fmt.Errorf("no suitable node found")
    }
    
    // 4. 更新计数
    s.updateTenantNodeCount(tenant, bestNode)
    return bestNode, nil
}

// 辅助方法:获取某租户在某节点上的Pod数量
func (s *MultiTenantScheduler) getTenantNodeCount(tenant, node string) int {
    if nodes, ok := s.tenantNodeCount[tenant]; ok {
        if count, ok := nodes[node]; ok {
            return count
        }
    }
    return 0
}

// 辅助方法:更新计数
func (s *MultiTenantScheduler) updateTenantNodeCount(tenant, node string) {
    if s.tenantNodeCount == nil {
        s.tenantNodeCount = make(map[string]map[string]int)
    }
    if s.tenantNodeCount[tenant] == nil {
        s.tenantNodeCount[tenant] = make(map[string]int)
    }
    s.tenantNodeCount[tenant][node]++
}

六、部署和运行自定义调度器

开发完成后,我们需要将调度器部署到集群中。有几种常见方式:

  1. 作为独立的Deployment运行
  2. 以静态Pod方式运行
  3. 在集群外运行(需要正确配置kubeconfig)

以下是作为Deployment运行的示例:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: custom-scheduler
  labels:
    app: custom-scheduler
spec:
  replicas: 2  # 通常需要多个副本确保高可用
  selector:
    matchLabels:
      app: custom-scheduler
  template:
    metadata:
      labels:
        app: custom-scheduler
    spec:
      serviceAccountName: scheduler-sa  # 需要专门的ServiceAccount
      containers:
      - name: scheduler
        image: my-custom-scheduler:1.0
        imagePullPolicy: Always

注意要为调度器创建专门的ServiceAccount并授予必要的RBAC权限:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: scheduler-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: custom-scheduler-role
rules:
- apiGroups: [""]
  resources: ["nodes"]
  verbs: ["get", "list", "watch"]
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "list", "watch", "update", "bind"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: custom-scheduler-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: custom-scheduler-role
subjects:
- kind: ServiceAccount
  name: scheduler-sa
  namespace: default

七、测试和验证

部署完成后,我们需要验证调度器是否正常工作。可以创建一个测试Pod:

apiVersion: v1
kind: Pod
metadata:
  name: test-pod
  labels:
    tenant: tenant-a  # 用于多租户调度器测试
spec:
  schedulerName: my-custom-scheduler
  containers:
  - name: busybox
    image: busybox
    command: ["sleep", "3600"]
    resources:
      requests:
        cpu: "100m"
        memory: "100Mi"

然后检查Pod是否被正确调度:

kubectl get pod test-pod -o wide

查看调度器日志以了解调度决策过程:

kubectl logs -l app=custom-scheduler

八、性能优化建议

当集群规模变大时,调度器性能可能成为瓶颈。以下是一些优化建议:

  1. 缓存节点信息,减少API Server查询
  2. 实现调度器队列,避免同时处理太多Pod
  3. 对评分算法进行优化,减少计算量
  4. 考虑使用优先级队列,重要Pod优先调度

这里是一个带缓存的调度器改进示例:

type CachedScheduler struct {
    clientset    *kubernetes.Clientset
    nodeCache    []v1.Node
    lastUpdated  time.Time
    cacheTTL     time.Duration
}

func (s *CachedScheduler) getNodes() ([]v1.Node, error) {
    // 如果缓存未过期,直接返回缓存数据
    if time.Since(s.lastUpdated) < s.cacheTTL {
        return s.nodeCache, nil
    }
    
    // 否则从API Server获取最新数据
    nodes, err := s.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
    if err != nil {
        return nil, err
    }
    
    // 更新缓存
    s.nodeCache = nodes.Items
    s.lastUpdated = time.Now()
    return s.nodeCache, nil
}

九、常见问题与解决方案

在实际使用中,可能会遇到以下问题:

  1. 调度器冲突:多个调度器同时修改同一个Pod

    • 解决方案:使用ResourceVersion实现乐观锁
  2. 调度决策不一致:不同副本的调度器缓存状态不同

    • 解决方案:实现分布式缓存或缩短缓存TTL
  3. 调度器成为单点故障

    • 解决方案:部署多个副本,实现领导者选举

领导者选举示例:

import (
    "k8s.io/client-go/tools/leaderelection"
    "k8s.io/client-go/tools/leaderelection/resourcelock"
)

func runWithLeaderElection(ctx context.Context, clientset *kubernetes.Clientset) {
    lock := &resourcelock.LeaseLock{
        LeaseMeta: v1.ObjectMeta{
            Name:      "custom-scheduler-lock",
            Namespace: "default",
        },
        Client: clientset.CoordinationV1(),
        LockConfig: resourcelock.ResourceLockConfig{
            Identity: os.Getenv("POD_NAME"), // 每个副本需要唯一标识
        },
    }
    
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock:            lock,
        ReleaseOnCancel: true,
        LeaseDuration:   15 * time.Second,
        RenewDeadline:   10 * time.Second,
        RetryPeriod:     2 * time.Second,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                // 成为领导者后运行调度逻辑
                runScheduler(ctx, clientset)
            },
            OnStoppedLeading: func() {
                // 领导者变更处理
            },
        },
    })
}

十、总结与最佳实践

开发自定义调度器是一项强大的功能,但也需要谨慎使用。以下是一些最佳实践:

  1. 尽量复用默认调度器的功能,只在必要时自定义
  2. 确保调度器是幂等的,相同的输入总是产生相同的输出
  3. 实现完善的日志记录,便于问题排查
  4. 考虑调度器的性能影响,特别是大规模集群
  5. 为调度器添加监控指标,如调度延迟、错误率等

自定义调度器打开了Kubernetes调度能力的无限可能,从简单的资源分配到复杂的业务规则都可以实现。希望本文能帮助你理解其工作原理并成功实现自己的调度策略。