istio 源码 – pilot 源码分析(原创)

[TOC]

架构

介绍

完整的 yaml 文件参见 pilot-yaml

Dockerfile

FROM istionightly/base_debug

ADD pilot-discovery /usr/local/bin/
ADD cacert.pem /cacert.pem
ENTRYPOINT ["/usr/local/bin/pilot-discovery"]

启动命令行

# ps -ef -www|grep pilot
$/usr/local/bin/pilot-discovery discovery

命令行帮助

# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery --help
Usage:
  pilot-discovery [command]

Available Commands:
  discovery   Start Istio proxy discovery service
  help        Help about any command
  request     Makes an HTTP request to Pilot metrics/debug endpoint
  version     Prints out build version information

discovery 相关的帮助

# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery discovery --help
Defaulting container name to discovery.

Start Istio proxy discovery service

Usage:
  pilot-discovery discovery [flags]

Flags:
  -a, --appNamespace string                 Restrict the applications namespace the controller manages; if not set, controller watches all namespaces
      --cfConfig string                     Cloud Foundry config file
      --clusterRegistriesConfigMap string   ConfigMap map for clusters config store
      --clusterRegistriesNamespace string   Namespace for ConfigMap which stores clusters configs
      --configDir string                    Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client.
      --consulserverInterval duration       Interval (in seconds) for polling the Consul service registry (default 2s)
      --consulserverURL string              URL for the Consul server
      --disable-install-crds                Disable discovery service from verifying the existence of CRDs at startup and then installing if not detected.  It is recommended to be disable for highly available setups.
      --discovery_cache                     Enable caching discovery service responses (default true)
      --domain string                       DNS domain suffix (default "cluster.local")
      --grpcAddr string                     Discovery service grpc address (default ":15010")
  -h, --help                                help for discovery
      --httpAddr string                     Discovery service HTTP address (default ":8080")
      --kubeconfig string                   Use a Kubernetes configuration file instead of in-cluster configuration
      --meshConfig string                   File name for Istio mesh configuration. If not specified, a default mesh will be used. (default "/etc/istio/config/mesh")
      --monitoringAddr string               HTTP address to use for the exposing pilot self-monitoring information (default ":9093")
  -n, --namespace string                    Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable
      --plugins stringSlice                 comma separated list of networking plugins to enable (default [authn,authz,health,mixer,envoyfilter])
      --profile                             Enable profiling via web interface host:port/debug/pprof (default true)
      --registries stringSlice              Comma separated list of platform service registries to read from (choose one or more from {Kubernetes, Consul, CloudFoundry, Mock, Config}) (default [Kubernetes])
      --resync duration                     Controller resync interval (default 1m0s)
      --secureGrpcAddr string               Discovery service grpc address, with https (default ":15012")
      --webhookEndpoint string              Webhook API endpoint (supports http://sockethost, and unix:///absolute/path/to/socket

Global Flags:
   省略

主要参数:

名称 默认值 备注
–appNamespace 与 helm 安装中的 oneNamespace 对应
–configDir 表明 pilot 的两种来源:配置文件和 CRD
–discovery_cache ture 启动 cache,有助于提升性能
–domain “cluster.local” k8s 中域后缀
–grpcAddr :15010
–httpAddr :8080
–meshConfig “/etc/istio/config/mesh”
–registries Kubernetes

代码分析

函数入口

    discoveryCmd = &cobra.Command{
        Use:   "discovery",
        Short: "Start Istio proxy discovery service.",
        Args:  cobra.ExactArgs(0),
        RunE: func(c *cobra.Command, args []string) error {
            // ...

            // Create the stop channel for all of the servers.
            stop := make(chan struct{})

            // Create the server for the discovery service.
            discoveryServer, err := bootstrap.NewServer(serverArgs)
            if err != nil {
                return fmt.Errorf("failed to create discovery service: %v", err)
            }

            // Start the server
            if err := discoveryServer.Start(stop); err != nil {
                return fmt.Errorf("failed to start discovery service: %v", err)
            }

            cmd.WaitSignal(stop)
            return nil
        },
    }

istio.io/istio/pilot/pkg/bootstrap/server.go

mesh 的默认配置参见:https://gist.github.com/DavadDi/f110459d339e260f818250287fc78ccc#file-mesh

// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args PilotArgs) (*Server, error) {
    // ...
    s := &Server{
        filewatcher: filewatcher.NewWatcher(),
    }

    // 省略错误处理
    s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient
    s.initMesh(&args) // 初始化配置,并添加到  filewatcher 中, /etc/istio/config/mesh

    // 1.0.5 版本的 cmd 中未包括,应该是 1.1 中添加 
    // serverArgs.NetworksConfigFile, "networksConfig", "/etc/istio/config/meshNetworks"
    // 初始化配置,并加入到 filewatcher 中监听
    s.initMeshNetworks(&args)

    // initMixerSan configures the mixerSAN configuration item. 
    // The mesh must already have been configured.
    s.initMixerSan(&args)

    // creates the config controller in the pilotConfig.
    // 最终会创建一个 crd.NewController 实例
    s.initConfigController(&args)
    /* 内部以 kube 为例,表明主要流程
        controller, err := s.makeKubeConfigController(args)
        s.configController = controller

        s.addStartFunc(func(stop <-chan struct{}) error {
            go s.configController.Run(stop)  // 1. 第一个启动的 configController
        })
    */

    // creates and initializes the service controllers
    s.initServiceControllers(&args)
    /*
       s.createK8sServiceControllers(serviceControllers, args)
        --> kube.NewController(s.kubeClient, args.Config.ControllerOptions)

        s.addStartFunc(func(stop <-chan struct{}) error {
        go s.ServiceController.Run(stop)  // 2. 第二个启动的 ServiceController
        return nil
    })
    */

    // 初始化 DiscoveryService gRPC 服务器,后面详细讲解
    // 添加2个 func 到 startFuncs 中
    s.initDiscoveryService(&args)

    // initializes the configuration for the pilot monitoring server.
    // 添加1个 func 到 startFuncs 中
    s.initMonitor(&args)

    // starts the secret controller to watch for remote
    // clusters and initialize the multicluster structures.
    // 主要指定了,连接到远程集群的配置和需要监控的 namespace,并启动一个 secret controller
    // 监视 istio/multiCluster=true 的 secret 
    // --clusterRegistriesConfigMap   ConfigMap map for clusters config store
    //  --clusterRegistriesNamespace string     Namespace for ConfigMap which stores clusters configs
    // 暂时不分析
    s.initClusterRegistries(&args)

    return s, nil
}

// 将 NewServer 函数中初始化的函数依次启动起来
func (s *Server) Start(stop <-chan struct{}) error {
    // Now start all of the components.
    for _, fn := range s.startFuncs {
        fn(stop)
    }
}

istio.io/api/mesh/v1alpha1/network.pb.go

其中 MeshNetworks 结构体和定义说明如下:

// MeshNetworks (config map) provides information about the set of networks
// inside a mesh and how to route to endpoints in each network. For example
//
// MeshNetworks(file/config map):
// networks:
// - network1:
//   - endpoints:
//     - fromRegistry: registry1 #must match secret name in kubernetes
//     - fromCidr: 192.168.100.0/22 #a VM network for example
//     gateways:
//     - registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local
//       port: 15443
//       locality: us-east-1a
type MeshNetworks struct {
    // REQUIRED: The set of networks inside this mesh. Each network should
    // have a unique name and information about how to infer the endpoints in
    // the network as well as the gateways associated with the network.
    Networks map[string]*Network `protobuf:"bytes,1,rep,name=networks" json:"networks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"`
}

经过对于 NewServer 函数的计划分析如下:

func NewServer(args PilotArgs) (*Server, error) {
    // 省略错误处理
    s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient

    // 初始化相关配置,并监视配置的变化情况
    // ...

    // creates the config controller in the pilotConfig.
    // 最终会创建一个 crd.NewController 实例
    s.initConfigController(&args)
    // s.makeKubeConfigController(args)
    // s.configController.Run(stop)


    // creates and initializes the service controllers
    s.initServiceControllers(&args)
    // kube.NewController(s.kubeClient, args.Config.ControllerOptions)
    // go s.ServiceController.Run(stop)

    // 初始化 DiscoveryService gRPC 服务器,后面详细讲解
    // 添加 2 个 func 到 startFuncs 中
    s.initDiscoveryService(&args)

    return s, nil
}


ConfigController

// initConfigController creates the config controller in the pilotConfig.
func (s *Server) initConfigController(args *PilotArgs) error {
    // k8s 方式下初始化
    controller, err := s.makeKubeConfigController(args)
    s.configController = controller

    // Defer starting the controller until after the service is created.
    s.addStartFunc(func(stop <-chan struct{}) error {
        go s.configController.Run(stop)
        return nil
    })

    //...

    // Create the config store.
    s.istioConfigStore = model.MakeIstioStore(s.configController)

    return nil
}
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) {
    kubeCfgFile := s.getKubeCfgFile(args)
    configClient, err := crd.NewClient(
        kubeCfgFile, "", 
        model.IstioConfigTypes, // 全部相关的 CRD 定义
        args.Config.ControllerOptions.DomainSuffix)


    if !args.Config.DisableInstallCRDs {
        // 注册自定义的 CRD
        configClient.RegisterResources()
    }

    return crd.NewController(configClient, args.Config.ControllerOptions), nil
}

其中 crd.NewClient 中的参数 model.IstioConfigTypes 包含了相关的全部 CRD 的定义:

istio.io/istio/pilot/pkg/model/config.go

    // IstioConfigTypes lists all Istio config types with schemas and validation
    IstioConfigTypes = ConfigDescriptor{
        VirtualService,
        Gateway,
        ServiceEntry,
        DestinationRule,
        EnvoyFilter,
        Sidecar,
        HTTPAPISpec,
        HTTPAPISpecBinding,
        QuotaSpec,
        QuotaSpecBinding,
        AuthenticationPolicy,
        AuthenticationMeshPolicy,
        ServiceRole,
        ServiceRoleBinding,
        RbacConfig,
        ClusterRbacConfig,
    }

其中 crd.NewController 定义在 istio.io/istio/pilot/pkg/config/kube/crd/controller.go 文件中:

// NewController creates a new Kubernetes controller for CRDs
// Use "" for namespace to listen for all namespace changes
func NewController(client *Client, options kube.ControllerOptions) model.ConfigStoreCache {
    log.Infof("CRD controller watching namespaces %q", options.WatchedNamespace)

    // Queue requires a time duration for a retry delay after a handler error
    out := &controller{
        client: client,
        queue:  kube.NewQueue(1 * time.Second),
        kinds:  make(map[string]cacheHandler),
    }

    // add stores for CRD kinds
    for _, schema := range client.ConfigDescriptor() {
        out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod)
    }

    return out
}

对于 CRD 的监视,每一类资源需要启动一个单独的客户端连接,目前 CRD 的 Group 主要有 network/config/authentication/rbac 等;

// controller is a collection of synchronized resource watchers.
// Caches are thread-safe
type controller struct {
    client *Client  // 每类资源一个连接
    queue  kube.Queue
    kinds  map[string]cacheHandler
}

type cacheHandler struct {
    informer cache.SharedIndexInformer
    handler  *kube.ChainHandler
}
// Client is a basic REST client for CRDs implementing config store
type Client struct {
    // Map of apiVersion to restClient.
    clientset map[string]*restClient

    // domainSuffix for the config metadata
    domainSuffix string
}

ServiceController

istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go

结构定义如下:

type Controller struct {
    domainSuffix string

    client    kubernetes.Interface
    queue     Queue
    services  cacheHandler
    endpoints cacheHandler
    nodes     cacheHandler

    pods *PodCache

    //....
}

NewController 函数定义如下:

// NewController creates a new Kubernetes controller
// Created by bootstrap and multicluster (see secretcontroler).
func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
    log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
        options.WatchedNamespace, options.ResyncPeriod)

    // Queue requires a time duration for a retry delay after a handler error
    out := &Controller{
        domainSuffix:               options.DomainSuffix,
        client:                     client,
        queue:                      NewQueue(1 * time.Second),
        ClusterID:                  options.ClusterID,
        XDSUpdater:                 options.XDSUpdater,
        servicesMap:                make(map[model.Hostname]*model.Service),
        externalNameSvcInstanceMap: make(map[model.Hostname][]*model.ServiceInstance),
    }

    sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))

    svcInformer := sharedInformers.Core().V1().Services().Informer()
    out.services = out.createCacheHandler(svcInformer, "Services")

    epInformer := sharedInformers.Core().V1().Endpoints().Informer()
    out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")

    nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
    out.nodes = out.createCacheHandler(nodeInformer, "Nodes")

    podInformer := sharedInformers.Core().V1().Pods().Informer()
    out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)

    return out
}

从上面代码可以很清晰看到,ServiceController 监听特定 namespace 下的以下资源:

  1. Services
  2. Endpoints
  3. Nodes
  4. Pod

ServiceDiscovery

istio.io/istio/pilot/pkg/bootstrap/server.go

func (s *Server) initDiscoveryService(args *PilotArgs) error {
    // 需要注意 env 保存了后面使用的变量包括 
    // s.istioConfigStore  s.ServiceController s.ServiceController
    environment := &model.Environment{
        Mesh:             s.mesh,
        MeshNetworks:     s.meshNetworks,
        IstioConfigStore: s.istioConfigStore, // istio routing rules
        ServiceDiscovery: s.ServiceController, // service list 
        ServiceAccounts:  s.ServiceController,
        MixerSAN:         s.mixerSAN,
    }

    // Set up discovery service
    discovery, err := envoy.NewDiscoveryService(
        environment,
        args.DiscoveryOptions,
    )

    s.mux = discovery.RestContainer.ServeMux

    // 创建  envoyv2.NewDiscoveryServer 对应的 gRPC Server
    s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
        environment,
        istio_networking.NewConfigGenerator(args.Plugins),
        s.ServiceController, 
        s.configController)

    s.EnvoyXdsServer.InitDebug(s.mux, s.ServiceController)

    // ...

    // create grpc/http server
    s.initGrpcServer(args.KeepaliveOptions)
    s.httpServer = &http.Server{
        Addr:    args.DiscoveryOptions.HTTPAddr,
        Handler: s.mux,
    }

    // create http listener
    listener, err := net.Listen("tcp", args.DiscoveryOptions.HTTPAddr)
    s.HTTPListeningAddr = listener.Addr()

    // create grpc listener
    grpcListener, err := net.Listen("tcp", args.DiscoveryOptions.GrpcAddr)
    s.GRPCListeningAddr = grpcListener.Addr()

    s.addStartFunc(func(stop <-chan struct{}) error {
        // 启动 http  server goroutine
        // 启动 gRPC server goroutine
        // 等待关闭 goroutine

        return nil
    })

    // run secure grpc server

    return nil
}

istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go,envoyv2.NewDiscoveryServer 函数定义:

//  s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
//        environment,
//        istio_networking.NewConfigGenerator(args.Plugins),
//        s.ServiceController, 
//        s.configController)
// NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures
func NewDiscoveryServer(env *model.Environment, generator core.ConfigGenerator, ctl model.Controller, configCache model.ConfigStoreCache) *DiscoveryServer {
    // 创建 DiscoveryServer 对象
    out := &DiscoveryServer{
        Env:                     env,
        ConfigGenerator:         generator, // generates xDS responses
        ConfigController:        configCache,
        EndpointShardsByService: map[string]*EndpointShards{},
        WorkloadsByID:           map[string]*Workload{},
        edsUpdates:              map[string]*EndpointShards{},
        concurrentPushLimit:     make(chan struct{}, 20), 
        updateChannel:           make(chan *updateReq, 10),
    }
    env.PushContext = model.NewPushContext()

    // 处理相关的更新操作
    // handleUpdates处理来自 updateChannel 的事件它确保自上次事件处理之前至少已经过了minQuiet时间。
    // 它还确保在接收事件和处理事件之间最多经过 maxDelay。
    // 最后调用 doPush 函数进行推送,根据全量推送标记,将最近更新的 eds 相关信息通过 
    // XDS Incremental Push 或者 全量推送出去
    go out.handleUpdates()

    // 以下三种清空 DiscoveryServer 的本地缓存,并注册清理缓存的函数
    // 1. service 相关的信息有变化的时候,
    // 2. jwt public key 发生变化
    // 3. Istio CRD 有变化的
    // 当以上三种任一情况发生的时候,会设置信息到 out.handleUpdates() 函数

    // 周期性更新
    go out.periodicRefresh()

    // 周期性更新 metrics
    go out.periodicRefreshMetrics()

    out.DebugConfigs = pilot.DebugConfigs

    pushThrottle := intEnv(pilot.PushThrottle, 10)
    pushBurst := intEnv(pilot.PushBurst, 100)

    adsLog.Infof("Starting ADS server with rateLimiter=%d burst=%d", pushThrottle, pushBurst)
    out.rateLimiter = rate.NewLimiter(rate.Limit(pushThrottle), pushBurst)
    out.initRateLimiter = rate.NewLimiter(rate.Limit(pushThrottle*2), pushBurst*2)

    return out
}

ADS 相关定义:

https://github.com/envoyproxy/data-plane-api/blob/master/envoy/service/discovery/v2/ads.proto

// See https://github.com/lyft/envoy-api#apis for a description of the role of
// ADS and how it is intended to be used by a management server. ADS requests
// have the same structure as their singleton xDS counterparts, but can
// multiplex many resource types on a single stream. The type_url in the
// DiscoveryRequest/DiscoveryResponse provides sufficient information to recover
// the multiplexed singleton APIs at the Envoy instance and management server.
service AggregatedDiscoveryService {
  // This is a gRPC-only API.
  rpc StreamAggregatedResources(stream envoy.api.v2.DiscoveryRequest)
      returns (stream envoy.api.v2.DiscoveryResponse) {
  }

  rpc IncrementalAggregatedResources(stream envoy.api.v2.IncrementalDiscoveryRequest)
      returns (stream envoy.api.v2.IncrementalDiscoveryResponse) {
  }
}

https://github.com/envoyproxy/data-plane-api/blob/master/envoy/api/v2/discovery.proto

// A DiscoveryRequest requests a set of versioned resources of the same type for
// a given Envoy node on some API.
message DiscoveryRequest {
  // The version_info provided in the request messages will be the version_info
  // received with the most recent successfully processed response or empty on
  // the first request. It is expected that no new request is sent after a
  // response is received until the Envoy instance is ready to ACK/NACK the new
  // configuration. ACK/NACK takes place by returning the new API config version
  // as applied or the previous API config version respectively. Each type_url
  // (see below) has an independent version associated with it.
  string version_info = 1;

  // The node making the request.
  core.Node node = 2;

  // List of resources to subscribe to, e.g. list of cluster names or a route
  // configuration name. If this is empty, all resources for the API are
  // returned. LDS/CDS expect empty resource_names, since this is global
  // discovery for the Envoy instance. The LDS and CDS responses will then imply
  // a number of resources that need to be fetched via EDS/RDS, which will be
  // explicitly enumerated in resource_names.
  repeated string resource_names = 3;

  // Type of the resource that is being requested, e.g.
  // "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment". This is implicit
  // in requests made via singleton xDS APIs such as CDS, LDS, etc. but is
  // required for ADS.
  string type_url = 4;

  // nonce corresponding to DiscoveryResponse being ACK/NACKed. See above
  // discussion on version_info and the DiscoveryResponse nonce comment. This
  // may be empty if no nonce is available, e.g. at startup or for non-stream
  // xDS implementations.
  string response_nonce = 5;

  // This is populated when the previous :ref:`DiscoveryResponse <envoy_api_msg_DiscoveryResponse>`
  // failed to update configuration. The *message* field in *error_details* provides the Envoy
  // internal exception related to the failure. It is only intended for consumption during manual
  // debugging, the string provided is not guaranteed to be stable across Envoy versions.
  google.rpc.Status error_detail = 6;
}

message DiscoveryResponse {
  // The version of the response data.
  string version_info = 1;

  // The response resources. These resources are typed and depend on the API being called.
  repeated google.protobuf.Any resources = 2 [(gogoproto.nullable) = false];

  // [#not-implemented-hide:]
  // Canary is used to support two Envoy command line flags:
  //
  // * --terminate-on-canary-transition-failure. When set, Envoy is able to
  //   terminate if it detects that configuration is stuck at canary. Consider
  //   this example sequence of updates:
  //   - Management server applies a canary config successfully.
  //   - Management server rolls back to a production config.
  //   - Envoy rejects the new production config.
  //   Since there is no sensible way to continue receiving configuration
  //   updates, Envoy will then terminate and apply production config from a
  //   clean slate.
  // * --dry-run-canary. When set, a canary response will never be applied, only
  //   validated via a dry run.
  bool canary = 3;

  // Type URL for resources. This must be consistent with the type_url in the
  // Any messages for resources if resources is non-empty. This effectively
  // identifies the xDS API when muxing over ADS.
  string type_url = 4;

  // For gRPC based subscriptions, the nonce provides a way to explicitly ack a
  // specific DiscoveryResponse in a following DiscoveryRequest. Additional
  // messages may have been sent by Envoy to the management server for the
  // previous version on the stream prior to this DiscoveryResponse, that were
  // unprocessed at response send time. The nonce allows the management server
  // to ignore any further DiscoveryRequests for the previous version until a
  // DiscoveryRequest bearing the nonce. The nonce is optional and is not
  // required for non-stream based xDS implementations.
  string nonce = 5;

  // [#not-implemented-hide:]
  // The control plane instance that sent the response.
  core.ControlPlane control_plane = 6;
}

关于 ADS 保证一致性的内容参见:envoy-的-xds-rest-和-grpc-协议详解 中的 ”最终一致性考虑“ 章节:

一般来说,为避免流量丢弃,更新的顺序应该遵循 make before break 模型,其中
* 必须始终先推送 CDS 更新(如果有)。
* EDS 更新(如果有)必须在相应集群的 CDS 更新后到达。
* LDS 更新必须在相应的 CDS/EDS 更新后到达。
* 与新添加的监听器相关的 RDS 更新必须在最后到达。
* 最后,删除过期的 CDS 集群和相关的 EDS 端点(不再被引用的端点)。

ADS 允许单一管理服务器通过单个 gRPC 流,提供所有的 API 更新。配合仔细规划的更新顺序,ADS 可规避更新过程中流量丢失。

pilot/pkg/proxy/envoy/v2/ads.go

// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    // ...

    // InitContext returns immediately if the context was already initialized.
    // InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
    // 1. initServiceRegistry
    // 2. initVirtualServices
    // 3. initDestinationRules
    // 4. initAuthorizationPolicies
    // 5. InitSidecarScopes
    err := s.globalPushContext().InitContext(s.Env)
    con := newXdsConnection(peerAddr, stream)

    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    // 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
    go receiveThread(con, reqChannel, &receiveError)

    for {
        // Block until either a request is received or a push is triggered.
        select {
        case discReq, ok := <-reqChannel:
              err = s.initConnectionNode(discReq, con)

            switch discReq.TypeUrl {
            case ClusterType:
                // ...
                // CDS REQ is the first request an envoy makes. This shows up
                // immediately after connect. It is followed by EDS REQ as
                // soon as the CDS push is returned.
                adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
                con.CDSWatch = true
                err := s.pushCds(con, s.globalPushContext(), versionInfo())

            case ListenerType:
                // ...
                adsLog.Debugf("ADS:LDS: REQ %s %v", con.ConID, peerAddr)
                con.LDSWatch = true
                err := s.pushLds(con, s.globalPushContext(), true, versionInfo())

            case RouteType:
                // ...
                adsLog.Debugf("ADS:RDS: REQ %s %s  routes: %d", peerAddr, con.ConID, len(con.Routes))
                err := s.pushRoute(con, s.globalPushContext())

            case EndpointType:
                // ...
                // 各种错误处理

                for _, cn := range con.Clusters {
                    s.removeEdsCon(cn, con.ConID, con)
                }

                for _, cn := range clusters {
                    s.addEdsCon(cn, con.ConID, con)
                }

                con.Clusters = clusters
                adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters))
                err := s.pushEds(s.globalPushContext(), con, true, nil)
                if err != nil {
                    return err
                }

            default:
                adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
            }
            // ...
        case pushEv := <-con.pushChannel:
            // ...

            err := s.pushConnection(con, pushEv)
            if err != nil {
                return nil
            }

        }
    }
}

istio.io/istio/pilot/pkg/model/push_context.go

// PushContext tracks the status of a push - metrics and errors.
// Metrics are reset after a push - at the beginning all
// values are zero, and when push completes the status is reset.
// The struct is exposed in a debug endpoint - fields public to allow
// easy serialization as json.
type PushContext struct {

    // privateServices are reachable within the same namespace.
    privateServicesByNamespace map[string][]*Service
    // publicServices are services reachable within the mesh.
    publicServices []*Service

    privateVirtualServicesByNamespace map[string][]Config
    publicVirtualServices             []Config

    // destination rules are of three types:
    // namespaceLocalDestRules: all public/private dest rules pertaining to a service defined in a given namespace
    //  namespaceExportedDestRules: all public dest rules pertaining to a service defined in a namespace
    //  allExportedDestRules: all (public) dest rules across all namespaces
    // We need the allExportedDestRules in addition to namespaceExportedDestRules because we select
    // the dest rule based on the most specific host match, and not just any destination rule
    namespaceLocalDestRules    map[string]*processedDestRules
    namespaceExportedDestRules map[string]*processedDestRules
    allExportedDestRules       *processedDestRules

    // sidecars for each namespace
    sidecarsByNamespace map[string][]*SidecarScope
    ////////// END ////////

    // The following data is either a global index or used in the inbound path.
    // Namespace specific views do not apply here.

    // ServiceByHostname has all services, indexed by hostname.
    ServiceByHostname map[Hostname]*Service `json:"-"`

    // AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there
    // are no authorization policies in the cluster.
    AuthzPolicies *AuthorizationPolicies `json:"-"`

    // ServicePort2Name is used to keep track of service name and port mapping.
    // This is needed because ADS names use port numbers, while endpoints use
    // port names. The key is the service name. If a service or port are not found,
    // the endpoint needs to be re-evaluated later (eventual consistency)
    ServicePort2Name map[string]PortList `json:"-"`

    initDone bool
}


// InitContext will initialize the data structures used for code generation.
// This should be called before starting the push, from the thread creating
// the push context.
func (ps *PushContext) InitContext(env *Environment) error {
    ps.Mutex.Lock()
    defer ps.Mutex.Unlock()
    if ps.initDone {
        return nil
    }
    ps.Env = env
    var err error

    // Caches list of services in the registry, and creates a map
    // of hostname to service -> ServicePort2Name map[string]PortList `json:"-"`
    ps.initServiceRegistry(env)

    // Caches list of virtual services -> publicVirtualServices []Config
    ps.initVirtualServices(env)

    // Split out of DestinationRule expensive conversions - once per push.
    // 最后保存到以下三个变量中:
    //  * namespaceLocalDestRules    map[string]*processedDestRules
    //  * namespaceExportedDestRules map[string]*processedDestRules
    //  * allExportedDestRules       *processedDestRules
    ps.initDestinationRules(env)

    // Get the ClusterRbacConfig -> AuthzPolicies *AuthorizationPolicies
    ps.initAuthorizationPolicies(env)

    // Must be initialized in the end -> sidecarsByNamespace map[string][]*SidecarScope
    ps.InitSidecarScopes(env)

    ps.initDone = true
    return nil
}

CDS 为 ADS 中第一个发送的信息,后续我们以 CDS 为例进行详细分析

// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    // ...

    // InitContext returns immediately if the context was already initialized.
    // InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
    // 1. initServiceRegistry
    // 2. initVirtualServices
    // 3. initDestinationRules
    // 4. initAuthorizationPolicies
    // 5. InitSidecarScopes
    err := s.globalPushContext().InitContext(s.Env)
    con := newXdsConnection(peerAddr, stream)

    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    // 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
    go receiveThread(con, reqChannel, &receiveError)

    for {
        // Block until either a request is received or a push is triggered.
        select {
        case discReq, ok := <-reqChannel:
            // 主要是调用 `ParseServiceNodeWithMetadata` 函数,
            // 从 Req 的消息中获取到各种信息,并生产 `Proxy` 对象
            // 当前 discReq.Node.Id 格式为 Type~IPAddress~ID~Domain
            err = s.initConnectionNode(discReq, con)

            switch discReq.TypeUrl {
            case ClusterType:
                // 如果已经发送过 CDS 数据后的响应消息的处理
                if con.CDSWatch {
                    // ...
                }
                // CDS REQ is the first request an envoy makes. This shows up
                // immediately after connect. It is followed by EDS REQ as
                // soon as the CDS push is returned.
                adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
                con.CDSWatch = true
                err := s.pushCds(con, s.globalPushContext(), versionInfo())
                if err != nil {
                    return err
                }

                //...
    }

core.Node

Identifies a specific Envoy instance. The node identifier is presented to the management server, which may use this identifier to distinguish per Envoy configuration for serving.{
“id”: “…”,
“cluster”: “…”,
“metadata”: “{…}”,
“locality”: “{…}”,
“build_version”: “…”
}

id
(string) An opaque node identifier for the Envoy node. This also provides the local service node name. It should be set if any of the following features are used: statsd, CDS, and HTTP tracing, either in this message or via –service-node.

cluster
(string) Defines the local service cluster name where Envoy is running. Though optional, it should be set if any of the following features are used: statsd, health check cluster verification, runtime override directory, user agent addition, HTTP global rate limiting, CDS, and HTTP tracing, either in this message or via –service-cluster.

metadata
(Struct) Opaque metadata extending the node identifier. Envoy will pass this directly to the management server.

locality
(core.Locality) Locality specifying where the Envoy instance is running.

build_version
(string) This is motivated by informing a management server during canary which version of Envoy is being tested in a heterogeneous fleet. This will be set by Envoy in management server RPCs.

initConnectionNode 函数中,主要是调用 ParseServiceNodeWithMetadata 函数,从 Req 的消息中获取到各种信息,并生产 Proxy 对象;

istio.io/istio/pilot/pkg/model/context.go

func ParseServiceNodeWithMetadata(s string, metadata map[string]string) (*Proxy, error) {
}
// Proxy contains information about an specific instance of a proxy (envoy sidecar, gateway,
// etc). The Proxy is initialized when a sidecar connects to Pilot, and populated from
// 'node' info in the protocol as well as data extracted from registries.
//
// In current Istio implementation nodes use a 4-parts '~' delimited ID.
// Type~IPAddress~ID~Domain
type Proxy struct {
    // ClusterID specifies the cluster where the proxy resides.
    // TODO: clarify if this is needed in the new 'network' model, likely needs to
    // be renamed to 'network'
    ClusterID string

    // Type specifies the node type. First part of the ID.
    Type NodeType

    // IPAddresses is the IP addresses of the proxy used to identify it and its
    // co-located service instances. Example: "10.60.1.6". In some cases, the host
    // where the poxy and service instances reside may have more than one IP address
    IPAddresses []string

    // ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and
    // namespace.
    ID string

    // Locality is the location of where Envoy proxy runs.
    Locality Locality

    // DNSDomain defines the DNS domain suffix for short hostnames (e.g.
    // "default.svc.cluster.local")
    DNSDomain string

    // TrustDomain defines the trust domain of the certificate
    TrustDomain string

    // ConfigNamespace defines the namespace where this proxy resides
    // for the purposes of network scoping.
    // NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES
    ConfigNamespace string

    // Metadata key-value pairs extending the Node identifier
    Metadata map[string]string

    // the sidecarScope associated with the proxy
    SidecarScope *SidecarScope
}

推送 CDS 的核心实现通过函数 s.pushCds(con, s.globalPushContext(), versionInfo())

istio.io/istio/pilot/pkg/proxy/envoy/v2/cds.go

func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
    // 通过当前的 con.modelNode 和 push 的上下文生成对应的 rawClusters 对象 []*xdsapi.Cluster
    rawClusters, err := s.generateRawClusters(con.modelNode, push)

    // DebugConfigs controls saving snapshots of configs for /debug/adsz.
    // Defaults to false, can be enabled with PILOT_DEBUG_ADSZ_CONFIG=1
    // 如果通过 env 开启了此选项,则可以使用 9093 端口 /debug/adsz 查看详细信息,会增加内存开销
    if s.DebugConfigs {
        con.CDSClusters = rawClusters
    }

    response := con.clusters(rawClusters)
    err = con.send(response)
    return nil
}

因此 s.generateRawClusters(con.modelNode, push) 的作用不言而喻,就是将 push 上下文中与 CDS 相关的数据整理并封装成 CDS Reponse 的格式。

func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) ([]*xdsapi.Cluster, error) {
    rawClusters, err := s.ConfigGenerator.BuildClusters(s.Env, node, push)

    // 对 rawClusters 中的信息进行 Validate 验证
    return rawClusters, nil
}

对象 rawClusters 是通过 s.ConfigGenerator.BuildClusters 函数基于 s.Env, node, push 三者的信息组合而生成出来的:

  • s.Env 保存了 ServiceController 和 ConfigController 等资源的本地缓存信息

  • node 为本次 Req 请求中生成的包含 Node 相关信息的对象

  • push 为本次推送的上下文,已经将本次推送过程中需要的信息完成了初步的整理(从 s.Env 中生成出来的)

istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go

BuildClusters 函数的实现如下:

// BuildClusters returns the list of clusters for the given proxy. This is the CDS output
// For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname
// Cluster type based on resolution
// For inbound (sidecar only): Cluster for each inbound endpoint port and for each service port
func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) {
    clusters := make([]*apiv2.Cluster, 0)

    switch proxy.Type {
    case model.SidecarProxy:
        // GetProxyServiceInstances returns service instances co-located with the proxy
        // 获取与 proxy 所在主机上的 service instance,包括 headless 服务
        instances, err := env.GetProxyServiceInstances(proxy)

        sidecarScope := proxy.SidecarScope
        recomputeOutboundClusters := true
        // 追加 OutboundClusters
        if recomputeOutboundClusters {
            clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...)
        }

        // Let ServiceDiscovery decide which IP and Port are used for management if
        // there are multiple IPs
        managementPorts := make([]*model.Port, 0)
        for _, ip := range proxy.IPAddresses {
            managementPorts = append(managementPorts, env.ManagementPorts(ip)...)
        }

        // 追加与 proxy ip 上 managementPorts 相关的 InboundClusters
        clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...)

    default: // Gateways
        // ...
    }

    // Add a blackhole and passthrough cluster for catching traffic to unresolved routes
    // DO NOT CALL PLUGINS for these two clusters.
    clusters = append(clusters, buildBlackHoleCluster())
    clusters = append(clusters, buildDefaultPassthroughCluster())

    // resolves cluster name conflicts. 
    return normalizeClusters(push, proxy, clusters), nil
}

临时的调试方法,挂载到了 9093 端口,但是是否保存 ads 相关的信息,还会受到 pilot 相关选项的限制,参见

istio.io/istio/pkg/features/pilot/pilot.go

// DebugConfigs controls saving snapshots of configs for /debug/adsz.
  // Defaults to false, can be enabled with PILOT_DEBUG_ADSZ_CONFIG=1
  // For larger clusters it can increase memory use and GC - useful for small tests.
  DebugConfigs = os.Getenv("PILOT_DEBUG_ADSZ_CONFIG") == "1"
  

istio.io/istio/pilot/pkg/proxy/envoy/v2/debug.go

// InitDebug initializes the debug handlers and adds a debug in-memory registry.
func (s *DiscoveryServer) InitDebug(mux *http.ServeMux, sctl *aggregate.Controller) {
    // For debugging and load testing v2 we add an memory registry.
    s.MemRegistry = NewMemServiceDiscovery(
        map[model.Hostname]*model.Service{ // mock.HelloService.Hostname: mock.HelloService,
        }, 2)
    s.MemRegistry.EDSUpdater = s
    s.MemRegistry.ClusterID = "v2-debug"

    sctl.AddRegistry(aggregate.Registry{
        ClusterID:        "v2-debug",
        Name:             serviceregistry.ServiceRegistry("memAdapter"),
        ServiceDiscovery: s.MemRegistry,
        ServiceAccounts:  s.MemRegistry,
        Controller:       s.MemRegistry.controller,
    })

    mux.HandleFunc("/ready", s.ready)

    mux.HandleFunc("/debug/edsz", s.edsz)
    mux.HandleFunc("/debug/adsz", s.adsz)
    mux.HandleFunc("/debug/cdsz", cdsz)
    mux.HandleFunc("/debug/syncz", Syncz)

    mux.HandleFunc("/debug/registryz", s.registryz)
    mux.HandleFunc("/debug/endpointz", s.endpointz)
    mux.HandleFunc("/debug/endpointShardz", s.endpointShardz)
    mux.HandleFunc("/debug/workloadz", s.workloadz)
    mux.HandleFunc("/debug/configz", s.configz)

    mux.HandleFunc("/debug/authenticationz", s.authenticationz)
    mux.HandleFunc("/debug/config_dump", s.ConfigDump)
    mux.HandleFunc("/debug/push_status", s.PushStatusHandler)
}

PushContext 初始化

istio.io/istio/pilot/pkg/model/push_context.go

func (ps *PushContext) InitContext(env *Environment) error {
    // 只会初始化一次,如果已经初始化了则直接返回
    if ps.initDone {
        return nil
    }

    ps.initServiceRegistry(env)
    ps.initVirtualServices(env)
    ps.initDestinationRules(env)
    ps.initAuthorizationPolicies(env)
    ps.InitSidecarScopes(env)
}

其中 env 保存了能够使用的全部信息,包括 ServiceDiscovery 接口。

istio.io/istio/pilot/pkg/model/context.go

// Environment provides an aggregate environmental API for Pilot
type Environment struct {
    // Discovery interface for listing services and instances.
    ServiceDiscovery

    ServiceAccounts
    IstioConfigStore

    Mesh *meshconfig.MeshConfig
    MixerSAN []string
    PushContext *PushContext
    MeshNetworks *meshconfig.MeshNetworks
}
  • initServiceRegistry
    • 从 env.Services() -> ServiceDiscovery::Services
    • 涉及操作的变量包括:
    • privateServicesByNamespace ns 为 key
    • publicServices 列表
    • ServiceByHostname s.Hostname -> Service
    • ServicePort2Name s.Hostname -> Ports
  • initVirtualServices
    • 从 env.List(VirtualService.Type, NamespaceAll) -> IstioConfigStore::ConfigStore::List

    • 将 virtual services 的 host shortnames 转换成 FQDNs

    • 涉及操作的变量包括:

    • privateVirtualServicesByNamespace ns 为 key

    • publicVirtualServices 列表

    • 相关定义

    VirtualService -> (Hosts, []*HTTPRoute) -> (HTTPMatchRequest, HTTPRouteDestination)


    apiVersion: networking.istio.io/v1alpha3 kind: VirtualService metadata: name: reviews-route spec: hosts: - reviews.prod.svc.cluster.local http: # HTTPRoute - match: - uri: prefix: "/wpcatalog" - uri: prefix: "/consumercatalog" rewrite: uri: "/newcatalog" route: - destination: # HTTPRouteDestination host: reviews.prod.svc.cluster.local subset: v2 - route: - destination: host: reviews.prod.svc.cluster.local subset: v1 apiVersion: networking.istio.io/v1alpha3 kind: DestinationRule metadata: name: reviews-destination spec: host: reviews.prod.svc.cluster.local subsets: - name: v1 labels: version: v1 - name: v2 labels: version: v2
  • initDestinationRules
    • env.List(DestinationRule.Type, NamespaceAll) -> IstioConfigStore::ConfigStore::List
    • 操作变量涉及
    • namespaceLocalDestRules ns -> processedDestRules, ConfigScope == PRIVATE
    • namespaceExportedDestRules ns -> processedDestRules, ConfigScope == PUBLIC
    • allExportedDestRules 列表
  • initAuthorizationPolicies
    • env.IstioConfigStore.ClusterRbacConfig()
    • 操作变量
    • AuthzPolicies
  • InitSidecarScopes
    • env.List(Sidecar.Type, NamespaceAll) -> env.List(Sidecar.Type, NamespaceAll)
    • 操作的变量
    • sidecarsByNamespace ns -> SidecarScope

CDS 初始化流程详解

DiscoveryServer::pushCds -> DiscoveryServer::generateRawClusters -> ConfigGenerator.BuildClusters,最终的函数主体在函数 BuildClusters 中实现

BuildClusters

istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go

// BuildClusters returns the list of clusters for the given proxy. This is the CDS output
// For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname
// Cluster type based on resolution: For inbound (sidecar only): 
// Cluster for each inbound endpoint port and for each service port
func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) {
        clusters := make([]*apiv2.Cluster, 0)

    switch proxy.Type {
    case model.SidecarProxy:
        // 1. 获取 Proxy 所在 Pod 上的监听的服务名,在 k8s 中是通过 proxyIP 进行相关的 Pod 查找
        // 主要用于生成 Inbound 相关的集群
        // GetProxyServiceInstances returns the service instances that 
        // co-located(in the same network namespace and security context) 
        // with a given Proxy
        instances, err := env.GetProxyServiceInstances(proxy)

        sidecarScope := proxy.SidecarScope
        recomputeOutboundClusters := true
        if configgen.CanUsePrecomputedCDS(proxy) {
                // 如果已经缓存过了,则直接使用
            }
        }
        // 2. 如果没有缓存,则直接计算 configgen.buildOutboundClusters(env, proxy, push)
        if recomputeOutboundClusters {
            clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...)
        }

        // Let ServiceDiscovery decide which IP and Port are used for management if
        // there are multiple IPs
        // managementPorts 主要是从 Pod 中获取 Liveness && Readiness probes 的端口
        managementPorts := make([]*model.Port, 0)
        for _, ip := range proxy.IPAddresses {
            managementPorts = append(managementPorts, env.ManagementPorts(ip)...)
        }

        // 
        clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...) 

    // ...

    // Add a blackhole and passthrough cluster for catching traffic to unresolved routes
    // DO NOT CALL PLUGINS for these two clusters.
    // 添加 blackhole 和 passthrough cluster
    clusters = append(clusters, buildBlackHoleCluster())
    clusters = append(clusters, buildDefaultPassthroughCluster())

}

以多版本的 HelloWorld 为例,最终生成的样例如下,完整样例参见 istio_helloworld_v1_v2.json

  "clusters": {   
    "dynamic_active_clusters": [
     {
     "version_info": "2019-02-13T09:28:16Z/7",
     "cluster": {
      "name": "BlackHoleCluster",
      "connect_timeout": "1s"
     },
     "last_updated": "2019-02-13T09:28:37.971Z"
    },
    {
     "version_info": "2019-02-13T09:28:16Z/7",
     "cluster": {
      "name": "BlackHoleCluster",
      "connect_timeout": "1s"
     },
     "last_updated": "2019-02-13T09:28:37.971Z"
    },
    {
     "version_info": "2019-02-13T09:28:16Z/7",
     "cluster": {
               // fmt.Sprintf("%s|%d|%s|%s", direction, port, subsetName, hostname)
      "name": "inbound|5000||helloworld.default.svc.cluster.local",
      "connect_timeout": "1s",
      "hosts": [
       {
        "socket_address": {
         "address": "127.0.0.1",
         "port_value": 5000
        }
       }
      ],
      "circuit_breakers": {
       "thresholds": [
        {}
       ]
      }
     },
     "last_updated": "2019-02-13T09:28:37.971Z"
    },

    {
     "version_info": "2019-02-13T12:41:46Z/10",
     "cluster": {
      "name": "outbound|5000|v1|helloworld.default.svc.cluster.local",
      "type": "EDS",
      "eds_cluster_config": {
       "eds_config": {
        "ads": {} // ESS 类型的,需要通过 EDS 来进行获取 !!!
       },
       "service_name": "outbound|5000|v1|helloworld.default.svc.cluster.local"
      },
      "connect_timeout": "1s",
      "circuit_breakers": {
       "thresholds": [
        {}
       ]
      }
     },
     "last_updated": "2019-02-13T12:41:46.640Z"
    },

    {
     "version_info": "2019-02-13T12:41:46Z/10",
     "cluster": {
      "name": "outbound|5000|v2|helloworld.default.svc.cluster.local",
      "type": "EDS",
      "eds_cluster_config": {
       "eds_config": {
        "ads": {}
       },
       "service_name": "outbound|5000|v2|helloworld.default.svc.cluster.local"
      },
      "connect_timeout": "1s",
      "circuit_breakers": {
       "thresholds": [
        {}
       ]
      }
     },
     "last_updated": "2019-02-13T12:41:46.641Z"
    },
    {
     "version_info": "2019-02-13T09:28:16Z/7",
     "cluster": {
      "name": "outbound|5000||helloworld.default.svc.cluster.local",
      "type": "EDS",
      "eds_cluster_config": {
       "eds_config": {
        "ads": {}
       },
       "service_name": "outbound|5000||helloworld.default.svc.cluster.local"
      },
      "connect_timeout": "1s",
      "circuit_breakers": {
       "thresholds": [
        {}
       ]
      }
     },
     "last_updated": "2019-02-13T09:28:37.928Z"
    },
   ]
  },
}

GetProxyServiceInstances

GetProxyServiceInstances 函数在 kube 中的实现通过 ProxyIP 所在的 Pod 查找对应的服务,代码如下:

istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go

// GetProxyServiceInstances returns service instances co-located with a given proxy
func (c *Controller) GetProxyServiceInstances(proxy *model.Proxy) ([]*model.ServiceInstance, error) {
    out := make([]*model.ServiceInstance, 0)

    // There is only one IP for kube registry
    proxyIP := proxy.IPAddresses[0]
    proxyNamespace := ""

    pod := c.pods.getPodByIP(proxyIP)
    if pod != nil {
        proxyNamespace = pod.Namespace
        // 1. find proxy service by label selector, 
        // if not any, there may exist headless service
        // failover to 2
        svcLister := listerv1.NewServiceLister(c.services.informer.GetIndexer())
        if services, err := svcLister.GetPodServices(pod); err != nil && len(services) > 0 {
            for _, svc := range services {
                item, exists, err := c.endpoints.informer.GetStore().GetByKey(KeyFunc(svc.Namespace, svc.Name))

                ep := *item.(*v1.Endpoints)
                out = append(out, c.getProxyServiceInstancesByEndpoint(ep, proxy)...)
            }
            return out, nil
        }
    }

    // 2. Headless service
    endpointsForPodInSameNS := make([]*model.ServiceInstance, 0)
    endpointsForPodInDifferentNS := make([]*model.ServiceInstance, 0)
    for _, item := range c.endpoints.informer.GetStore().List() {
        ep := *item.(*v1.Endpoints)
        endpoints := &endpointsForPodInSameNS
        if ep.Namespace != proxyNamespace {
            endpoints = &endpointsForPodInDifferentNS
        }

        *endpoints = append(*endpoints, c.getProxyServiceInstancesByEndpoint(ep, proxy)...)
    }

    // Put the endpointsForPodInSameNS in front of endpointsForPodInDifferentNS so that Pilot will
    // first use endpoints from endpointsForPodInSameNS. This makes sure if there are two endpoints
    // referring to the same IP/port, the one in endpointsForPodInSameNS will be used. (The other one
    // in endpointsForPodInDifferentNS will thus be rejected by Pilot).
    out = append(endpointsForPodInSameNS, endpointsForPodInDifferentNS...)
    if len(out) == 0 {
        if c.Env != nil {
            c.Env.PushContext.Add(model.ProxyStatusNoService, proxy.ID, proxy, "")
            status := c.Env.PushContext
        } else {}
    }
    return out, nil
}

buildOutboundClusters

查找出 proxy 可见的 Service 和 公开的 Service,设置成对应的 Cluster。

func (configgen *ConfigGeneratorImpl) buildOutboundClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) []*apiv2.Cluster {
    clusters := make([]*apiv2.Cluster, 0)

    inputParams := &plugin.InputParams{
        Env:  env,
        Push: push,
        Node: proxy,
    }
    networkView := model.GetNetworkView(proxy)

    // push.Services(proxy) 会返回对于Proxy 可见的 PrivateService 和 PublicServices
    for _, service := range push.Services(proxy) {
        config := push.DestinationRule(proxy, service)
        for _, port := range service.Ports {
            if port.Protocol == model.ProtocolUDP {
                continue
            }
            inputParams.Service = service
            inputParams.Port = port

            lbEndpoints := buildLocalityLbEndpoints(env, networkView, service, port.Port, nil)
            // create default cluster
            // outbound|5000||helloworld.default.svc.cluster.local
            discoveryType := convertResolution(service.Resolution)
            clusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, "", service.Hostname, port.Port)
            serviceAccounts := env.ServiceAccounts.GetIstioServiceAccounts(service.Hostname, []int{port.Port})
            defaultCluster := buildDefaultCluster(env, clusterName, discoveryType, lbEndpoints, model.TrafficDirectionOutbound)

            updateEds(defaultCluster)
            setUpstreamProtocol(defaultCluster, port)
            clusters = append(clusters, defaultCluster)

            // 如果 service 存在对应的 destinationRule 存在,则需要将 destinationRule 
            // 中定义的 subset 也添加到 outbound 类型的 cluster 集合中
            // outbound|5000|v1|helloworld.default.svc.cluster.local
            // outbound|5000|v2|helloworld.default.svc.cluster.local
            // v1 和 v2 为 subset name
            if config != nil {
                destinationRule := config.Spec.(*networking.DestinationRule)
                defaultSni := model.BuildDNSSrvSubsetKey(model.TrafficDirectionOutbound, "", service.Hostname, port.Port)
                applyTrafficPolicy(env, defaultCluster, destinationRule.TrafficPolicy, port, serviceAccounts,
                    defaultSni, DefaultClusterMode, model.TrafficDirectionOutbound)
                setLocalityPriority := false
                if defaultCluster.OutlierDetection != nil {
                    setLocalityPriority = true
                }
                applyLocalityLBSetting(proxy, defaultCluster.LoadAssignment, env.Mesh.LocalityLbSetting, setLocalityPriority)
                for _, subset := range destinationRule.Subsets {
                    inputParams.Subset = subset.Name
                    subsetClusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, port.Port)
                    defaultSni := model.BuildDNSSrvSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, port.Port)

                    // clusters with discovery type STATIC, STRICT_DNS or LOGICAL_DNS rely on cluster.hosts field
                    // ServiceEntry's need to filter hosts based on subset.labels in order to perform weighted routing
                    if discoveryType != apiv2.Cluster_EDS && len(subset.Labels) != 0 {
                        lbEndpoints = buildLocalityLbEndpoints(env, networkView, service, port.Port, []model.Labels{subset.Labels})
                    }
                    subsetCluster := buildDefaultCluster(env, subsetClusterName, discoveryType, lbEndpoints, model.TrafficDirectionOutbound)
                    updateEds(subsetCluster)
                    setUpstreamProtocol(subsetCluster, port)
                    applyTrafficPolicy(env, subsetCluster, destinationRule.TrafficPolicy, port, serviceAccounts, defaultSni,
                        DefaultClusterMode, model.TrafficDirectionOutbound)
                    applyTrafficPolicy(env, subsetCluster, subset.TrafficPolicy, port, serviceAccounts, defaultSni,
                        DefaultClusterMode, model.TrafficDirectionOutbound)
                    setLocalityPriority = false
                    if subsetCluster.OutlierDetection != nil {
                        setLocalityPriority = true
                    }
                    applyLocalityLBSetting(proxy, subsetCluster.LoadAssignment, env.Mesh.LocalityLbSetting, setLocalityPriority)
                    // call plugins
                    for _, p := range configgen.Plugins {
                        p.OnOutboundCluster(inputParams, subsetCluster)
                    }
                    clusters = append(clusters, subsetCluster)
                }
            }

            // call plugins for the default cluster
            for _, p := range configgen.Plugins {
                p.OnOutboundCluster(inputParams, defaultCluster)
            }
        }
    }

    return clusters
}

buildInboundClusters

buildInboundClusters 中需要添加 k8s 用于管理端口相关的信息,ManagementPorts 函数则是通过 proxyIP 查找到与 ProxyIP 相同的 Pod,然后基于 Pod 的规范获取到 Liveness 和 Readiness probes 中定义的相关端口;

// ManagementPorts implements a service catalog operation
// addr 为 proxy 的 IP 地址
func (c *Controller) ManagementPorts(addr string) model.PortList {
    pod := c.pods.getPodByIP(addr)
    if pod == nil {
        return nil
    }

    managementPorts, err := convertProbesToPorts(&pod.Spec)
    return managementPorts
}

// convertProbesToPorts
// convertProbesToPorts returns a PortList consisting of the ports where the
// pod is configured to do Liveness and Readiness probes
func convertProbesToPorts(t *v1.PodSpec) (model.PortList, error) {
    set := make(map[string]*model.Port)
    for _, container := range t.Containers {
        for _, probe := range []*v1.Probe{container.LivenessProbe, container.ReadinessProbe} {

            p, err := convertProbePort(&container, &probe.Handler)
            if err != nil {
                errs = multierror.Append(errs, err)
            } else if p != nil && set[p.Name] == nil {
                // Deduplicate along the way. We don't differentiate between HTTP vs TCP mgmt ports
                set[p.Name] = p
            }
        }
    }

    mgmtPorts := make(model.PortList, 0, len(set))
    for _, p := range set {
        mgmtPorts = append(mgmtPorts, p)
    }
    sort.Slice(mgmtPorts, func(i, j int) bool { return mgmtPorts[i].Port < mgmtPorts[j].Port })

    return mgmtPorts, errs
}

buildInboundClusters 函数的主体实现如下:

istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go

func (configgen *ConfigGeneratorImpl) buildInboundClusters(env *model.Environment, proxy *model.Proxy,
    push *model.PushContext, instances []*model.ServiceInstance, managementPorts []*model.Port) []*apiv2.Cluster {

    clusters := make([]*apiv2.Cluster, 0)

    sidecarScope := proxy.SidecarScope
    noneMode := proxy.GetInterceptionMode() == model.InterceptionNone

   // 如果没有设置  sidecarScope 和 ingressListener
    if sidecarScope == nil || !sidecarScope.HasCustomIngressListeners {
        // ...

       // 为传入的 instances 建立 inbound 相关的集群
        for _, instance := range instances {
            pluginParams := &plugin.InputParams{
                Env:             env,
                Node:            proxy,
                ServiceInstance: instance,
                Port:            instance.Endpoint.ServicePort,
                Push:            push,
                Bind:            LocalhostAddress, // 设定 Hosts 为 127.0.0.1
            }
            localCluster := configgen.buildInboundClusterForPortOrUDS(pluginParams)
            clusters = append(clusters, localCluster)
        }

        // 添加管理端口, clusterName 格式为 inbound|port_name|mgmCluster|port
        // Add a passthrough cluster for traffic to management ports (health check ports)
        for _, port := range managementPorts {
            clusterName := model.BuildSubsetKey(model.TrafficDirectionInbound, port.Name,
                ManagementClusterHostname, port.Port)

            localityLbEndpoints := buildInboundLocalityLbEndpoints(LocalhostAddress, port.Port)
            mgmtCluster := buildDefaultCluster(env, clusterName, apiv2.Cluster_STATIC, localityLbEndpoints,
            model.TrafficDirectionInbound)
            setUpstreamProtocol(mgmtCluster, port)
            clusters = append(clusters, mgmtCluster)
        }
    } else {
            // ....
    }

    return clusters
}

接着调用将各种信息拼装成 Plugin 结构,传入 buildInboundClusterForPortOrUDS 来进行最终的拼装,

// Plugin is called during the construction of a xdsapi.Listener which may alter the Listener in any
// way. Examples include AuthenticationPlugin that sets up mTLS authentication on the inbound Listener
// and outbound Cluster, the mixer plugin that sets up policy checks on the inbound listener, etc.
type Plugin interface {
    // OnOutboundListener is called whenever a new outbound listener is added to the LDS output for a given service.
    // Can be used to add additional filters on the outbound path.
    OnOutboundListener(in *InputParams, mutable *MutableObjects) error

    // OnInboundListener is called whenever a new listener is added to the LDS output for a given service
    // Can be used to add additional filters.
    OnInboundListener(in *InputParams, mutable *MutableObjects) error

    // OnOutboundCluster is called whenever a new cluster is added to the CDS output.
    // This is called once per push cycle, and not for every sidecar/gateway, except for gateways with non-standard
    // operating modes.
    OnOutboundCluster(in *InputParams, cluster *xdsapi.Cluster)

    // OnInboundCluster is called whenever a new cluster is added to the CDS output.
    // Called for each sidecar
    OnInboundCluster(in *InputParams, cluster *xdsapi.Cluster)

    // OnOutboundRouteConfiguration is called whenever a new set of virtual hosts (a set of virtual hosts with routes) is
    // added to RDS in the outbound path.
    OnOutboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration)

    // OnInboundRouteConfiguration is called whenever a new set of virtual hosts are added to the inbound path.
    OnInboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration)

    // OnInboundFilterChains is called whenever a plugin needs to setup the filter chains, including relevant filter chain
    // configuration, like FilterChainMatch and TLSContext.
    OnInboundFilterChains(in *InputParams) []FilterChain
}

函数buildInboundClusterForPortOrUDS完成了最后拼装 cluster 对象的接力赛:

/*
        for _, instance := range instances {
            pluginParams := &plugin.InputParams{
                Env:             env,
                Node:            proxy,
                ServiceInstance: instance,
                Port:            instance.Endpoint.ServicePort,
                Push:            push,
                Bind:            LocalhostAddress,
            }
            localCluster := configgen.buildInboundClusterForPortOrUDS(pluginParams)
            clusters = append(clusters, localCluster)
        }
*/

func (configgen *ConfigGeneratorImpl) buildInboundClusterForPortOrUDS(pluginParams *plugin.InputParams) *apiv2.Cluster {
    instance := pluginParams.ServiceInstance
    clusterName := model.BuildSubsetKey(model.TrafficDirectionInbound, instance.Endpoint.ServicePort.Name,
        instance.Service.Hostname, instance.Endpoint.ServicePort.Port)

    // 设置本地绑定的地址信息
    localityLbEndpoints := buildInboundLocalityLbEndpoints(pluginParams.Bind, instance.Endpoint.Port)

    localCluster := buildDefaultCluster(pluginParams.Env, clusterName, apiv2.Cluster_STATIC, localityLbEndpoints,
        model.TrafficDirectionInbound)
    setUpstreamProtocol(localCluster, instance.Endpoint.ServicePort)

    // call plugins,用于通知到相关的处理函数调用
    for _, p := range configgen.Plugins {
        p.OnInboundCluster(pluginParams, localCluster)
    }

    // When users specify circuit breakers, they need to be set on the receiver end
    // (server side) as well as client side, so that the server has enough capacity
    // (not the defaults) to handle the increased traffic volume

    // DestinationRule returns a destination rule for a service name in a given domain.
    config := pluginParams.Push.DestinationRule(pluginParams.Node, instance.Service)
    if config != nil {
        destinationRule := config.Spec.(*networking.DestinationRule)
        if destinationRule.TrafficPolicy != nil {
            // only connection pool settings make sense on the inbound path.
            // upstream TLS settings/outlier detection/load balancer don't apply here.
            applyConnectionPool(pluginParams.Env, localCluster, destinationRule.TrafficPolicy.ConnectionPool,
                model.TrafficDirectionInbound)
        }
    }
    return localCluster
}

istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go

applyConnectionPool 会根据 service 对应的 destinationRule 信息完成 ConnectionPool 部分的补齐, 函数定义如下:

// FIXME: there isn't a way to distinguish between unset values and zero values
func applyConnectionPool(env *model.Environment, cluster *apiv2.Cluster, settings *networking.ConnectionPoolSettings, direction model.TrafficDirection) {
    threshold := GetDefaultCircuitBreakerThresholds(direction)
    if settings.Http != nil {
        if settings.Http.Http2MaxRequests > 0 {
            // Envoy only applies MaxRequests in HTTP/2 clusters
            threshold.MaxRequests = &types.UInt32Value{Value: uint32(settings.Http.Http2MaxRequests)}
        }
        if settings.Http.Http1MaxPendingRequests > 0 {
            // Envoy only applies MaxPendingRequests in HTTP/1.1 clusters
            threshold.MaxPendingRequests = &types.UInt32Value{Value: uint32(settings.Http.Http1MaxPendingRequests)}
        }

        if settings.Http.MaxRequestsPerConnection > 0 {
            cluster.MaxRequestsPerConnection = &types.UInt32Value{Value: uint32(settings.Http.MaxRequestsPerConnection)}
        }

        // FIXME: zero is a valid value if explicitly set, otherwise we want to use the default
        if settings.Http.MaxRetries > 0 {
            threshold.MaxRetries = &types.UInt32Value{Value: uint32(settings.Http.MaxRetries)}
        }
    }

    if settings.Tcp != nil {
        if settings.Tcp.ConnectTimeout != nil {
            cluster.ConnectTimeout = util.GogoDurationToDuration(settings.Tcp.ConnectTimeout)
        }

        if settings.Tcp.MaxConnections > 0 {
            threshold.MaxConnections = &types.UInt32Value{Value: uint32(settings.Tcp.MaxConnections)}
        }

        applyTCPKeepalive(env, cluster, settings)
    }

    cluster.CircuitBreakers = &v2Cluster.CircuitBreakers{
        Thresholds: []*v2Cluster.CircuitBreakers_Thresholds{threshold},
    }
}

LDS 初始化流程详解

DiscoveryServer::pushLds -> DiscoveryServer::generateRawListeners -> ConfigGenerator.BuildListeners 的流程进行,主要的实现在函数 BuildListeners 中。

仍然以 HelloWorld 为例,只保留了2个相关的样例:envoy_listeners.json

相关信息

# kubectl get pod -n istio-system -o wide
NAME                                READY   STATUS      RESTARTS   AGE    IP            NODE       NOMINATED NODE
helloworld-v1-8f8dd85-d59lh         2/2     Running     0          155m   10.128.5.4    node06     <none>
helloworld-v2-f9cf47df4-w9mfn       2/2     Running     0          155m   10.128.13.2   node04     <none>

# kubectl get svc -n istio-system -o wide
NAME            TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)                                 AGE    SELECTOR
helloworld      ClusterIP   10.0.40.71    <none>        5000/TCP                                155m   app=helloworld


# kubectl -n istio-system exec -ti  helloworld-v1-8f8dd85-d59lh -c istio-proxy -- netstat -t -anlp
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:5000            0.0.0.0:*               LISTEN      -
tcp        0      0 0.0.0.0:15090           0.0.0.0:*               LISTEN      23/envoy
tcp        0      0 127.0.0.1:15000         0.0.0.0:*               LISTEN      23/envoy
tcp        0      0 0.0.0.0:15001           0.0.0.0:*               LISTEN      23/envoy
端口号 服务名 说明
5000 helloworld endpoint 监听端口
15090 静态监听 通过 /stats/prometheus,转发到 15000 管理端口
15000 管理端口 监听在本机 127.0.0.1
15001 virtual 端口 BlackHoleCluster,用于接受 iptable 的重定向

envoy 打印的日志,可以查看到对应的加载的信息:

[lds_api.cc:80] lds: add/update listener '10.128.69.4_5000'
[lds_api.cc:80] lds: add/update listener '10.0.79.108_15011'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8080'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_9093'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8060'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_5000'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_15010'
[lds_api.cc:80] lds: add/update listener 'virtual'

会从相关的 namespace 中查找出对应暴露出去的对应端口,生成相关记录,并针对本地 endpoint 端口增加一条记录,有去重的功能。

  "listeners": {
   "dynamic_active_listeners": [
    // 生成一个 endpoint_ip:5000 端口的 hellworld 的记录
    {
     "version_info": "2019-02-13T09:28:16Z/7",
     "listener": {
      "name": "10.128.5.4_5000",
      "address": {
       "socket_address": {
        "address": "10.128.5.4",
        "port_value": 5000
       }
      },
      "filter_chains": [
      ],
      "deprecated_v1": {
       "bind_to_port": false
      },
      "listener_filters": [
       {
        "name": "envoy.listener.tls_inspector",
        "config": {}
       }
      ]
     },
     "last_updated": "2019-02-13T09:28:38.027Z"
    },

    // 生成一个 0.0.0.0:5000 端口的 hellworld 的记录
    {
     "version_info": "2019-02-15T05:44:18Z/6",
     "listener": {
      "name": "0.0.0.0_5000",
      "address": {
       "socket_address": {
        "address": "0.0.0.0",
        "port_value": 5000
       }
      },
      "filter_chains": [
      ],
      "deprecated_v1": {
       "bind_to_port": false
      }
     },
     "last_updated": "2019-02-15T05:44:18.648Z"
    },

    // 为可见的其他 service 暴露出来的端口生成对应的记录

    // BlackHoleCluster
    {
     "version_info": "2019-02-13T09:28:16Z/7",
     "listener": {
      "name": "virtual",
      "address": {
       "socket_address": {
        "address": "0.0.0.0",
        "port_value": 15001
       }
      },
      "filter_chains": [
       {
        "filters": [
         {
          "name": "envoy.tcp_proxy",
          "config": {
           "stat_prefix": "BlackHoleCluster",
           "cluster": "BlackHoleCluster"
          }
         }
        ]
       }
      ],
      "use_original_dst": true
     },
     "last_updated": "2019-02-13T09:28:38.080Z"
    },
   ]
  }

BuildListeners 函数实现如下:

// BuildListeners produces a list of listeners and referenced clusters for all proxies
func (configgen *ConfigGeneratorImpl) BuildListeners(env *model.Environment, node *model.Proxy, push *model.PushContext) ([]*xdsapi.Listener, error) {
    switch node.Type {
    case model.SidecarProxy:
        // 获取本机上监听服务的 endpoint,然后根据 endpoint.ip 和 port 生成对应的记录
        return configgen.buildSidecarListeners(env, node, push)
    case model.Router, model.Ingress:
        return configgen.buildGatewayListeners(env, node, push)
    }
    return nil, nil
}

buildSidecarListeners 函数主要流程如下:

// buildSidecarListeners produces a list of listeners for sidecar proxies
func (configgen *ConfigGeneratorImpl) buildSidecarListeners(env *model.Environment, node *model.Proxy,
    push *model.PushContext) ([]*xdsapi.Listener, error) {

    mesh := env.Mesh

    proxyInstances, err := env.GetProxyServiceInstances(node)

    noneMode := node.GetInterceptionMode() == model.InterceptionNone
    listeners := make([]*xdsapi.Listener, 0)

    if mesh.ProxyListenPort > 0 {
        // 建立 inbound 相关的 Listeners
        inbound := configgen.buildSidecarInboundListeners(env, node, push, proxyInstances)

// outbound 相关的 Listeners
// buildSidecarOutboundListeners generates http and tcp listeners for
// outbound connections from the proxy based on the sidecar scope associated with the proxy.
// TODO(github.com/istio/pilot/issues/237)
//
// Sharing tcp_proxy and http_connection_manager filters on the same port for
// different destination services doesn't work with Envoy (yet). When the
// tcp_proxy filter's route matching fails for the http service the connection
// is closed without falling back to the http_connection_manager.
//
// Temporary workaround is to add a listener for each service IP that requires
// TCP routing
//
// Connections to the ports of non-load balanced services are directed to
// the connection's original destination. This avoids costly queries of instance
// IPs and ports, but requires that ports of non-load balanced service be unique.
        outbound := configgen.buildSidecarOutboundListeners(env, node, push, proxyInstances)

        listeners = append(listeners, inbound...)
        listeners = append(listeners, outbound...)

        // Do not generate any management port listeners if the user has specified a SidecarScope object
        // with ingress listeners. Specifying the ingress listener implies that the user wants
        // to only have those specific listeners and nothing else, in the inbound path.
        generateManagementListeners := true

        sidecarScope := node.SidecarScope
        if sidecarScope != nil && sidecarScope.HasCustomIngressListeners ||
            noneMode {
            generateManagementListeners = false
        }

        if generateManagementListeners {
            // ...
        }

        tcpProxy := &tcp_proxy.TcpProxy{
            StatPrefix:       util.BlackHoleCluster,
            ClusterSpecifier: &tcp_proxy.TcpProxy_Cluster{Cluster: util.BlackHoleCluster},
        }

        if mesh.OutboundTrafficPolicy.Mode == meshconfig.MeshConfig_OutboundTrafficPolicy_ALLOW_ANY {
            // We need a passthrough filter to fill in the filter stack for orig_dst listener
            tcpProxy = &tcp_proxy.TcpProxy{
                StatPrefix:       util.PassthroughCluster,
                ClusterSpecifier: &tcp_proxy.TcpProxy_Cluster{Cluster: util.PassthroughCluster},
            }
        }
        var transparent *google_protobuf.BoolValue
        if node.GetInterceptionMode() == model.InterceptionTproxy {
            transparent = proto.BoolTrue
        }

        // add an extra listener that binds to the port that is the recipient of the iptables redirect
        listeners = append(listeners, &xdsapi.Listener{
            Name:           VirtualListenerName,
            Address:        util.BuildAddress(WildcardAddress, uint32(mesh.ProxyListenPort)),
            Transparent:    transparent,
            UseOriginalDst: proto.BoolTrue,
            FilterChains: []listener.FilterChain{
                {
                    Filters: []listener.Filter{
                        {
                            Name: xdsutil.TCPProxy,
                            ConfigType: &listener.Filter_Config{
                                Config: util.MessageToStruct(tcpProxy),
                            },
                        },
                    },
                },
            },
        })
    }

    httpProxyPort := mesh.ProxyHttpPort
    if httpProxyPort == 0 && noneMode { // make sure http proxy is enabled for 'none' interception.
        httpProxyPort = int32(pilot.DefaultPortHTTPProxy)
    }
    // enable HTTP PROXY port if necessary; this will add an RDS route for this port
    if httpProxyPort > 0 {
        useRemoteAddress := false
        traceOperation := http_conn.EGRESS
        listenAddress := LocalhostAddress

        opts := buildListenerOpts{
            env:            env,
            proxy:          node,
            proxyInstances: proxyInstances,
            bind:           listenAddress,
            port:           int(httpProxyPort),
            filterChainOpts: []*filterChainOpts{{
                httpOpts: &httpListenerOpts{
                    rds:              RDSHttpProxy,
                    useRemoteAddress: useRemoteAddress,
                    direction:        traceOperation,
                    connectionManager: &http_conn.HttpConnectionManager{
                        HttpProtocolOptions: &core.Http1ProtocolOptions{
                            AllowAbsoluteUrl: proto.BoolTrue,
                        },
                    },
                },
            }},
            bindToPort:      true,
            skipUserFilters: true,
        }
        l := buildListener(opts)
        // TODO: plugins for HTTP_PROXY mode, envoyfilter needs another listener match for SIDECAR_HTTP_PROXY
        // there is no mixer for http_proxy
        mutable := &plugin.MutableObjects{
            Listener:     l,
            FilterChains: []plugin.FilterChain{{}},
        }
        pluginParams := &plugin.InputParams{
            ListenerProtocol: plugin.ListenerProtocolHTTP,
            ListenerCategory: networking.EnvoyFilter_ListenerMatch_SIDECAR_OUTBOUND,
            Env:              env,
            Node:             node,
            ProxyInstances:   proxyInstances,
            Push:             push,
        }
        if err := buildCompleteFilterChain(pluginParams, mutable, opts); err != nil {
            log.Warna("buildSidecarListeners ", err.Error())
        } else {
            listeners = append(listeners, l)
        }
        // TODO: need inbound listeners in HTTP_PROXY case, with dedicated ingress listener.
    }

    return listeners, nil
}

EDS 初始化流程详解

eds_clusters 跟踪的架构大体如下:

cluster_name -> proxy.LocalityA -> eds_clusterA -> (node1,node2….)
proxy.LocalityB -> eds_clusterB -> (node1,node2….)

EdsClusterClusterLoadAssignment 核心结构如下:

// EdsCluster tracks eds-related info for monitored clusters. In practice it'll include
// all clusters until we support on-demand cluster loading.
type EdsCluster struct {
    // ...
    LoadAssignment *xdsapi.ClusterLoadAssignment

    // 记录当前 cluster 被那些 proxy 使用
    // EdsClients keeps track of all nodes monitoring the cluster.
    EdsClients map[string]*XdsConnection `json:"-"`
}

type ClusterLoadAssignment struct {
    // Name of the cluster. This will be the :ref:`service_name
    // <envoy_api_field_Cluster.EdsClusterConfig.service_name>` value if specified
    // in the cluster :ref:`EdsClusterConfig
    // <envoy_api_msg_Cluster.EdsClusterConfig>`.
    ClusterName string 
    // List of endpoints to load balance to.
    Endpoints []endpoint.LocalityLbEndpoints 
    // Map of named endpoints that can be referenced in LocalityLbEndpoints.
    NamedEndpoints map[string]*endpoint.Endpoint 
    // Load balancing policy settings.
    Policy               *ClusterLoadAssignment_Policy 
}

对于 EDS 推送的主体函数如下:

            case EndpointType:
                // ...
                // 将当前已经存在的 cluster 从当前 eds_cluster 资源前跟踪的 Node 信息删除掉
                for _, cn := range con.Clusters {
                    s.removeEdsCon(cn, con.ConID, con)
                }

                // 将当前请求的 cluster 信息和 node 信息添加到,cluster 跟踪的 Node 列表中
                for _, cn := range clusters {
                    s.addEdsCon(cn, con.ConID, con)
                    // addEdsCon 函数会处理不存在的情况,并初始化 eds_cluster
                    // -> s.getOrAddEdsCluster(connection.modelNode, clusterName)
                    //      c := edsClusters[clusterName]
                    //      if c == nil {
                    //          c := &EdsCluster{
                    //          discovery:  s,
                    //          EdsClients: map[string]*XdsConnection{},
                    //          FirstUse:   time.Now(),
                    //          }
                    //          edsClusters[clusterName] = map[model.Locality]*EdsCluster{
                    //              proxy.Locality: c,
                    //      }
                    //      return c
                    //    }
                }

                con.Clusters = clusters
                adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters))
                err := s.pushEds(s.globalPushContext(), con, true, nil)
                if err != nil {
                    return err
                }

        // 配置发生变化,如果有更新则出发增量更新
        case pushEv := <-con.pushChannel:
            // It is called when config changes.
            // This is not optimized yet - we should detect what changed based on event and only
            // push resources that need to be pushed.

            // TODO: possible race condition: if a config change happens while the envoy
            // was getting the initial config, between LDS and RDS, the push will miss the
            // monitored 'routes'. Same for CDS/EDS interval.
            // It is very tricky to handle due to the protocol - but the periodic push recovers
            // from it.

            err := s.pushConnection(con, pushEv)
            if err != nil {
                return nil
            }

        }

函数 pushEds 负责主体的推送:

// pushEds is pushing EDS updates for a single connection. Called the first time
// a client connects, for incremental updates and for full periodic updates.
func (s *DiscoveryServer) pushEds(push *model.PushContext, con *XdsConnection,
   full bool, edsUpdatedServices map[string]*EndpointShards) error {
   loadAssignments := []*xdsapi.ClusterLoadAssignment{}

   emptyClusters := 0
   endpoints := 0

   // 根据 conn 连接上发送过来的 clusters name 循环拉取相关的信息
   for _, clusterName := range con.Clusters {
      _, _, hostname, _ := model.ParseSubsetKey(clusterName)
      if edsUpdatedServices != nil && edsUpdatedServices[string(hostname)] == nil {
         // Cluster was not updated, skip recomputing.
         continue
      }

     // 根据 cluster_name + con.modelNode 获取到对应的 EdsCluster 对象
      c := s.getEdsCluster(con.modelNode, clusterName)

      l := c.LoadAssignment

      // 如果存在,则重新更新 Cluster 中 LoadAssignment 相关的信息
      if l == nil { // fresh cluster
         edsClusters := map[model.Locality]*EdsCluster{
            con.modelNode.Locality: c,
         }
         s.updateCluster(push, clusterName, edsClusters)
         l = loadAssignment(c)
      }

      // If networks are set (by default they aren't) apply the Split Horizon
      // EDS filter on the endpoints
      // 根据 networks 进行的 eds 水平切分,则再进行一次过滤
      if s.Env.MeshNetworks != nil && len(s.Env.MeshNetworks.Networks) > 0 {
         endpoints := EndpointsByNetworkFilter(l.Endpoints, con, s.Env)
         endpoints = LoadBalancingWeightNormalize(endpoints)
         filteredCLA := &xdsapi.ClusterLoadAssignment{
            ClusterName: l.ClusterName,
            Endpoints:   endpoints,
            Policy:      l.Policy,
         }
         l = filteredCLA
      }

      endpoints += len(l.Endpoints)
      loadAssignments = append(loadAssignments, l)
   }

   response := endpointDiscoveryResponse(loadAssignments)
   err := con.send(response)
   return nil
}

RDS 初始化流程详解

结合 LDS 中加载的日志信息

[lds_api.cc:80] lds: add/update listener '10.128.69.4_5000'
[lds_api.cc:80] lds: add/update listener '10.0.79.108_15011'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8080'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_9093'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8060'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_5000'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_15010'
[lds_api.cc:80] lds: add/update listener 'virtual'

在 RDS 的请求中会包括 LDS 接口中的 IP 地址为 0.0.0.0 表明是外部访问的端口,将端口号作为请求的 route_names 向 ADS Server 发起请求,比如 ”8080,9093,8060,5000,15010“ 共 5 个发起请求,响应结果仅以 helloworld 5000 端口为例:

   "dynamic_route_configs": [
    {
     "version_info": "2019-02-15T05:53:49Z/8",
     "route_config": {
      "name": "5000",
      "virtual_hosts": [
       {
        "name": "helloworld.istio-system.svc.cluster.local:5000",
        "domains": [
         "helloworld.istio-system.svc.cluster.local",
         "helloworld.istio-system.svc.cluster.local:5000",
         "helloworld",
         "helloworld:5000",
         "helloworld.istio-system.svc.cluster",
         "helloworld.istio-system.svc.cluster:5000",
         "helloworld.istio-system.svc",
         "helloworld.istio-system.svc:5000",
         "helloworld.istio-system",
         "helloworld.istio-system:5000",
         "10.0.40.71",
         "10.0.40.71:5000"
        ],
        "routes": [
         {
          "match": {
           "prefix": "/"
          },
          "route": {
           "weighted_clusters": {
            "clusters": [
             {
              "name": "outbound|5000|v1|helloworld.istio-system.svc.cluster.local",
              "weight": 90,
              "per_filter_config": {
              }
             },
             {
              "name": "outbound|5000|v2|helloworld.istio-system.svc.cluster.local",
              "weight": 10,
              "per_filter_config": {
              }
             }
            ]
           },
           "timeout": "0s",
           "max_grpc_timeout": "0s"
          },
          "decorator": {
           "operation": "helloworld:5000/*"
          },
          "per_filter_config": {
           "mixer": {
            "disable_check_calls": true
           }
          }
         }
        ]
       }
      ],
      "validate_clusters": false
     },
     "last_updated": "2019-02-15T05:53:49.910Z"
    },
]

处理入口代码:

            case RouteType:
                routes := discReq.GetResourceNames()

                // ...

                if sortedRoutes == nil {
                    sort.Strings(routes)
                    sortedRoutes = routes
                }
                con.Routes = sortedRoutes
                adsLog.Debugf("ADS:RDS: REQ %s %s  routes: %d", peerAddr, con.ConID, len(con.Routes))
                err := s.pushRoute(con, s.globalPushContext())

pushRoute 函数实现如下:

func (s *DiscoveryServer) pushRoute(con *XdsConnection, push *model.PushContext) error {
    rawRoutes, err := s.generateRawRoutes(con, push)

    response := routeDiscoveryResponse(rawRoutes)
    err = con.send(response)
    // ...
    return nil
}

generateRawRoutes 主要调用函数 ConfigGenerator.BuildHTTPRoutes 完成数据的准备:

func (s *DiscoveryServer) generateRawRoutes(con *XdsConnection, push *model.PushContext) ([]*xdsapi.RouteConfiguration, error) {
   rc := make([]*xdsapi.RouteConfiguration, 0)
   for _, routeName := range con.Routes {
      r, err := s.ConfigGenerator.BuildHTTPRoutes(s.Env, con.modelNode, push, routeName)
      }

      // 验证并追加
      rc = append(rc, r)
   }
   return rc, nil
}

ConfigGeneratorImpl 函数根据 model.SidecarProxy 的类型调用对应的函数。

// BuildHTTPRoutes produces a list of routes for the proxy
func (configgen *ConfigGeneratorImpl) BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext,
    routeName string) (*xdsapi.RouteConfiguration, error) {
    proxyInstances, err := env.GetProxyServiceInstances(node)

    switch node.Type {
    case model.SidecarProxy:
        return configgen.buildSidecarOutboundHTTPRouteConfig(env, node, push, proxyInstances, routeName), nil
    case model.Router, model.Ingress:
        return configgen.buildGatewayHTTPRouteConfig(env, node, push, proxyInstances, routeName)
    }
    return nil, nil
}

最终工作,会基于 routeName 和相对应的 VirtualService 共同生成最终的 Route 配置,格式上面已经给出:

// buildSidecarOutboundHTTPRouteConfig builds an outbound HTTP Route for sidecar.
// Based on port, will determine all virtual hosts that listen on the port.
func (configgen *ConfigGeneratorImpl) buildSidecarOutboundHTTPRouteConfig(env *model.Environment, node *model.Proxy, push *model.PushContext,
    proxyInstances []*model.ServiceInstance, routeName string) *xdsapi.RouteConfiguration {

    listenerPort := 0
    listenerPort, err = strconv.Atoi(routeName)

    var virtualServices []model.Config
    var services []*model.Service

    // Get the list of services that correspond to this egressListener from the sidecarScope
    sidecarScope := node.SidecarScope
    // sidecarScope should never be nil
    if sidecarScope != nil && sidecarScope.Config != nil {
        // egress 相关的
    } else { // 从默认的 meshGateway 中获取
        meshGateway := map[string]bool{model.IstioMeshGateway: true}
        services = push.Services(node)
        virtualServices = push.VirtualServices(node, meshGateway)
    }

    nameToServiceMap := make(map[model.Hostname]*model.Service)
    for _, svc := range services {
        if listenerPort == 0 {
            nameToServiceMap[svc.Hostname] = svc
        } else {
            if svcPort, exists := svc.Ports.GetByPort(listenerPort); exists {

                nameToServiceMap[svc.Hostname] = &model.Service{
                    Hostname:     svc.Hostname,
                    Address:      svc.Address,
                    MeshExternal: svc.MeshExternal,
                    Ports:        []*model.Port{svcPort},
                }
            }
        }
    }

    // Collect all proxy labels for source match
    var proxyLabels model.LabelsCollection
    for _, w := range proxyInstances {
        proxyLabels = append(proxyLabels, w.Labels)
    }

    // Get list of virtual services bound to the mesh gateway
    virtualHostWrappers := istio_route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, nameToServiceMap, proxyLabels, virtualServices, listenerPort)
    vHostPortMap := make(map[int][]route.VirtualHost)

    for _, virtualHostWrapper := range virtualHostWrappers {
        virtualHosts := make([]route.VirtualHost, 0, len(virtualHostWrapper.VirtualServiceHosts)+len(virtualHostWrapper.Services))
        for _, host := range virtualHostWrapper.VirtualServiceHosts {
            virtualHosts = append(virtualHosts, route.VirtualHost{
                Name:    fmt.Sprintf("%s:%d", host, virtualHostWrapper.Port),
                Domains: []string{host, fmt.Sprintf("%s:%d", host, virtualHostWrapper.Port)},
                Routes:  virtualHostWrapper.Routes,
            })
        }

        for _, svc := range virtualHostWrapper.Services {
            virtualHosts = append(virtualHosts, route.VirtualHost{
                Name:    fmt.Sprintf("%s:%d", svc.Hostname, virtualHostWrapper.Port),
                Domains: generateVirtualHostDomains(svc, virtualHostWrapper.Port, node),
                Routes:  virtualHostWrapper.Routes,
            })
        }

        vHostPortMap[virtualHostWrapper.Port] = append(vHostPortMap[virtualHostWrapper.Port], virtualHosts...)
    }

    var virtualHosts []route.VirtualHost
    if listenerPort == 0 {
        virtualHosts = mergeAllVirtualHosts(vHostPortMap)
    } else {
        virtualHosts = vHostPortMap[listenerPort]
    }

    util.SortVirtualHosts(virtualHosts)
    out := &xdsapi.RouteConfiguration{
        Name:             routeName,
        VirtualHosts:     virtualHosts,
        ValidateClusters: proto.BoolFalse,
    }

    // call plugins
    for _, p := range configgen.Plugins {
        in := &plugin.InputParams{
            ListenerProtocol: plugin.ListenerProtocolHTTP,
            Env:              env,
            Node:             node,
            Push:             push,
        }
        p.OnOutboundRouteConfiguration(in, out)
    }

    return out
}

Envoy 重启的 Pilot 连接日志

CDS -> EDS -> LDS -> RDS

2019-02-16T06:27:14.131219Z info    ads ADS:CDS: REQ 10.128.69.0:48816 sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local-29 66.451µs raw: node:<id:"sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local" cluster:"helloworld" metadata:<fields:<key:"INTERCEPTION_MODE" value:<string_value:"REDIRECT" > > fields:<key:"ISTIO_PROXY_SHA" value:<string_value:"istio-proxy:930841ca88b15365737acb7eddeea6733d4f98b9" > > fields:<key:"ISTIO_PROXY_VERSION" value:<string_value:"1.0.2" > > fields:<key:"ISTIO_VERSION" value:<string_value:"1.0.5" > > fields:<key:"POD_NAME" value:<string_value:"helloworld-v1-8f8dd85-f99wk" > > fields:<key:"app" value:<string_value:"helloworld" > > fields:<key:"istio" value:<string_value:"sidecar" > > fields:<key:"version" value:<string_value:"v1" > > > build_version:"0/1.8.0-dev//RELEASE" > type_url:"type.googleapis.com/envoy.api.v2.Cluster"

2019-02-16T06:27:14.131444Z info    ads CDS: PUSH 2019-02-15T05:53:49Z/8 for helloworld-v1-8f8dd85-f99wk.istio-system "10.128.69.0:48816", Clusters: 11, Services 3

2019-02-16T06:27:14.141574Z info    ads EDS: PUSH for sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local-29 clusters 9 endpoints 9 empty 0

2019-02-16T06:27:14.142920Z info    Uses TLS multiplexing for helloworld.istio-system.svc.cluster.local {http 5000 HTTP}

2019-02-16T06:27:14.149656Z info    ads LDS: PUSH for node:helloworld-v1-8f8dd85-f99wk.istio-system addr:"10.128.69.0:48816" listeners:8 10981

2019-02-16T06:27:14.163397Z info    ads ADS: RDS: PUSH for node: helloworld-v1-8f8dd85-f99wk.istio-system addr:10.128.69.0:48816 routes:5 ver:2019-02-15T05:53:49Z/8

envoy

2019-02-16T06:27:13.308594Z info    Version root@6f6ea1061f2b-docker.io/istio-1.0.5-c1707e45e71c75d74bf3a5dec8c7086f32f32fad-Clean

2019-02-16T06:27:13.308678Z info    Proxy role: model.Proxy{ClusterID:"", Type:"sidecar", IPAddress:"10.128.69.4", ID:"helloworld-v1-8f8dd85-f99wk.istio-system", Domain:"istio-system.svc.cluster.local", Metadata:map[string]string(nil)}

2019-02-16T06:27:13.309241Z info    Effective config: binaryPath: /usr/local/bin/envoy
configPath: /etc/istio/proxy
connectTimeout: 10s
discoveryAddress: istio-pilot.istio-system:15007
discoveryRefreshDelay: 1s
drainDuration: 45s
parentShutdownDuration: 60s
proxyAdminPort: 15000
serviceCluster: helloworld
zipkinAddress: zipkin.istio-system:9411

2019-02-16T06:27:13.309274Z info    Monitored certs: []envoy.CertSource{envoy.CertSource{Directory:"/etc/certs/", Files:[]string{"cert-chain.pem", "key.pem", "root-cert.pem"}}}

2019-02-16T06:27:13.309457Z info    Starting proxy agent
2019-02-16T06:27:13.309573Z info    Received new config, resetting budget
2019-02-16T06:27:13.310745Z info    Reconciling configuration (budget 10)
2019-02-16T06:27:13.310805Z info    Epoch 0 starting
2019-02-16T06:27:13.311883Z info    Envoy command: [-c /etc/istio/proxy/envoy-rev0.json --restart-epoch 0 --drain-time-s 45 --parent-shutdown-time-s 60 --service-cluster helloworld --service-node sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local --max-obj-name-len 189 --allow-unknown-fields -l warn --v2-config-only]

// ...

[2019-02-16 06:27:13.347][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:130] cm init: initializing cds
[2019-02-16 06:27:14.133][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|15010||istio-pilot.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.134][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|15011||istio-pilot.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.134][24][info][upstream] 
external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|8080||istio-pilot.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.135][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|9093||istio-pilot.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.136][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|8060||istio-citadel.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.137][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|9093||istio-citadel.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.138][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|5000||helloworld.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.138][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|5000|v1|helloworld.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.139][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|5000|v2|helloworld.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.140][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster inbound|5000||helloworld.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.141][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster BlackHoleCluster during init
[2019-02-16 06:27:14.141][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:111] cm init: initializing secondary clusters

[2019-02-16 06:27:14.142][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:134] cm init: all clusters initialized
[2019-02-16 06:27:14.142][24][info][main] external/envoy/source/server/server.cc:401] all clusters initialized. initializing init manager
[2019-02-16 06:27:14.155][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '10.128.69.4_5000'
[2019-02-16 06:27:14.156][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '10.0.79.108_15011'
[2019-02-16 06:27:14.157][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_8080'
[2019-02-16 06:27:14.158][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_9093'
[2019-02-16 06:27:14.159][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_8060'
[2019-02-16 06:27:14.160][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_5000'
[2019-02-16 06:27:14.160][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_15010'
[2019-02-16 06:27:14.161][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener 'virtual'
[2019-02-16 06:27:14.165][24][info][config] external/envoy/source/server/listener_manager_impl.cc:908] all dependencies initialized. starting workers

configScope

Istio 1.1 代码中实现,参见 PR 10278,早期的版本实现是通过注解的方式来进行的,打在 service 上,注解如下:

// ServiceConfigScopeAnnotation configs the scope the service visible to.
  //   "PUBLIC" which is the default, indicates it is reachable within the mesh
  //   "PRIVATE" indicates it is reachable within its namespace
  ServiceConfigScopeAnnotation = "networking.istio.io/configScope"
  

ServiceEntry,VirtualService,Gateway, DestinationRule等都可以通过spec.configScope设置作用范围。ConfigScope 可以设置为”PUBLIC”,”PRIVATE”类型:

  • “PUBLIC” 表示规则对网格内所有的工作负载可见,这也是默认值。

  • “PRIVATE” 表示规则仅对同一namespace下面的工作负载可见。

样例

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: virtual-service-scope-private
spec:
  configScope: PRIVATE
  hosts:
  - "bookinfo.com"
  http:
  - route:
    - destination:
        host: "bookinfo.com"
    headers:
      request:
        add: 
          scope: private

参考

  1. debug-istio

istio 源码 – pilot 源码分析(原创)》上有1条评论

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注