MENU

Kubelet

• 2021 年 11 月 23 日 • Kubernetes

Kubelet

进入 cmd/kubelet/app/server.go 调用 NewKubeletCommand 函数,使用默认的参数创建一个 cobra.Command 指针:

func NewKubeletCommand() *cobra.Command {
    ...
    cmd := &cobra.Command{
        Use: componentKubelet,
        ...
        // Kubelet具有特殊的flag解析要求来强制执行flag优先级规则,因此我们在下面的Run中手动完成所有解析。DisableFlagParsing=true提供传递给`args` arg中的kubelet的完整flagset来运行,没有Cobra的干扰。
        DisableFlagParsing: true,
        Run: func(cmd *cobra.Command, args []string) {
            // 初始化flag解析,因为我们禁用了cobra的flag解析
            if err := cleanFlagSet.Parse(args); err != nil {
                cmd.Usage()
                klog.Fatal(err)
            }
            ......
            // 校验初始的 KubeletFlags
            // 加载 kubelet config 文件(如果开启)
            // 使用动态的 kubelet config(如果开启)
            // 通过kubeletFlags和kubeletConfig封装KubeletServer结构
            // 使用 kubeletServer 来构造默认的 KubeletDeps
            // 添加 kubelet 配置控制器到 kubeletDeps
            // 添加 stopCh Channel 供 kubelet 和 docker shim 复用
            // 开启docker shim特性(如果开启)
            if kubeletServer.KubeletFlags.ExperimentalDockershim {
                if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
                    klog.Fatal(err)
                }
                // 注意这里的return,意味着启用 dockershim only 模式,kubelet只会启动 dockershim 进程。此标志仅用于测试目的,除非您意识到自己在做什么,否则请不要使用它。 [默认= FALSE]
                return
            }
            // 运行 kubelet
            if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
                klog.Fatal(err)
            }
        }
    }
    ...
    return cmd
}

其中 RunDockershim 函数仅在当前进程中启动 dockershim。这仅用于验证测试目的。

func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConfiguration, stopCh <-chan struct{}) error {
    // 初始化 docker 客户端配置
    // 初始化网络插件配置
    // 初始化 streaming 配置(现在不使用TLS)
    // 独立的 dockershim 将始终会启动本地的streaming server。
    ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings,
        f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, true /*startLocalStreamingServer*/)
    if err != nil {
        return err
    }
    // 为 docker CRI shim 启动 GRPC 服务器
    server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds)
    if err := server.Start(); err != nil {
        return err
    }
    <-stopCh
    return nil
}

然后是最核心的 kubelet 启动函数 Run:

// Run 函数运行一个具有给定依赖项的 KubeletServer,永远不会退出。
// kubeDeps 参数可能为 nil - 如果为空,则需要从 KubeletServer 上的设置进行初始化。
// 如果调用者已经设置了依赖对象了则不会生成默认的对象了。
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
    if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
        return fmt.Errorf("failed OS init: %v", err)
    }
    if err := run(s, kubeDeps, stopCh); err != nil {
        return fmt.Errorf("failed to run Kubelet: %v", err)
    }
    return nil
}

调用下面的核心的 run 函数:

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
    // 基于初始的KubeletServer的值设置全局的 feature gates 特性
    // 校验初始KubeletServer(我们首先设置feature gates,因为该验证依赖feature gates)
    if err := options.ValidateKubeletServer(s); err != nil {
        return err
    }
    // 获取 Kubelet 锁文件
    // 使用 /configz 端口注册当前配置
    // 获取客户端等信息,检测独立(standaloneMode)模式
    // 获取主机名、获取节点名
    // 如果是独立模式,尽可能将所有客户端设置成nil
    // 否则
    //      创建 kubelet 客户端
    //      为事件创建一个独立的客户端
    //      为禁用限流和附加超时的心跳创建一个单独的客户端
    //          如果启用了NodeLease功能,则超时是租约持续时间和状态更新频率的最小值
    //          NodeLease: Kubelet 使用新的 Lease API 报告节点心跳,节点生命周期控制器使用这些心跳作为节点健康信号。
    // 如果 kubelet 配置控制器可用,并且启用了动态配置,则启动配置和状态同步循环
    // DynamicConfigDir:
    //      Kubelet 将使用此目录检查下载的配置并跟踪配置运行状况。
    //      Kubelet 将创建此目录(如果该目录尚不存在), 路径可以是绝对的或相对的; 相对路径位于 Kubelet 的当前工作目录下。
    //      提供此标志可启用动态 kubelet 配置,要使用此标志,必须启用DynamicKubeletConfig feature gates。
    if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
        kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
        // StartSync 函数告诉控制器开一个goroutine从apiserver同步status/config 到 /(位于pkg/kubelt/kubeletconfig/controller.go)
        if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
            return err
        }
    }

    // 然后调用 BuildAuth 函数创建一个验证器,一个授权器,以及一个 kubelet的authorizer.RequestAttributesGetter
    // 安装事件记录器
    // 解析资源预留配置
    // 构造 ContainerManager

    // 然后调用 RunKubelet 函数
    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
        return err
    }

    // 如果开启了 healthzPort 端口(本地 healthz endpint 端口,0表示禁用),则开启 healthz server

    // 如果使用了 systemd , 通知它我们已经启动了
    go daemon.SdNotify(false, "READY=1")

    select {
    case <-done:
        break
    case <-stopCh:
        break
    }

    return nil
}

其中校验 KubeletServer 的函数 ValidateKubeletServer 位于cmd/kubelet/app/options/options.go

// ValidateKubeletServer 函数校验 KubeletServer 的配置,如果配置无效则返回error。
func ValidateKubeletServer(s *KubeletServer) error {
    // 任何的 KubeletConfiguration 校验都使用kubeletconfigvalidation.ValidateKubeletConfiguration函数
    // ValidateKubeletConfiguration 函数位于 pkg/kubelet/apis/config/validation/validation.go
    if err := kubeletconfigvalidation.ValidateKubeletConfiguration(&s.KubeletConfiguration); err != nil {
        return err
    }
    // ValidateKubeletFlags 函数校验 Kubelet 的配置 flag,如果无效则返回 error
    if err := ValidateKubeletFlags(&s.KubeletFlags); err != nil {
        return err
    }
    return nil
}

其中启动 Kubelet 核心的 RunKubelet 函数如下:

// RunKubelet 函数负责配置和运行 kubelet。它用于三种不同的应用:
//  1. 集成测试
//  2. Kubelet 二进制
//  3. 独立的 kubernetes 二进制
// 最终,#2将会被#3的实例所替换
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    ......
    // 核心仍然是调用 createAndInitKubelet 函数将 Kubelet 配置、依赖等构造成 kubelet 的启动接口

    // 然后调用 startKubelet 函数启动 Kubelet
    startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)

    return nil
}

其中 createAndInitKubelet 函数调用了 kubelet.NewMainKubelet 函数来构造 kubelet 的启动接口,NewMainKubelet 函数位于pkg/kubelet/kubelet.go

// NewMainKubelet 函数实例化一个新的 Kubelet 对象以及所有必需的内部模块。此处不会初始化 Kubelet 及其模块。
func NewMainKubelet(...) {
    // 一些常规校验
    // 指定容器垃圾回收策略
    // 指定镜像垃圾回收策略
    // 配置驱逐相关策略
    // 配置kubelet客户端
    // 配置集群DNS
    // 组装Kubelet结构体
    // 设置 secretManager
    // 设置 configMapManager
    // 设置 livenessManager
    // 设置 podManager,podManager 还负责保持 secretManager 和 configMapManager 内容为最新。
    // 设置 statusManager
    // 启动 docker CRI shim GRPC 服务(包括RuntimeService和ImageService两个客户端):
    //      默认 remoteRuntimeEndpoint=unix:///var/run/dockershim.sock,即默认使用本地的docker作为容器运行时;如果remoteImageEndpoint没有指定,则和remoteRuntimeEndpoint保持一致
    // 然后创建一个新的 kubeGenericRuntimeManager 
    // 初始化一个新的 GenericPLEG 对象,用于配置pod生命周期事件生成器
    // 然后配置容器回收策略
    // 配置镜像回收策略
    // 如果配置了 Kubelet 证书自动轮转(开启RotateKubeletServerCertificate特性),则初始化证书管理器
    // 然后配置probe管理器(探针管理)
    // 生成一个对pod的service account token的管理器
    // 调用如下方法配置volume插件管理器:
    //      NewInitializedVolumePluginMgr函数初始化Kubelet runtimeState上的一些storageErrors(在csi_plugin.go init中),这会影响节点d的就绪状态。必须在初始化Kubelet之前调用此函数,以便节点ReadyState在存储状态下准确无误。
    klet.volumePluginMgr, err =
        NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
    // 然后配置volume管理器
    // 配置eviction(驱逐)管理器
    // 如果开启了NodeLease特性(节点生命周期控制器使用lease api报告的心跳来作为节点健康的信号)),则配置nodeLease控制器
    // 最后,将最新版本的配置放在 Kubelet 上,以便可以看到它的配置方式:
    klet.kubeletConfiguration = *kubeCfg
    // 最后的最后,生成状态函数应该是我们做的最后一件事,因为这依赖于已经构造的Kubelet的其他部分。
    // defaultNodeStatusFuncs 是一个工厂函数(位于pkg/kubelet/kubelet_node_status.go),它生成一组默认的 setNodeStatus 函数
    klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()

    return klet, nil
}

其中 startKubelet 函数在调用上面的 createAndInitKubelet 函数获取到最新的 kubelet 启动对象以后,直接启动 kubelet 的一个server:

 // startKubelet 函数很简单就是直接启动一个 kubelet 的server,
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
    // 开启一个goroutine,调用Run函数,不退出。
    go wait.Until(func() {
        k.Run(podCfg.Updates())
    }, 0, wait.NeverStop)

    // 开启 kubelet server
    if enableServer {
        go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

    }
    // 如果开启了只读端口,然后也会在一个gorouting 里面开启一个只读的 server
    if kubeCfg.ReadOnlyPort > 0 {
        go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
    }
    // 如果开启了 kubelet 的 pod 资源 grpc 服务,则同样运行 一个kubelet 的 podresource grpc 服务
    if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
        go k.ListenAndServePodResources()
    }
}

其中 Run 函数,位于pkg/kubelet/kubelet.go,运行启动 kubelet 响应配置更新:

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    // 配置 logServer
    // 检查 kubeClient
    // 开启 cloud provider 同步管理器
    // 调用 initializeModules 函数初始化内部模块(不需要启动容器运行时)
    if err := kl.initializeModules(); err != nil {
        kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
        klog.Fatal(err)
    }
    // 启动 volume 管理器
    if kl.kubeClient != nil {
        // 立即开始同步节点状态,这可能会设置运行时需要运行的内容
        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)

        // FastStatusUpdateOnce 函数启动一个循环,在应用CIDR时检查内部节点索引器缓存,并尝试立即更新POD CIDR。更新pod CIDR后,它会触发运行时更新和节点状态更新。函数在一次成功的节点状态更新后直接返回。该功能仅在 kubelet 启动期间执行,通过尽快更新 pod cidr、运行时状态和节点状态来提高准备就绪节点的延迟。
        go kl.fastStatusUpdateOnce()

        // 开始同步 lease
        if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
            go kl.nodeLeaseController.Run(wait.NeverStop)
        }
    }
    // 每5s调用一次updateRuntimeUp函数检查容器运行时状态
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

    // 启动循环同步 iptables 规则
    if kl.makeIPTablesUtilChains {
        // 不过当前syncNetworkUtil函数是一个空实现
        go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
    }

    // 当 pod 没有被 podworker 正确处理的时候,启动一个 goroutine 杀死pod。
    go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

    // 启动组件同步循环
    //      使用apiserver同步pods状态; 也用作状态缓存。
    kl.statusManager.Start()
    //      处理容器探针
    kl.probeManager.Start()

    // 启动 RuntimeClass 控制器
    if kl.runtimeClassManager != nil {
        kl.runtimeClassManager.Start(wait.NeverStop)
    }

    // 启动 Pod 生命周期事件生成器。
    // PodLifecycleEventGenerator 是一个包含下面函数的pod生命周期事件生成器接口:
    // type PodLifecycleEventGenerator interface {
    //     Start()
    //     Watch() chan *PodLifecycleEvent
    //     Healthy() (bool, error)
    // }
    kl.pleg.Start()
    // 调用updates的同步循环
    kl.syncLoop(updates, kl)
}

其中 initializeModules 函数用来初始化内部得一些模块(不需要启动容器运行时)

func (kl *Kubelet) initializeModules() error {
    // 注册 prometheus metrics 接口
    // 创建文件系统目录:
    //      1.根目录 2.pods目录 3.插件目录 4.pod-resources 目录
    if err := kl.setupDataDirs(); err != nil {
        return err
    }
    // 如果容器日志目录不存在,则创建该目录(/var/log/containers)

    // 然后启动镜像管理器
    kl.imageManager.Start()

    // 开启 certificate 管理器
    if kl.serverCertificateManager != nil {
        kl.serverCertificateManager.Start()
    }

    // 开启 out of memory 监听器
    if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
        return fmt.Errorf("Failed to start OOM watcher %v", err)
    }

    // 开启资源分析器
    kl.resourceAnalyzer.Start()

    return nil
}

然后就是调用 syncNodeStatus 函数失效节点状态同步,位于pkg/kubelet/kubelet_node_status.go:

// syncNodeStatus 函数定期被一个goroutine所调用。
// 如果和上次同步对比有任何变化或者上次同步经过的时间足够长,它会将节点状态同步到 master,如果需要请先注册 kubelet 。
func (kl *Kubelet) syncNodeStatus() {
    kl.syncNodeStatusMux.Lock()
    defer kl.syncNodeStatusMux.Unlock()

    if kl.kubeClient == nil || kl.heartbeatClient == nil {
        return
    }
    // 如果开启了registerNode
    if kl.registerNode {
        // 如果不需要执行任何操作,它将立即退出。registerWithApiServer 函数向群集 APIServer 注册节点。可以安全地多次调用,但不能同时调用(kl.registrationcompleted 未加锁)。
        kl.registerWithAPIServer()
    }
    if err := kl.updateNodeStatus(); err != nil {
        klog.Errorf("Unable to update node status: %v", err)
    }
}

调用 updateNodeStatus 函数更新节点状态:

// updateNodeStatus 函数更新节点状态到 master,如果上次同步有任何更改或经过足够的时间,则重试一次,最多重试5次
func (kl *Kubelet) updateNodeStatus() error {
    klog.V(5).Infof("Updating node status")
    for i := 0; i < nodeStatusUpdateRetry; i++ {
        // 调用 tryUpdateNodeStatus 函数尝试进行状态更新
        if err := kl.tryUpdateNodeStatus(i); err != nil {
            // 当心跳操作失败多次时,调用 onRepeatedHeartBeatFailure 函数
            if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
                kl.onRepeatedHeartbeatFailure()
            }
            klog.Errorf("Error updating node status, will retry: %v", err)
        } else {
            return nil
        }
    }
    return fmt.Errorf("update node status exceeds retry count")
}

调用 tryUpdateNodeStatus 函数尝试去进行真正的节点状态更新:

func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
    // 在大型集群中,这里的节点对象的 GET 和 PUT 操作是 apiserver 和 etcd 上的主要负载。
    // 为了减少 etcd 上的负载,我们提供了从 apiserver 缓存获取操作(数据可能稍有延迟,但似乎不会造成更多冲突-延迟非常小)。
    // 如果导致冲突,则所有重试都直接从 etcd 提供。
    opts := metav1.GetOptions{}
    if tryNumber == 0 {
        util.FromApiserverCache(&opts)
    }
    node, err := kl.heartbeatClient.CoreV1().Nodes().Get(string(kl.nodeName), opts)
    if err != nil {
        return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
    }

    originalNode := node.DeepCopy()
    if originalNode == nil {
        return fmt.Errorf("nil %q node object", kl.nodeName)
    }

    podCIDRChanged := false
    if node.Spec.PodCIDR != "" {
        // Pod CIDR 之前就已经更新过了,因此我们需要判断node.Spec.PodCIDR是否为空,还需要知道 pod CIDR 是否真的已经改变了。
        if podCIDRChanged, err = kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
            klog.Errorf(err.Error())
        }
    }

    // setNodeStatus 函数就是调用上面我们设置的 setNodeStatusFuncs 函数列表来进行状态更新
    kl.setNodeStatus(node)

    // 如果开启了 NodeLease 特性
    now := kl.clock.Now()
    if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
        if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
            // 将指定的卷标记为已在节点的卷状态中成功报告为“正在使用”。
            kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
            return nil
        }
    }

    // Patch 现在节点的状态到 APIServer
    updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
    if err != nil {
        return err
    }
    // 设置当前时间为最近一次状态上报时间
    kl.lastStatusReportTime = now
    kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
    // 如果更新成功完成,请将卷使用标记为ReportedInUse以表明这些卷已在节点状态下更新
    kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
    return nil
}