一、为什么需要自定义调度器
Kubernetes自带的默认调度器已经能满足大部分场景的需求,它会根据资源请求、节点亲和性等规则把Pod分配到合适的节点上。但在实际生产环境中,我们经常会遇到一些特殊需求:
- 需要根据业务特性调度Pod(比如GPU密集型任务必须分配到有显卡的节点)
- 需要实现复杂的调度策略(比如让同一服务的Pod分散在不同可用区)
- 需要与内部系统集成(比如根据内部资源管理系统数据做调度决策)
这时候,默认调度器就显得力不从心了。好在Kubernetes提供了完善的扩展机制,让我们可以开发自己的调度器。
二、自定义调度器的工作原理
自定义调度器的核心工作流程其实很简单:
- 监听API Server中未被调度的Pod(即spec.nodeName为空的Pod)
- 根据自定义逻辑选择合适的节点
- 将调度决策写回API Server
整个过程就像是一个"决策者",不断查看有哪些Pod需要调度,然后根据自己的规则决定它们应该去哪。
技术栈:Golang(因为Kubernetes本身就是用Go写的,生态最好)
下面是一个最简单的调度器框架示例:
package main
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 1. 创建k8s客户端
config, _ := clientcmd.BuildConfigFromFlags("", "/path/to/kubeconfig")
clientset, _ := kubernetes.NewForConfig(config)
// 2. 监控未调度的Pod
watch, _ := clientset.CoreV1().Pods("").Watch(context.TODO(), v1.ListOptions{
FieldSelector: "spec.nodeName=",
})
// 3. 处理调度事件
for event := range watch.ResultChan() {
pod := event.Object.(*v1.Pod)
// 4. 简单调度逻辑:总是选择第一个节点
nodes, _ := clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
if len(nodes.Items) > 0 {
selectedNode := nodes.Items[0].Name
// 5. 绑定Pod到节点
binding := &v1.Binding{
ObjectMeta: v1.ObjectMeta{Name: pod.Name},
Target: v1.ObjectReference{Kind: "Node", Name: selectedNode},
}
_ = clientset.CoreV1().Pods(pod.Namespace).Bind(context.TODO(), binding, v1.CreateOptions{})
fmt.Printf("Scheduled pod %s to node %s\n", pod.Name, selectedNode)
}
}
}
这个示例虽然简单,但包含了自定义调度器的所有关键要素。接下来我们会逐步完善它。
三、实现一个实用的调度策略
让我们实现一个更实用的调度策略:优先选择资源充足的节点,同时考虑亲和性要求。
// 扩展上面的示例,添加调度逻辑
func schedulePod(pod *v1.Pod, clientset *kubernetes.Clientset) (string, error) {
// 1. 获取所有节点
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
if err != nil {
return "", err
}
// 2. 过滤符合条件的节点
var candidateNodes []v1.Node
for _, node := range nodes.Items {
if meetsRequirements(pod, node) {
candidateNodes = append(candidateNodes, node)
}
}
// 3. 如果没有可用节点,返回错误
if len(candidateNodes) == 0 {
return "", fmt.Errorf("no available nodes meet the requirements")
}
// 4. 评分:选择资源最充足的节点
bestNode := ""
maxScore := 0
for _, node := range candidateNodes {
score := calculateNodeScore(pod, node)
if score > maxScore {
maxScore = score
bestNode = node.Name
}
}
return bestNode, nil
}
// 检查节点是否满足Pod的基本要求
func meetsRequirements(pod *v1.Pod, node v1.Node) bool {
// 检查节点是否就绪
for _, condition := range node.Status.Conditions {
if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue {
return false
}
}
// 检查资源是否足够
requestedCPU := getPodCPURequest(pod)
requestedMemory := getPodMemoryRequest(pod)
allocatableCPU := node.Status.Allocatable.Cpu().MilliValue()
allocatableMemory := node.Status.Allocatable.Memory().Value()
return requestedCPU <= allocatableCPU && requestedMemory <= allocatableMemory
}
// 计算节点得分(资源越充足得分越高)
func calculateNodeScore(pod *v1.Pod, node v1.Node) int {
// 简单示例:只考虑CPU和内存
cpuScore := node.Status.Allocatable.Cpu().MilliValue() - getPodCPURequest(pod)
memoryScore := node.Status.Allocatable.Memory().Value() - getPodMemoryRequest(pod)
return int(cpuScore + memoryScore/1000000) // 内存转换为MB
}
四、与默认调度器共存
在实际环境中,我们可能希望自定义调度器只处理特定的Pod,其他的仍然由默认调度器处理。这可以通过几种方式实现:
- 使用nodeSelector:给Pod添加特定的标签
- 使用schedulerName:在PodSpec中指定调度器名称
推荐使用第二种方式,因为它更明确。修改后的Pod YAML如下:
apiVersion: v1
kind: Pod
metadata:
name: my-pod
spec:
schedulerName: my-custom-scheduler # 指定使用我们的调度器
containers:
- name: nginx
image: nginx
然后在我们的调度器代码中,只需要处理指定了schedulerName的Pod:
for event := range watch.ResultChan() {
pod := event.Object.(*v1.Pod)
// 只处理指定了我们调度器的Pod
if pod.Spec.SchedulerName != "my-custom-scheduler" {
continue
}
// 原有的调度逻辑...
}
五、高级调度策略示例
让我们看一个更复杂的场景:实现一个支持多租户的调度器,确保不同租户的Pod均匀分布在各个节点上。
// 多租户调度策略
type MultiTenantScheduler struct {
clientset *kubernetes.Clientset
tenantNodeCount map[string]map[string]int // tenant -> node -> count
}
func (s *MultiTenantScheduler) Schedule(pod *v1.Pod) (string, error) {
// 1. 获取租户信息
tenant := pod.Labels["tenant"]
if tenant == "" {
return "", fmt.Errorf("pod missing tenant label")
}
// 2. 获取所有节点
nodes, err := s.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
if err != nil {
return "", err
}
// 3. 找出最适合的节点(当前租户Pod最少的节点)
bestNode := ""
minCount := int(^uint(0) >> 1) // 初始化为最大值
for _, node := range nodes.Items {
if !meetsRequirements(pod, node) {
continue
}
// 获取该节点上当前租户的Pod数量
count := s.getTenantNodeCount(tenant, node.Name)
if count < minCount {
minCount = count
bestNode = node.Name
}
}
if bestNode == "" {
return "", fmt.Errorf("no suitable node found")
}
// 4. 更新计数
s.updateTenantNodeCount(tenant, bestNode)
return bestNode, nil
}
// 辅助方法:获取某租户在某节点上的Pod数量
func (s *MultiTenantScheduler) getTenantNodeCount(tenant, node string) int {
if nodes, ok := s.tenantNodeCount[tenant]; ok {
if count, ok := nodes[node]; ok {
return count
}
}
return 0
}
// 辅助方法:更新计数
func (s *MultiTenantScheduler) updateTenantNodeCount(tenant, node string) {
if s.tenantNodeCount == nil {
s.tenantNodeCount = make(map[string]map[string]int)
}
if s.tenantNodeCount[tenant] == nil {
s.tenantNodeCount[tenant] = make(map[string]int)
}
s.tenantNodeCount[tenant][node]++
}
六、部署和运行自定义调度器
开发完成后,我们需要将调度器部署到集群中。有几种常见方式:
- 作为独立的Deployment运行
- 以静态Pod方式运行
- 在集群外运行(需要正确配置kubeconfig)
以下是作为Deployment运行的示例:
apiVersion: apps/v1
kind: Deployment
metadata:
name: custom-scheduler
labels:
app: custom-scheduler
spec:
replicas: 2 # 通常需要多个副本确保高可用
selector:
matchLabels:
app: custom-scheduler
template:
metadata:
labels:
app: custom-scheduler
spec:
serviceAccountName: scheduler-sa # 需要专门的ServiceAccount
containers:
- name: scheduler
image: my-custom-scheduler:1.0
imagePullPolicy: Always
注意要为调度器创建专门的ServiceAccount并授予必要的RBAC权限:
apiVersion: v1
kind: ServiceAccount
metadata:
name: scheduler-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: custom-scheduler-role
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "update", "bind"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: custom-scheduler-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: custom-scheduler-role
subjects:
- kind: ServiceAccount
name: scheduler-sa
namespace: default
七、测试和验证
部署完成后,我们需要验证调度器是否正常工作。可以创建一个测试Pod:
apiVersion: v1
kind: Pod
metadata:
name: test-pod
labels:
tenant: tenant-a # 用于多租户调度器测试
spec:
schedulerName: my-custom-scheduler
containers:
- name: busybox
image: busybox
command: ["sleep", "3600"]
resources:
requests:
cpu: "100m"
memory: "100Mi"
然后检查Pod是否被正确调度:
kubectl get pod test-pod -o wide
查看调度器日志以了解调度决策过程:
kubectl logs -l app=custom-scheduler
八、性能优化建议
当集群规模变大时,调度器性能可能成为瓶颈。以下是一些优化建议:
- 缓存节点信息,减少API Server查询
- 实现调度器队列,避免同时处理太多Pod
- 对评分算法进行优化,减少计算量
- 考虑使用优先级队列,重要Pod优先调度
这里是一个带缓存的调度器改进示例:
type CachedScheduler struct {
clientset *kubernetes.Clientset
nodeCache []v1.Node
lastUpdated time.Time
cacheTTL time.Duration
}
func (s *CachedScheduler) getNodes() ([]v1.Node, error) {
// 如果缓存未过期,直接返回缓存数据
if time.Since(s.lastUpdated) < s.cacheTTL {
return s.nodeCache, nil
}
// 否则从API Server获取最新数据
nodes, err := s.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
if err != nil {
return nil, err
}
// 更新缓存
s.nodeCache = nodes.Items
s.lastUpdated = time.Now()
return s.nodeCache, nil
}
九、常见问题与解决方案
在实际使用中,可能会遇到以下问题:
调度器冲突:多个调度器同时修改同一个Pod
- 解决方案:使用ResourceVersion实现乐观锁
调度决策不一致:不同副本的调度器缓存状态不同
- 解决方案:实现分布式缓存或缩短缓存TTL
调度器成为单点故障
- 解决方案:部署多个副本,实现领导者选举
领导者选举示例:
import (
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)
func runWithLeaderElection(ctx context.Context, clientset *kubernetes.Clientset) {
lock := &resourcelock.LeaseLock{
LeaseMeta: v1.ObjectMeta{
Name: "custom-scheduler-lock",
Namespace: "default",
},
Client: clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: os.Getenv("POD_NAME"), // 每个副本需要唯一标识
},
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// 成为领导者后运行调度逻辑
runScheduler(ctx, clientset)
},
OnStoppedLeading: func() {
// 领导者变更处理
},
},
})
}
十、总结与最佳实践
开发自定义调度器是一项强大的功能,但也需要谨慎使用。以下是一些最佳实践:
- 尽量复用默认调度器的功能,只在必要时自定义
- 确保调度器是幂等的,相同的输入总是产生相同的输出
- 实现完善的日志记录,便于问题排查
- 考虑调度器的性能影响,特别是大规模集群
- 为调度器添加监控指标,如调度延迟、错误率等
自定义调度器打开了Kubernetes调度能力的无限可能,从简单的资源分配到复杂的业务规则都可以实现。希望本文能帮助你理解其工作原理并成功实现自己的调度策略。
评论