Kubelet – Pod 创建之 CRI 和 CNI 源码剖析

本文源码跟踪基于 1.12.6

CRI

创建 Pod 入口

k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go

// kubeGenericRuntimeManager::runtimeService:  newInstrumentedRuntimeService(runtimeService)

// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create init containers.
//  6. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    // Step 1: Compute sandbox and container changes.

    // Step 2: Kill the pod if the sandbox has changed.
    if podContainerChanges.KillPod {
    // ...
    } else {
        // Step 3: kill any running containers in this pod which are not to keep.
        // ...
    }

    // Keep terminated init containers fairly aggressively controlled
    // This is an optimization because container removals are typically handled
    // by container garbage collector.
    m.pruneInitContainersBeforeStart(pod, podStatus)

    // We pass the value of the podIP down to generatePodSandboxConfig and
    // generateContainerConfig, which in turn passes it to various other
    // functions, in order to facilitate functionality that requires this
    // value (hosts file and downward API) and avoid races determining
    // the pod IP in cases where a container requires restart but the
    // podIP isn't in the status manager yet.
    //
    // We default to the IP in the passed-in pod status, and overwrite it if the
    // sandbox needs to be (re)started.
    podIP := ""
    if podStatus != nil {
        podIP = podStatus.IP
    }

    // Step 4: Create a sandbox for the pod if necessary.  
  // 创建使用 pause 镜像创建的 sandbox
    podSandboxID := podContainerChanges.SandboxID
    if podContainerChanges.CreateSandbox {
        // ...
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
    // 底层调用 m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)

        glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

        podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
    }

    // Get podSandboxConfig for containers to start.
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)

    // Step 5: start the init container.  启动所有的 init container 
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
                // ...
        }

        // Successfully started the container; clear the entry in the failure
        glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
    }

    // Step 6: start containers in podContainerChanges.ContainersToStart. 启动 container 
    for _, idx := range podContainerChanges.ContainersToStart {
        container := &pod.Spec.Containers[idx]
    // ...

        glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
            // ...
        }
    }

    return
}

CRI 接口调用和实现

CRI 相关文档: Introducing Container Runtime Interface (CRI) in Kubernetes

k8s.io/kubernetes/pkg/kubelet/dockershim/docker_sandbox.go

dockerService 类实现了 CRI 接口:

type dockerService struct {
    client           libdocker.Interface
    os               kubecontainer.OSInterface
    podSandboxImage  string
    streamingRuntime *streamingRuntime
    streamingServer  streaming.Server

    network *network.PluginManager   // 实现了网络
    // Map of podSandboxID :: network-is-ready
    networkReady     map[string]bool
    networkReadyLock sync.Mutex

    containerManager cm.ContainerManager
    // cgroup driver used by Docker runtime.
    cgroupDriver      string
    checkpointManager checkpointmanager.CheckpointManager
    // caches the version of the runtime.
    // To be compatible with multiple docker versions, we need to perform
    // version checking for some operations. Use this cache to avoid querying
    // the docker daemon every time we need to do such checks.
    versionCache *cache.ObjectCache
    // startLocalStreamingServer indicates whether dockershim should start a
    // streaming server on localhost.
    startLocalStreamingServer bool
}

RuntimeService 接口定义:

// RuntimeService interface should be implemented by a container runtime.
// The methods should be thread-safe.
type RuntimeService interface {
    RuntimeVersioner
    ContainerManager
    PodSandboxManager
    ContainerStatsManager

    // UpdateRuntimeConfig updates runtime configuration if specified
    UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error
    // Status returns the status of the runtime.
    Status() (*runtimeapi.RuntimeStatus, error)
}

PodSandboxManager 接口定义:

// PodSandboxManager contains methods for operating on PodSandboxes. The methods
// are thread-safe.
type PodSandboxManager interface {
    // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
    // the sandbox is in ready state.
    RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)
    // StopPodSandbox stops the sandbox. If there are any running containers in the
    // sandbox, they should be force terminated.
    StopPodSandbox(podSandboxID string) error
    // RemovePodSandbox removes the sandbox. If there are running containers in the
    // sandbox, they should be forcibly removed.
    RemovePodSandbox(podSandboxID string) error
    // PodSandboxStatus returns the Status of the PodSandbox.
    PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error)
    // ListPodSandbox returns a list of Sandbox.
    ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)
    // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
    PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
}

RunPodSandbox 接口为例:

// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
// For docker, PodSandbox is implemented by a container holding the network
// namespace for the pod.
// Note: docker doesn't use LogDirectory (yet).
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
    config := r.GetConfig()

    // Step 1: Pull the image for the sandbox.
    image := defaultSandboxImage
    if err := ensureSandboxImageExists(ds.client, image); err != nil {
    }

    createConfig, err := ds.makeSandboxDockerConfig(config, image)
    createResp, err := ds.client.CreateContainer(*createConfig)

    resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}

    ds.setNetworkReady(createResp.ID, false)

    // Step 3: Create Sandbox Checkpoint.
    if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
        return nil, err
    }

    // Step 4: Start the sandbox container.
    // Assume kubelet's garbage collector would remove the sandbox later, if
    // startContainer failed.
    err = ds.client.StartContainer(createResp.ID)

  // 设置 DNS config
    // Do not invoke network plugins if in hostNetwork mode.
    if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
        return resp, nil
    }

    // Step 5: Setup networking for the sandbox.
    // All pod networking is setup by a CNI plugin discovered at startup time.
    // This plugin assigns the pod ip, sets up routes inside the sandbox,
    // creates interfaces etc. In theory, its jurisdiction ends with pod
    // sandbox networking, but it might insert iptables rules or open ports
    // on the host as well, to satisfy parts of the pod spec that aren't
    // recognized by the CNI standard yet.
    cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
    err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations)
    return resp, nil
}

CNI

CNI 接口调用和实现

在 dockershim 模式下,cniNetworkPlugin 实现了 CNI 定义的接口,SetUpPod 函数定义如下:

k8s.io/kubernetes/pkg/kubelet/dockershim/network/cni/cni.go

func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
    netnsPath, err := plugin.host.GetNetNS(id.ID)

    // Windows doesn't have loNetwork. It comes only with Linux
    if plugin.loNetwork != nil {
        if _, err = plugin.addToNetwork(plugin.loNetwork, name, namespace, id, netnsPath, annotations); err != nil {
        }
    }

    _, err = plugin.addToNetwork(plugin.getDefaultNetwork(), name, namespace, id, netnsPath, annotations)

    return err
}

network 相关的在函数 NewDockerService 中初始化:

k8s.io/kubernetes/pkg/kubelet/dockershim/docker_service.go

func NewDockerService(...){
  // dockershim currently only supports CNI plugins.
  // 使用传入的 cni 配置目录和bin目录,初始化插件,并供后续选择
    pluginSettings.PluginBinDirs = cni.SplitDirs(pluginSettings.PluginBinDirString)
    cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginBinDirs)
    cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDirs))

    // cniPlugins 传入全部的系统 CNI 插件
    plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU)

    ds.network = network.NewPluginManager(plug)
}

CNI 接口定义

在 DockerShim 中使用 cniNetworkPlugin 实现了 CNI 接口,CNI 接口如下:

k8s.io/kubernetes/pkg/kubelet/dockershim/network/cni/cni.go

type cniNetworkPlugin struct {
    network.NoopNetworkPlugin

    loNetwork *cniNetwork        // 指向 lo CNI 的实现,参见后续分析
  defaultNetwork *cniNetwork   // 指向 default CNI 的实现,参见后续分析

    sync.RWMutex
    defaultNetwork *cniNetwork

    host        network.Host
    execer      utilexec.Interface
    nsenterPath string
    confDir     string
    binDirs     []string
    podCidr     string
}

CNI 接口如下:

k8s.io/kubernetes/pkg/kubelet/dockershim/network/plugins.go面向 Pod 的 NetworkPlugin 接口 ,CNI 规范参见:https://github.com/containernetworking/cni/blob/master/SPEC.md#network-configuration

// Plugin is an interface to network plugins for the kubelet
type NetworkPlugin interface {
    // Init initializes the plugin.  This will be called exactly once
    // before any other methods are called.
    Init(host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error

    // Called on various events like:
    // NET_PLUGIN_EVENT_POD_CIDR_CHANGE
    Event(name string, details map[string]interface{})

    // Name returns the plugin's name. This will be used when searching
    // for a plugin by name, e.g.
    Name() string

    // Returns a set of NET_PLUGIN_CAPABILITY_*
    Capabilities() utilsets.Int

    // SetUpPod is the method called after the infra container of
    // the pod has been created but before the other containers of the
    // pod are launched.
    SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations map[string]string) error

    // TearDownPod is the method called before a pod's infra container will be deleted
    TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error

    // GetPodNetworkStatus is the method called to obtain the ipv4 or ipv6 addresses of the container
    GetPodNetworkStatus(namespace string, name string, podSandboxID kubecontainer.ContainerID) (*PodNetworkStatus, error)

    // Status returns error if the network plugin is in error state
    Status() error
}

cniNetworkPlugin 接口中包含接口 CNIConfig,其实现了 libcni.CNI 接口

type cniNetworkPlugin struct {
    network.NoopNetworkPlugin

    loNetwork *cniNetwork        // lo 本地端口
  defaultNetwork *cniNetwork  // 默认网络
}

type cniNetwork struct {
    name          string
    NetworkConfig *libcni.NetworkConfigList
    CNIConfig     libcni.CNI   // 指向 CNI 接口定义
}

CNI 接口定义

type CNI interface {
    AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
    DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error

    AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
    DelNetwork(net *NetworkConfig, rt *RuntimeConf) error
}

CNIConfig 的初始化:

func ProbeNetworkPlugins(confDir string, binDirs []string) []network.NetworkPlugin {

    plugin := &cniNetworkPlugin{
        defaultNetwork: nil,
        loNetwork:      getLoNetwork(binDirs),  // lo 初始化
        execer:         utilexec.New(),
        confDir:        confDir,
        binDirs:        binDirs,
    }

    return []network.NetworkPlugin{plugin}
}

getLoNetwork 函数定义:

func getLoNetwork(binDirs []string) *cniNetwork {
    loConfig, err := libcni.ConfListFromBytes([]byte(`{
  "cniVersion": "0.2.0",
  "name": "cni-loopback",
  "plugins":[{
    "type": "loopback"
  }]
}`))
    if err != nil {
        // The hardcoded config above should always be valid and unit tests will
        // catch this
        panic(err)
    }
    loNetwork := &cniNetwork{
        name:          "lo",
        NetworkConfig: loConfig,
        CNIConfig:     &libcni.CNIConfig{Path: binDirs},
    }

    return loNetwork
}

cniNetworkPlugin 结构体中默认网络字段 defaultNetwork 的初始化如下,该函数在 cni.ProbeNetworkPlugins 函数中被调用:

func (plugin *cniNetworkPlugin) syncNetworkConfig() {
    network, err := getDefaultCNINetwork(plugin.confDir, plugin.binDirs)
    plugin.setDefaultNetwork(network)
}

getDefaulgotCNINetwork 的定义如下,主要工作是从 CNI 的 conf 目录中读取 conf 文件,完成默认 network 的配置初始化:

func getDefaulgotCNINetwork(confDir string, binDirs []string) (*cniNetwork, error) {
    files, err := libcni.ConfFiles(confDir, []string{".conf", ".conflist", ".json"})
    switch {
    case err != nil:
        return nil, err
    case len(files) == 0:
        return nil, fmt.Errorf("No networks found in %s", confDir)
    }

    sort.Strings(files)
    for _, confFile := range files {
        var confList *libcni.NetworkConfigList
        if strings.HasSuffix(confFile, ".conflist") {
            confList, err = libcni.ConfListFromFile(confFile)
        } else {
            conf, err := libcni.ConfFromFile(confFile)

            // Ensure the config has a "type" so we know what plugin to run.
            // Also catches the case where somebody put a conflist into a conf file.
            if conf.Network.Type == "" {
                glog.Warningf("Error loading CNI config file %s: no 'type'; perhaps this is a .conflist?", confFile)
                continue
            }

            confList, err = libcni.ConfListFromConf(conf)
            if err != nil {
                glog.Warningf("Error converting CNI config file %s to list: %v", confFile, err)
                continue
            }
        }
        if len(confList.Plugins) == 0 {
            glog.Warningf("CNI config list %s has no networks, skipping", confFile)
            continue
        }

        glog.V(4).Infof("Using CNI configuration file %s", confFile)

        network := &cniNetwork{
            name:          confList.Name,
            NetworkConfig: confList,
            CNIConfig:     &libcni.CNIConfig{Path: binDirs},
        }
        return network, nil
    }
    return nil, fmt.Errorf("No valid networks found in %s", confDir)
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注