本文源码跟踪基于 1.12.6
CRI
创建 Pod 入口
k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
// kubeGenericRuntimeManager::runtimeService: newInstrumentedRuntimeService(runtimeService)
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create init containers.
// 6. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
// ...
} else {
// Step 3: kill any running containers in this pod which are not to keep.
// ...
}
// Keep terminated init containers fairly aggressively controlled
// This is an optimization because container removals are typically handled
// by container garbage collector.
m.pruneInitContainersBeforeStart(pod, podStatus)
// We pass the value of the podIP down to generatePodSandboxConfig and
// generateContainerConfig, which in turn passes it to various other
// functions, in order to facilitate functionality that requires this
// value (hosts file and downward API) and avoid races determining
// the pod IP in cases where a container requires restart but the
// podIP isn't in the status manager yet.
//
// We default to the IP in the passed-in pod status, and overwrite it if the
// sandbox needs to be (re)started.
podIP := ""
if podStatus != nil {
podIP = podStatus.IP
}
// Step 4: Create a sandbox for the pod if necessary.
// 创建使用 pause 镜像创建的 sandbox
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
// ...
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
// 底层调用 m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
}
// Get podSandboxConfig for containers to start.
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
// Step 5: start the init container. 启动所有的 init container
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
// ...
}
// Successfully started the container; clear the entry in the failure
glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
}
// Step 6: start containers in podContainerChanges.ContainersToStart. 启动 container
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
// ...
glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
// ...
}
}
return
}
CRI 接口调用和实现
CRI 相关文档: Introducing Container Runtime Interface (CRI) in Kubernetes
k8s.io/kubernetes/pkg/kubelet/dockershim/docker_sandbox.go
dockerService 类实现了 CRI 接口:
type dockerService struct {
client libdocker.Interface
os kubecontainer.OSInterface
podSandboxImage string
streamingRuntime *streamingRuntime
streamingServer streaming.Server
network *network.PluginManager // 实现了网络
// Map of podSandboxID :: network-is-ready
networkReady map[string]bool
networkReadyLock sync.Mutex
containerManager cm.ContainerManager
// cgroup driver used by Docker runtime.
cgroupDriver string
checkpointManager checkpointmanager.CheckpointManager
// caches the version of the runtime.
// To be compatible with multiple docker versions, we need to perform
// version checking for some operations. Use this cache to avoid querying
// the docker daemon every time we need to do such checks.
versionCache *cache.ObjectCache
// startLocalStreamingServer indicates whether dockershim should start a
// streaming server on localhost.
startLocalStreamingServer bool
}
RuntimeService 接口定义:
// RuntimeService interface should be implemented by a container runtime.
// The methods should be thread-safe.
type RuntimeService interface {
RuntimeVersioner
ContainerManager
PodSandboxManager
ContainerStatsManager
// UpdateRuntimeConfig updates runtime configuration if specified
UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error
// Status returns the status of the runtime.
Status() (*runtimeapi.RuntimeStatus, error)
}
PodSandboxManager 接口定义:
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
// are thread-safe.
type PodSandboxManager interface {
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)
// StopPodSandbox stops the sandbox. If there are any running containers in the
// sandbox, they should be force terminated.
StopPodSandbox(podSandboxID string) error
// RemovePodSandbox removes the sandbox. If there are running containers in the
// sandbox, they should be forcibly removed.
RemovePodSandbox(podSandboxID string) error
// PodSandboxStatus returns the Status of the PodSandbox.
PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error)
// ListPodSandbox returns a list of Sandbox.
ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
}
以 RunPodSandbox
接口为例:
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
// For docker, PodSandbox is implemented by a container holding the network
// namespace for the pod.
// Note: docker doesn't use LogDirectory (yet).
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
config := r.GetConfig()
// Step 1: Pull the image for the sandbox.
image := defaultSandboxImage
if err := ensureSandboxImageExists(ds.client, image); err != nil {
}
createConfig, err := ds.makeSandboxDockerConfig(config, image)
createResp, err := ds.client.CreateContainer(*createConfig)
resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}
ds.setNetworkReady(createResp.ID, false)
// Step 3: Create Sandbox Checkpoint.
if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
return nil, err
}
// Step 4: Start the sandbox container.
// Assume kubelet's garbage collector would remove the sandbox later, if
// startContainer failed.
err = ds.client.StartContainer(createResp.ID)
// 设置 DNS config
// Do not invoke network plugins if in hostNetwork mode.
if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
return resp, nil
}
// Step 5: Setup networking for the sandbox.
// All pod networking is setup by a CNI plugin discovered at startup time.
// This plugin assigns the pod ip, sets up routes inside the sandbox,
// creates interfaces etc. In theory, its jurisdiction ends with pod
// sandbox networking, but it might insert iptables rules or open ports
// on the host as well, to satisfy parts of the pod spec that aren't
// recognized by the CNI standard yet.
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations)
return resp, nil
}
CNI
CNI 接口调用和实现
在 dockershim 模式下,cniNetworkPlugin
实现了 CNI 定义的接口,SetUpPod
函数定义如下:
k8s.io/kubernetes/pkg/kubelet/dockershim/network/cni/cni.go
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
netnsPath, err := plugin.host.GetNetNS(id.ID)
// Windows doesn't have loNetwork. It comes only with Linux
if plugin.loNetwork != nil {
if _, err = plugin.addToNetwork(plugin.loNetwork, name, namespace, id, netnsPath, annotations); err != nil {
}
}
_, err = plugin.addToNetwork(plugin.getDefaultNetwork(), name, namespace, id, netnsPath, annotations)
return err
}
network 相关的在函数 NewDockerService
中初始化:
k8s.io/kubernetes/pkg/kubelet/dockershim/docker_service.go
func NewDockerService(...){
// dockershim currently only supports CNI plugins.
// 使用传入的 cni 配置目录和bin目录,初始化插件,并供后续选择
pluginSettings.PluginBinDirs = cni.SplitDirs(pluginSettings.PluginBinDirString)
cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginBinDirs)
cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDirs))
// cniPlugins 传入全部的系统 CNI 插件
plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU)
ds.network = network.NewPluginManager(plug)
}
CNI 接口定义
在 DockerShim 中使用 cniNetworkPlugin
实现了 CNI 接口,CNI 接口如下:
k8s.io/kubernetes/pkg/kubelet/dockershim/network/cni/cni.go
type cniNetworkPlugin struct {
network.NoopNetworkPlugin
loNetwork *cniNetwork // 指向 lo CNI 的实现,参见后续分析
defaultNetwork *cniNetwork // 指向 default CNI 的实现,参见后续分析
sync.RWMutex
defaultNetwork *cniNetwork
host network.Host
execer utilexec.Interface
nsenterPath string
confDir string
binDirs []string
podCidr string
}
CNI 接口如下:
k8s.io/kubernetes/pkg/kubelet/dockershim/network/plugins.go面向 Pod 的 NetworkPlugin
接口 ,CNI 规范参见:https://github.com/containernetworking/cni/blob/master/SPEC.md#network-configuration
// Plugin is an interface to network plugins for the kubelet
type NetworkPlugin interface {
// Init initializes the plugin. This will be called exactly once
// before any other methods are called.
Init(host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error
// Called on various events like:
// NET_PLUGIN_EVENT_POD_CIDR_CHANGE
Event(name string, details map[string]interface{})
// Name returns the plugin's name. This will be used when searching
// for a plugin by name, e.g.
Name() string
// Returns a set of NET_PLUGIN_CAPABILITY_*
Capabilities() utilsets.Int
// SetUpPod is the method called after the infra container of
// the pod has been created but before the other containers of the
// pod are launched.
SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations map[string]string) error
// TearDownPod is the method called before a pod's infra container will be deleted
TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error
// GetPodNetworkStatus is the method called to obtain the ipv4 or ipv6 addresses of the container
GetPodNetworkStatus(namespace string, name string, podSandboxID kubecontainer.ContainerID) (*PodNetworkStatus, error)
// Status returns error if the network plugin is in error state
Status() error
}
cniNetworkPlugin
接口中包含接口 CNIConfig
,其实现了 libcni.CNI
接口
type cniNetworkPlugin struct {
network.NoopNetworkPlugin
loNetwork *cniNetwork // lo 本地端口
defaultNetwork *cniNetwork // 默认网络
}
type cniNetwork struct {
name string
NetworkConfig *libcni.NetworkConfigList
CNIConfig libcni.CNI // 指向 CNI 接口定义
}
CNI 接口定义
type CNI interface {
AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error
AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
DelNetwork(net *NetworkConfig, rt *RuntimeConf) error
}
CNIConfig
的初始化:
func ProbeNetworkPlugins(confDir string, binDirs []string) []network.NetworkPlugin {
plugin := &cniNetworkPlugin{
defaultNetwork: nil,
loNetwork: getLoNetwork(binDirs), // lo 初始化
execer: utilexec.New(),
confDir: confDir,
binDirs: binDirs,
}
return []network.NetworkPlugin{plugin}
}
getLoNetwork 函数定义:
func getLoNetwork(binDirs []string) *cniNetwork {
loConfig, err := libcni.ConfListFromBytes([]byte(`{
"cniVersion": "0.2.0",
"name": "cni-loopback",
"plugins":[{
"type": "loopback"
}]
}`))
if err != nil {
// The hardcoded config above should always be valid and unit tests will
// catch this
panic(err)
}
loNetwork := &cniNetwork{
name: "lo",
NetworkConfig: loConfig,
CNIConfig: &libcni.CNIConfig{Path: binDirs},
}
return loNetwork
}
cniNetworkPlugin
结构体中默认网络字段 defaultNetwork
的初始化如下,该函数在 cni.ProbeNetworkPlugins
函数中被调用:
func (plugin *cniNetworkPlugin) syncNetworkConfig() {
network, err := getDefaultCNINetwork(plugin.confDir, plugin.binDirs)
plugin.setDefaultNetwork(network)
}
getDefaulgotCNINetwork
的定义如下,主要工作是从 CNI 的 conf 目录中读取 conf 文件,完成默认 network 的配置初始化:
func getDefaulgotCNINetwork(confDir string, binDirs []string) (*cniNetwork, error) {
files, err := libcni.ConfFiles(confDir, []string{".conf", ".conflist", ".json"})
switch {
case err != nil:
return nil, err
case len(files) == 0:
return nil, fmt.Errorf("No networks found in %s", confDir)
}
sort.Strings(files)
for _, confFile := range files {
var confList *libcni.NetworkConfigList
if strings.HasSuffix(confFile, ".conflist") {
confList, err = libcni.ConfListFromFile(confFile)
} else {
conf, err := libcni.ConfFromFile(confFile)
// Ensure the config has a "type" so we know what plugin to run.
// Also catches the case where somebody put a conflist into a conf file.
if conf.Network.Type == "" {
glog.Warningf("Error loading CNI config file %s: no 'type'; perhaps this is a .conflist?", confFile)
continue
}
confList, err = libcni.ConfListFromConf(conf)
if err != nil {
glog.Warningf("Error converting CNI config file %s to list: %v", confFile, err)
continue
}
}
if len(confList.Plugins) == 0 {
glog.Warningf("CNI config list %s has no networks, skipping", confFile)
continue
}
glog.V(4).Infof("Using CNI configuration file %s", confFile)
network := &cniNetwork{
name: confList.Name,
NetworkConfig: confList,
CNIConfig: &libcni.CNIConfig{Path: binDirs},
}
return network, nil
}
return nil, fmt.Errorf("No valid networks found in %s", confDir)
}